气流:外部任务传感器不触发tas

2024-09-30 18:29:15 发布

您现在位置:Python中文网/ 问答频道 /正文

我已经看到了关于SO的this和{a2}问题,并做了相应的更改。然而,我依赖的狗仍然被困在戳状态。下面是我的达格大师:

from airflow import DAG
from airflow.operators.jdbc_operator import JdbcOperator
from datetime import datetime
from airflow.operators.bash_operator import BashOperator

today = datetime.today()

default_args = {
    'depends_on_past': False,
    'retries': 0,
    'start_date': datetime(today.year, today.month, today.day),
    'schedule_interval': '@once'
}

dag = DAG('call-procedure-and-bash', default_args=default_args)

call_procedure = JdbcOperator(
    task_id='call_procedure',
    jdbc_conn_id='airflow_db2',
    sql='CALL AIRFLOW.TEST_INSERT (20)',
    dag=dag
)

call_procedure

以下是我的从属DAG:

^{pr2}$

以下是主DAG执行后从属DAG的日志:

[2019-01-10 11:43:52,951] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 
[2019-01-10 11:44:52,955] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 
[2019-01-10 11:45:52,961] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 
[2019-01-10 11:46:52,949] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 
[2019-01-10 11:47:52,928] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 
[2019-01-10 11:48:52,928] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 
[2019-01-10 11:49:52,905] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 

以下是主DAG执行日志:

[2019-01-10 11:45:20,215] {{jdbc_operator.py:56}} INFO - Executing: CALL AIRFLOW.TEST_INSERT (20)
[2019-01-10 11:45:21,477] {{logging_mixin.py:95}} INFO - [2019-01-10 11:45:21,476] {{dbapi_hook.py:166}} INFO - CALL AIRFLOW.TEST_INSERT (20)
[2019-01-10 11:45:24,139] {{logging_mixin.py:95}} INFO - [2019-01-10 11:45:24,137] {{jobs.py:2627}} INFO - Task exited with return code 0

我的假设是,如果主机运行良好,气流会触发从属DAG?我试过用execution_delta来玩玩,但似乎没用。在

另外,schedule_interval和{}对于两个dag都是相同的,所以不要认为这会引起任何麻烦。在

我有遗漏什么吗?在


Tags: andfrompyinfobashfortasktoday
2条回答

您可能应该使用正的时间增量:https://airflow.readthedocs.io/en/stable/_modules/airflow/sensors/external_task_sensor.html,因为当减去执行增量时,它将查找一个在其自身运行2分钟之后的任务。在

但是delta并不是一个真正的范围,TI必须有匹配的Dag ID、任务ID、成功的结果以及datetime列表中的执行日期。当您将execution_delta作为一个delta时,它是一个datetime的列表,取当前执行日期并减去timedelta。在

这可能是因为您删除了timedelta以便两个执行日期匹配,并且传感器将等待另一个任务成功,或者您的开始日期和计划间隔基本上设置为今天,并且{}获得的执行日期不在可预测的锁定步进中。您可以尝试设置datetime(2019,1,10)0 1 * * *,让它们都在每天凌晨1点运行(同样没有execution_delta)。在

确保两个DAG同时启动,并且不要手动启动任何一个DAG。在

相关问题 更多 >