带消息传递的Tornado IO循环处理池

Looppool的Python项目详细描述


环池

looppool是一个python 3包,用于运行tornado io循环的工作进程池。它很有用 对于大型异步应用程序,由于 CPU使用率并遭受IO循环阻塞(请参见设置阻塞日志阈值 [1] )。

它是作为龙卷风数据提取应用程序性能优化的一部分而开发的。 应用程序混合了IO绑定和CPU绑定的任务。此外,CPU限制的任务非常高 再加上IO回路。因为这样的耦合, concurrent.futures [3] 没有帮助。

设计

价值一千字的图片。

https://bytebucket.org/saaj/looppool/raw/default/manual/overview.png

一些观察和注意事项:

  1. 消息立即卸载到IO循环,但任务队列和结果队列 受信号量保护,不会在IO循环上卸载超过队列大小的任务。
  2. 事实上,有一个任务队列,每个工人一个队列。任务消息已分发 平均分配给工人。
  3. add_callback [2] 是从其他方传递控制的安全(也是唯一安全)方法 线程到IO循环的线程。
  4. 因为队列消息处理程序( fn1 fn2 )是从io循环调用的 它们可以是联程旅行。
  5. 通过向每个工人发送 毒丸 任务消息来停止其工人。

工人

有一个工人的基类。它表示运行自己的进程工作进程 龙卷风IO回路。它处理任务消息并将它们放入结果队列(或者只放入 接受任何东西)。它需要重写子类中的 \u process\u消息(self,task) 。 并要求任务完成后,调用 self.\u task\u done() (直接在 最后尝试 或完成任务 ,例如:

def _process_message(self, task):
    try:
        result = 'some processing'
        self._put_nowait(result)
    finally:
        self._task_done()

WorkerSubClass.\u process\u消息 可以是普通函数或协同程序。更多细节如下 可在软件包的单元测试模块中获得。 < div > 注

对于 pool.process\u消息,还需要调用 result\u done 参数 一旦消息完成。在简单的情况下,当 结果是在返回 process_消息时考虑摄取

这两个需求都绑定到限制运行任务和挂起结果的信号量。 它还影响池在等待运行任务完成时的停止方式。

有状态工作者

例如,如果要运行有状态的工作进程,请使用一些定期计算的查找表, 但不想在每个工作人员中计算它(例如,维护数据库连接的负担)。 您可以发送 n 任务,并且每个 n 工作人员都保证会收到它。

您还可以单独向员工发送消息。 pool.put_nowait 有可选参数 工人数量

< div > 注

如果您的流程启动方法 [4] fork (在*nix平台上是默认的),则可以共享 来自父进程的一些静态数据。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
virtualbox无法从java移动共享文件夹中的文件   java如何连接Android 4.3.5(GA)的apache HttpClient库?   片段中的java Recyclerview未立即显示警报对话框结果   javac(n,r)计算器程序不工作   java使用BooleanQuery还是编写更多索引?   如何在java中设置y/n循环?   java不兼容的通用通配符捕获   java如何在安卓xml中编写数据绑定时的三元操作条件   java如何使用FileDialog?   java如何创建单元测试来检测是否有人使用错误的编码编辑了文件?   java如何从唯一的字符串生成唯一的int?   java gradletomcatplugin:log4j:WARN找不到记录器的附加程序   java我的动态编程解决方案(Kefa和第一步)在codeforces中有什么问题?   java每天更新两个数据库,使它们都包含相同的有效数据集   java如何检查给定的时间是否在时间限制之间   java在单个json POST上保存父级和子级   java如何获取Solr字段类型