Argo工作流SDK

argo-workflows-sdk的Python项目详细描述


argo python sdkRelease

LicenseCI

Argo Workflows

如果您是Argo新手,我们建议您查看pure YAML中的示例。语言是描述性的,Argo examples提供了详尽的解释。在

对于更有经验的读者,此SDK允许您在Python中以编程方式定义Argo工作流,然后将其转换为Argo YAML规范。在

SDK使用Argo Python client存储库中定义的Argo模型。结合这两种方法,我们得到了对Argo工作流的整个低级控制。在

入门

你好世界

这个例子演示了最简单的功能。通过子类化@Workflow类和带有{}修饰符的单个模板来定义Workflow。在

工作流的入口点定义为entrypoint类属性。在

# @file: hello-world.yamlapiVersion:argoproj.io/v1alpha1kind:Workflowmetadata:name:hello-worldgenerateName:hello-world-spec:entrypoint:whalesaytemplates:-name:whalesaycontainer:name:whalesayimage:docker/whalesay:latestcommand:[cowsay]args:["helloworld"]
^{pr2}$

DAG:任务

此示例演示通过构成diamond结构的依赖项定义的任务。任务是使用@task修饰符定义的,它们必须返回一个有效的模板。在

对于Workflow的顶层任务,入口点自动创建为main。在

# @file: dag-diamond.yaml# The following workflow executes a diamond workflow##   A#  / \# B   C#  \ /#   DapiVersion:argoproj.io/v1alpha1kind:Workflowmetadata:name:dag-diamondgenerateName:dag-diamond-spec:entrypoint:maintemplates:-name:maindag:tasks:-name:Atemplate:echoarguments:parameters:[{name:message, value:A}]-name:Bdependencies:[A]template:echoarguments:parameters:[{name:message, value:B}]-name:Cdependencies:[A]template:echoarguments:parameters:[{name:message, value:C}]-name:Ddependencies:[B,C]template:echoarguments:parameters:[{name:message, value:D}]# @task: [A, B, C, D]-name:echoinputs:parameters:-name:messagecontainer:name:echoimage:alpine:3.7command:[echo,"{{inputs.parameters.message}}"]
fromargo.workflows.sdkimportWorkflowfromargo.workflows.sdk.tasksimport*fromargo.workflows.sdk.templatesimport*classDagDiamond(Workflow):@task@parameter(name="message",value="A")defA(self,message:V1alpha1Parameter)->V1alpha1Template:returnself.echo(message=message)@task@parameter(name="message",value="B")@dependencies(["A"])defB(self,message:V1alpha1Parameter)->V1alpha1Template:returnself.echo(message=message)@task@parameter(name="message",value="C")@dependencies(["A"])defC(self,message:V1alpha1Parameter)->V1alpha1Template:returnself.echo(message=message)@task@parameter(name="message",value="D")@dependencies(["B","C"])defD(self,message:V1alpha1Parameter)->V1alpha1Template:returnself.echo(message=message)@template@inputs.parameter(name="message")defecho(self,message:V1alpha1Parameter)->V1Container:container=V1Container(image="alpine:3.7",name="echo",command=["echo","{{inputs.parameters.message}}"],)returncontainer

伪影

Artifacts可以类似于parameters以三种形式传递:argumentsinputs和{},其中{}是默认值(简单地@artifact或{})。在

即:inputs.artifact(...)

工件和参数都逐个传递,这意味着对于多个工件(参数),应该调用:

@inputs.artifact(name="artifact",...)@inputs.parameter(name="parameter_a",...)@inputs.parameter(...)deffoo(self,artifact:V1alpha1Artifact,prameter_b:V1alpha1Parameter,...):pass

一个完整的例子:

# @file: artifacts.yamlapiVersion:argoproj.io/v1alpha1kind:Workflowmetadata:name:artifact-passinggenerateName:artifact-passing-spec:entrypoint:maintemplates:-name:maindag:tasks:-name:generate-artifacttemplate:whalesay-name:consume-artifacttemplate:print-messagearguments:artifacts:# bind message to the hello-art artifact# generated by the generate-artifact step-name:messagefrom:"{{tasks.generate-artifact.outputs.artifacts.hello-art}}"-name:whalesaycontainer:name:"whalesay"image:docker/whalesay:latestcommand:[sh,-c]args:["cowsayhelloworld|tee/tmp/hello_world.txt"]outputs:artifacts:# generate hello-art artifact from /tmp/hello_world.txt# artifacts can be directories as well as files-name:hello-artpath:/tmp/hello_world.txt-name:print-messageinputs:artifacts:# unpack the message input artifact# and put it at /tmp/message-name:messagepath:/tmp/messagecontainer:name:"print-message"image:alpine:latestcommand:[sh,-c]args:["cat","/tmp/message"]
fromargo.workflows.sdkimportWorkflowfromargo.workflows.sdk.tasksimport*fromargo.workflows.sdk.templatesimport*classArtifactPassing(Workflow):@taskdefgenerate_artifact(self)->V1alpha1Template:returnself.whalesay()@task@artifact(name="message",_from="{{tasks.generate-artifact.outputs.artifacts.hello-art}}")defconsume_artifact(self,message:V1alpha1Artifact)->V1alpha1Template:returnself.print_message(message=message)@template@outputs.artifact(name="hello-art",path="/tmp/hello_world.txt")defwhalesay(self)->V1Container:container=V1Container(name="whalesay",image="docker/whalesay:latest",command=["sh","-c"],args=["cowsay hello world | tee /tmp/hello_world.txt"])returncontainer@template@inputs.artifact(name="message",path="/tmp/message")defprint_message(self,message:V1alpha1Artifact)->V1Container:container=V1Container(name="print-message",image="alpine:latest",command=["sh","-c"],args=["cat","/tmp/message"],)returncontainer

更进一步:closure和{}

这就是它变得非常有趣的地方。到目前为止,我们只讨论了Python实现提供的好处。在

如果我们想使用本机Python代码并将其作为工作流中的一个步骤来执行呢。我们有什么选择?在

Option A)是重用现有的思维方式,将代码转储为字符串,将其作为源传递给V1ScriptTemplate模型,并用template修饰符包装。 下面的代码块对此进行了说明:

importtextwrapclassScriptsPython(Workflow):...@templatedefgen_random_int(self)->V1alpha1ScriptTemplate:source=textwrap.dedent("""\          import random          i = random.randint(1, 100)          print(i)        """)template=V1alpha1ScriptTemplate(image="python:alpine3.6",name="gen-random-int",command=["python"],source=source)returntemplate

结果是:

api_version:argoproj.io/v1alpha1kind:Workflowmetadata:generate_name:scripts-python-name:scripts-pythonspec:entrypoint:main...templates:-name:gen-random-intscript:command:-pythonimage:python:alpine3.6name:gen-random-intsource:'importrandom\ni=random.randint(1,100)\nprint(i)\n'

不错,但也没有充分发挥潜力。既然我们已经在编写Python了,为什么要用字符串包装代码呢?这里我们介绍closures

closures

closures的逻辑非常简单。只需将要执行的函数包装在@closure装饰器的容器中。然后,closure处理其余部分并返回一个template(就像@template装饰器一样)。在

我们只需要为它提供一个映像,该映像已安装了必需的Python依赖项,并且存在于集群中。在

There is a plan to eliminate even this step in the future, but currently it is inavoidable.

按照前面的例子:

classScriptsPython(Workflow):...@closure(image="python:alpine3.6")defgen_random_int()->V1alpha1ScriptTemplate:importrandomi=random.randint(1,100)print(i)

闭包实现了V1alpha1ScriptTemplate,这意味着您可以传递resourcesenv等内容。。。在

另外,确保您import无论您使用的是什么库,上下文都不会被保留---closure作为一个staticmethod,并且是来自模块范围的沙盒。在

scopes

现在,如果我们有一个非常大的函数(或者整个脚本)呢。将它包装在一个Python函数中并不是非常Python,而且会变得很乏味。这是我们可以利用scopes的地方

例如,假设我们希望在运行gen_random_int函数之前初始化日志记录。在

...@closure(scope="main",image="python:alpine3.6")defgen_random_int(main)->V1alpha1ScriptTemplate:importrandommain.init_logging()i=random.randint(1,100)print(i)@scope(name="main")definit_logging(level="DEBUG"):importlogginglogging_level=getattr(logging,level,"INFO")logging.getLogger("__main__").setLevel(logging_level)

请注意我们所做的3个更改:

@closure(scope="main",# <--- provide the closure a scopeimage="python:alpine3.6")defgen_random_int(main):# <--- use the scope name
@scope(name="main")# <--- add function to a scopedefinit_logging(level="DEBUG"):

然后,给定范围中的每个函数都由作用域名称命名,并注入到闭包中。在

也就是说,产生的YAML如下所示:

...spec:...templates:-name:gen-random-intscript:command:-pythonimage:python:alpine3.6name:gen-random-intsource:|-import loggingimport randomclass main:"""Scoped objects injected from scope 'main'."""@staticmethoddef init_logging(level="DEBUG"):logging_level = getattr(logging, level, "INFO")logging.getLogger("__main__").setLevel(logging_level)main.init_logging()i = random.randint(1, 100)print(i)

编译还将所有导入内容放在前面并删除重复项,以方便使用并使外观更自然,这样您就不会感觉到li当你看到产生的山药时,你会戳你的眼睛。在

有关更多示例,请参见examples文件夹。在


作者:

@AICoE,红帽子

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java不兼容类型:MainActivity无法转换为LifecycleOwner   java安卓是一种更有效的读取大文本文件的方法   java导出LWJGL本地人与项目?(IntelliJ IDEA)   JDK更新后,JavaJShell不再在下一行打印输出   父类对象上的继承Java比较子属性   Java:有没有一个容器可以有效地结合HashMap和ArrayList?   安卓 Java对象指针   java在annotationdriven Spring MVC应用程序中实现大气   java 安卓源代码构建应用找不到安卓supportv4。罐子   文件系统上的抽象层和Java中的jar/zip   java在水平滚动视图中添加多个图像?   java如何从firebase实时数据库中获取字符串数组   WIndows 10工作站上的java未满足链接错误   java命令在终端中工作,但在使用过程中出现“无结束引号”错误。执行官