RabbitMQ(aiopika)ConnectionResetError:[Errno 104]对等方重置连接

2024-09-23 10:22:45 发布

您现在位置:Python中文网/ 问答频道 /正文

我们有一个websocket服务器,它执行以下操作-

  • 用例-流式音频数据通过Websocket从前端发送到此服务器
  • Websocket服务器使用aio_pika从Python客户端连接到RabbitMQ并发布音频数据
  • 为每个新的web套接字连接打开RabbitMQ连接

WEBSOCKET\u代码

@app.websocket('/ws')
async def websocket(websocket: WebSocket, meeting_id: str, user_id: str):
    global connections_ws, counter, audio
    conn_id = meeting_id
    print('before try')
    try:
        if not conn_id or conn_id == "-1":
            await websocket.close()
        print('before')
        await connections_ws.connect(websocket, conn_id)
        print('after')
        # hard coded user-id for now
        payload = {'user_id': user_id,
                   'meeting_id': meeting_id}

        print('http://{}/speechx'.format(API_URL_TRANS_SERVER))
        resp = requests.post('http://{}/speechx'.format(API_URL_TRANS_SERVER), json=payload)

        print("amqp://user:user123@{}/".format(RABBITMQ_URL))
        # Perform connection
        connection = await connect(
            "amqp://guest:guest@{}/".format(RABBITMQ_URL)
        )
        # Creating a channel
        channel = await connection.channel()
        exchange = await channel.declare_exchange("my_exchange", durable=True)

        while (True):
            receive_audio = await websocket.receive_bytes()

            # Sending the message
            await exchange.publish(
                Message(receive_audio),
                routing_key=meeting_id,
            )

    except Exception as err:
        await channel.close()
        await connection.close()
        connections_ws.disconnect(websocket, conn_id)
        print(f"Disconnect: {err}")

问题

问题是,如果我们有一个长时间运行的websocket或rabbitmq连接,以及其间来自不同客户机的3-4个其他短期连接,那么当我们等待连接时,有时候websocket服务器将无法接受到Rabbit MQ的新连接

WEBSOCKET\u服务器日志

ERROR:  Exception in ASGI application Traceback (most recent call last):  
File "ws_server_fastapi.py", line 73, in websocket   
connection = await connect(  
File "/opt/conda/lib/python3.8/site-packages/aio_pika/connection.py", line 333, in connect   
await connection.connect(  
File "/opt/conda/lib/python3.8/site-packages/aio_pika/connection.py", line 120, in connect   
self.connection = await asyncio.wait_for(  
File "/opt/conda/lib/python3.8/asyncio/tasks.py", line 455, in wait_for   
return await fut  
File "/opt/conda/lib/python3.8/site-packages/aio_pika/connection.py", line 105, in _make_connection   
connection = await aiormq.connect(self.url, **kwargs) File "/opt/conda/lib/python3.8/site-packages/aiormq/connection.py", line 542, in connect   
await connection.connect(client_properties or {})  
File "/opt/conda/lib/python3.8/site-packages/aiormq/base.py", line 168, in wrap   
return await self.create_task(func(self, *args, **kwargs))  
File "/opt/conda/lib/python3.8/site-packages/aiormq/base.py", line 25, in __inner   
return await self.task  
File "/opt/conda/lib/python3.8/site-packages/aiormq/connection.py", line 249, in connect   
self.connection_tune = await self.__rpc(  
File "/opt/conda/lib/python3.8/site-packages/aiormq/connection.py", line 200, in __rpc   
_, _, frame = await self.__receive_frame()  
File "/opt/conda/lib/python3.8/site-packages/aiormq/connection.py", line 327, in __receive_frame   
frame_header = await self.reader.readexactly(1)  
File "/opt/conda/lib/python3.8/asyncio/streams.py", line 723, in readexactly   
await self._wait_for_data('readexactly')  
File "/opt/conda/lib/python3.8/asyncio/streams.py", line 517, in _wait_for_data   
await self._waiter  
File "/opt/conda/lib/python3.8/asyncio/selector_events.py", line 848, in _read_ready__data_received   
data = self._sock.recv(self.max_size) 
ConnectionResetError: [Errno 104] Connection reset by peer

RABBITMQ日志

2021-09-13 14:12:43.007265+00:00 [info] <0.18038.21> accepting AMQP connection <0.18038.21> (172.31.32.146:60714 -> 172.31.22.88:5672) 
2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> crasher: 
2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> initial call: aten_detector:init/1 2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> pid: <0.18031.21> 
2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> registered_name: aten_detector 
2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> exception exit: {timeout, 
2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> {gen_server,call, 2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> [aten_sink,get_failure_probabilities]}} 
2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> in function gen_server:call/2 (gen_server.erl, line 239) 
2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> in call from aten_detector:handle_info/2 (src/aten_detector.erl, line 103) 
2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> in call from gen_server:try_dispatch/4 (gen_server.erl, line 695) 
2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> in call from gen_server:handle_msg/6 (gen_server.erl, line 771) 
2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> ancestors: [aten_sup,<0.179.0>] 2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> message_queue_len: 1 
2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> messages: [poll] 
2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> links: [<0.180.0>] 
2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> dictionary: [] 
2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> trap_exit: false 
2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> status: running 
2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> heap_size: 6772 
2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> stack_size: 29 
2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> reductions: 11485 
2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> neighbours: 
2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> 
2021-09-13 14:12:48.145257+00:00 [erro] <0.18032.21> closing AMQP connection <0.18032.21> (172.31.17.69:46020 -> 172.31.22.88:5672): 
2021-09-13 14:12:48.145257+00:00 [erro] <0.18032.21> {handshake_timeout,frame_header}

你能帮我们解决这个问题吗


Tags: inpyselfidlibconnectlinesite