我们有一个websocket服务器,它执行以下操作-
WEBSOCKET\u代码
@app.websocket('/ws')
async def websocket(websocket: WebSocket, meeting_id: str, user_id: str):
global connections_ws, counter, audio
conn_id = meeting_id
print('before try')
try:
if not conn_id or conn_id == "-1":
await websocket.close()
print('before')
await connections_ws.connect(websocket, conn_id)
print('after')
# hard coded user-id for now
payload = {'user_id': user_id,
'meeting_id': meeting_id}
print('http://{}/speechx'.format(API_URL_TRANS_SERVER))
resp = requests.post('http://{}/speechx'.format(API_URL_TRANS_SERVER), json=payload)
print("amqp://user:user123@{}/".format(RABBITMQ_URL))
# Perform connection
connection = await connect(
"amqp://guest:guest@{}/".format(RABBITMQ_URL)
)
# Creating a channel
channel = await connection.channel()
exchange = await channel.declare_exchange("my_exchange", durable=True)
while (True):
receive_audio = await websocket.receive_bytes()
# Sending the message
await exchange.publish(
Message(receive_audio),
routing_key=meeting_id,
)
except Exception as err:
await channel.close()
await connection.close()
connections_ws.disconnect(websocket, conn_id)
print(f"Disconnect: {err}")
问题
问题是,如果我们有一个长时间运行的websocket或rabbitmq连接,以及其间来自不同客户机的3-4个其他短期连接,那么当我们等待连接时,有时候websocket服务器将无法接受到Rabbit MQ的新连接
WEBSOCKET\u服务器日志
ERROR: Exception in ASGI application Traceback (most recent call last):
File "ws_server_fastapi.py", line 73, in websocket
connection = await connect(
File "/opt/conda/lib/python3.8/site-packages/aio_pika/connection.py", line 333, in connect
await connection.connect(
File "/opt/conda/lib/python3.8/site-packages/aio_pika/connection.py", line 120, in connect
self.connection = await asyncio.wait_for(
File "/opt/conda/lib/python3.8/asyncio/tasks.py", line 455, in wait_for
return await fut
File "/opt/conda/lib/python3.8/site-packages/aio_pika/connection.py", line 105, in _make_connection
connection = await aiormq.connect(self.url, **kwargs) File "/opt/conda/lib/python3.8/site-packages/aiormq/connection.py", line 542, in connect
await connection.connect(client_properties or {})
File "/opt/conda/lib/python3.8/site-packages/aiormq/base.py", line 168, in wrap
return await self.create_task(func(self, *args, **kwargs))
File "/opt/conda/lib/python3.8/site-packages/aiormq/base.py", line 25, in __inner
return await self.task
File "/opt/conda/lib/python3.8/site-packages/aiormq/connection.py", line 249, in connect
self.connection_tune = await self.__rpc(
File "/opt/conda/lib/python3.8/site-packages/aiormq/connection.py", line 200, in __rpc
_, _, frame = await self.__receive_frame()
File "/opt/conda/lib/python3.8/site-packages/aiormq/connection.py", line 327, in __receive_frame
frame_header = await self.reader.readexactly(1)
File "/opt/conda/lib/python3.8/asyncio/streams.py", line 723, in readexactly
await self._wait_for_data('readexactly')
File "/opt/conda/lib/python3.8/asyncio/streams.py", line 517, in _wait_for_data
await self._waiter
File "/opt/conda/lib/python3.8/asyncio/selector_events.py", line 848, in _read_ready__data_received
data = self._sock.recv(self.max_size)
ConnectionResetError: [Errno 104] Connection reset by peer
RABBITMQ日志
2021-09-13 14:12:43.007265+00:00 [info] <0.18038.21> accepting AMQP connection <0.18038.21> (172.31.32.146:60714 -> 172.31.22.88:5672)
2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> crasher:
2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> initial call: aten_detector:init/1 2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> pid: <0.18031.21>
2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> registered_name: aten_detector
2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> exception exit: {timeout,
2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> {gen_server,call, 2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> [aten_sink,get_failure_probabilities]}}
2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> in function gen_server:call/2 (gen_server.erl, line 239)
2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> in call from aten_detector:handle_info/2 (src/aten_detector.erl, line 103)
2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> in call from gen_server:try_dispatch/4 (gen_server.erl, line 695)
2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> in call from gen_server:handle_msg/6 (gen_server.erl, line 771)
2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> ancestors: [aten_sup,<0.179.0>] 2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> message_queue_len: 1
2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> messages: [poll]
2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> links: [<0.180.0>]
2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> dictionary: []
2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> trap_exit: false
2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> status: running
2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> heap_size: 6772
2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> stack_size: 29
2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> reductions: 11485
2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21> neighbours:
2021-09-13 14:12:21.309372+00:00 [erro] <0.18031.21>
2021-09-13 14:12:48.145257+00:00 [erro] <0.18032.21> closing AMQP connection <0.18032.21> (172.31.17.69:46020 -> 172.31.22.88:5672):
2021-09-13 14:12:48.145257+00:00 [erro] <0.18032.21> {handshake_timeout,frame_header}
你能帮我们解决这个问题吗
目前没有回答
相关问题 更多 >
编程相关推荐