一种自适应线程池实现
atp的Python项目详细描述
一个自适应线程池实现,为您提供可配置的最小值 应用程序生存期内始终可用的线程数。 此实现通过具有池卷来适应线程消耗率 监视线程增加工作线程数以适应任务速率 提交到池中。由于 任务不足将被终止
安装
安装它很简单,您所要做的就是:
pip install atp
用法
在使用任何helper函数之前,需要初始化线程池 或传递要由工人处理的任务。有两种使用方法 库中,可以初始化全局线程池或初始化本地线程池 线程池:
import atp # The ``min_workers`` argument specifies the minimum number of workers in the # pool, the default is 2. # The ``stack_size`` argument specifies the stack size of each thread in KiB, # the default value is 512. # Initialize the global thread pool atp.GlobalThreadPool(min_workers = 2, stack_size = 512) # Or initialize a local thread pool tp = atp.ThreadPool(min_workers = 2, stack_size = 512)
有两个接口可将任务传递给工作人员。您可以创建一个Task 手动初始化并将其传递到线程池或使用帮助函数:
import atp # Create a task manually. task = atp.Task(target=<callable>, success=<callable>, args=(1,2,3,), kwargs={'a':1, 'b':2}) atp.GlobalThreadPool().run(task) # Run a task through helpers. Note that you can pass your own local thread-pool # to the helper function through the ``pool`` argument. The ``args`` and ``kwargs`` # are the arguments and keyword arguments passed to the target functions. task = atp.async_call(<callable>, success=<callable>, args=(1,2,3,), kwargs={'a':1, 'b':2}, pool=atp.GlobalThreadPool())
有两种类型的任务可以传递到线程池;一次性运行任务 一个无限的任务前者是当您需要一个工作人员只运行一次任务时 但后者是当您需要无限次地运行一个任务时:
import atp import time def print_string(string): print string def caps(string): return string.upper() task = atp.async_call(caps, 'hello world!', success=print_string) time.sleep(0.1) # 100ms
将无限期运行的任务将作为第一个参数传递给 如果你希望它停止,请查看:
import atp import time result = [1] def increase(kill, arg): while not kill.is_set() and arg[-1] < 100: arg.append(arg[-1] + 1) task = atp.async_call(increase, result, infinite=True) time.sleep(0.1) # 100ms task.stop() assert len(result) == 100
如果任务引发未处理的异常,则失败回调将捕获 异常并将其包装在一个Failure类中,在该类中可以访问 异常的详细信息。如果失败回调引发未处理的错误,则 捕获并记录:
import atp import time import logging logging.getLogger().addHandler(logging.StreamHandler()) def will_fail(): throw RuntimeError("fake error") def catch_fail(error): throw error.exception, error.message, error.traceback task = atp.async_call(will_fail, failure=catch_fail) time.sleep(0.1) # 100ms
黑客攻击
想在atp上看到什么吗?您可以访问issue tracker 检查是否以前报告过,如果没有,鼓励您创建 先讨论的问题或特征请求。当你准备好贡献 代码或文档从github处的code repository分叉。
要开始克隆您的fork并设置您的环境:
$ git clone git@github.com:<your username>/atp.git $ cd atp/ $ virtualenv venv $ source venv/bin/activate $ python setup.py develop
复制
本软件的免费使用是根据GNU公众条款授予的 许可证(GPLv3+)。有关详细信息,请参见此文件中包含的LICENSE文件 分配。