另一个工作流引擎,一个基于子流程的dag执行系统
yawns的Python项目详细描述
yawn提供了一个框架,用于执行一组具有依赖关系的shell命令 以分散的方式和重复的时间表。其他工具也有类似的功能 是对这个的启示;特别是Celery和Airflow。
在https://yawn.live实时浏览,部署在GKE。
原理差异
哈欠的灵感来源于芹菜和气流,但与芹菜和气流不同,因为它:
- 在一个单独的子流程中运行每个任务,就像气流,但不像芹菜,芹菜避免了 一个共享的python解释器,使内存使用更容易推理。
- 使用postgresql作为消息代理和数据库,减少了像 redis或rabbitmq。这避免了使用redis作为芹菜经纪人时的visibility timeout问题。 yawn使用新的SELECT ... FOR UPDATE SKIP LOCKED语句从 队列表。
- 存储每个任务执行的命令、环境变量、stdout和stderror, 所以更容易看到日志和发生的历史。重新运行任务不会覆盖 上一次跑步。
- 不支持命令行和环境变量以外的输入或输出,使用 客户机应用程序应改为处理状态的意图。
部件
- 网络服务器
- 网站提供了一个用户界面,可以查看其中运行的工作流和任务。 它允许您运行现有工作流或重新运行失败的任务。web服务器还提供 远程创建和运行工作流的rest api。
- 工人
- 工人安排和执行任务。工作进程使用subprocess.Popen运行任务和 捕获stdout和stderr。
概念
- 工作流
- 一组相互依赖的任务,形成通常所说的定向任务 无环图(DAG)。工作流可以计划定期运行,并对其进行版本控制 因此它们可以随时间变化。
- 运行
- 手动触发或计划的工作流实例。
- 任务
- 一个shell命令,指定它所依赖的上游任务、重试次数,以及 超时。任务是在工作流和运行中配置的环境变量。
- 执行
一个任务的命令的执行,捕获退出代码和标准输出和错误.< /dd> - 排队
- 要执行的任务的先进先出列表。
- 工人
- 从一组队列中读取并执行相关任务的过程,记录 导致执行。
安装
开始打哈欠:
# install the package - someone has yawn :-( pip install yawns # install postgres and create the yawn database # the default settings localhost and no password createdb yawn # setup the tables by running db migrations yawn migrate # create a user to login with yawn createsuperuser # create some sample workflows yawn examples # start the webserver on port 8000 yawn webserver # open a new terminal and start the worker yawn worker
以下是单个工作流页面的屏幕截图:
rest api
在浏览器中转到http://127.0.0.1:8000/api/来浏览api。
创建工作流时,格式为(为便于阅读,显示为yaml):
name: Example parameters: ENVIRONMENT: production CALCULATION_DATE: 2017-01-01 schedule: 0 0 * schedule_active: True tasks: - name: task_1 queue: default max_retries: 1 timeout: 30 command: python my_awesome_program.py $ENVIRONMENT - name: task_2 queue: default command: echo $CALCULATION_DATE | grep 2017 upstream: - task_1
- /api/workflows/
获取版本列表或单个工作流版本。发布以创建或更新工作流 使用上面显示的架构。修补程序以更改schedule、schedule_active,或 ^{tt6}仅限$个字段。
- post-使用上面显示的模式
- 补丁{"schedule_active": false}
- /api/runs/
获取运行列表,可以选择使用?workflow=<id>筛选到特定工作流。 发布以创建新运行。修补以更改参数。
- 邮政{"workflow_id": 1, "parameters": null}
- 补丁{"parameters": {"ENVIRONMENT": "test"}}
- /api/tasks/<id>/
从工作流运行中获取单个任务,以及其执行及其状态和日志记录 信息。修补程序将任务排队或终止正在运行的执行。
- 补丁{"enqueue": true}
- 补丁{"terminate": <execution_id>}
python api
导入并使用django模型创建工作流:
from yawn.workflow.models import WorkflowName from yawn.task.models import Template name, _ = WorkflowName.objects.get_or_create(name='Simple Workflow Example') workflow = name.new_version(parameters={'MY_OBJECT_ID': '1', 'SOME_SETTING': 'false'}) task1 = Template.objects.create(workflow=workflow, name='start', command='echo Starting...') task2 = Template.objects.create(workflow=workflow, name='task2', command='echo Working on $MY_OBJECT_ID') task2.upstream.add(task1) task3 = Template.objects.create(workflow=workflow, name='task3', command='echo Another busy thing && sleep 20') task3.upstream.add(task1) task4 = Template.objects.create(workflow=workflow, name='done', command='echo Finished!') task4.upstream.add(task2, task3) workflow.submit_run(parameters={'child': 'true'})
或者,使用序列化程序将任务作为f中的字典使用的格式 根据API。此方法检查工作流的版本是否存在相同的结构, 如果存在的话,将返回现有版本:
from yawn.workflow.serializers import WorkflowSerializer serializer = WorkflowSerializer(data=test_views.data()) serializer.is_valid(raise_exception=True) workflow = serializer.save() workflow.submit_run()