另一个工作流引擎,一个基于子流程的dag执行系统

yawns的Python项目详细描述


yawn提供了一个框架,用于执行一组具有依赖关系的shell命令 以分散的方式和重复的时间表。其他工具也有类似的功能 是对这个的启示;特别是CeleryAirflow

https://yawn.live实时浏览,部署在GKE

https://circleci.com/gh/aclowes/yawn/tree/master.svg?style=svghttps://codecov.io/gh/aclowes/yawn/branch/master/graph/badge.svg

原理差异

哈欠的灵感来源于芹菜和气流,但与芹菜和气流不同,因为它:

  • 在一个单独的子流程中运行每个任务,就像气流,但不像芹菜,芹菜避免了 一个共享的python解释器,使内存使用更容易推理。
  • 使用postgresql作为消息代理和数据库,减少了像 redis或rabbitmq。这避免了使用redis作为芹菜经纪人时的visibility timeout问题。 yawn使用新的SELECT ... FOR UPDATE SKIP LOCKED语句从 队列表。
  • 存储每个任务执行的命令、环境变量、stdout和stderror, 所以更容易看到日志和发生的历史。重新运行任务不会覆盖 上一次跑步。
  • 不支持命令行和环境变量以外的输入或输出,使用 客户机应用程序应改为处理状态的意图。

部件

网络服务器
网站提供了一个用户界面,可以查看其中运行的工作流和任务。 它允许您运行现有工作流或重新运行失败的任务。web服务器还提供 远程创建和运行工作流的rest api。
工人
工人安排和执行任务。工作进程使用subprocess.Popen运行任务和 捕获stdout和stderr。

概念

工作流
一组相互依赖的任务,形成通常所说的定向任务 无环图(DAG)。工作流可以计划定期运行,并对其进行版本控制 因此它们可以随时间变化。
运行
手动触发或计划的工作流实例。
任务
一个shell命令,指定它所依赖的上游任务、重试次数,以及 超时。任务是在工作流和运行中配置的环境变量。
执行 一个任务的命令的执行,捕获退出代码和标准输出和错误.< /dd>
排队
要执行的任务的先进先出列表。
工人
从一组队列中读取并执行相关任务的过程,记录 导致执行。

安装

开始打哈欠:

# install the package - someone has yawn :-(
pip install yawns

# install postgres and create the yawn database
# the default settings localhost and no password
createdb yawn

# setup the tables by running db migrations
yawn migrate

# create a user to login with
yawn createsuperuser

# create some sample workflows
yawn examples

# start the webserver on port 8000
yawn webserver

# open a new terminal and start the worker
yawn worker

以下是单个工作流页面的屏幕截图:

https://cloud.githubusercontent.com/assets/910316/21969288/fe40baf0-db51-11e6-97f2-7e6875c1e575.png

rest api

在浏览器中转到http://127.0.0.1:8000/api/来浏览api。

创建工作流时,格式为(为便于阅读,显示为yaml):

name: Example
parameters:
  ENVIRONMENT: production
  CALCULATION_DATE: 2017-01-01
schedule: 0 0 *
schedule_active: True

tasks:
- name: task_1
  queue: default
  max_retries: 1
  timeout: 30
  command: python my_awesome_program.py $ENVIRONMENT
- name: task_2
  queue: default
  command: echo $CALCULATION_DATE | grep 2017
  upstream:
  - task_1
/api/workflows/

获取版本列表或单个工作流版本。发布以创建或更新工作流 使用上面显示的架构。修补程序以更改scheduleschedule_active,或 ^{tt6}仅限$个字段。

  • post-使用上面显示的模式
  • 补丁{"schedule_active": false}
/api/runs/

获取运行列表,可以选择使用?workflow=<id>筛选到特定工作流。 发布以创建新运行。修补以更改参数。

  • 邮政{"workflow_id": 1, "parameters": null}
  • 补丁{"parameters": {"ENVIRONMENT": "test"}}
/api/tasks/<id>/

从工作流运行中获取单个任务,以及其执行及其状态和日志记录 信息。修补程序将任务排队或终止正在运行的执行。

  • 补丁{"enqueue": true}
  • 补丁{"terminate": <execution_id>}

python api

导入并使用django模型创建工作流:

from yawn.workflow.models import WorkflowName
from yawn.task.models import Template

name, _ = WorkflowName.objects.get_or_create(name='Simple Workflow Example')
workflow = name.new_version(parameters={'MY_OBJECT_ID': '1', 'SOME_SETTING': 'false'})
task1 = Template.objects.create(workflow=workflow, name='start', command='echo Starting...')
task2 = Template.objects.create(workflow=workflow, name='task2', command='echo Working on $MY_OBJECT_ID')
task2.upstream.add(task1)
task3 = Template.objects.create(workflow=workflow, name='task3',
                                command='echo Another busy thing && sleep 20')
task3.upstream.add(task1)
task4 = Template.objects.create(workflow=workflow, name='done', command='echo Finished!')
task4.upstream.add(task2, task3)

workflow.submit_run(parameters={'child': 'true'})

或者,使用序列化程序将任务作为f中的字典使用的格式 根据API。此方法检查工作流的版本是否存在相同的结构, 如果存在的话,将返回现有版本:

from yawn.workflow.serializers import WorkflowSerializer

serializer = WorkflowSerializer(data=test_views.data())
serializer.is_valid(raise_exception=True)
workflow = serializer.save()
workflow.submit_run()

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
CentOS上的java Spring Boot简易应用程序需要很长时间才能启动   java如何检查字符串值是否等于null?   收集器中的java映射值。分组方式()   java需要支持Azure AD B2C webapp集成   java如何加入线程以停止它?   java如何使用意图传递类的对象?   java如何在战争环境中发现CDI生产者?   多模块项目中java奇怪的编译器行为   java如何在web应用程序中管理密码?   java从http服务器、filehandler中删除冗余代码   java使用反射来获取泛型类的字段   java Spring MVC/Hibernate/MySQL 400错误请求错误   给定正整数a的java幂为3   在Java中将元素拆分为不同数量的列表?   java展开折叠窗格