一个任务运行N个任务,等待它们并处理结果

2024-09-24 22:31:22 发布

您现在位置:Python中文网/ 问答频道 /正文

我需要以下工作流程:

  • ParentTask首先运行
  • 在某个时刻,它会产生ChildTask的N个实例,这些实例并行运行
  • ParentTask等待这些完成,收集结果,以某种方式处理它们并完成

这似乎很容易。不幸的是,从任务中调用Task().delay()(我用它来调用任务)似乎完全被忽略了。我在这里完全迷路了。在

如果您更喜欢代码方法,我也包括它。在

from celery.task import Task
from celery.result import AsyncResult

class ParentTask(Task):
    def run(self, *args, **kwargs):
        # do some stuff
        ids = [ChildTask().delay().id for _ in range(N)] # this seems to do nothing here
        results = [AsyncResult(t) for t in ids]
        while not all([r.ready() for r in results]): # wait for child tasks to finish
            sleep(.100)
        # do some stuff again
        # return results

class ChildTask(Task):
    def run(self, *args, **kwargs):
        # do some child stuff
        # return child results

ParentTask().delay() # this delay works fine

谢谢你的任何线索!在


Tags: 实例infromimportchildfortasksome
1条回答
网友
1楼 · 发布于 2024-09-24 22:31:22

好的,我知道了。工作方法可以是这样的(当然,任务可以做任何需要的东西):

from time import sleep
from celery.task import Task
from celery import chain, group

class PreTask(Task):
    def run(self, *args, **kwargs):
        x = 0
        for i in range(100000):
            x += 1
        return x


class MidTask(Task):
    def run(self, *args, **kwargs):
        sleep(5)
        return 42


class PostTask(Task):
    def run(self, *args, **kwargs):
        return args


# call it like this
res = chain(PreTask().s() | group(MidTask().s() for _ in range(5)) | PostTask().s()).apply_async()

# and get the result for example like this
while(True):
    if res.ready():
        print(res.get())
    sleep(1)

希望它能帮助别人。在

相关问题 更多 >