<p>当没有文件时,可以在<code>checkFilesInS3</code>中引发<code>AirflowSkipException</code>。这将导致在引发异常之前跳过下游任务,您也可以发送电子邮件</p>
<pre><code>from airflow import DAG
from datetime import datetime
from airflow.exceptions import AirflowSkipException
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.email import send_email
def checkFilesInS3(**kwargs):
your_code
if no_files: #Raise exception to make downstream tasks skip
send_email(to="someone@somewhere.com, subject="task failed!"")
raise AirflowSkipException("No files found!")
def snowflakeLoad(**kwargs):
pass
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 6, 23),
}
with DAG(dag_id='question',
default_args=default_args,
schedule_interval=None,
catchup=False
) as dag:
start = DummyOperator(task_id='start_task')
t1 = PythonOperator(
task_id='Check_Files_in_S3',
python_callable=checkFilesInS3,
)
t2 = PythonOperator(
task_id='snowflakeLoad',
python_callable=snowflakeLoad,
)
start >> t1 >> t2
</code></pre>
<p><a href="https://i.stack.imgur.com/NV1sH.png" rel="nofollow noreferrer"><img src="https://i.stack.imgur.com/NV1sH.png" alt="enter image description here"/></a></p>