如何将XCom消息从PythonOperator任务传递到Ai中的SparkSubmitOperator任务

2024-09-30 01:35:33 发布

您现在位置:Python中文网/ 问答频道 /正文

假设我有一个PythonOperator任务,将消息推送到XCom,如何在SparkSubmitOperator中提取这些消息?在

def get_some_value(**kwargs):
    some_value = 10
    return some_value

task1 = PythonOperator(task_id='run_task_1',
                       python_callable=get_some_value,
                       provide_context=True,
                       dag=dag)

task2 = SparkSubmitOperator(
    task_id='run_sparkSubmit_job',
    conn_id='spark_default',
    java_class='com.example',
    application='example.jar',
    name='airflow-spark-job',
    verbose=True,
    application_args=["some_value"],   #<---I want to use some_value from task1 here
    conf={'master':'yarn'},
    dag=dag,
)

task1 >> task2

Tags: runidtrue消息taskgetvaluejob
1条回答
网友
1楼 · 发布于 2024-09-30 01:35:33

在TaskInstance(ti)宏参数上使用xcom_pull加载task1返回的变量。使用任务id“run_task_1”检索变量:

def get_some_value(**kwargs):
    some_value = 10
    return some_value

task1 = PythonOperator(task_id='run_task_1',
                       python_callable=get_some_value,
                       provide_context=True,
                       dag=dag)

task2 = SparkSubmitOperator(
    task_id='run_sparkSubmit_job',
    conn_id='spark_default',
    java_class='com.example',
    application='example.jar',
    name='airflow-spark-job',
    verbose=True,
    application_args=["{{ti.xcom_pull(task_ids='run_task_1')}}"],  
    conf={'master':'yarn'},
    dag=dag,
)

application_args支持这些jinja模板,因为它是一个模板变量: 参见:https://github.com/apache/incubator-airflow/blob/v1-10-stable/airflow/contrib/operators/spark_submit_operator.py#L87

相关问题 更多 >

    热门问题