如何确保在关闭池之前执行所有python**池.apply\u async()**调用?

2024-09-27 00:14:55 发布

您现在位置:Python中文网/ 问答频道 /正文

如何确保在过早调用pool.close()pool.join()之前执行所有pool.apply_async()调用,并通过回调累积其结果

numofProcesses = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=numofProcesses)

jobs=[]

for arg1, arg2 in arg1_arg2_tuples:
    jobs.append(pool.apply_async(function1,
                     args=(arg1,
                           arg2,
                           arg3,),
                     callback=accumulate_apply_async_result))

pool.close()
pool.join()

Tags: forcloseasynccountjobscpumultiprocessingprocesses
3条回答

如果您担心调用closejoin会以某种方式取消正在进行的作业,这就是您不应该“过早”调用它们的原因,那么您不必担心这一点close告诉工人在完成所有排队的工作后退出,然后join等待他们退出。总之,它们是等待所有工作完成的最简单方式,因此在调用它们之前等待工作完成是多余的

在退出池之前,需要等待附加的^{}对象。那是

for job in jobs:
    job.wait()

pool.close()之前

但你可能在这里工作太辛苦了。你可以

with multiprocessing.Pool() as pool:
    for result in pool.starmap(function1, 
            (arg_1, arg_2, arg_3) for arg_1, arg_2 in sim_chr_tuples)):
        accumulate_apply_async_result(result)
        
  • 池的默认值为cpu_count(),因此无需添加它
  • with是否关闭/加入
  • starmap等待您的结果

一个完整的工作示例是

import multiprocessing

result_list = []

def accumulate_apply_async_result(result):
    result_list.append(result)

def function1(arg1, arg2, arg3):
    return arg1, arg2, arg3

sim_chr_tuples = [(1,2), (3,4), (5,6), (7,8)]
arg_3 = "third arg"

if __name__ == "__main__":
    with multiprocessing.Pool() as pool:
        for result in pool.starmap(function1,
                ((arg_1, arg_2, arg_3) 
                for arg_1, arg_2 in sim_chr_tuples)):
            accumulate_apply_async_result(result)

    for r in result_list:
        print(r)

在这种情况下,更好的选择可能是ProcessPoolExecutor

这是concurrent.futures模块的一部分

模块有一个函数concurrent.futures.as_completed,它可以帮助您了解状态

相关问题 更多 >

    热门问题