当我尝试使用pika(python)向RabbitMQ发送ack消息时发生错误“unknown delivery tag”

2024-05-10 06:18:01 发布

您现在位置:Python中文网/ 问答频道 /正文

我希望在几个线程中处理消息,但在执行此代码时出现错误:

from __future__ import with_statement
import pika
import sys
from pika.adapters.blocking_connection import BlockingConnection
from pika import connection, credentials
import time
import threading
import random
from pika.adapters.select_connection import SelectConnection
from pika.connection import Connection
import traceback


def doWork(body, args, channel):


    r = random.random()
    time.sleep(r * 10)
    try:        
        channel.basic_ack(delivery_tag=args.delivery_tag)

    except :
        traceback.print_exc()


auth = credentials.PlainCredentials(username="guest", password="guest")
params = connection.ConnectionParameters(host="localhost", credentials=auth)
conn = BlockingConnection(params)
channel = conn.channel()


while True:

    time.sleep(0.03)    
    try:

        method_frame, header_frame, body = channel.basic_get(queue="test_queue")
        if method_frame.NAME == 'Basic.GetEmpty':
            continue        

        t = threading.Thread(target=doWork, args=[body, method_frame, channel])
        t.setDaemon(True)
        t.start()

    except Exception, e:
        traceback.print_exc()
        continue

错误描述:

Traceback (most recent call last):
  File "C:\work\projects\mq\start.py", line 43, in 
    method_frame, header_frame, body = channel.basic_get(queue="test_queue")
  File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 318, in basic_get
    self.basic_get_(self, self._on_basic_get, ticket, queue, no_ack)
  File "C:\work\projects\mq\libs\pika\channel.py", line 469, in basic_get
    no_ack=no_ack))
  File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 244, in send_method
    self.connection.process_data_events()
  File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 94, in process_data_events
    self._handle_read()
  File "C:\work\projects\mq\libs\pika\adapters\base_connection.py", line 162, in _handle_read
    self._on_data_available(data)
  File "C:\work\projects\mq\libs\pika\connection.py", line 589, in _on_data_available
    frame)                 # Args
  File "C:\work\projects\mq\libs\pika\callback.py", line 124, in process
    callback(*args, **keywords)
  File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 269, in _on_remote_close
    frame.method.reply_text)
AMQPChannelError: (406, 'PRECONDITION_FAILED - unknown delivery tag 204')

版本:pika 0.9.5,rabbitMQ 2.6.1


Tags: inpyimportbasiclinechannelconnectionframe
3条回答

对我来说,我只是告诉排队的人我不想回去,然后我就回去了。

例如:错误

channel.basic_consume(callback, queue=queue_name, no_ack=True)

然后在我的回电中:

def callback(ch, method, properties, body):
  # do stuff
  ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback, queue=queue_name, no_ack=False)

底线:如果要手动确认,请设置no_ack=False。

从文档中:

no_ack: (bool) if set to True, automatic acknowledgement mode will be used (see http://www.rabbitmq.com/confirms.html)

问题可能是您设置的no_ack=True如下:

consumer_tag = channel.basic_consume(
    message_delivery_event,
    no_ack=True,
    queue=queue,
)

然后确认消息:

channel.basic_ack(delivery_tag=args.delivery_tag)

您必须选择是否确认并设置正确的消费参数。

你的代码有个错误。您可以跨线程共享通道。pika不支持此功能(请参见FAQ)。你有两个选择:

  1. basic_get(...)中定义no_ack=True标志,不要在线程函数doWork(...)中使用通道对象
  2. 如果只在完成工作后需要确认消息,那么让主线程(thewhile True:循环)处理消息ACK(而不是工作线程)。下面是执行此操作的代码的修改版本。

    from __future__ import with_statement
    import pika
    import sys
    from pika.adapters.blocking_connection import BlockingConnection
    from pika import connection, credentials
    import time
    import threading
    import random
    from pika.adapters.select_connection import SelectConnection
    from pika.connection import Connection
    import traceback
    from Queue import Queue, Empty
    
    def doWork(body, args, channel, ack_queue):
        time.sleep(random.random())
        ack_queue.put(args.delivery_tag)
    
    def doAck(channel):
        while True:
            try:
                r = ack_queue.get_nowait()
            except Empty:
                r = None
            if r is None:
                break
            try:
                channel.basic_ack(delivery_tag=r)
            except:
                traceback.print_exc()
    
    auth = credentials.PlainCredentials(username="guest", password="guest")
    params = connection.ConnectionParameters(host="localhost", credentials=auth)
    conn = BlockingConnection(params)
    channel = conn.channel()
    # Create a queue for the messages that should be ACKed by main thread
    ack_queue = Queue()
    
    while True:
        time.sleep(0.03)    
        try:
            doAck(channel)
            method_frame, header_frame, body = channel.basic_get(queue="test_queue")
            if method_frame.NAME == 'Basic.GetEmpty':
                continue        
            t = threading.Thread(target=doWork, args=[body, method_frame, channel, ack_queue])
            t.setDaemon(True)
            t.start()
        except Exception, e:
            traceback.print_exc()
            continue
    

相关问题 更多 >