是否可以让ThreadPoolExecutor
等待其所有未来及其add_done_callback()
函数完成,而不必调用.shutdown(wait=True)
?下面的代码片段说明了我试图实现的本质,即在外部循环的迭代之间重用线程池
from concurrent.futures import ThreadPoolExecutor, wait
import time
def proc_func(n):
return n + 1
def create_callback_func(fid, sleep_time):
def callback(future):
time.sleep(sleep_time)
fid.write(str(future.result()))
return
return callback
num_workers = 4
num_files_write = 3
num_tasks = 8
sleep_time = 1
pool = ThreadPoolExecutor(max_workers=num_workers)
for n in range(num_files_write):
fid = open(f'test{n}.txt', 'w')
futs = []
callback_func = create_callback_func(fid, sleep_time)
for t in range(num_tasks):
fut = pool.submit(proc_func, n)
fut.add_done_callback(callback_func)
futs.append(fut)
wait(futs)
fid.close()
pool.shutdown(wait=True)
运行此代码会抛出一堆ValueError: I/O operation on closed file.
,并且写入的三个文件都有内容:
test0.txt->1111
test1.txt->2222
test3.txt->3333
显然这是错误的,每个数字应该有八个。如果我为每个文件创建并关闭一个单独的ThreadPoolExecutor
,那么就会得到正确的结果。所以我知道Executor
能够正确地等待所有回调完成,但是我能告诉它这样做而不关闭它吗
恐怕这是不可能的,你“误用”了回调
回调的主要目的是通知计划的工作已经完成
内部未来状态待定->;运行->;已完成(为简洁起见,不考虑取消)。当达到完成状态时,将调用回调,但在回调完成时没有下一个状态。这就是为什么无法与该事件同步
在一个可用线程中执行提交函数的核心是(简化):
其中
set_exception
和set_result
看起来像这样(非常简化):当调用“done”回调时,future处于FINISHED,即“done”状态。在标记完成之前通知工作已完成是没有意义的
正如您已经注意到的,在代码中:
wait
返回,文件关闭,但回调尚未完成,无法尝试写入关闭的文件第二个问题是
shutdown(wait=True)
为什么有效?只是因为它等待所有线程:这些线程还执行回调(请参阅上面的代码片段)。这就是线程完成时回调执行必须完成的原因
相关问题 更多 >
编程相关推荐