假设我们有一个主-工作管道的玩具版本来并行数据收集
# pip install gym
import gym
import numpy as np
from multiprocessing import Process, Pipe
def worker(master_conn, worker_conn):
master_conn.close()
env = gym.make('Pendulum-v0')
env.reset()
while True:
cmd, data = worker_conn.recv()
if cmd == 'close':
worker_conn.close()
break
elif cmd == 'step':
results = env.step(data)
worker_conn.send(results)
class Master(object):
def __init__(self):
self.master_conns, self.worker_conns = zip(*[Pipe() for _ in range(10)])
self.list_process = [Process(target=worker, args=[master_conn, worker_conn], daemon=True)
for master_conn, worker_conn in zip(self.master_conns, self.worker_conns)]
[p.start() for p in self.list_process]
[worker_conn.close() for worker_conn in self.worker_conns]
def go(self, actions):
[master_conn.send(['step', action]) for master_conn, action in zip(self.master_conns, actions)]
results = [master_conn.recv() for master_conn in self.master_conns]
return results
def close(self):
[master_conn.send(['close', None]) for master_conn in self.master_conns]
[p.join() for p in self.list_process]
master = Master()
l = []
T = 1000
for t in range(T):
actions = np.random.rand(10, 1)
results = master.go(actions)
l.append(len(results))
sum(l)
由于master和每个worker之间的管道连接,对于每个时间步,我们必须通过管道向worker发送一个命令,worker将结果发送回来。我们需要从长远考虑。由于频繁的通信,这有时会有点慢。在
所以,我想知道,如果我正确地理解了它的功能,使用最新的Python特性asyncio和Process来替换管道,是否会因为IO并发而潜在地加速。在
多处理模块已经有一个并行任务处理的解决方案:^{}
您可以使用^{} 实现相同的效果。我相信这就是
pool.map()
在内部实现的方式。在那么,^{} 和{a4}有什么区别?
Queue
只是一个Pipe
加上一些锁定机制。因此,多个工作进程可以只共享一个Queue
(或者更确切地说是2个-一个用于命令,一个用于结果),但是使用Pipe
每个进程都需要它自己的Pipe
(或一对,或一个双工进程),这正是您现在的做法。在Queue
的唯一缺点是性能-因为所有进程共享一个队列互斥锁,因此对于许多进程来说,它不能很好地伸缩。为了确保它可以处理成千上万个项目,我会选择Pipe
,但是对于典型的并行任务处理用例,我认为Queue
或者仅仅是{希望有帮助,我知道我回答的问题和你问的有点不同:)
相关问题 更多 >
编程相关推荐