Python环境下DAGs的并行实时调度

2024-10-01 10:11:54 发布

您现在位置:Python中文网/ 问答频道 /正文

如何为多个类别并行运行/实例化Dag(气流)?

例如: 我有一个气流(DAG),我定期运行 如何安排dag在不同的批处理名上并行运行(parallet):

  1. 为batch1运行dag(以args形式传递批处理名称)
  2. 为batch2运行dag(以参数传递批处理名称)应与1并行运行 . . . 在

等等

我使用environment varible传递批处理名,然后在服务器上使用多个tmux会话并行运行dag,但它被搞得一团糟。在

有没有更好的方法可以让我节省时间并并行运行多个批处理名的dag?在

谢谢你的时间。在


Tags: 实例服务器名称environmentargs类别形式dag
1条回答
网友
1楼 · 发布于 2024-10-01 10:11:54

由于flow运行的python类表示bashshell命令的图形,因此可以在flow中创建两个独立的dag。这里是对tutorial的一个小修改

dag = DAG(dag_id='batch')
task = [ BashOperator(
             task_id='templated',
             bash_command=templated_command,
             params={'batch_name': batch_name},
             dag=dag)
        for batch_name in ["batch one", "batch two"]]

dag.add_task(task[0])
dag.add_task(task[1])

由于没有依赖关系,只要气流以这种方式设置好,它们就应该并行运行。如果需要设置shell环境变量,请添加VAR={params.batch_名称}}在模板的某个地方。在

假设你的程序使用系统argv,也可以使用普通作业控制启动:

^{pr2}$

相关问题 更多 >