如何重新启动使用者rabbitmq pika python

2024-10-01 09:27:56 发布

您现在位置:Python中文网/ 问答频道 /正文

我在使用pika rabbitmq的pika库设置了一些用户退出。与pika一起,我使用twisted实现来设置异步消费者。我不知道为什么会发生这种情况,但我希望在消费者退出时实现重新连接,并且不确定如何继续这样做。这是我当前的实现

class Consumer(object):
def __init__(self, queue, exchange, routingKey, medium, signalRcallbackFunc):
    self._queue_name = queue
    self.exchange = exchange
    self.routingKey = routingKey
    self.medium = medium
    print "client on"
    self.channel = None
    self.medium.client.on(signalRcallbackFunc, self.callback)

def on_connected(self, connection):
    d = connection.channel()
    d.addCallback(self.got_channel)
    d.addCallback(self.queue_declared)
    d.addCallback(self.queue_bound)
    d.addCallback(self.handle_deliveries)
    d.addErrback(log.err)

def got_channel(self, channel):
    self.channel = channel
    self.channel.basic_qos(prefetch_count=500)
    return self.channel.queue_declare(queue=self._queue_name, durable=True)

def queue_declared(self, queue):
    self.channel.queue_bind(queue=self._queue_name,
                            exchange=self.exchange,
                            routing_key=self.routingKey)

def queue_bound(self, ignored):
    return self.channel.basic_consume(queue=self._queue_name)

def handle_deliveries(self, queue_and_consumer_tag):
    queue, consumer_tag = queue_and_consumer_tag
    self.looping_call = task.LoopingCall(self.consume_from_queue, queue)

    return self.looping_call.start(0)

def consume_from_queue(self, queue):
    d = queue.get()
    return d.addCallback(lambda result: self.handle_payload(*result))

def handle_payload(self, channel, method, properties, body):
    print(body)
    print(properties.headers)
    channel.basic_ack(method.delivery_tag)
    print "#####################################" + method.delivery_tag + "###################################"

def callback(self, data):
    #self.channel.basic_ack(data, multiple=True)
    pass

Tags: nameselfreturnexchangebasicqueuedeftag
2条回答

有什么原因不能直接关闭连接然后重新打开?在

@contextmanager
def with_pika_connection():
    credentials = pika.PlainCredentials(worker_config.username, worker_config.password)
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=worker_config.host,
        credentials=credentials,
        port=worker_config.port,
    ))

    try:
        yield connection
    finally:
        connection.close()


@contextmanager
def with_pika_channel(queuename):
    with with_pika_connection() as connection:
        channel = connection.channel()


while True:
    while not stopping:
         try:
                with with_pika_channel(queuename) as (connection, channel):
                    consumer_tag = channel.basic_consume(
                        callback,
                        queue=queuename,
                    )
                    channel.start_consuming()
         except Exception as e:
              reportException(e) 
              # Continue 

您可以使用on-unconnected回调中的连接注册“on-close”处理程序。当连接丢失时将调用此函数。在这里,您可以重新建立一个新的连接。在

下面的例子是相对有用的,这是一个策略,我用了很好的效果。。。 http://pika.readthedocs.io/en/latest/examples/asynchronous_consumer_example.html

对于twisted pika库,add_on_close_callback方法可能会让您走得更远(尽管我还没有测试过)。https://pika.readthedocs.io/en/0.10.0/modules/adapters/twisted.html

相关问题 更多 >