如何使用条件任务运行DAG

2024-06-01 11:15:25 发布

您现在位置:Python中文网/ 问答频道 /正文

总共有6个任务。这些任务需要根据输入json中的一个字段(标志值)值执行。 如果flag_value的值为真,则所有任务都需要以如下方式执行:, 首先是task1,然后是Parallel to(task2和task3一起),Parallel to task4,Parallel to task5。 所有这些完成后,任务6。 因为我不熟悉气流和DAG,所以我不知道如何在这种情况下跑步

如果flag_value的值为false,则顺序仅为顺序
任务1>&燃气轮机;任务4>&燃气轮机;任务5>&燃气轮机;任务6

下面是我的DAG代码

from airflow import DAG
from datetime import datetime
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator


default_args = {
    'owner': 'airflow',
    'depends_on_past': False
}

dag = DAG('DAG_FOR_TEST',default_args=default_args,schedule_interval=None,max_active_runs=3, start_date=datetime(2020, 7, 8)) 


#################### CREATE TASK #####################################   

task_1 = DatabricksSubmitRunOperator(
    task_id='task_1',
    databricks_conn_id='connection_id_details',
    existing_cluster_id='{{ dag_run.conf.clusterId }}',
    libraries= [
        {
        'jar': 'dbfs:/task_1/task_1.jar'
        }        
        ],
    spark_jar_task={
        'main_class_name': 'com.task_1.driver.TestClass1',
        'parameters' : [
            '{{ dag_run.conf.json }}'       
        ]
    }
)



    
task_2 = DatabricksSubmitRunOperator(
    task_id='task_2',
    databricks_conn_id='connection_id_details',
    existing_cluster_id='{{ dag_run.conf.clusterId }}',   
    libraries= [
        {
        'jar': 'dbfs:/task_2/task_2.jar'
        }        
        ],
    spark_jar_task={
        'main_class_name': 'com.task_2.driver.TestClass2',
        'parameters' : [
            '{{ dag_run.conf.json }}'                               
        ]
    }
)
    
task_3 = DatabricksSubmitRunOperator(
    task_id='task_3',
    databricks_conn_id='connection_id_details',
    existing_cluster_id='{{ dag_run.conf.clusterId }}',   
    libraries= [
        {
        'jar': 'dbfs:/task_3/task_3.jar'
        }        
        ],
    spark_jar_task={
        'main_class_name': 'com.task_3.driver.TestClass3',
        'parameters' : [
            '{{ dag_run.conf.json }}'   
        ]
    }
) 

task_4 = DatabricksSubmitRunOperator(
    task_id='task_4',
    databricks_conn_id='connection_id_details',
    existing_cluster_id='{{ dag_run.conf.clusterId }}',
    libraries= [
        {
        'jar': 'dbfs:/task_4/task_4.jar'
        }        
        ],
    spark_jar_task={
        'main_class_name': 'com.task_4.driver.TestClass4',
        'parameters' : [
            '{{ dag_run.conf.json }}'   
        ]
    }
) 

task_5 = DatabricksSubmitRunOperator(
    task_id='task_5',
    databricks_conn_id='connection_id_details',
    existing_cluster_id='{{ dag_run.conf.clusterId }}',
    libraries= [
        {
        'jar': 'dbfs:/task_5/task_5.jar'
        }        
        ],
    spark_jar_task={
        'main_class_name': 'com.task_5.driver.TestClass5',
        'parameters' : [
            'json ={{ dag_run.conf.json }}' 
        ]
    }
) 

task_6 = DatabricksSubmitRunOperator(
    task_id='task_6',
    databricks_conn_id='connection_id_details',
    existing_cluster_id='{{ dag_run.conf.clusterId }}',
    libraries= [
        {
        'jar': 'dbfs:/task_6/task_6.jar'
        }        
        ],
    spark_jar_task={
        'main_class_name': 'com.task_6.driver.TestClass6',
        'parameters' : ['{{ dag_run.conf.json }}'   
        ]
    }
) 


flag_value='{{ dag_run.conf.json.flag_value }}'

#################### ORDER OF OPERATORS ###########################  

if flag_value == 'true':
    
    task_1.dag = dag
    task_2.dag = dag
    task_3.dag = dag
    task_4.dag = dag
    task_5.dag = dag
    task_6.dag = dag
    
    task_1  >> [task_2 , task_3] >> [task_4] >> [task_5]  >> task_6    // Not sure correct 
else:
    task_1.dag = dag
    task_4.dag = dag
    task_5.dag = dag
    task_6.dag = dag
    
    task_1 >> task_4 >> task_5 >> task_6

    

        
        
    

Tags: runidjsontaskconfdetailsconnectionconn
1条回答
网友
1楼 · 发布于 2024-06-01 11:15:25

首先,依赖关系是不正确的,应该这样做:

task_1 >> [task_2 , task_3] >> task_4 >> task_5  >> task_6

无法使用list_1 >> list_2对任务进行排序,但有一些帮助器方法可以提供此功能,请参见:cross_downstream

对于分支,您可以使用BranchPythonOperator来更改任务的trigger rules。不确定下面的代码,它可能有一些小错误,但这里的想法是可行的

task_4.trigger_rule = "none_failed"

dummy = DummyOperator(task_id="dummy", dag=dag)

branch = BranchPythonOperator(
    task_id="branch",
    # jinja template returns string "True" or "False"
    python_callable=lambda f: ["task_2" , "task_3"] if f == "True" else "dummy",
    op_kwargs={"f": flag_value},
    dag=dag)

task_1 >> branch
branch >> [task_2 , task_3, dummy] >> task_4
task_4 >> task_5 >> task_6

可能有更好的办法

相关问题 更多 >