使用Asyncio和WebSocket时从未检索到任务异常

2024-09-30 02:21:04 发布

您现在位置:Python中文网/ 问答频道 /正文

在多处理python脚本中使用Asyncio和Websockets时,我一直遇到一个错误。每次发生此错误时,都会导致程序基本挂起。我的其他进程似乎继续工作,但websocket进程不再通信。对我的问题是什么以及如何解决有什么建议吗

日志中的错误如下所示:

Nov 29 15:28:34 fbcradiocast1 python3[22482]: Task exception was never retrieved
Nov 29 15:28:34 fbcradiocast1 python3[22482]: future: <Task finished coro=<websocket_server.<locals>.broadcast() done, defined at /home/wruser/radio/python/fbcradiocast.py:329> exception=Connectio$
Nov 29 15:28:34 fbcradiocast1 python3[22482]: Traceback (most recent call last):
Nov 29 15:28:34 fbcradiocast1 python3[22482]:   File "/home/wruser/radio/python/fbcradiocast.py", line 342, in broadcast
Nov 29 15:28:34 fbcradiocast1 python3[22482]:     return_exceptions=False,)
Nov 29 15:28:34 fbcradiocast1 python3[22482]:   File "/usr/local/lib/python3.7/dist-packages/websockets/protocol.py", line 555, in send
Nov 29 15:28:34 fbcradiocast1 python3[22482]:     await self.ensure_open()
Nov 29 15:28:34 fbcradiocast1 python3[22482]:   File "/usr/local/lib/python3.7/dist-packages/websockets/protocol.py", line 803, in ensure_open
Nov 29 15:28:34 fbcradiocast1 python3[22482]:     raise self.connection_closed_exc()
Nov 29 15:28:34 fbcradiocast1 python3[22482]: websockets.exceptions.ConnectionClosedOK: code = 1000 (OK), no reason

运行WebSocket的功能如下所示:

def websocket_server(event, q):
    # imports needed only by this function
    import asyncio
    import websockets
    import queue

    # we will ignore signal and let main process terminate child processes
    signal.signal(signal.SIGINT, signal.SIG_IGN)

    # Get PID of process
    pid = os.getpid()
    mylog.logger.info(f"Starting process with PID {pid}")

    CLIENTS = set()
    workQueue = queue.Queue(10)

    async def broadcast():
        while True:
            # when event is set, we'll stop loop
            if event.is_set():
                loop.stop()
            elif not workQueue.empty():
                data = workQueue.get()
                if (data.find("update_airplay") != -1):
                    q.put(data);

                mylog.logger.info(f"Processing {data}")
                await asyncio.gather(
                *[ws.send(data) for ws in CLIENTS],
                return_exceptions=False,)
     
            await asyncio.sleep(0.01)

    async def handler(websocket, path):
        CLIENTS.add(websocket)
        #mylog.logger.info("[WEBSOCKET_SERVER] Add client")
        try:
            async for msg in websocket:
                workQueue.put(msg)
        except websockets.ConnectionClosedError:
            mylog.logger.warning("Connection closed")   
        finally:
            CLIENTS.remove(websocket)
            #mylog.logger.info("Remove client")

    loop = asyncio.get_event_loop()
    loop.create_task(broadcast())

    # read config file to get server and port and then assign variable the command
    config = configparser.ConfigParser()
    config.read('config.ini')
    server = config['WEBSOCKET']['server']
    port = config['WEBSOCKET']['port']
    start_server = websockets.serve(handler, server, port)

    try:
        # start the server and looping it forever
        loop.run_until_complete(start_server)
        loop.run_forever()
    except Exception as err: # catch *all* exceptions
        mylog.logger.error(f"{err}")
        radio_lib.sendmail("websocket_server function in fbcradiocast", f"Error in websocket_server: {err}")

Tags: inpyloopconfigdatasignalwebsocketsserver

热门问题