如何在三个或更多进程之间共享队列?

2024-10-05 14:28:36 发布

您现在位置:Python中文网/ 问答频道 /正文

我有三个过程。一个进程从磁盘读取数据。另外两个进程根据第一个进程读取的数据进行一些计算。 以下代码是我的草图:

def read(pathList, q):
    for path in pathList:
        q.put(readFunc(path))
    q.put(None)
    return


def calc0(src_q, des_q):
    while True:
        data = src_q.get()
        if data is None:
            break
        else:
            des_q.put(calcFunc0(data))
    return


def calc1(src_q, des_q):
    while True:
        data = src_q.get()
        if data is None:
            break
        else:
            des_q.put(calcFunc1(data))
    return

if __name__ == '__main__':
    with Manager() as m:
        dataQueue = m.queue()
        res0 = m.queue()
        res1 = m.queue()
        readProcess = Process(target=read, args=(readPathList, dataQueue))
        readProcess.start()
        calcProcess0 = Process(target=calc0, args=(dataQueue, res0))
        calcProcess0.start()
        calcProcess1 = Process(target=calc1, args=(dataQueue, res1))
        calcProcess1.start()
        readProcess.join()
        calcProcess0.join()
        calcProcess1.join()

但是,上面的代码有一个严重的问题:我无法从队列中获取两次数据!那么,如何将队列中的数据共享给三个或更多进程


Tags: srcnonetargetdatareturnifqueueput
2条回答

HALF9000(使用multiprocessing.Queue)提供的评论是对管理的队列的改进,对于Mark Setchell关于走Redis路线的评论,有很多话要说,如果您将要做大量此类发布/订阅工作,并且您想要一些真正健壮的东西。但对于一个可能是一次性的情况来说,这是一个相当大的挑战

我相信性能最好的解决方案使用未充分利用的multiprocessing.Pipe来构建multiprocessing.Queue。它不像队列那样灵活,因为它实际上只支持一个生产者和一个消费者,但这正是您所需要的,而且它的性能要高得多

调用函数Pipe([*duplex*])时,它返回一对表示管道末端的conn1conn2对象。如果双工是False,则管道是单向的:conn1只能用于接收消息,conn2只能用于发送消息。对于此应用程序,您只需要单向连接。其思想是将连接列表作为第二个参数传递给函数read,它应该在该列表上向需要处理它的各种进程广播它读取的数据

from multiprocessing import Process, Pipe
from threading import Thread
import time

def read(pathList, conn_list):
    for path in pathList:
        value = readFunc(path)
        # simulate lots of data:
        for _ in range(1_000):
            for conn in conn_list:
                conn.send(value)
    for conn in conn_list:
        conn.send(None)

def calc0(src_conn, des_conn):
    while True:
        data = src_conn.recv()
        if data is None:
            break
        des_conn.send(calcFunc0(data))
    des_conn.send(None)

def calc1(src_conn, des_conn):
    while True:
        data = src_conn.recv()
        if data is None:
            break
        des_conn.send(calcFunc1(data))
    des_conn.send(None)

# dummy functions for testing

def readFunc(path):
    return path

def calcFunc0(data):
    return data.upper()

def calcFunc1(data):
    return data.lower()

def process_results(results, conn):
    while True:
        data = conn.recv()
        if data is None:
            break
        results.append(data)

if __name__ == '__main__':
    t = time.time()
    readPathList = ['Aa', 'Bb', 'Cc', 'Dd', 'Ee']

    res0_recv, res0_send = Pipe(False)
    data0_recv, data0_send = Pipe(False)
    res1_recv, res1_send = Pipe(False)
    data1_recv, data1_send = Pipe(False)
    results0 = []
    results1 = []
    # start threads to process results
    t0 = Thread(target=process_results, args=(results0, res0_recv))
    t1 = Thread(target=process_results, args=(results1, res1_recv))
    t0.start()
    t1.start()
    readProcess = Process(target=read, args=(readPathList, [data0_send, data1_send]))
    readProcess.start()
    calcProcess0 = Process(target=calc0, args=(data0_recv, res0_send))
    calcProcess0.start()
    calcProcess1 = Process(target=calc1, args=(data1_recv, res1_send))
    calcProcess1.start()
    readProcess.join()
    calcProcess0.join()
    calcProcess1.join()
    t0.join()
    t1.join()
    elapsed = time.time() - t
    print(len(results0), results0[0], results1[0], elapsed)

印刷品:

5000 AA aa 0.34799909591674805

更新

如果所有不同的连接都使代码有点难以理解,那么我们可以将数据隐藏在类Efficient_Queue中,这可能会导致代码更容易破译:

from multiprocessing import Process, Pipe
from threading import Thread
import time

class Efficient_Queue:
    def __init__(self):
        self._recv_conn, self._send_conn = Pipe(False)

    def put(self, obj):
        self._send_conn.send(obj)
        return self

    def get(self):
        return self._recv_conn.recv()

def read(pathList, q_list):
    for path in pathList:
        value = readFunc(path)
        # simulate lots of data:
        for _ in range(1_000):
            for q in q_list:
                q.put(value)
    for q in q_list:
        q.put(None)

def calc0(src_q, des_q):
    while True:
        data = src_q.get()
        if data is None:
            break
        des_q.put(calcFunc0(data))
    des_q.put(None)

def calc1(src_q, des_q):
    while True:
        data = src_q.get()
        if data is None:
            break
        des_q.put(calcFunc1(data))
    des_q.put(None)

# dummy functions for testing

def readFunc(path):
    return path

def calcFunc0(data):
    return data.upper()

def calcFunc1(data):
    return data.lower()

def process_results(results, q):
    while True:
        data = q.get()
        if data is None:
            break
        results.append(data)

if __name__ == '__main__':
    t = time.time()
    readPathList = ['Aa', 'Bb', 'Cc', 'Dd', 'Ee']

    res0_q = Efficient_Queue()
    res1_q = Efficient_Queue()
    data0_q = Efficient_Queue()
    data1_q = Efficient_Queue()
    results0 = []
    results1 = []
    # start threads to process results
    t0 = Thread(target=process_results, args=(results0, res0_q))
    t1 = Thread(target=process_results, args=(results1, res1_q))
    t0.start()
    t1.start()
    readProcess = Process(target=read, args=(readPathList, [data0_q, data1_q]))
    readProcess.start()
    calcProcess0 = Process(target=calc0, args=(data0_q, res0_q))
    calcProcess0.start()
    calcProcess1 = Process(target=calc1, args=(data1_q, res1_q))
    calcProcess1.start()
    readProcess.join()
    calcProcess0.join()
    calcProcess1.join()
    t0.join()
    t1.join()
    elapsed = time.time() - t
    print(len(results0), results0[0], results1[0], elapsed)

印刷品:

5000 AA aa 0.3409993648529053

Efficient_Queue实例替换为multiprocessing.Queue实例时,我们得到:

5000 AA aa 0.576676607131958

multiprocessing.Queue实例被管理的队列替换时(即m.Queue(),其中mManager()),我们得到:

5000 AA aa 2.8409862518310547

正如我在评论中所建议的那样,我在下面使用了RedisPubSub

请注意,安装Redis是一个非常简单和轻量级的过程,也可以使用docker轻松运行,将容器端口6379映射到主机的6379:

docker run  name Redis -p 6379:6379 redis:latest

还请注意,由于Redis已联网,您可以在不同的计算机上运行发布者和订阅者

还要注意的是,您可以运行任意数量的订阅者,实际上也可以运行任意数量的发布者,而无需更改其中任何一个的代码

以下是出版商:

#!/usr/bin/env python3

import sys
import redis
import logging

if __name__ == '__main__':

    # Configurable settings
    host, port = 'localhost', 6379
    topic = 'stuff'
    nMsgs = 20

    logging.basicConfig(level=logging.DEBUG, format='[Redis:Publisher] %(levelname)s:%(message)s')

    # Redis connection
    logging.info(f'Running with host={host}, port={port}')
    r = redis.Redis(host=host, port=port, db=0)

    # Test Redis is running
    if not r.ping():
        logging.critical('Redis is not responding')
        sys.exit(1)

    logging.info('Redis is responding')

    # Publish a bunch of messages
    for i in range(nMsgs):
        message = f'Test message {i+1}/{nMsgs}'
        logging.info(f'Published: {message}')
        r.publish(topic, message)

    # Tell subscribers the show is over
    r.publish(topic, 'quit')

    logging.info('Done')

这是订户:

#!/usr/bin/env python3

import sys
import redis
import logging

if __name__ == '__main__':

    # Configurable settings
    host, port = 'localhost', 6379
    topic = 'stuff'

    logging.basicConfig(level=logging.DEBUG, format='[Redis:Subscriber] %(levelname)s:%(message)s')

    # Redis connection
    logging.info(f'Running with host={host}, port={port}')
    r = redis.Redis(host=host, port=port, db=0)

    # Test Redis is running
    if not r.ping():
        logging.critical('Redis is not responding')
        sys.exit(1)

    logging.info('Redis is responding')
    logging.info(f'Subscribing to topic: {topic}')
    sub = r.pubsub()
    sub.subscribe(topic)

    for message in sub.listen():
        if message['data'] == b'quit':
            logging.info(f'Teardown requested')
            break
        logging.info(f'Received: {message}')

    logging.info('Done')

请注意,还有一个异步订户选项,它允许您执行其他操作(如运行GUI),并在收到消息时给您回电话

示例输出-Publisher

[Redis:Publisher] INFO:Running with host=localhost, port=6379
[Redis:Publisher] INFO:Redis is responding
[Redis:Publisher] INFO:Published: Test message 1/20
[Redis:Publisher] INFO:Published: Test message 2/20
[Redis:Publisher] INFO:Published: Test message 3/20
[Redis:Publisher] INFO:Published: Test message 4/20
[Redis:Publisher] INFO:Published: Test message 5/20
[Redis:Publisher] INFO:Published: Test message 6/20
[Redis:Publisher] INFO:Published: Test message 7/20
[Redis:Publisher] INFO:Published: Test message 8/20
[Redis:Publisher] INFO:Published: Test message 9/20
[Redis:Publisher] INFO:Published: Test message 10/20
[Redis:Publisher] INFO:Published: Test message 11/20
[Redis:Publisher] INFO:Published: Test message 12/20
[Redis:Publisher] INFO:Published: Test message 13/20
[Redis:Publisher] INFO:Published: Test message 14/20
[Redis:Publisher] INFO:Published: Test message 15/20
[Redis:Publisher] INFO:Published: Test message 16/20
[Redis:Publisher] INFO:Published: Test message 17/20
[Redis:Publisher] INFO:Published: Test message 18/20
[Redis:Publisher] INFO:Published: Test message 19/20
[Redis:Publisher] INFO:Published: Test message 20/20
[Redis:Publisher] INFO:Done

样本输出-订阅者

[Redis:Subscriber] INFO:Running with host=localhost, port=6379
[Redis:Subscriber] INFO:Redis is responding
[Redis:Subscriber] INFO:Subscribing to topic: stuff
[Redis:Subscriber] INFO:Received: {'type': 'subscribe', 'pattern': None, 'channel': b'stuff', 'data': 1}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 1/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 2/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 3/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 4/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 5/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 6/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 7/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 8/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 9/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 10/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 11/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 12/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 13/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 14/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 15/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 16/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 17/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 18/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 19/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 20/20'}
[Redis:Subscriber] INFO:Teardown requested
[Redis:Subscriber] INFO:Done

相关问题 更多 >