芹菜的复杂工作流

2024-10-03 06:23:14 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试建立一个基于芹菜的工作流。我用组和弦。在

在下面的例子中,有独立的组([mytask1,mytask1,mytask1,…]->;myfinaltask1),其中mytask1可以并行执行,但是myfinaltask1应该在每个组之后调用。在

代码:

def func1(date):
    subtasks = []
    for filepath in all_files:
        kwargs = {'date': date, 'hfile': filepath}
        subtask = mytask1.subtask(kwargs=kwargs)
        subtasks.append(subtask)

    chrd = chord(subtasks)
    chrdr = chrd(myfinaltask1.s(kwargs={'date': date}))
    return chrdr


def main(all_dates):
    subtasks = []
    for ad in all_dates:
        subtasks.append(func1(ad))

    g = group(subtasks)
    gr = g.apply_async()
    results = gr.get(propagate=False)  # sync wait!


main([2014, 2015, 2016])

引发异常:

^{pr2}$

我做错了什么?在


Tags: infordatedefallkwargsappendfilepath
1条回答
网友
1楼 · 发布于 2024-10-03 06:23:14

似乎您忘了将subtasks包装到group。在

def func1(date):
    subtasks = []
    for filepath in all_files:
        kwargs = {'date': date, 'hfile': filepath}
        subtask = mytask1.subtask(kwargs=kwargs)
        subtasks.append(subtask)

    chrd = chord(header=group(subtasks), body=myfinaltask1.subtask(kwargs={'date': date}))
    return chrdr

相关问题 更多 >