我尝试从python代码中手动触发dag,如下所示:
from airflow_local.operators.mmttrigger_dagrun_operator import \
MMTTriggerDagRunOperator
# in loop
MMTTriggerDagRunOperator(
task_id='trigger_dagrun',
execution_date=datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S'),
python_callable=trigger_dag,
trigger_dag_id=log_fetch).execute(kwargs)
time.sleep(1)
此代码被置于有限循环中。
我知道,对于同一个dag触发器尝试,每个dag运行的执行日期必须是唯一的,这解释了time.sleep(1)
然而,在循环中运行之后,一些dag被成功触发,循环停止,并在尝试将重复的条目插入airflow数据库的dag_stats表时出错
Error: (pymysql.err.IntegrityError) (1062, "Duplicate entry 'log_fetch-running' for key 'PRIMARY'") [SQL: 'INSERT INTO dag_stats (dag_id, state, count, dirty) VALUES (%(dag_id)s, %(state)s, %(count)s, %(dirty)s)'] [parameters: {'dag_id': log_fetch', 'state': 'running', 'count': 0, 'dirty': 1}]
我不能理解这种行为,所以如果有人能帮我找出这个错误背后的原因,我会很感激的。在
谢谢。在
目前没有回答
相关问题 更多 >
编程相关推荐