在多处理python脚本中使用Asyncio和Websockets时,我一直遇到一个错误。每次发生此错误时,都会导致程序基本挂起。我的其他进程似乎继续工作,但websocket进程不再通信。对我的问题是什么以及如何解决有什么建议吗
日志中的错误如下所示:
Nov 29 15:28:34 fbcradiocast1 python3[22482]: Task exception was never retrieved
Nov 29 15:28:34 fbcradiocast1 python3[22482]: future: <Task finished coro=<websocket_server.<locals>.broadcast() done, defined at /home/wruser/radio/python/fbcradiocast.py:329> exception=Connectio$
Nov 29 15:28:34 fbcradiocast1 python3[22482]: Traceback (most recent call last):
Nov 29 15:28:34 fbcradiocast1 python3[22482]: File "/home/wruser/radio/python/fbcradiocast.py", line 342, in broadcast
Nov 29 15:28:34 fbcradiocast1 python3[22482]: return_exceptions=False,)
Nov 29 15:28:34 fbcradiocast1 python3[22482]: File "/usr/local/lib/python3.7/dist-packages/websockets/protocol.py", line 555, in send
Nov 29 15:28:34 fbcradiocast1 python3[22482]: await self.ensure_open()
Nov 29 15:28:34 fbcradiocast1 python3[22482]: File "/usr/local/lib/python3.7/dist-packages/websockets/protocol.py", line 803, in ensure_open
Nov 29 15:28:34 fbcradiocast1 python3[22482]: raise self.connection_closed_exc()
Nov 29 15:28:34 fbcradiocast1 python3[22482]: websockets.exceptions.ConnectionClosedOK: code = 1000 (OK), no reason
运行WebSocket的功能如下所示:
def websocket_server(event, q):
# imports needed only by this function
import asyncio
import websockets
import queue
# we will ignore signal and let main process terminate child processes
signal.signal(signal.SIGINT, signal.SIG_IGN)
# Get PID of process
pid = os.getpid()
mylog.logger.info(f"Starting process with PID {pid}")
CLIENTS = set()
workQueue = queue.Queue(10)
async def broadcast():
while True:
# when event is set, we'll stop loop
if event.is_set():
loop.stop()
elif not workQueue.empty():
data = workQueue.get()
if (data.find("update_airplay") != -1):
q.put(data);
mylog.logger.info(f"Processing {data}")
await asyncio.gather(
*[ws.send(data) for ws in CLIENTS],
return_exceptions=False,)
await asyncio.sleep(0.01)
async def handler(websocket, path):
CLIENTS.add(websocket)
#mylog.logger.info("[WEBSOCKET_SERVER] Add client")
try:
async for msg in websocket:
workQueue.put(msg)
except websockets.ConnectionClosedError:
mylog.logger.warning("Connection closed")
finally:
CLIENTS.remove(websocket)
#mylog.logger.info("Remove client")
loop = asyncio.get_event_loop()
loop.create_task(broadcast())
# read config file to get server and port and then assign variable the command
config = configparser.ConfigParser()
config.read('config.ini')
server = config['WEBSOCKET']['server']
port = config['WEBSOCKET']['port']
start_server = websockets.serve(handler, server, port)
try:
# start the server and looping it forever
loop.run_until_complete(start_server)
loop.run_forever()
except Exception as err: # catch *all* exceptions
mylog.logger.error(f"{err}")
radio_lib.sendmail("websocket_server function in fbcradiocast", f"Error in websocket_server: {err}")
目前没有回答
相关问题 更多 >
编程相关推荐