Python(3.7+)多处理:用asyncio替换master和worker之间的管道连接以实现IO并发

2024-09-22 18:29:49 发布

您现在位置:Python中文网/ 问答频道 /正文

假设我们有一个主-工作管道的玩具版本来并行数据收集

# pip install gym
import gym
import numpy as np
from multiprocessing import Process, Pipe

def worker(master_conn, worker_conn):
    master_conn.close()

    env = gym.make('Pendulum-v0')
    env.reset()

    while True:
        cmd, data = worker_conn.recv()

        if cmd == 'close':
            worker_conn.close()
            break
        elif cmd == 'step':
            results = env.step(data)
            worker_conn.send(results)

class Master(object):
    def __init__(self):
        self.master_conns, self.worker_conns = zip(*[Pipe() for _ in range(10)])
        self.list_process = [Process(target=worker, args=[master_conn, worker_conn], daemon=True) 
                             for master_conn, worker_conn in zip(self.master_conns, self.worker_conns)]
        [p.start() for p in self.list_process]
        [worker_conn.close() for worker_conn in self.worker_conns]

    def go(self, actions):
        [master_conn.send(['step', action]) for master_conn, action in zip(self.master_conns, actions)]
        results = [master_conn.recv() for master_conn in self.master_conns]

        return results

    def close(self):
        [master_conn.send(['close', None]) for master_conn in self.master_conns]
        [p.join() for p in self.list_process]

master = Master()
l = []
T = 1000
for t in range(T):
    actions = np.random.rand(10, 1)
    results = master.go(actions)
    l.append(len(results))

sum(l)

由于master和每个worker之间的管道连接,对于每个时间步,我们必须通过管道向worker发送一个命令,worker将结果发送回来。我们需要从长远考虑。由于频繁的通信,这有时会有点慢。在

所以,我想知道,如果我正确地理解了它的功能,使用最新的Python特性asyncio和Process来替换管道,是否会因为IO并发而潜在地加速。在


Tags: inimportselfmasteractionsforclose管道
1条回答
网友
1楼 · 发布于 2024-09-22 18:29:49

多处理模块已经有一个并行任务处理的解决方案:^{}

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(processes=4) as pool:         # start 4 worker processes
        print(pool.map(f, range(10)))       # prints "[0, 1, 4,..., 81]"

您可以使用^{}实现相同的效果。我相信这就是pool.map()在内部实现的方式。在

那么,^{}和{a4}有什么区别?Queue只是一个Pipe加上一些锁定机制。因此,多个工作进程可以只共享一个Queue(或者更确切地说是2个-一个用于命令,一个用于结果),但是使用Pipe每个进程都需要它自己的Pipe(或一对,或一个双工进程),这正是您现在的做法。在

Queue的唯一缺点是性能-因为所有进程共享一个队列互斥锁,因此对于许多进程来说,它不能很好地伸缩。为了确保它可以处理成千上万个项目,我会选择Pipe,但是对于典型的并行任务处理用例,我认为Queue或者仅仅是{}都可以,因为它们更易于使用。(管理流程可能会很棘手,而asyncio也不会让它变得更简单。)

希望有帮助,我知道我回答的问题和你问的有点不同:)

相关问题 更多 >