我有运行在Docker中的RabbitMQ服务器和两个python客户端,它们连接到服务器并使用交换头相互发送消息。消息速率约为10/s。经过一段时间后(大部分时间是在交换了300-500条消息之后),其中一条交换将变得无响应。channel.basic_发布呼叫无异常通过,但接收方未收到任何消息。此外,在rabbitmq仪表板上,此exchange上没有任何活动rabbitmq dashboard screenshot
下面是代码示例:
import pika
import threading
import time
import sys
class Test:
def __init__(
self,
p_username,
p_password,
p_host,
p_port,
p_virtualHost,
p_outgoingExchange,
p_incomingExchange
):
self.__outgoingExch = p_outgoingExchange
self.__incomingExch = p_incomingExchange
self.__headers = {'topic': 'test'}
self.__queueName = ''
self.__channelConsumer = None
self.__channelProducer = None
self.__isRun = False
l_credentials = pika.PlainCredentials(p_username, p_password)
l_parameters = pika.ConnectionParameters(
host=p_host,
port=p_port,
virtual_host=p_virtualHost,
credentials=l_credentials,
socket_timeout=30,
connection_attempts=5,
)
self.__connection = pika.SelectConnection(
parameters=l_parameters,
on_open_callback=self.__on_connection_open,
on_open_error_callback=self.__on_connection_open_error,
on_close_callback=self.__on_connection_closed
)
def __on_connection_open(self, _conn):
print("Connection opened")
self.__connection.channel(on_open_callback=self.__on_consume_channel_open)
self.__connection.channel(on_open_callback=self.__on_produce_channel_open)
def __on_connection_open_error(self, _conn, _exception):
print("Failed to open connection")
def __on_connection_closed(self, _conn, p_exception):
print("Connection closed: {}".format(p_exception))
def __on_consume_channel_open(self, p_ch):
print("Consumer channel opened")
self.__channelConsumer = p_ch
self.__channelConsumer.exchange_declare(
exchange=self.__incomingExch,
exchange_type="headers",
callback=self.__on_consume_exchange_declared
)
def __on_consume_exchange_declared(self, p_method):
print("Consumer exchange declared")
self.__channelConsumer.queue_declare(
queue='',
callback=self.__on_queue_declare
)
def __on_queue_declare(self, p_method):
print("Consumer queue declared")
self.__queueName = p_method.method.queue
self.__channelConsumer.queue_bind(
queue=self.__queueName,
exchange=self.__incomingExch,
arguments=self.__headers,
)
self.__channelConsumer.basic_consume(self.__queueName, self.__onMessageReceived)
def __on_produce_channel_open(self, p_ch):
print("Producer channel opened")
self.__channelProducer = p_ch
self.__channelProducer.exchange_declare(
exchange=self.__outgoingExch,
exchange_type="headers",
callback=self.__on_produce_exchange_declared
)
def __on_produce_exchange_declared(self, p_method):
print("Producer exchange declared")
l_publisher = threading.Thread(target=self.__publishProcedure)
l_publisher.start()
def __onMessageReceived(self, p_channel, p_method, p_properties, p_body):
p_channel.basic_ack(p_method.delivery_tag)
print("Message received: {}".format(p_body))
def __publishProcedure(self):
print("Start publishing")
l_msgCounter = 0
while self.__isRun:
l_msgCounter += 1
self.__publish(l_msgCounter)
time.sleep(0.1)
def __publish(self, p_msgCounter):
self.__channelProducer.basic_publish(
exchange=self.__outgoingExch,
routing_key="#",
body=str(p_msgCounter),
properties=pika.BasicProperties(headers=self.__headers)
)
def run(self):
self.__isRun = True
try:
self.__connection.ioloop.start()
except KeyboardInterrupt:
self.__isRun = False
self.__connection.close()
print("Exit...")
if __name__ == '__main__':
if len(sys.argv) < 2:
print("Provide node name [node1 | node2]")
exit(-1)
l_outgoingExch = ''
l_incomingExch = ''
if sys.argv[1] == 'node1':
l_outgoingExch = 'node2.headers'
l_incomingExch = 'node1.headers'
elif sys.argv[1] == 'node2':
l_outgoingExch = 'node1.headers'
l_incomingExch = 'node2.headers'
else:
print("Wrong node name")
exit(-1)
l_testInstance = Test(
p_username='admin',
p_password='admin',
p_host='localhost',
p_port=5672,
p_virtualHost='/',
p_incomingExchange=l_incomingExch,
p_outgoingExchange=l_outgoingExch
)
l_testInstance.run()
我将两个实例作为两个节点(node1和node2)运行,因此它们应该相互通信
有时我也会在这里描述这个问题: Stream connection lost: AssertionError(('_AsyncTransportBase._produce() tx buffer size underflow', -275, 1),)
我发现我误用了皮卡。正如pikadocumentation所说,跨多个线程共享连接是不安全的。与其他线程的连接交互的唯一方法是使用add\u callback\u threadsafe函数。在我的示例中,它应该如下所示:
相关问题 更多 >
编程相关推荐