气流触发dag故障

2024-09-30 09:22:12 发布

您现在位置:Python中文网/ 问答频道 /正文

我尝试从python代码中手动触发dag,如下所示:

from airflow_local.operators.mmttrigger_dagrun_operator import \
MMTTriggerDagRunOperator

# in loop
MMTTriggerDagRunOperator(
            task_id='trigger_dagrun',
            execution_date=datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S'),
            python_callable=trigger_dag,
            trigger_dag_id=log_fetch).execute(kwargs)

time.sleep(1)

此代码被置于有限循环中。 我知道,对于同一个dag触发器尝试,每个dag运行的执行日期必须是唯一的,这解释了time.sleep(1) 然而,在循环中运行之后,一些dag被成功触发,循环停止,并在尝试将重复的条目插入airflow数据库的dag_stats表时出错

Error: (pymysql.err.IntegrityError) (1062, "Duplicate entry 'log_fetch-running' for key 'PRIMARY'") [SQL: 'INSERT INTO dag_stats (dag_id, state, count, dirty) VALUES (%(dag_id)s, %(state)s, %(count)s, %(dirty)s)'] [parameters: {'dag_id': log_fetch', 'state': 'running', 'count': 0, 'dirty': 1}]

我不能理解这种行为,所以如果有人能帮我找出这个错误背后的原因,我会很感激的。在

谢谢。在


Tags: 代码logidtimestatscountsleepfetch

热门问题