Python多处理。队列不接收来自分叉进程的PUT

2024-04-26 11:58:47 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在创建固定数量的分叉子进程,并试图让它们通过multiprocessing.Queue返回结果。这导致了一些意想不到的行为

import multiprocessing
import os

def main():
    n_workers = 4

    q = multiprocessing.Queue(n_workers)

    for i in range(n_workers):
        if os.fork() == 0:
            print(f"child {i} put {i}")
            q.put(i)
            print(f"child {i} exiting")
            os._exit(0)

    for i in range(n_workers):
        res = q.get()
        print(f"parent got {res}")

    print("parent exiting")


if __name__ == "__main__":
    main()

当我运行此操作时,所有子进程将其结果排队并终止,但父进程挂起:

child 0 put 0                                                                              │
child 1 put 1                                                                              │
child 2 put 2                                                                              │
child 3 put 3                                                                              │
child 0 exiting                                                                            │
child 1 exiting                                                                            │
child 2 exiting                                                                            │
child 3 exiting                                                                            │
parent got 0

Tags: inimportchildforqueueput进程os
1条回答
网友
1楼 · 发布于 2024-04-26 11:58:47

问题是将数据放入队列后立即调用os._exit(0)

multiprocessing docs解释如何将数据添加到队列:

When an object is put on a queue, the object is pickled and a background thread later flushes the pickled data to an underlying pipe.

因为进程是分叉的,所以对os._exit(0)的调用是必要的(与sys.exit(0)相反),但它是does not do any cleanup。如果后台线程尚未刷新数据,它将丢失

解决方案是调用close(),后跟join_thread()

import multiprocessing
import os

def main():
    n_workers = 4

    q = multiprocessing.Queue(n_workers)

    for i in range(n_workers):
        if os.fork() == 0:
            print(f"child {i} put {i}")
            q.put(i)
            print(f"child {i} exiting")

            q.close()  # indicate nothing else will be queued by this process
            q.join_thread()  # wait for the background thread to flush the data

            os._exit(0)

    for i in range(n_workers):
        res = q.get()
        print(f"parent got {res}")

    print("parent exiting")


if __name__ == "__main__":
    main()

相关问题 更多 >