在Tornado中的阻塞上下文中调用异步函数

2024-09-22 18:25:20 发布

您现在位置:Python中文网/ 问答频道 /正文

我想在Tornado框架中实现一个基于web套接字的服务。当用户关闭web套接字时,我想通知其他用户这一点。然而,on_close显然是一个阻塞函数,而我的_broadcast(str) -> None函数是异步的

我怎样才能调用这个函数呢

from tornado import websocket

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class SocketHandler(websocket.WebSocketHandler):
    async def open(self, *args, conns, **kwargs):
        logger.info(f"Opened a new connection to client {id(self)}")
        self._conns = conns

    async def on_message(self, message):
        logger.info(f"Client {id(self)} sent message: {message}")
        await self._broadcast(message)

    def on_close(self):
        logger.info(f"Client {id(self)} has left the scene")
        self._conns.remove(self)
        self._broadcast("something")  # TODO

    async def _broadcast(self, msg): 
        for conn in self._conns: 
            try:
                await conn.write_message(msg)
            except websocket.WebSocketClosedError:
                pass


app = web.Application([
    (r'/ws', SocketHandler)
])

if __name__ == '__main__':
    app.listen(9000)
    ioloop.IOLoop.instance().start()

Tags: 函数用户selfinfowebidmessageasync
2条回答

我认为使用asyncio.Queue的解决方案应该适合您

我制作了一个小班作为一个模型来测试这一点:

import asyncio
import time


class Thing:
    on_close_q = asyncio.Queue()

    def __init__(self):
        self.conns = range(3)

    def on_close(self, id):
        time.sleep(id)
        print(f'closing {id}')
        self.on_close_q.put_nowait((self, id))

    async def notify(self, msg):
        print('in notify')
        for conn in range(3):
            print(f'notifying {conn} {msg}')


async def monitor_on_close():
    print('monitoring')
    while True:
        instance, id = await Thing.on_close_q.get()
        await instance.notify(f'{id} is closed')

从那里开始,您需要在从tornado获得的ioloop中运行monitor_on_close。我从未使用过tornado,但我认为在__main__块中添加类似的内容应该可以:

    ioloop.IOLoop.current().add_callback(monitor_on_close) 

您正在寻找的简单解决方案是在调用协同程序时使用asyncio.create_task

def on_close(self):
    logger.info(f"Client {id(self)} has left the scene")
    self._conns.remove(self)
    asyncio.create_task(self._broadcast("something"))

(此函数的旧Tornado版本为tornado.gen.convert_yielded,但现在Tornado和asyncio已经集成,没有理由不将asyncio版本用于本机协同路由)

但是对于这个特殊的问题,在_broadcast函数中使用await并不理想。等待write_message用于提供流量控制,但是create_taskawait提供的背压没有任何用处。(write_message是相当不寻常的,因为完全支持使用和不使用await来调用它)。事实上,它对错误的事情施加了反压力——一个缓慢的连接将减缓对所有其他连接的通知

因此,在这种情况下,我建议将_broadcast设为常规同步函数:

def _broadcast(self, msg): 
    for conn in self._conns: 
        try:
            conn.write_message(msg)
        except websocket.WebSocketClosedError:
            pass

如果您想更好地控制内存使用(通过await write_message提供的流控制),您需要一个更复杂的解决方案,可能需要为每个连接使用一个有界队列(在on_close中,使用put_nowait将消息添加到每个连接的队列中,然后使用一个任务从队列中读取并写入消息await write_message

相关问题 更多 >