kubeflow管道动态输出列表作为输入参数

2024-10-04 11:36:40 发布

您现在位置:Python中文网/ 问答频道 /正文

一个动态列表。我想收集循环中的所有输出,并将它们传递给另一个ContainerOp。
下面这样的东西显然行不通,因为outputs列表是静态的。在

with dsl.ParallelFor(op1.output) as item:
    op2 = dsl.ContainerOp(
      name='op2',
      ...
      file_outputs={
         'outputs': '/outputs.json',
    })
    outputs.append(op2.output)


op3 = dsl.ContainerOp(
   name='op3',
   ...
   arguments=['--input': outputs]  # won't work
)

Tags: name列表outputaswith静态动态item
2条回答

问题是op3没有正确地引用op2的输出作为输入参数。试试这个:

op3 = dsl.ContainerOp(
    ...
    arguments=[' input': op2.outputs['outputs']]
)

我遇到了Kubeflow管道的动态“扇出”和“扇进”的问题。可能有点重手,但我用了一个装PVC的声明来克服这一点。在

Kubeflow允许您使用VolumeOp(linkhere)挂载已知的PVC或动态创建一个新的PVC。这个片段展示了如何使用已知的PVC。在

    pvc_name = '<available-pvc-name>' 
    pvc_volume_name = '<pvc-uuid>' # pass the pvc uuid here

    # Op 1 creates a list to iterate over
    op_1 = dsl.ContainerOp(
            name='echo',
            image='library/bash:4.4.23',
            command=['sh', '-c'],
            arguments=['echo "[1,2,3]"> /tmp/output.txt'],
            file_outputs={'output': '/tmp/output.txt'})

    # Using withParam here to iterate over the results from op1
    # and writing the results of each step to its own PVC
    with dsl.ParallelFor(op_1.output) as item:
        op_2 = dsl.ContainerOp(
            name='iterate',
            image='library/bash:4.4.23',
            command=['sh', '-c'],
            arguments=[f"echo item-{item} > /tmp/output.txt; "  # <- write to output  
                       f"mkdir -p /mnt/{{workflow.uid}}; "  # <- make a dir under /mnt
                       f"echo item-{item}\n >> /mnt/{{workflow.uid}}"],  # <- append results from each step to the PVC
            file_outputs={'output': '/tmp/output.txt'},
            # mount the PVC
            pvolumes={"/mnt": dsl.PipelineVolume(pvc=pvc_name, name=pvc_volume_name)})

    op_3 = dsl.ContainerOp(
            name='echo',
            image='library/bash:4.4.23',
            command=['sh', '-c'],
            arguments=[f"echo /mnt/{{workflow.uid}} > /tmp/output.txt"],
            # mount the PVC again to use
            pvolumes={"/mnt": dsl.PipelineVolume(pvc=pvc_name, name=pvc_volume_name)},
            file_outputs={'output': '/tmp/output_2.txt'}).after(op_2)

确保op_3在{}的循环之后运行,最后使用after(op_2)。在

注意:这可能是一个繁重的方法,如果KFP允许它作为KF编译器的一部分,那么可能会有更好的解决方案,但是我无法让它工作。如果很容易在env中创建一个PVC,这可能适用于您的情况。在

相关问题 更多 >