如何在对象列表中使用“with”

2024-10-03 02:32:56 发布

您现在位置:Python中文网/ 问答频道 /正文

假设我有一个类,它将生成一个线程并实现.__enter__.__exit__,因此我可以这样使用它:

with SomeAsyncTask(params) as t:
    # do stuff with `t`
    t.thread.start()
    t.thread.join()

.__exit__可能出于清理目的执行某些操作(即删除临时文件等)

在我有一个SomeAsyncTask列表之前,这一切都很好。你知道吗

list_of_async_task_params = [params1, params2, ...]

我应该如何在列表中使用with?我希望这样:

with [SomeAsyncTask(params) for params in list_of_async_task_params] as tasks:
    # do stuff with `tasks`
    for task in tasks:
        task.thread.start()
    for task in tasks:
        task.thread.join()

Tags: in列表fortaskaswithexitparams
3条回答

注意:不知何故,我忽略了这样一个事实,即您的Thread子类本身也是一个上下文管理器,因此下面的代码不会做出这种假设。然而,当使用更“通用”类型的线程时(使用contextlib.ExitStack之类的线程将不是一个选项),它可能会有所帮助。你知道吗

你的问题是有点轻的细节,所以我编了一些,但这可能是接近你想要的。它定义了一个AsyncTaskListContextManager类,该类具有支持上下文管理器协议所需的__enter__()__exit__()方法(以及相关的with语句)。你知道吗

import threading
from time import sleep

class SomeAsyncTask(threading.Thread):
    def __init__(self, name, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.name = name
        self.status_lock = threading.Lock()
        self.running = False

    def run(self):
        with self.status_lock:
            self.running = True

        while True:
            with self.status_lock:
                if not self.running:
                    break
            print('task {!r} running'.format(self.name))
            sleep(.1)

        print('task {!r} stopped'.format(self.name))

    def stop(self):
        with self.status_lock:
            self.running = False


class AsyncTaskListContextManager:
    def __init__(self, params_list):
        self.threads = [SomeAsyncTask(params) for params in params_list]

    def __enter__(self):
        for thread in self.threads:
            thread.start()
        return self

    def __exit__(self, *args):
        for thread in self.threads:
            if thread.is_alive():
                thread.stop()
                thread.join()  # wait for it to terminate
        return None  # allows exceptions to be processed normally

params = ['Fee', 'Fie', 'Foe']
with AsyncTaskListContextManager(params) as task_list:
    for _ in range(5):
        sleep(1)
    print('leaving task list context')

print('end-of-script')

输出:

task 'Fee' running
task 'Fie' running
task 'Foe' running
task 'Foe' running
task 'Fee' running
task 'Fie' running
... etc
task 'Fie' running
task 'Fee' running
task 'Foe' running
leaving task list context
task 'Foe' stopped
task 'Fie' stopped
task 'Fee' stopped
end-of-script

我想^{}正是你要找的。这是一种将不确定数量的上下文管理器安全地组合到一个上下文管理器中的方法(这样在输入一个上下文管理器时出现异常不会导致它跳过已经成功输入的上下文管理器)。你知道吗

文件中的例子很有启发性:

with ExitStack() as stack:
    files = [stack.enter_context(open(fname)) for fname in filenames]
    # All opened files will automatically be closed at the end of
    # the with statement, even if attempts to open files later
    # in the list raise an exception

这可以很容易地适应您的“希望”代码:

import contextlib

with contextlib.ExitStack() as stack:
    tasks = [stack.enter_context(SomeAsyncTask(params))
             for params in list_of_async_task_params]
    for task in tasks:
        task.thread.start()
    for task in tasks:
        task.thread.join()

@martineau答案应该有用。这里有一个更通用的方法,应该适用于其他情况。请注意,__exit__()中不处理异常。如果一个__exit__()函数失败,其余的函数将不会被调用。通用解决方案可能会抛出聚合异常并允许您处理它。另一种情况是当您的第二个管理器的__enter__()方法抛出异常时。在这种情况下,不会调用第一个管理器的__exit__()。你知道吗

class list_context_manager:
  def __init__(self, managers):
    this.managers = managers

  def __enter__(self):
    for m in self.managers:
      m.__enter__()
    return self.managers

  def __exit__(self):
    for m in self.managers:
      m.__exit__()

然后可以像在你的问题中那样使用:

with list_context_manager([SomeAsyncTask(params) for params in list_of_async_task_params]) as tasks:
    # do stuff with `tasks`
    for task in tasks:
        task.thread.start()
    for task in tasks:
        task.thread.join()

相关问题 更多 >