我有一个由两个固体组成的Dagster管道(下面的可复制示例)。第一个(return_some_list
)输出一些对象的列表。第二个实体(print_num
)接受第一个列表(不是完整列表)中的元素,并对该元素进行一些处理
我应该如何为第一个实体返回的列表中的每个元素调用第二个实体?请解释任何最佳实践以及
不确定这是否是最好的方法(让我知道),但我想为第一个实体输出的每个元素生成一个不同的print_num
实体实例。这将帮助我将来并行化实体,更好地处理长时间/计算密集型实体
from dagster import execute_pipeline, pipeline, solid
@solid
def return_some_list(context):
return [1,2,3,4,5]
@solid
def print_num(context, some_num: int):
print(some_num)
return some_num
@pipeline
def some_pipeline():
output_list = return_some_list()
for some_num in output_list:
print_num(some_num)
if __name__ == "__main__":
result = execute_pipeline(some_pipeline)
事实证明,有一个实验性的特性(有望成为正式特性)允许基于可移植输出的元素创建任务。工作代码如下:
这里,
return_some_list
返回一个iterable。我们要为这个iterable的每个元素运行一个实体。我们在实数generate_subtasks
中执行此操作,这将生成一个DynamicOutput
,其中包含元素和将为其生成的子任务的名称。DynamicOutput
的类型信息在solid
规范中的DynamicOutputDefinition
中给出要连接这些实体,我们首先通过
return_some_list
获取列表。然后调用generate_subtasks
,它是一个生成器,并map
将其每个输出调用print_num
函数运行整个管道应该为
generate_subtasks
生成的每个子任务打印大量信息,如下所示(仅显示部分输出):哦,还有一件很酷的事情:Dagster执行类型检查,如果你给它一个错误类型的参数,它会很快失败。因此,如果我们提供
print_str
,比如说,给map
函数,它甚至会拒绝运行相关问题 更多 >
编程相关推荐