<p>你的代码有个错误。您可以跨线程共享通道。pika不支持此功能(请参见<a href="http://pika.readthedocs.org/en/latest/faq.html" rel="nofollow">FAQ</a>)。你有两个选择:</p>
<ol>
<li>在<code>basic_get(...)</code>中定义<code>no_ack=True</code>标志,不要在线程函数<code>doWork(...)</code>中使用通道对象</li>
<li><p>如果只在完成工作后需要确认消息,那么让主线程(the<code>while True:</code>循环)处理消息ACK(而不是工作线程)。下面是执行此操作的代码的修改版本。</p>
<pre><code>from __future__ import with_statement
import pika
import sys
from pika.adapters.blocking_connection import BlockingConnection
from pika import connection, credentials
import time
import threading
import random
from pika.adapters.select_connection import SelectConnection
from pika.connection import Connection
import traceback
from Queue import Queue, Empty
def doWork(body, args, channel, ack_queue):
time.sleep(random.random())
ack_queue.put(args.delivery_tag)
def doAck(channel):
while True:
try:
r = ack_queue.get_nowait()
except Empty:
r = None
if r is None:
break
try:
channel.basic_ack(delivery_tag=r)
except:
traceback.print_exc()
auth = credentials.PlainCredentials(username="guest", password="guest")
params = connection.ConnectionParameters(host="localhost", credentials=auth)
conn = BlockingConnection(params)
channel = conn.channel()
# Create a queue for the messages that should be ACKed by main thread
ack_queue = Queue()
while True:
time.sleep(0.03)
try:
doAck(channel)
method_frame, header_frame, body = channel.basic_get(queue="test_queue")
if method_frame.NAME == 'Basic.GetEmpty':
continue
t = threading.Thread(target=doWork, args=[body, method_frame, channel, ack_queue])
t.setDaemon(True)
t.start()
except Exception, e:
traceback.print_exc()
continue
</code></pre></li>
</ol>