线程和多处理池,通过异步处理任务
atasker的Python项目详细描述
ATasker
用于现代线程/多处理池和任务处理的python库 通过异步。
无论代码是如何编写的,atasker都会自动检测阻塞 函数和协同程序,并以适当的方式在线程中启动它们, 异步循环或在多处理池中。
任务被分组到池中。如果池中没有空间,则正在放置任务 按照他们的优先顺序排队。游泳池还为 优先级为“正常”或更高的任务。具有“关键”优先级的任务是 总是立即执行。
如果您有一个具有许多类似任务的项目 产生近似相等的CPU /内存负载,例如API响应 资源状态更新等。
安装
pip3 install atasker
来源:https://github.com/alttch/atasker
文档:https://atasker.readthedocs.io/
为什么
异步编程是使代码快速可靠的完美方法
多线程编程是在 背景
atasker结合了这两种方法的优点:atasker任务在单独的 但是,线程任务主管和工作线程是完全异步的。但是 他们所有的公共方法都是线程安全的。
为什么不使用标准的python线程池呢?
- 标准池中的线程没有优先级
- 工人
为什么不使用标准的异步循环?
- 与阻塞函数的兼容性
- 异步工作者
为什么不同时期货?
concurrent.futures是一个很棒的标准python库,它允许您 在工人池中执行指定的任务。
atasker方法background\u task解决了同样的问题,但是 以不同的方式,向任务添加优先级,而atasker工作人员执行 完全不同的工作:
在concurrent.futures中,worker是执行单个 指定的任务。
在atasker中,worker是一个对象,它连续地生成新任务 在指定的时间间隔或外部事件上,并在线程中执行它们 或多处理池。
代码示例
启动/停止
fromataskerimporttask_supervisor# set pool sizetask_supervisor.set_thread_pool(pool_size=20,reserve_normal=5,reserve_high=5)task_supervisor.start()# ...# start workers, other threads etc.# ...# optionally block current threadtask_supervisor.block()# stop from any threadtask_supervisor.stop()
背景任务
fromataskerimportbackgroundtask,TASK_LOW,TASK_HIGH# with annotation@background_taskdefmytask():print('I am working in the background!')mytask()# with manual decorationdefmytask2():print('I am working in the background too!')background_task(mytask2,priority=TASK_HIGH)()
异步任务
# new asyncio loop is automatically created in own threada1=task_supervisor.create_aloop('myaloop',default=True)asyncdefcalc(a):print(a)awaitasyncio.sleep(1)print(a*2)returna*3# call from sync code# put coroutine and forgetbackground_task(calc)(1)# get coroutine resultresult=a1.run(calc(1))
工人示例
fromataskerimportbackground_worker,TASK_HIGH@background_workerdefworker1(**kwargs):print('I am a simple background worker')@background_workerasyncdefworker_async(**kwargs):print('I am async background worker')@background_worker(interval=1)defworker2(**kwargs):print('I run every second!')@background_worker(queue=True)defworker3(task,**kwargs):print('I run when there is a task in my queue')@background_worker(event=True,priority=TASK_HIGH)defworker4(**kwargs):print('I run when triggered with high priority')worker1.start()worker_async.start()worker2.start()worker3.start()worker4.start()worker3.put('todo1')worker4.trigger()fromataskerimportBackgroundIntervalWorkerclassMyWorker(BackgroundIntervalWorker):defrun(self,**kwargs):print('I am custom worker class')worker5=MyWorker(interval=0.1,name='worker5')worker5.start()