芹菜任务树模块

celery-tasktree的Python项目详细描述


芹菜任务树是一个帮助执行芹菜任务树的模块。 以特定顺序异步地。当 任务和依赖项的数量增长,以及基于回调的简单方法 变得难以理解和维护。

使用示例

from celery_tasktree import task_with_callbacks, TaskTree

@task_with_callbacks
def some_action(...):
    ...

def execute_actions():
    tree = TaskTree()
    task0 = tree.add_task(some_action, args=[...], kwargs={...})
    task1 = tree.add_task(some_action, args=[...], kwargs={...})
    task10 = task1.add_task(some_action, args=[...], kwargs={...})
    task11 = task1.add_task(some_action, args=[...], kwargs={...})
    task110 = task11.add_task(some_action, args=[...], kwargs={...})
    async_result = tree.apply_async()
    return async_result

应该使用名为task_with_callbacks的装饰符,而不是简单的芹菜 task装饰器。

根据代码:

  • task0和task1同时执行
  • task10和task11在task1之后同时执行
  • task110在task11之后执行

注意事项:

  • 无法阻止执行的传播,也无法 从祖先向子任务传递额外的参数。简而言之,只有一个 任务之间的依赖关系:执行顺序的依赖关系。

  • 如果子任务(函数)返回值是对象,则名为 “异步结果”将被添加到该对象中,以便 使用join()收集有序的任务结果。扩展前面的示例:

    async_result = execute_actions()
    task0_result, task1_result = async_result.join()
    task10_result, task11_result = task1_result.async_result.join()
    task110_result = task11_result.async_result.join()
    

子类化带有回调的芹菜.task.task

@taskdecorator装饰函数是最简单的,但不是唯一的 创建新的Task子类的一种方法。有时更方便 将泛型celery.task.Task类划分为子类,并重新定义其run()方法。 为了使这样的类与tasktree兼容,run应该用 celery_tasktree.run_with_callbacks装饰器。下面的例子 说明了这种方法:

from celery.task import Task
from celery_tasktree import run_with_callbacks, TaskTree

class SomeActionTask(Task):

    @run_with_callbacks
    def run(self, ...):
        ...

def execute_actions():
    tree = TaskTree()
    task0 = tree.add_task(SomeActionTask, args=[...], kwargs={...})
    task01 = task0.add_task(SomeActionTask, args=[...], kwargs={...})
    tree.apply_async()

将tasktree用作简单队列

在很多情况下,一个完全成熟的任务树对你来说是过分的。你们所有人 需要将两个或多个任务添加到队列中,以确保它们将 按顺序执行。允许此任务树有push()pop() 方法实际上只是add_task()周围的包装器。 push()方法将新任务作为子任务添加到已创建的任务中。 而pop()从任务堆栈的尾部移除并返回任务。 用法示例如下:

# create the tree
tree = TaskTree()
# push a number of tasks into it
tree.push(action1, args=[...], kwargs={...})
tree.push(action2, args=[...], kwargs={...})
tree.push(actionX, args=[...], kwargs={...})
tree.pop() # get back action X from the queue
tree.push(action3, args=[...], kwargs={...})
# apply asynchronously
tree.apply_async()

操作将按action1 -> action2 -> action3顺序执行。

任务树外具有回调的任务

task_with_callbacksdecorator本身可能很有用。它装饰着 功能与普通的task芹菜装饰器相同,但是 添加可选的callback参数。

回调可以是子任务或子任务列表(而不是任务集)。背后 场景中,当调用带有回调的任务时,它执行函数的主代码, 然后构建一个任务集,异步调用它并附加 TaskSetResut作为函数返回名为async_result的属性 价值。

下面提供了一个简单的示例:

from celery_tasktree import task_with_callbacks

@task_with_callbacks
def some_action(...):
    ...

cb1 = some_action.subtask(...)
cb2 = some_action.subtask(...)
async_result = some_action.delay(..., callback=[cb1, cb2])
main_result = async_result.wait()
cb1_result, cb2_result = main_result.async_result.join()

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
Java 2D数组字符串或字符管理   java构建循环   java在S3桶上迭代   java通过数字id获取枚举的最佳方法   java设置水平回收视图   java使用从完整Maven项目生成的JAR安装一个模块   java背压不适用于groupBy运算符   java a4j:带有大型mp3文件的媒体输出   性能—在困难的场景中高效地对Java集合进行排序   spring Java泛型和枚举,模板参数丢失   java JavaFX HTMLEditor超链接   java Android Store变量及其在webview链接中的使用   如何在Java中实现Scala apply方法   java Lombok@Data annotation项目是否创建了任何类型的构造函数?