如何根据气流中的不同天数为同一DAG设置不同的时间表

2024-05-19 18:48:55 发布

您现在位置:Python中文网/ 问答频道 /正文

我有两个像这样定义的DAG

my_dag= DAG('my_dag_thu_and_friday',
           catchup=False,
           default_args=default_args,
           schedule_interval='0 12,13,15,19 * * THU,FRI'
         ) 
my_dag= DAG('my_dag_sat_and_sun',
           catchup=False,
           default_args=default_args,
           schedule_interval='0 13,17 * * SAT,SUN'
         ) 

它们运行相同的运算符和相同的代码,但只是根据是星期四/星期五还是星期六/星期日而按不同的时间表运行。是否有一种方法可以指定cron间隔,使我只有一个DAG有条件地处理调度

谢谢


Tags: andfalsedefault定义myargsscheduledag
1条回答
网友
1楼 · 发布于 2024-05-19 18:48:55

没有清洁&清洁;然而,目前的简单解决方案AIP-39 Richer scheduler_interval将解决这个问题,因此在未来的版本中,这将变得很容易

目前,您的选择是:

  1. 尝试使用cron'0 12,13,15,17,19 * * THU,FRI,SAT,SUN'放置分支运算符来创建1个DAG,以确定是应该执行DummyOperator还是您的运算符。例如,在THU气流中运行17将执行DummyOperator(因此不执行任何操作)
  2. 像你已经做的那样,和2只狗呆在一起
  3. 使用2个DAG,但使用function that returns a DAG object,从而避免需要维护重复的代码。您不必维护重复的代码。 可以编写返回DAG对象的函数:

def create_dag(dag_id, schedule, default_args):
    dag = DAG(
        dag_id,
        schedule_interval=schedule,
        default_args=default_args)
    with dag:
        task = BashOperator(task_id='my_task')
    return dag

list_of_dags = [
    ('my_dag_thu_and_friday', '0 12,13,15,19 * * THU,FRI'),
    ('my_dag_sat_and_sun', '0 13,17 * * SAT,SUN')
]

default_args = {'owner': 'airflow', ...}

for dag_item in list_of_dags:
    dag_id = dag_item[0]
    dag_schedule = dag_item[1]
    globals()[dag_id] = create_dag(
        dag_id,
        dag_schedule,
        default_args
    )

相关问题 更多 >