擅长:python、mysql、java
<p>您正在寻找的简单解决方案是在调用协同程序时使用<code>asyncio.create_task</code>:</p>
<pre><code>def on_close(self):
logger.info(f"Client {id(self)} has left the scene")
self._conns.remove(self)
asyncio.create_task(self._broadcast("something"))
</code></pre>
<p>(此函数的旧Tornado版本为<code>tornado.gen.convert_yielded</code>,但现在Tornado和asyncio已经集成,没有理由不将asyncio版本用于本机协同路由)</p>
<p>但是对于这个特殊的问题,在<code>_broadcast</code>函数中使用<code>await</code>并不理想。等待<code>write_message</code>用于提供流量控制,但是<code>create_task</code>对<code>await</code>提供的背压没有任何用处。(<code>write_message</code>是相当不寻常的,因为完全支持使用和不使用<code>await</code>来调用它)。事实上,它对错误的事情施加了反压力——一个缓慢的连接将减缓对所有其他连接的通知</p>
<p>因此,在这种情况下,我建议将<code>_broadcast</code>设为常规同步函数:</p>
<pre><code>def _broadcast(self, msg):
for conn in self._conns:
try:
conn.write_message(msg)
except websocket.WebSocketClosedError:
pass
</code></pre>
<p>如果您想更好地控制内存使用(通过<code>await write_message</code>提供的流控制),您需要一个更复杂的解决方案,可能需要为每个连接使用一个有界队列(在<code>on_close</code>中,使用<code>put_nowait</code>将消息添加到每个连接的队列中,然后使用一个任务从队列中读取并写入消息<code>await write_message</code>)</p>