为什么即使在对池的映射调用完成后,进程也要花费很长时间才能加入?

2024-10-02 00:44:36 发布

您现在位置:Python中文网/ 问答频道 /正文

这是关于Python 3.5中的multiprocessing模块的另一个问题。我的问题是,我知道所有的forked processed都已经完成了它们的工作(我可以在Queue中看到它们的结果)异步结果.result()返回True,表示作业已完成,但当我继续时PoolObj.join(),它需要永远。我知道我可以PoolObj.终止()继续我的生活,但我想知道为什么会发生这种事?在

我使用以下代码:

def worker(d):
    queue.put(d)

def gen_data():
    for i in range(int(1e6)):
        yield i

if __name__ == "__main__":
    queue = Queue(maxsize=-1)
    pool = Pool(processes=12)
    pool_obj_worker = pool.map_async(worker, gen_data(), chunksize=1)
    pool.close()

    print ('Lets run the workers...\n')
    while True:
        if pool_obj_worker.ready():
            if pool_obj_worker.successful():
                print ('\nAll processed successfully!') # I can see this quickly, so my jobs are done
            else:
                print ('\nAll processed. Errors encountered!')
            sys.stdout.flush()
            print (q.qsize()) # The size is right that means all workers have done their job
            pool.join() # will get stuck here for long long time
            queue.put('*')           
            break
    print ('%d still to be processed' %
           pool_obj_worker._number_left)
    sys.stdout.flush()
    time.sleep(0.5)

我做错了吗?请开导我。或者持有join()的进程已经变成僵尸了?在


Tags: trueobjdataifqueueputdefgen
1条回答
网友
1楼 · 发布于 2024-10-02 00:44:36

这里的问题是您在worker中使用了一个额外的Queue,而不是Pool完成的四个。 当进程完成它们的工作时,它们都将加入multiprocessing.Queue中使用的FeederThread,这些调用将挂起(可能是因为所有线程同时调用join,并且可能存在一些奇怪的争用条件,这不容易调查)。在

添加multiprocessing.util.log_to_stderr(10)可以显示进程在加入队列馈送线程时挂起。在

要解决您的问题,您可以使用multiprocessing.SimpleQueue而不是multiprocessing.Queue(没有挂接连接,因为没有进料器线程),或者尝试使用方法pool.unordered_imap,它提供了与您似乎要实现的相同的行为(返回一个无序的生成器,其中包含worker返回的结果)。在

相关问题 更多 >

    热门问题