我想在Tornado框架中实现一个基于web套接字的服务。当用户关闭web套接字时,我想通知其他用户这一点。然而,on_close
显然是一个阻塞函数,而我的_broadcast(str) -> None
函数是异步的
我怎样才能调用这个函数呢
from tornado import websocket
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class SocketHandler(websocket.WebSocketHandler):
async def open(self, *args, conns, **kwargs):
logger.info(f"Opened a new connection to client {id(self)}")
self._conns = conns
async def on_message(self, message):
logger.info(f"Client {id(self)} sent message: {message}")
await self._broadcast(message)
def on_close(self):
logger.info(f"Client {id(self)} has left the scene")
self._conns.remove(self)
self._broadcast("something") # TODO
async def _broadcast(self, msg):
for conn in self._conns:
try:
await conn.write_message(msg)
except websocket.WebSocketClosedError:
pass
app = web.Application([
(r'/ws', SocketHandler)
])
if __name__ == '__main__':
app.listen(9000)
ioloop.IOLoop.instance().start()
我认为使用
asyncio.Queue
的解决方案应该适合您我制作了一个小班作为一个模型来测试这一点:
从那里开始,您需要在从tornado获得的
ioloop
中运行monitor_on_close
。我从未使用过tornado,但我认为在__main__
块中添加类似的内容应该可以:您正在寻找的简单解决方案是在调用协同程序时使用
asyncio.create_task
:(此函数的旧Tornado版本为
tornado.gen.convert_yielded
,但现在Tornado和asyncio已经集成,没有理由不将asyncio版本用于本机协同路由)但是对于这个特殊的问题,在
_broadcast
函数中使用await
并不理想。等待write_message
用于提供流量控制,但是create_task
对await
提供的背压没有任何用处。(write_message
是相当不寻常的,因为完全支持使用和不使用await
来调用它)。事实上,它对错误的事情施加了反压力——一个缓慢的连接将减缓对所有其他连接的通知因此,在这种情况下,我建议将
_broadcast
设为常规同步函数:如果您想更好地控制内存使用(通过
await write_message
提供的流控制),您需要一个更复杂的解决方案,可能需要为每个连接使用一个有界队列(在on_close
中,使用put_nowait
将消息添加到每个连接的队列中,然后使用一个任务从队列中读取并写入消息await write_message
)相关问题 更多 >
编程相关推荐