带消息传递的Tornado IO循环处理池
Looppool的Python项目详细描述
环池
looppool是一个python 3包,用于运行tornado io循环的工作进程池。它很有用 对于大型异步应用程序,由于 CPU使用率并遭受IO循环阻塞(请参见设置阻塞日志阈值 [1] )。
它是作为龙卷风数据提取应用程序性能优化的一部分而开发的。 应用程序混合了IO绑定和CPU绑定的任务。此外,CPU限制的任务非常高 再加上IO回路。因为这样的耦合, concurrent.futures [3] 没有帮助。
设计
价值一千字的图片。
一些观察和注意事项:
- 消息立即卸载到IO循环,但任务队列和结果队列 受信号量保护,不会在IO循环上卸载超过队列大小的任务。
- 事实上,有一个任务队列,每个工人一个队列。任务消息已分发 平均分配给工人。
- add_callback [2] 是从其他方传递控制的安全(也是唯一安全)方法 线程到IO循环的线程。
- 因为队列消息处理程序( fn1 和 fn2 )是从io循环调用的 它们可以是联程旅行。
- 池 通过向每个工人发送 毒丸 任务消息来停止其工人。
工人
有一个工人的基类。它表示运行自己的进程工作进程 龙卷风IO回路。它处理任务消息并将它们放入结果队列(或者只放入 接受任何东西)。它需要重写子类中的 \u process\u消息(self,task) 。 并要求任务完成后,调用 self.\u task\u done() (直接在 最后尝试 或完成任务 ,例如:
def _process_message(self, task): try: result = 'some processing' self._put_nowait(result) finally: self._task_done()
WorkerSubClass.\u process\u消息 可以是普通函数或协同程序。更多细节如下 可在软件包的单元测试模块中获得。 < div > 注
对于 pool.process\u消息,还需要调用 result\u done 参数 一旦消息完成。在简单的情况下,当 结果是在返回 process_消息时考虑摄取
这两个需求都绑定到限制运行任务和挂起结果的信号量。 它还影响池在等待运行任务完成时的停止方式。
有状态工作者
例如,如果要运行有状态的工作进程,请使用一些定期计算的查找表, 但不想在每个工作人员中计算它(例如,维护数据库连接的负担)。 您可以发送 n 任务,并且每个 n 工作人员都保证会收到它。
您还可以单独向员工发送消息。 pool.put_nowait 有可选参数 工人数量
< div > 注如果您的流程启动方法 [4] 是 fork (在*nix平台上是默认的),则可以共享 来自父进程的一些静态数据。