实现luigi动态图配置

2024-09-29 17:19:17 发布

您现在位置:Python中文网/ 问答频道 /正文

我是luigi的新手,在为我们的ML工作设计管道时遇到了它。虽然它不适合我的特定用例,但它有很多额外的特性,我决定让它适合。在

基本上,我所寻找的是一种能够持久化定制管道的方法,从而使其结果可重复且更易于部署,在阅读了大部分在线教程之后,我尝试使用现有的luigi.cfg配置和命令行机制来实现序列化,它可能已经满足了任务的参数要求,但它没有提供序列化我的管道的DAG连接的方法,所以我决定让一个WrapperTask接收一个json config file,它将创建所有的任务实例并连接luigi任务的所有输入输出通道(完成所有管道)。在

兹随函附上一个小型测试程序供您审查:

import random
import luigi
import time
import os


class TaskNode(luigi.Task):
    i = luigi.IntParameter()  # node ID

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.required = []

    def set_required(self, required=None):
        self.required = required  # set the dependencies
        return self

    def requires(self):
        return self.required

    def output(self):
        return luigi.LocalTarget('{0}{1}.txt'.format(self.__class__.__name__, self.i))

    def run(self):
        with self.output().open('w') as outfile:
            outfile.write('inside {0}{1}\n'.format(self.__class__.__name__, self.i))
        self.process()

    def process(self):
        raise NotImplementedError(self.__class__.__name__ + " must implement this method")


class FastNode(TaskNode):

    def process(self):
        time.sleep(1)


class SlowNode(TaskNode):

    def process(self):
        time.sleep(2)


# This WrapperTask builds all the nodes 
class All(luigi.WrapperTask):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        num_nodes = 513

        classes = TaskNode.__subclasses__()
        self.nodes = []
        for i in reversed(range(num_nodes)):
            cls = random.choice(classes)

            dependencies = random.sample(self.nodes, (num_nodes - i) // 35)

            obj = cls(i=i)
            if dependencies:
                obj.set_required(required=dependencies)
            else:
                obj.set_required(required=None)

            # delete existing output causing a build all
            if obj.output().exists():
                obj.output().remove()  

            self.nodes.append(obj)

    def requires(self):
        return self.nodes


if __name__ == '__main__':
    luigi.run()

因此,基本上,正如问题标题中所述,这主要关注于动态依赖关系,并使用p=1/35 connectivity probability生成{},它还将All(如make All)类定义为一个WrapperTask,它需要构建所有节点,以便将其视为完成(我有一个版本,它只将它连接到连接的DAG组件的头部,但是不想把事情搞得太复杂)。在

有没有更标准的(Luigic)方法来实现这一点?特别注意TaskNodeinit和set_-required方法的复杂程度并不高,我之所以这样做是因为init方法中的接收参数与luigi注册参数的方式存在某种冲突。我也尝试了其他几种方法,但这基本上是最体面的一种(有效)

如果没有一个标准的方法,我还是很想听听你对我在完成框架实施之前计划走的路有什么见解。在


Tags: 方法importselfobjoutput管道initdef
1条回答
网友
1楼 · 发布于 2024-09-29 17:19:17

我昨天带了一个演示。我几乎完全基于example in the docs.。在文档中,通过yeild分配动态依赖项似乎是他们喜欢的方式。在

luigi.Config和动态依赖关系可能会给您提供几乎无限的灵活性。它们还描述了一个调用多个依赖链here,的伪Task,它可以给你更多的控制权。在

相关问题 更多 >

    热门问题