<p>我遇到了Kubeflow管道的动态“扇出”和“扇进”的问题。可能有点重手,但我用了一个装PVC的声明来克服这一点。在</p>
<p>Kubeflow允许您使用<code>VolumeOp</code>(link<a href="https://www.kubeflow.org/docs/pipelines/sdk/manipulate-resources/#volumeop" rel="nofollow noreferrer">here</a>)挂载已知的PVC或动态创建一个新的PVC。这个片段展示了如何使用已知的PVC。在</p>
<pre><code> pvc_name = '<available-pvc-name>'
pvc_volume_name = '<pvc-uuid>' # pass the pvc uuid here
# Op 1 creates a list to iterate over
op_1 = dsl.ContainerOp(
name='echo',
image='library/bash:4.4.23',
command=['sh', '-c'],
arguments=['echo "[1,2,3]"> /tmp/output.txt'],
file_outputs={'output': '/tmp/output.txt'})
# Using withParam here to iterate over the results from op1
# and writing the results of each step to its own PVC
with dsl.ParallelFor(op_1.output) as item:
op_2 = dsl.ContainerOp(
name='iterate',
image='library/bash:4.4.23',
command=['sh', '-c'],
arguments=[f"echo item-{item} > /tmp/output.txt; " # <- write to output
f"mkdir -p /mnt/{{workflow.uid}}; " # <- make a dir under /mnt
f"echo item-{item}\n >> /mnt/{{workflow.uid}}"], # <- append results from each step to the PVC
file_outputs={'output': '/tmp/output.txt'},
# mount the PVC
pvolumes={"/mnt": dsl.PipelineVolume(pvc=pvc_name, name=pvc_volume_name)})
op_3 = dsl.ContainerOp(
name='echo',
image='library/bash:4.4.23',
command=['sh', '-c'],
arguments=[f"echo /mnt/{{workflow.uid}} > /tmp/output.txt"],
# mount the PVC again to use
pvolumes={"/mnt": dsl.PipelineVolume(pvc=pvc_name, name=pvc_volume_name)},
file_outputs={'output': '/tmp/output_2.txt'}).after(op_2)
</code></pre>
<p>确保<code>op_3</code>在{<cd3>}的循环之后运行,最后使用<code>after(op_2)</code>。在</p>
<p>注意:这可能是一个繁重的方法,如果KFP允许它作为KF编译器的一部分,那么可能会有更好的解决方案,但是我无法让它工作。如果很容易在env中创建一个PVC,这可能适用于您的情况。在</p>