根据气流中的上游任务终止下游任务

2024-09-24 02:28:11 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个DAG,其中我有两个任务

t1=  PythonOperator(
    task_id='Check_Files_in_S3',
    provide_context=False,
    python_callable=checkFilesInS3,
    xcom_push=True,
    dag=dag)

t2 =  PythonOperator(
    task_id='snowflakeLoad',
    provide_context=True,
    python_callable=snowflakeLoad,
    xcom_push=True,
    dag=dag)

t1>>t2

第一个任务在S3中查找文件,如果文件可用,则任务应该成功,并且应该使用下游任务将数据加载到snowflake

我的要求是,如果文件不可用,那么task1需要发送邮件,然后它应该停止下游任务(task2)。有没有一种方法可以实现这一点,比如返回false会使下游任务失败?还取决于如果先前的DAG运行失败,过去的DAG不会运行吗?或者,如果其中一个任务在当前运行中失败,它不会运行下游任务


Tags: idtruetasks3contextpushdagt1
2条回答

你有多个问题,让我把它分解一下

Is there a way to achieve this something like returning false will fail the downstream task?

等待事件发生的通常方法是使用sensor(从BaseSensorOperator继承)类。它们专门用于低资源消耗和易于配置以感知外部对象/资源。 但是,在您的情况下,这是一个检查和忘记(如果不存在,请不要重试)。然后你可以继续使用Python。您只需引发一个异常,它将被视为任务失败,从而阻止下游任务(在您的例子中是t2)运行

also depends_on_past will not run the DAG if the earlier DAG run fails? or it don't run the downstream tasks if one of the task failed in the present run?

如果我们有你的两个任务,假设我们有两次运行。一个在时间点x,一个在时间点x + 1。如果您有depends_on_past=True,那么x + 1的任务t1的运行将在时间x查看运行t1,并且仅当该运行成功时才会启动。 这同样适用于{}的{},它将检查{}的{}任务是否完成,然后检查{}时间{}是否成功

当没有文件时,可以在checkFilesInS3中引发AirflowSkipException。这将导致在引发异常之前跳过下游任务,您也可以发送电子邮件

from airflow import DAG
from datetime import datetime
from airflow.exceptions import AirflowSkipException
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.email import send_email


def checkFilesInS3(**kwargs):
    your_code
    if no_files: #Raise exception to make downstream tasks skip
        send_email(to="someone@somewhere.com, subject="task failed!"")
        raise AirflowSkipException("No files found!")


def snowflakeLoad(**kwargs):
    pass


default_args = {
    'owner': 'airflow',
    'start_date': datetime(2021, 6, 23),

}
with DAG(dag_id='question',
         default_args=default_args,
         schedule_interval=None,
         catchup=False
         ) as dag:

    start = DummyOperator(task_id='start_task')

    t1 = PythonOperator(
        task_id='Check_Files_in_S3',
        python_callable=checkFilesInS3,
    )

    t2 = PythonOperator(
        task_id='snowflakeLoad',
        python_callable=snowflakeLoad,
    )

    start >> t1 >> t2

enter image description here

相关问题 更多 >