如何使用pyamqplib在多个队列上等待消息

2024-09-29 19:37:06 发布

您现在位置:Python中文网/ 问答频道 /正文

我使用pyamqplib访问Python中的RabbitMQ。应用程序会不时接收侦听某些MQ主题的请求。在

当它第一次收到这样的请求时,它会创建一个AMQP连接和一个通道,并启动一个新线程来监听消息:

    connection = amqp.Connection(host = host, userid = "guest", password = "guest", virtual_host = "/", insist = False)
    channel = connection.channel()

    listener = AMQPListener(channel)
    listener.start()

amqplister非常简单:

^{pr2}$

创建连接后,它订阅感兴趣的主题,如下所示:

channel.queue_declare(queue = queueName, exclusive = False)
channel.exchange_declare(exchange = MQ_EXCHANGE_NAME, type = "direct", durable = False, auto_delete = True)
channel.queue_bind(queue = queueName, exchange = MQ_EXCHANGE_NAME, routing_key = destination)

def receive_callback(msg):
    self.queue.put(msg.body)

channel.basic_consume(queue = queueName, no_ack = True, callback = receive_callback)

第一次这一切都很好。但是,在随后请求订阅另一个主题时失败。在随后的请求中,我重用AMQP连接和amqplister线程(因为我不想为每个主题启动一个新线程),并且当我调用上面的代码块时channel.queue_声明()方法调用从不返回。我也尝试在那时创建一个新的频道,连接.通道()调用也不会返回。在

我能让它工作的唯一方法是为每个主题创建一个新的连接、通道和侦听器线程(即路由密钥),但这并不理想。我怀疑是wait()方法在某种程度上阻塞了整个连接,但我不知道该怎么做。当然,我应该能够使用一个侦听线程接收带有多个路由键(甚至在多个通道上)的消息吗?在

一个相关的问题是:当侦听器线程不再感兴趣时,如何停止该侦听器线程?频道。等等()如果没有消息,则呼叫将永远阻塞。我能想到的唯一方法是向队列发送一个虚拟消息,它会“毒害”它,即被侦听器解释为停止的信号。在


Tags: 方法falsehost消息amqp主题exchangequeue
1条回答
网友
1楼 · 发布于 2024-09-29 19:37:06

如果每个通道需要多个comsumer,只需使用basic\u consume()附加另一个,然后使用频道。等等()之后。它将监听通过basic\u consume()连接的所有队列。确保为每个basic\u consume()定义不同的消费者标签。在

使用channel.basic_取消(consumer_tag)如果要取消队列上的特定使用者(取消收听特定主题)。在

相关问题 更多 >

    热门问题