Dagster使用其输出从另一个管道启动管道

2024-05-20 17:21:15 发布

您现在位置:Python中文网/ 问答频道 /正文

我应该如何在管道a完成后启动管道B,并将管道a的输出用于管道B

一段代码作为起点:

from dagster import InputDefinition, Nothing, OutputDefinition, pipeline, solid

@solid
def pipeline1_task1(context) -> Nothing:
    context.log.info('in pipeline 1 task 1')


@solid(input_defs=[InputDefinition("start", Nothing)],
       output_defs=[OutputDefinition(str, 'some_str')])
def pipeline1_task2(context) -> str:
    context.log.info('in pipeline 1 task 2')
    return 'my cool output'


@pipeline
def pipeline1():
    pipeline1_task2(pipeline1_task1())


@solid(input_defs=[InputDefinition("print_str", str)])
def pipeline2_task1(context, print_str) -> Nothing:
    context.log.info('in pipeline 2 task 1' + print_str)


@solid(input_defs=[InputDefinition("start", Nothing)])
def pipeline2_task2(context) -> Nothing:
    context.log.info('in pipeline 2 task 2')


@pipeline
def pipeline2():
    pipeline2_task2(pipeline2_task1())


if __name__ == '__main__':
    # run pipeline 1
    # store outputs
    # call pipeline 2 using the above outputs

这里我们有三个管道:pipeline1有两个实体,可能执行我们希望执行的任何操作,并返回第二个实体的输出pipeline2应该使用pipeline1_task2的输出,最终执行另一项工作并打印第一条管道的输出

我应该如何“连接”这两条管道


Tags: ininfolog管道pipelinedefcontexttask1
2条回答

使一条管道在另一条管道之后执行的一种方法是通过传感器。在Dagster中执行此操作的推荐方法是使用“资产传感器”。第一条管道中的固体产生AssetMaterialization,第二条管道中的传感器等待该资产被物化

下面是一个例子:https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors#asset-sensors

在玩了一会儿之后,我找到了以下解决方案(在我看来不是很优雅,但至少它是有效的):

from dagster import (InputDefinition, OutputDefinition,
                     execute_pipeline, pipeline, solid, Nothing, repository)


@solid
def pipeline1_task1(context) -> Nothing:
    context.log.info('in pipeline 1 task 1')


@solid(input_defs=[InputDefinition("start", Nothing)],
output_defs=[OutputDefinition(str, 'some_str')])
def pipeline1_task2(context) -> str:
    context.log.info('in pipeline 1 task 2')
    return '\n\n\nmy cool output\n\n\n'


@pipeline
def pipeline1():
    pipeline1_task2(pipeline1_task1())


@solid(input_defs=[InputDefinition("print_str", str)])
def pipeline2_task1(context, print_str) -> Nothing:
    context.log.info('in pipeline 2 task 1' + print_str)


@solid(input_defs=[InputDefinition("start", Nothing)])
def pipeline2_task2(context) -> Nothing:
    context.log.info('in pipeline 2 task 2')


@pipeline
def pipeline2():
    pipeline2_task2(pipeline2_task1())


@solid
def run_pipelines(context):
    pout = execute_pipeline(pipeline1)
    some_str = pout.result_for_solid('pipeline1_task2')
    conf = {'solids': {'pipeline2_task1': {'inputs': {'print_str': some_str.output_value('some_str')}}}}
    execute_pipeline(pipeline2, run_config=conf)

@pipeline
def pipeline3():
    run_pipelines()


@repository
def repo():
    return [pipeline1, pipeline2, pipeline3]

if __name__ == '__main__':
    execute_pipeline(pipeline3)

所以。。。在这里,我定义了pipeline3,而不是在底部条件中执行所有操作。管道3只有一个实体,它执行pipeline1并获取实体pipeline1_task2的输出。然后,它创建一个包含该输出的配置some_str,并将该配置传递给第二个管道的execute_pipeline

在这里,我们还定义了一个@repository函数,Dagster需要这个函数来确定所有三条管道都是一个整体的一部分

整个事情在dagit中很好地可视化。尽管每个管道与其他管道分别显示,但这三个管道显示在一个存储库中(如代码中定义的)。 enter image description here

相关问题 更多 >