在所有任务完成后运行任务

2024-10-06 12:25:37 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在编写一个应用程序,它需要并行运行一系列任务,然后运行一个包含所有任务结果的任务:

@celery.task
def power(value, expo):
    return value ** expo

@celery.task
def amass(values):
    print str(values)

这是一个非常做作和过于简单化的例子,但希望这一点得到很好的理解。基本上,我有很多需要运行power的项,但我只想运行amass所有任务的结果。所有这些都应该异步发生,我不需要amass方法返回任何内容。

有没有人知道如何在芹菜中设置它,以便所有的操作都异步执行,并且在所有操作完成后调用一个带有结果列表的回调?

我已经将这个示例设置为使用Alexander Afanasiev推荐的chord运行:

from time import sleep

import random

tasks = []

for i in xrange(10):
    tasks.append(power.s((i, 2)))
    sleep(random.randint(10, 1000) / 1000.0) # sleep for 10-1000ms

callback = amass.s()

r = chord(tasks)(callback)

不幸的是,在上面的例子中,tasks中的所有任务都是在调用chord方法时启动的。有没有一种方法可以让每个任务单独启动,然后我可以向组中添加一个回调,以便在所有任务完成后运行?


Tags: 方法importtaskvaluedefsleeprandom例子
3条回答

以下是一个对我有用的解决方案:

任务.py

from time import sleep

import random

@celery.task
def power(value, expo):
    sleep(random.randint(10, 1000) / 1000.0) # sleep for 10-1000ms
    return value ** expo

@celery.task
def amass(results, tasks):
    completed_tasks = []
    for task in tasks:
        if task.ready():
            completed_tasks.append(task)
            results.append(task.get())

    # remove completed tasks
    tasks = list(set(tasks) - set(completed_tasks))

    if len(tasks) > 0:
        # resend the task to execute at least 1 second from now
        amass.delay(results, tasks, countdown=1)
    else:
        # we done
        print results

用例:

tasks = []

for i in xrange(10):
    tasks.append(power.delay(i, 2))

amass.delay([], tasks)

这个应该做的是尽快异步启动所有任务。一旦它们都被发布到队列中,amass任务也将被发布到队列中。在所有其他任务完成之前,amass任务将一直重新部署自己。

芹菜有plenty of tools的大多数工作流程,你可以想象。

似乎你需要使用chord。以下是来自docs的报价:

A chord is just like a group but with a callback. A chord consists of a header group and a body, where the body is a task that should execute after all of the tasks in the header are complete.

从你的问题来看,这段代码看起来像是传递了一个list作为和弦头,而不是group

from time import sleep
import random

tasks = []

for i in xrange(10):
    tasks.append(power.s((i, 2)))
    sleep(random.randint(10, 1000) / 1000.0) # sleep for 10-1000ms

callback = amass.s()

r = chord(tasks)(callback)

list转换为group应该会导致预期的行为:

...

callback = amass.s()

tasks = group(tasks)

r = chord(tasks)(callback)

相关问题 更多 >