Python:让concurrent.futures执行器等待done_回调完成

2024-09-27 21:23:22 发布

您现在位置:Python中文网/ 问答频道 /正文

是否可以让ThreadPoolExecutor等待其所有未来及其add_done_callback()函数完成,而不必调用.shutdown(wait=True)?下面的代码片段说明了我试图实现的本质,即在外部循环的迭代之间重用线程池

from concurrent.futures import ThreadPoolExecutor, wait
import time

def proc_func(n):
    return n + 1


def create_callback_func(fid, sleep_time):
    def callback(future):
        time.sleep(sleep_time)
        fid.write(str(future.result()))
        return

    return callback


num_workers = 4
num_files_write = 3
num_tasks = 8
sleep_time = 1

pool = ThreadPoolExecutor(max_workers=num_workers)

for n in range(num_files_write):
    fid = open(f'test{n}.txt', 'w')
    futs = []

    callback_func = create_callback_func(fid, sleep_time)

    for t in range(num_tasks):
        fut = pool.submit(proc_func, n)
        fut.add_done_callback(callback_func)
        futs.append(fut)

    wait(futs)
    fid.close()

pool.shutdown(wait=True)

运行此代码会抛出一堆ValueError: I/O operation on closed file.,并且写入的三个文件都有内容:
test0.txt->1111
test1.txt->2222
test3.txt->3333

显然这是错误的,每个数字应该有八个。如果我为每个文件创建并关闭一个单独的ThreadPoolExecutor,那么就会得到正确的结果。所以我知道Executor能够正确地等待所有回调完成,但是我能告诉它这样做而不关闭它吗


Tags: txtreturntimedefcallbacksleepnumwrite
1条回答
网友
1楼 · 发布于 2024-09-27 21:23:22

恐怕这是不可能的,你“误用”了回调

回调的主要目的是通知计划的工作已经完成

内部未来状态待定->;运行->;已完成(为简洁起见,不考虑取消)。当达到完成状态时,将调用回调,但在回调完成时没有下一个状态。这就是为什么无法与该事件同步

在一个可用线程中执行提交函数的核心是(简化):

try:
    result = self.fn(*self.args, **self.kwargs)
except BaseException as exc:
     self.future.set_exception(exc)
else:
    self.future.set_result(result)

其中set_exceptionset_result看起来像这样(非常简化):

... save the result/exception
self._state = FINISHED
... wakeup all waiters
self._invoke_callbacks() # this is the last statement

当调用“done”回调时,future处于FINISHED,即“done”状态。在标记完成之前通知工作已完成是没有意义的

正如您已经注意到的,在代码中:

wait(futs)
fid.close()

wait返回,文件关闭,但回调尚未完成,无法尝试写入关闭的文件


第二个问题是shutdown(wait=True)为什么有效?只是因为它等待所有线程:

if wait:
    for t in self._threads:
        t.join()

这些线程还执行回调(请参阅上面的代码片段)。这就是线程完成时回调执行必须完成的原因

相关问题 更多 >

    热门问题