重用Luigi中的通用任务

2024-09-28 22:23:03 发布

您现在位置:Python中文网/ 问答频道 /正文

我很难理解如何在Luigi中创建可重用的任务,然后在具体情况下使用它们。在

例如。我有两个通用任务,对文件执行某些操作,然后输出结果:

class GffFilter(luigi.Task):
    "Filters a GFF file to only one feature"
    feature = luigi.Parameter()
    out_file = luigi.Parameter()
    in_file = luigi.Parameter()
    ...

class BgZip(luigi.Task):
    "bgZips a file"
    out_file = luigi.Parameter()
    in_file = luigi.Parameter()
    ...

现在,我需要一个工作流,它首先过滤,然后使用以下任务bgzip特定文件:

^{pr2}$

但这很尴尬。在第一个任务中,我没有run方法,我只是使用依赖关系来使用通用任务。这是对的吗?我应该在这里使用继承吗?在

然后在第二个任务中,我不能使用依赖项,因为我需要FilterSomeFile的输出才能使用BgZip。但是使用动态依赖关系似乎是错误的,因为luigi无法构建一个正确的依赖关系图。在

我应该如何从我的一般任务中创建一个Luigi工作流?在


Tags: 文件intaskparameter关系outfiltersfeature
1条回答
网友
1楼 · 发布于 2024-09-28 22:23:03

But this is awkward. In the first task I have no run method, and I'm just using dependencies to use the generic task. Is this correct?

是的,根据this pageWrapperTask是一个虚拟任务,其目的是定义一个任务工作流,因此它本身不执行任何操作。相反,通过定义几个需求,这个任务将在requires方法中列出的每个需求都已完成时完成。这个WrapperTask与常规的Task的主要区别是,您不需要定义一个输出方法来表示这个任务成功了,如here所示。在

Then in the second task, I can't use dependencies, because I need the output from FilterSomeFile in order to use BgZip. But using dynamic dependencies seems wrong, because luigi can't build a proper dependency graph.

理论上,您可以使FilterSomeFile具有与GffFilter相同的输出,使BgZipSomeFile需要{},然后使用BgZipSomeFile.run中的FilterSomeFile.output()来访问压缩文件。然而,这种解决方案有些奇怪,因为:

  • 包装器任务只“运行”1个其他任务,因此可以直接使用包装好的任务,而不必创建WrapperTask。更好地使用WrapperTaskBgZipSomeFile和{}合并到WrapperTask

    的单个子类中
  • 正在run方法中实例化Task。但在这个问题中不需要动态的结果。

  • 最后,GffFilter的输入被硬编码在FilterSomeFile任务中,这使得工作流不那么有用。这可以通过使WrapperClass仍然接收参数并将这些参数传递给其需求来避免。

更好的解决方案是:

import luigi as lg

class A(lg.Task):
    inFile = lg.Parameter()
    outFile = lg.Parameter()

    def run(self,):
        with open(self.inFile, "r") as oldFile:
            text = oldFile.read()

        text  += "*" * 10 + "\n" + "This text was added by task A.\n" + "*" * 10 + "\n"
        print(text)
        with open(self.outFile, "w") as newFile:
            newFile.write(text)

    def output(self,):
        return lg.LocalTarget(self.outFile)

class B(lg.Task):
    inFile = lg.Parameter()
    outFile = lg.Parameter()

    def run(self,):
        with open(self.inFile, "r") as oldFile:
            text = oldFile.read()

        text  += "*" * 10 + "\n" + "This text was added by task B.\n" + "*" * 10 + "\n"

        with open(self.outFile, "w") as newFile:
            newFile.write(text)

    def output(self,):
        return lg.LocalTarget(self.outFile)

class CustomWorkflow(lg.WrapperTask):
    mainOutFile = lg.Parameter()
    mainInFile = lg.Parameter()
    tempFile = "/tmp/myTempFile.txt"
    def requires(self,):
        return [    A(inFile = self.mainInFile, outFile = self.tempFile),
                    B(inFile = self.tempFile, outFile = self.mainOutFile)
                ]

此代码可以在命令行中运行:

^{pr2}$

相关问题 更多 >