我有一段代码,我正在以以下方式并行化:-
def stats_wrapper(a,b,c,d):
q = mp.Queue()
# b here is a dictionary
processes = [mp.Process(target = stats,args = (a,{b1:b[b1]},c,d,q) ) for b1 in b]
for p in processes:
p.start()
results = []
for p in processes:
results.append(q.get())
return(results)
在我执行这个块之后,我看到了很多僵尸进程。我试图在return(result)
行之前以以下方式使用.join()
方法:-
for p in processes:
p.join()
但这无助于摆脱僵尸进程。有人能帮我确定我的代码到底哪里出了问题吗
编辑:-我在代码的其他地方使用了另一种并行化方法,它再次给了我许多僵尸进程,只是我不知道如何重构代码来添加连接
q = mp.Queue()
jobs = (func1,func2,func3)
args = ((arg1,arg2,arg3),(arg2,arg3),(arg1,arg4))
for job,arg in zip(jobs,args):
mp.Process(target = job,args = arg,name = str(job.__name__)).start()
result = []
for _ in range(len(job)):
result.append(q.get())
如果您愿意尝试更高级别的
multiprocessing.Pool()
应该是等效的(除了
q
没有传递给stats
;该函数可以简单地返回结果)如果您不需要按顺序排列结果,请使用
p.imap_unordered()
,这样可以更有效地使用进程池您可以尝试以下代码以从队列中获取结果:
相关问题 更多 >
编程相关推荐