气流:如何将xcom从父dag传递到子dag

2024-10-02 02:41:20 发布

您现在位置:Python中文网/ 问答频道 /正文

考虑到将值推送到xcom的父dag,如何从子dag中检索dag?在

我所做的:

#parent_dag.py

PARENT_DAG_NAME = "MyParentDag"
CHILD_DAG_NAME = "MyChildDag"

main_dag = DAG(
  dag_id=PARENT_DAG_NAME,
  schedule_interval="@hourly",
  start_date=DAG_START_DATE
)


def push_value(**kwargs):
    ''' push into Xcom '''
    return [1, 2]

t1 = PythonOperator(task_id='push_value',
                       python_callable=push_value,
                       retries=3,
                       dag=main_dag)

subdag_1 = SubDagOperator(
  subdag=Sub_Dag1(
      PARENT_DAG_NAME,
      CHILD_DAG_NAME,
      main_dag.start_date,
      main_dag.schedule_interval,
      "'{{ ti.xcom_pull(task_ids='push_value', dag_id='" + PARENT_DAG_NAME + "' }}'"
  ),
  task_id=CHILD_DAG_NAME,
  dag=main_dag,
)
t1 >> subdag_1

以及子子表:

^{pr2}$

它不是要记录并返回[1,2]的子子数据,而是返回字符串'{{ ti.xcom_pull(task_ids='push_value', dag_id='MyParentDag' }}'


Tags: nameidchildtaskvaluemainpushparent
1条回答
网友
1楼 · 发布于 2024-10-02 02:41:20

我看到您已经设置了provide\u context=True,所以这很好。这就是我如何使用**context参数在父/子dag之间传递变量。在

def push_value(**context):
    context['ti'].xcom_push(key='my_key', value='my_value')

def use_pushed_val(**context):
    value_from_parent = context['ti'].xcom_pull(task_ids=t1.task_id, key='my_key')

相关问题 更多 >

    热门问题