当消息数大于n时,我的使用者如何继续读取来自RabbitMQ的所有消息,然后回复生产者(RPC)?

2024-06-14 12:09:37 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个rabbitmq的使用者,它从特定队列中读取数据并执行一些操作。 现在我想批量执行这个操作。我无法找到一种方法,消费者只需将整个上下文保存在一个列表中,即(ch、method、props、body),然后当大小大于“n”(即所需的最小批量大小)时,我就可以执行批处理操作并执行以下操作:

if len(no_of_messages) > 10:
    responses = batch_operation(messages)
    for response in responses:
        ch, method, props, body = response
        ch.basic_publish(exchange='',
                         routing_key=props.reply_to,
                         properties=pika.BasicProperties(
                             correlation_id=props.correlation_id),
                         body=str(body))

你知道怎么做吗?我是这样启动消费者的:

channel.basic_qos(prefetch_count=1)
channel.basic_consume(some_function_that_does_above_stuffs, queue='rpc_queue')

print(" [x] Awaiting RPC requests")
channel.start_consuming()

Tags: idbasicqueueresponsechannelrabbitmq消费者body