Luigi能否传播异常或返回任何结果?

2024-10-02 14:21:18 发布

您现在位置:Python中文网/ 问答频道 /正文

我在用Luigi来启动管道。 让我们举一个简单的例子

task = myTask()
w = Worker(scheduler=CentralPlannerScheduler(), worker_processes=1)
w.add(task)
w.run()

现在假设myTask在执行期间引发异常。我所能得到的只是一个来自luigi的日志显示了这个异常。在

luigi有什么方法可以传播它或者至少返回一个failure状态吗?在

然后我就可以让我的程序在这个状态的函数中做出反应。在

谢谢。在

编辑 我在存储结果时忘了指定luigi的输出是针对数据库的。如果引发异常,则不会存储任何结果,但不会将异常传播到luigi。我想知道路易吉有没有这个选择。在


Tags: 方法runaddtask管道状态scheduler例子
2条回答

你能做的就是在文件中写一个错误。例如,在可能失败的任务中(我们称之为TaskA):

x=""
try:
    do stuff
except:
    x="error!"
with open('errorfile.log','w') as f:
    f.write(x)

然后,在依赖于该错误的任务中,该任务将需要TaskA。你可以这样做:

^{2}$

来自docs

Luigi has a built-in event system that allows you to register callbacks to events and trigger them from your own tasks. You can both hook into some pre-defined events and create your own. Each event handle is tied to a Task class and will be triggered only from that class or a subclass of it. This allows you to effortlessly subscribe to events only from a specific class (e.g. for hadoop jobs).

示例:

import luigi

from my_tasks import MyTask


@MyTask.event_handler(luigi.Event.FAILURE)
def mourn_failure(task, exception):
    """Will be called directly after a failed execution
    of `run` on any MyTask subclass
    """

    do_something()


luigi.run()

路易吉有一个lot of events you can choose from。你也可以看看this tests来学习如何倾听和回应其他事件。在

相关问题 更多 >