这是关于Python 3.
5中的multiprocessing
模块的另一个问题。我的问题是,我知道所有的forked processed都已经完成了它们的工作(我可以在Queue
中看到它们的结果)异步结果.result()返回True,表示作业已完成,但当我继续时PoolObj.join(),它需要永远。我知道我可以PoolObj.终止()继续我的生活,但我想知道为什么会发生这种事?在
我使用以下代码:
def worker(d):
queue.put(d)
def gen_data():
for i in range(int(1e6)):
yield i
if __name__ == "__main__":
queue = Queue(maxsize=-1)
pool = Pool(processes=12)
pool_obj_worker = pool.map_async(worker, gen_data(), chunksize=1)
pool.close()
print ('Lets run the workers...\n')
while True:
if pool_obj_worker.ready():
if pool_obj_worker.successful():
print ('\nAll processed successfully!') # I can see this quickly, so my jobs are done
else:
print ('\nAll processed. Errors encountered!')
sys.stdout.flush()
print (q.qsize()) # The size is right that means all workers have done their job
pool.join() # will get stuck here for long long time
queue.put('*')
break
print ('%d still to be processed' %
pool_obj_worker._number_left)
sys.stdout.flush()
time.sleep(0.5)
我做错了吗?请开导我。或者持有join()
的进程已经变成僵尸了?在
这里的问题是您在worker中使用了一个额外的
Queue
,而不是Pool
完成的四个。 当进程完成它们的工作时,它们都将加入multiprocessing.Queue
中使用的FeederThread
,这些调用将挂起(可能是因为所有线程同时调用join
,并且可能存在一些奇怪的争用条件,这不容易调查)。在添加
multiprocessing.util.log_to_stderr(10)
可以显示进程在加入队列馈送线程时挂起。在要解决您的问题,您可以使用
multiprocessing.SimpleQueue
而不是multiprocessing.Queue
(没有挂接连接,因为没有进料器线程),或者尝试使用方法pool.unordered_imap
,它提供了与您似乎要实现的相同的行为(返回一个无序的生成器,其中包含worker返回的结果)。在相关问题 更多 >
编程相关推荐