即使在.join()之后,多处理也会创建僵尸进程

2024-05-18 10:08:45 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一段代码,我正在以以下方式并行化:-

def stats_wrapper(a,b,c,d):
    q = mp.Queue()
    # b here is a dictionary
    processes = [mp.Process(target = stats,args = (a,{b1:b[b1]},c,d,q) ) for b1 in b]
    for p in processes:
        p.start()
    results = []
    for p in processes:
        results.append(q.get())
    return(results)

在我执行这个块之后,我看到了很多僵尸进程。我试图在return(result)行之前以以下方式使用.join()方法:-

for p in processes:
   p.join()

但这无助于摆脱僵尸进程。有人能帮我确定我的代码到底哪里出了问题吗

编辑:-我在代码的其他地方使用了另一种并行化方法,它再次给了我许多僵尸进程,只是我不知道如何重构代码来添加连接

q = mp.Queue()
jobs = (func1,func2,func3)
args = ((arg1,arg2,arg3),(arg2,arg3),(arg1,arg4))
for job,arg in zip(jobs,args):
   mp.Process(target = job,args = arg,name = str(job.__name__)).start()
result = []
for _ in range(len(job)): 
   result.append(q.get())

Tags: 代码inforqueue进程stats方式args
2条回答

如果您愿意尝试更高级别的multiprocessing.Pool()

def stats_wrapper(a, b, c, d):
    with multiprocessing.Pool() as p:
        args = [(a, {b1: b[b1]}, c, d) for b1 in b]
        return list(p.starmap(stats, args))

应该是等效的(除了q没有传递给stats;该函数可以简单地返回结果)

如果您不需要按顺序排列结果,请使用p.imap_unordered(),这样可以更有效地使用进程池

您可以尝试以下代码以从队列中获取结果:

def stats_wrapper(a,b,c,d):
    q = mp.Queue()
    # b here is a dictionary
    processes = [mp.Process(target = stats,args = (a,{b1:b[b1]},c,d,q)) for b1 in b]
    for p in processes:
        p.start()
    results = []
    for p in processes:
        p.join()
    while not q.empty():
        results.append(q.get())
    return(results)

相关问题 更多 >