Argo工作流SDK
argo-workflows-sdk的Python项目详细描述
argo python sdk
Argo Workflows
如果您是Argo新手,我们建议您查看pure YAML中的示例。语言是描述性的,Argo examples提供了详尽的解释。在
对于更有经验的读者,此SDK允许您在Python中以编程方式定义Argo工作流,然后将其转换为Argo YAML规范。在
SDK使用Argo Python client存储库中定义的Argo模型。结合这两种方法,我们得到了对Argo工作流的整个低级控制。在
入门
你好世界
这个例子演示了最简单的功能。通过子类化@Workflow
类和带有{Workflow
。在
工作流的入口点定义为entrypoint
类属性。在
# @file: hello-world.yamlapiVersion:argoproj.io/v1alpha1kind:Workflowmetadata:name:hello-worldgenerateName:hello-world-spec:entrypoint:whalesaytemplates:-name:whalesaycontainer:name:whalesayimage:docker/whalesay:latestcommand:[cowsay]args:["helloworld"]^{pr2}$
DAG:任务
此示例演示通过构成diamond结构的依赖项定义的任务。任务是使用@task
修饰符定义的,它们必须返回一个有效的模板。在
对于Workflow
的顶层任务,入口点自动创建为main
。在
# @file: dag-diamond.yaml# The following workflow executes a diamond workflow## A# / \# B C# \ /# DapiVersion:argoproj.io/v1alpha1kind:Workflowmetadata:name:dag-diamondgenerateName:dag-diamond-spec:entrypoint:maintemplates:-name:maindag:tasks:-name:Atemplate:echoarguments:parameters:[{name:message, value:A}]-name:Bdependencies:[A]template:echoarguments:parameters:[{name:message, value:B}]-name:Cdependencies:[A]template:echoarguments:parameters:[{name:message, value:C}]-name:Ddependencies:[B,C]template:echoarguments:parameters:[{name:message, value:D}]# @task: [A, B, C, D]-name:echoinputs:parameters:-name:messagecontainer:name:echoimage:alpine:3.7command:[echo,"{{inputs.parameters.message}}"]
fromargo.workflows.sdkimportWorkflowfromargo.workflows.sdk.tasksimport*fromargo.workflows.sdk.templatesimport*classDagDiamond(Workflow):@task@parameter(name="message",value="A")defA(self,message:V1alpha1Parameter)->V1alpha1Template:returnself.echo(message=message)@task@parameter(name="message",value="B")@dependencies(["A"])defB(self,message:V1alpha1Parameter)->V1alpha1Template:returnself.echo(message=message)@task@parameter(name="message",value="C")@dependencies(["A"])defC(self,message:V1alpha1Parameter)->V1alpha1Template:returnself.echo(message=message)@task@parameter(name="message",value="D")@dependencies(["B","C"])defD(self,message:V1alpha1Parameter)->V1alpha1Template:returnself.echo(message=message)@template@inputs.parameter(name="message")defecho(self,message:V1alpha1Parameter)->V1Container:container=V1Container(image="alpine:3.7",name="echo",command=["echo","{{inputs.parameters.message}}"],)returncontainer
伪影
Artifacts
可以类似于parameters
以三种形式传递:arguments
,inputs
和{@artifact
或{
即:inputs.artifact(...)
工件和参数都逐个传递,这意味着对于多个工件(参数),应该调用:
@inputs.artifact(name="artifact",...)@inputs.parameter(name="parameter_a",...)@inputs.parameter(...)deffoo(self,artifact:V1alpha1Artifact,prameter_b:V1alpha1Parameter,...):pass
一个完整的例子:
# @file: artifacts.yamlapiVersion:argoproj.io/v1alpha1kind:Workflowmetadata:name:artifact-passinggenerateName:artifact-passing-spec:entrypoint:maintemplates:-name:maindag:tasks:-name:generate-artifacttemplate:whalesay-name:consume-artifacttemplate:print-messagearguments:artifacts:# bind message to the hello-art artifact# generated by the generate-artifact step-name:messagefrom:"{{tasks.generate-artifact.outputs.artifacts.hello-art}}"-name:whalesaycontainer:name:"whalesay"image:docker/whalesay:latestcommand:[sh,-c]args:["cowsayhelloworld|tee/tmp/hello_world.txt"]outputs:artifacts:# generate hello-art artifact from /tmp/hello_world.txt# artifacts can be directories as well as files-name:hello-artpath:/tmp/hello_world.txt-name:print-messageinputs:artifacts:# unpack the message input artifact# and put it at /tmp/message-name:messagepath:/tmp/messagecontainer:name:"print-message"image:alpine:latestcommand:[sh,-c]args:["cat","/tmp/message"]
fromargo.workflows.sdkimportWorkflowfromargo.workflows.sdk.tasksimport*fromargo.workflows.sdk.templatesimport*classArtifactPassing(Workflow):@taskdefgenerate_artifact(self)->V1alpha1Template:returnself.whalesay()@task@artifact(name="message",_from="{{tasks.generate-artifact.outputs.artifacts.hello-art}}")defconsume_artifact(self,message:V1alpha1Artifact)->V1alpha1Template:returnself.print_message(message=message)@template@outputs.artifact(name="hello-art",path="/tmp/hello_world.txt")defwhalesay(self)->V1Container:container=V1Container(name="whalesay",image="docker/whalesay:latest",command=["sh","-c"],args=["cowsay hello world | tee /tmp/hello_world.txt"])returncontainer@template@inputs.artifact(name="message",path="/tmp/message")defprint_message(self,message:V1alpha1Artifact)->V1Container:container=V1Container(name="print-message",image="alpine:latest",command=["sh","-c"],args=["cat","/tmp/message"],)returncontainer
更进一步:closure
和{}
这就是它变得非常有趣的地方。到目前为止,我们只讨论了Python实现提供的好处。在
如果我们想使用本机Python代码并将其作为工作流中的一个步骤来执行呢。我们有什么选择?在
Option A)是重用现有的思维方式,将代码转储为字符串,将其作为源传递给V1ScriptTemplate
模型,并用template
修饰符包装。
下面的代码块对此进行了说明:
importtextwrapclassScriptsPython(Workflow):...@templatedefgen_random_int(self)->V1alpha1ScriptTemplate:source=textwrap.dedent("""\ import random i = random.randint(1, 100) print(i) """)template=V1alpha1ScriptTemplate(image="python:alpine3.6",name="gen-random-int",command=["python"],source=source)returntemplate
结果是:
api_version:argoproj.io/v1alpha1kind:Workflowmetadata:generate_name:scripts-python-name:scripts-pythonspec:entrypoint:main...templates:-name:gen-random-intscript:command:-pythonimage:python:alpine3.6name:gen-random-intsource:'importrandom\ni=random.randint(1,100)\nprint(i)\n'
不错,但也没有充分发挥潜力。既然我们已经在编写Python了,为什么要用字符串包装代码呢?这里我们介绍closure
s
closure
s
closure
s的逻辑非常简单。只需将要执行的函数包装在@closure
装饰器的容器中。然后,closure
处理其余部分并返回一个template
(就像@template
装饰器一样)。在
我们只需要为它提供一个映像,该映像已安装了必需的Python依赖项,并且存在于集群中。在
There is a plan to eliminate even this step in the future, but currently it is inavoidable.
按照前面的例子:
classScriptsPython(Workflow):...@closure(image="python:alpine3.6")defgen_random_int()->V1alpha1ScriptTemplate:importrandomi=random.randint(1,100)print(i)
闭包实现了V1alpha1ScriptTemplate
,这意味着您可以传递resources
,env
等内容。。。在
另外,确保您import
无论您使用的是什么库,上下文都不会被保留---closure
作为一个staticmethod,并且是来自模块范围的沙盒。在
scope
s
现在,如果我们有一个非常大的函数(或者整个脚本)呢。将它包装在一个Python函数中并不是非常Python,而且会变得很乏味。这是我们可以利用scope
s的地方
例如,假设我们希望在运行gen_random_int
函数之前初始化日志记录。在
...@closure(scope="main",image="python:alpine3.6")defgen_random_int(main)->V1alpha1ScriptTemplate:importrandommain.init_logging()i=random.randint(1,100)print(i)@scope(name="main")definit_logging(level="DEBUG"):importlogginglogging_level=getattr(logging,level,"INFO")logging.getLogger("__main__").setLevel(logging_level)
请注意我们所做的3个更改:
@closure(scope="main",# <--- provide the closure a scopeimage="python:alpine3.6")defgen_random_int(main):# <--- use the scope name
@scope(name="main")# <--- add function to a scopedefinit_logging(level="DEBUG"):
然后,给定范围中的每个函数都由作用域名称命名,并注入到闭包中。在
也就是说,产生的YAML如下所示:
...spec:...templates:-name:gen-random-intscript:command:-pythonimage:python:alpine3.6name:gen-random-intsource:|-import loggingimport randomclass main:"""Scoped objects injected from scope 'main'."""@staticmethoddef init_logging(level="DEBUG"):logging_level = getattr(logging, level, "INFO")logging.getLogger("__main__").setLevel(logging_level)main.init_logging()i = random.randint(1, 100)print(i)
编译还将所有导入内容放在前面并删除重复项,以方便使用并使外观更自然,这样您就不会感觉到li当你看到产生的山药时,你会戳你的眼睛。在
有关更多示例,请参见examples文件夹。在
作者:
- [维护者]马雷克·瑟马克macermak@redhat.com
- 瓦茨拉夫·帕夫林vpavlin@redhat.com
@AICoE,红帽子
- 项目
标签: