如何忽略在另一个任务的run()中触发的Luigi任务失败

2024-10-01 19:25:22 发布

您现在位置:Python中文网/ 问答频道 /正文

考虑以下任务:

import luigi


class YieldFailTaskInBatches(luigi.Task):
    def run(self):
        for i in range(5):
            yield [
                FailTask(i, j)
                for j in range(2)
            ]


class YieldAllFailTasksAtOnce(luigi.Task):
    def run(self):
        yield [
            FailTask(i, j)
            for j in range(2)
            for i in range(5)
        ]

class FailTask(luigi.Task):
    i = luigi.IntParameter()
    j = luigi.IntParameter()

    def run(self):
        print("i: %d, j: %d" % (self.i, self.j))
        if self.j > 0:
            raise Exception("i: %d, j: %d" % (self.i, self.j))

如果j > 0,则{}失败。YieldFailTaskInBatches在for循环中多次生成FailTask,而YieldAllFailTasksAtOnce生成数组中的所有任务。在

如果我运行YieldFailTaskInBatches,Luigi将运行第一个循环中生成的任务,当其中一个失败时(i = 0, j = 1),Luigi不会生成其余的任务。如果我运行YieldAllFailTasksAtOnce,那么Luigi将按预期运行所有任务。在

我的问题是:我如何告诉Luigi继续运行YieldFailTasksInBatches上剩余的任务,即使有些任务失败了?有可能吗?在

我问的原因是我有大约40万个任务要触发。我不想一次触发它们,因为这会让Luigi花费太多时间构建每个任务的需求(它们可能有1到400个需求)。我目前的解决方案是成批生成它们,一次只生成几个,但是如果其中任何一个失败,任务就会停止,其余的也不会产生。在

如果实现了this issue似乎可以解决这个问题,但我想知道是否有其他方法。在


Tags: runinselffortaskdefrangeclass
1条回答
网友
1楼 · 发布于 2024-10-01 19:25:22

这很老套,但它应该做你想做的:

class YieldAll(luigi.Task):
    def run(self):
        errors = list()
        for i in range(5):
            for j in range(2):
                try:
                    FailTask(i, j).run()
                except Exception as e:
                    errors.append(e)

        if errors:
            raise ValueError(f' all traceback: {errors}')

class FailTask(luigi.Task):
    i = luigi.IntParameter()
    j = luigi.IntParameter()

    def run(self):
        print("i: %d, j: %d" % (self.i, self.j))
        if self.j > 0:
            raise Exception("i: %d, j: %d" % (self.i, self.j))

所以基本上,您在luigi上下文之外运行任务。除非输出目标,否则luigi永远不会知道任务是否已运行。在

luigi唯一知道的任务是YieldAll。如果任何YieldAll创建错误,代码将捕获它并将YieldAll任务设置为fail状态。在

相关问题 更多 >

    热门问题