我有三个过程。一个进程从磁盘读取数据。另外两个进程根据第一个进程读取的数据进行一些计算。 以下代码是我的草图:
def read(pathList, q):
for path in pathList:
q.put(readFunc(path))
q.put(None)
return
def calc0(src_q, des_q):
while True:
data = src_q.get()
if data is None:
break
else:
des_q.put(calcFunc0(data))
return
def calc1(src_q, des_q):
while True:
data = src_q.get()
if data is None:
break
else:
des_q.put(calcFunc1(data))
return
if __name__ == '__main__':
with Manager() as m:
dataQueue = m.queue()
res0 = m.queue()
res1 = m.queue()
readProcess = Process(target=read, args=(readPathList, dataQueue))
readProcess.start()
calcProcess0 = Process(target=calc0, args=(dataQueue, res0))
calcProcess0.start()
calcProcess1 = Process(target=calc1, args=(dataQueue, res1))
calcProcess1.start()
readProcess.join()
calcProcess0.join()
calcProcess1.join()
但是,上面的代码有一个严重的问题:我无法从队列中获取两次数据!那么,如何将队列中的数据共享给三个或更多进程
HALF9000(使用
multiprocessing.Queue
)提供的评论是对管理的队列的改进,对于Mark Setchell关于走Redis路线的评论,有很多话要说,如果您将要做大量此类发布/订阅工作,并且您想要一些真正健壮的东西。但对于一个可能是一次性的情况来说,这是一个相当大的挑战我相信性能最好的解决方案使用未充分利用的
multiprocessing.Pipe
来构建multiprocessing.Queue
。它不像队列那样灵活,因为它实际上只支持一个生产者和一个消费者,但这正是您所需要的,而且它的性能要高得多调用函数
Pipe([*duplex*])
时,它返回一对表示管道末端的conn1
、conn2
对象。如果双工是False
,则管道是单向的:conn1
只能用于接收消息,conn2
只能用于发送消息。对于此应用程序,您只需要单向连接。其思想是将连接列表作为第二个参数传递给函数read
,它应该在该列表上向需要处理它的各种进程广播它读取的数据印刷品:
更新
如果所有不同的连接都使代码有点难以理解,那么我们可以将数据隐藏在类
Efficient_Queue
中,这可能会导致代码更容易破译:印刷品:
当
Efficient_Queue
实例替换为multiprocessing.Queue
实例时,我们得到:当
multiprocessing.Queue
实例被管理的队列替换时(即m.Queue()
,其中m
是Manager()
),我们得到:正如我在评论中所建议的那样,我在下面使用了RedisPubSub
请注意,安装Redis是一个非常简单和轻量级的过程,也可以使用
docker
轻松运行,将容器端口6379映射到主机的6379:还请注意,由于Redis已联网,您可以在不同的计算机上运行发布者和订阅者
还要注意的是,您可以运行任意数量的订阅者,实际上也可以运行任意数量的发布者,而无需更改其中任何一个的代码
以下是出版商:
这是订户:
请注意,还有一个异步订户选项,它允许您执行其他操作(如运行GUI),并在收到消息时给您回电话
示例输出-Publisher
样本输出-订阅者
相关问题 更多 >
编程相关推荐