向concurrent.futures.ProcessPool中的所有进程提交执行代码

2024-05-01 07:32:52 发布

您现在位置:Python中文网/ 问答频道 /正文

背景:

  • 使用concurrent.futures.process.ProcessPool执行代码的Python应用程序服务器
  • 我们有时希望在不重新启动整个服务器进程的情况下热重新加载导入的代码

(是的,我知道importlib.reloadcaveats

要使其工作,我想我必须在由进程池管理的每个multiprocessing进程中执行importlib.reload

是否有方法将某些内容提交给流程池中的所有流程


Tags: 代码服务器应用程序进程情况流程importlibmultiprocessing
1条回答
网友
1楼 · 发布于 2024-05-01 07:32:52

我不知道你提到的热重新加载尝试会怎样,但是你真正问的一般问题是可以回答的

Is there a way to submit something to all processes in a process pool?

这里的挑战在于确保所有进程都能得到这个something一次,而且只有一次,并且在每个进程都得到它之前,不会再执行

您可以在^{}的帮助下获得这种必要的同步。屏障将阻止调用barrier.wait()的各方,直到各方都这样做,然后立即释放它们

import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor


def foo(x):
    for _ in range(int(42e4)):
        pass
    return x


def reload(something):
    print(f"{mp.current_process().name}  - reloading {something} and waiting.")
    barrier.wait()
    print(f"{mp.current_process().name}  - released.")


def init_barrier(barrier):
    globals()['barrier'] = barrier


if __name__ == '__main__':

    MAX_WORKERS = 4
    barrier = mp.Barrier(MAX_WORKERS)

    with ProcessPoolExecutor(
            MAX_WORKERS, initializer=init_barrier, initargs=(barrier,)
    ) as executor:
        print(list(executor.map(foo, range(10))))
        # then something for all processes
        futures = [executor.submit(reload, "something") for _ in range(MAX_WORKERS)]
        for f in futures:
            f.result()

        print(list(executor.map(foo, range(10))))

示例输出:

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
ForkProcess-3  - reloading something and waiting.
ForkProcess-2  - reloading something and waiting.
ForkProcess-1  - reloading something and waiting.
ForkProcess-4  - reloading something and waiting.
ForkProcess-1  - released.
ForkProcess-4  - released.
ForkProcess-3  - released.
ForkProcess-2  - released.
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Process finished with exit code 0

如果您可以保留barrier一个全局变量,并且multiprocessing.get_context()._name返回"fork",则不需要使用initializer,因为全局变量将通过分叉继承和访问

相关问题 更多 >