我有一个DAG,其中我有两个任务
t1= PythonOperator(
task_id='Check_Files_in_S3',
provide_context=False,
python_callable=checkFilesInS3,
xcom_push=True,
dag=dag)
t2 = PythonOperator(
task_id='snowflakeLoad',
provide_context=True,
python_callable=snowflakeLoad,
xcom_push=True,
dag=dag)
t1>>t2
第一个任务在S3中查找文件,如果文件可用,则任务应该成功,并且应该使用下游任务将数据加载到snowflake
我的要求是,如果文件不可用,那么task1需要发送邮件,然后它应该停止下游任务(task2)。有没有一种方法可以实现这一点,比如返回false会使下游任务失败?还取决于如果先前的DAG运行失败,过去的DAG不会运行吗?或者,如果其中一个任务在当前运行中失败,它不会运行下游任务
你有多个问题,让我把它分解一下
等待事件发生的通常方法是使用sensor(从BaseSensorOperator继承)类。它们专门用于低资源消耗和易于配置以感知外部对象/资源。 但是,在您的情况下,这是一个检查和忘记(如果不存在,请不要重试)。然后你可以继续使用Python。您只需引发一个异常,它将被视为任务失败,从而阻止下游任务(在您的例子中是t2)运行
如果我们有你的两个任务,假设我们有两次运行。一个在时间点}的{},它将检查{}的{}任务是否完成,然后检查{}时间{}是否成功
x
,一个在时间点x + 1
。如果您有depends_on_past=True
,那么x + 1
的任务t1
的运行将在时间x
查看运行t1
,并且仅当该运行成功时才会启动。 这同样适用于{当没有文件时,可以在
checkFilesInS3
中引发AirflowSkipException
。这将导致在引发异常之前跳过下游任务,您也可以发送电子邮件相关问题 更多 >
编程相关推荐