多个工作线程使用来自RabbitMQ queu的相同消息

2024-10-01 13:45:02 发布

您现在位置:Python中文网/ 问答频道 /正文

我使用pyamqp模块和python3.4 当我运行多个监听器并启动一个监听器来发布消息时,监听器接收一条消息并开始同时处理它。我不需要这种行为,因为消息只应该写入DB一次。所以最快的工作线程将消息写入数据库,所有其他工作线程都会说该消息已经存在。在

制作人:

import json
import amqp
import random
from application.settings import RMQ_PASSWORD, RMQ_USER, RMQ_HOST, RMQ_EXCHANGE

def main():
    conn = amqp.Connection(RMQ_HOST, RMQ_USER,
                           RMQ_PASSWORD, ssl=False)
    ch = conn.channel()
    ch.exchange_declare(RMQ_EXCHANGE, 'fanout')
    req = {"request": {"transaction_number": random.randint(100000, 9999999999)}}
    message = json.dumps(req)    
    msg = amqp.Message(message)    
    ch.basic_publish(msg, RMQ_EXCHANGE)    
    ch.close()
    conn.close()

if __name__ == '__main__':
    for x in range(100):
        main()

工人:

^{pr2}$

同时:

user@RabbitMQ:~$ sudo rabbitmqctl list_bindings
Listing bindings ...
        exchange        amq.gen--crTjfeSlue6gw0LRwW7pQ  queue   amq.gen--crTjfeSlue6gw0LRwW7pQ  []
        exchange        amq.gen-1X3vwGF5OKn_gcnofpJKFg  queue   amq.gen-1X3vwGF5OKn_gcnofpJKFg  []
...
        exchange        amq.gen-yf8ieG1AK9x83Vz4GBj-ZA  queue   amq.gen-yf8ieG1AK9x83Vz4GBj-ZA  []
        exchange        entryapi.test   queue   entryapi.test   []
entryapi        exchange        entryapi.test   queue           []
azaza   exchange        amq.gen--crTjfeSlue6gw0LRwW7pQ  queue           []
azaza   exchange        amq.gen-1X3vwGF5OKn_gcnofpJKFg  queue           []
...
azaza   exchange        amq.gen-yf8ieG1AK9x83Vz4GBj-ZA  queue           []
azaza   exchange        entryapi.test   queue           []
...done.

Tags: testimport消息amqpexchangequeuemainch
1条回答
网友
1楼 · 发布于 2024-10-01 13:45:02

我认为你为你的用例使用了错误的设置类型。您有一个发布服务器发布到exchange,您希望读取消息并将其写入数据库。您希望在许多用户向数据库写入数据的情况下这样做,以便提高吞吐量。扇形分叉交换复制消息,因此多个队列和使用者将导致同一数据多次写入数据库。您需要使用“工作队列”。每个交换将是一个默认的(无类型,或使用相同路由密钥的所有邮件的直接交换)交换。发送到exchange的所有邮件都将定向到一个队列。每个队列将有多个使用者。每个消息将从队列中读取一次,并且只有一个消费者从您的消费者组中读取一次,然后只将一次写入数据库。在

在这里阅读更多 http://www.rabbitmq.com/tutorials/tutorial-two-python.html

相关问题 更多 >