气流子dag xcom_从父dag的任务(特定任务)中提取数据

2024-09-30 08:36:21 发布

您现在位置:Python中文网/ 问答频道 /正文

我创建了一个dag,其中包含一个子dag,用于通过一个列表进行循环,该列表是任务的返回值

子DAG函数

    def mySubDag(parent: Text, child: Text, args, **context):
        task = context['tasl_instance']
        data = task.xcom_pull(task_ids='task1', dag_id=parent)

        for d in data:
            # do something...

父dag

    with DAG(...) as dag:
        task1 = PythonOperator(task_id="task1", ..., providde_context=True, dag=dag)

        task2 = SubDagOperator(subdag=mySubDag(...),..., provide_context=True, dag=dag)

        task1 >> task2

我不知道把参数“context”放在哪里,也不知道如何让subdag函数使用它

如果有人能帮忙解决,我真的很感激


Tags: 函数textidtrue列表taskdatacontext
1条回答
网友
1楼 · 发布于 2024-09-30 08:36:21

在taskinstance.py中定义代码

def xcom_pull(
        self,
        task_ids=None,
        dag_id=None,
        key=XCOM_RETURN_KEY,
        include_prior_dates=False):


    if dag_id is None:
        dag_id = self.dag_id
    ...

将当前dag的dag_id传递给xom_pull

因此,如果要从父dag获取数据,请使用父dag_id重写dag_id参数

对于您的示例,请使用op_kwargs传递更多上下文:

    def set_cookies_func(config, **kwargs):
        cookies = service_get_cookies_login(config)
        kwargs['ti'].xcom_push(key="SESSION", value=cookies)
    
    def get_data_func(parent_dag_name, **kwargs):
        cookies = kwargs['ti'].xcom_pull(
            task_ids='set_cookies_task',
            key="SESSION",
            dag_id="my_dag_id.set_cookies_task"
        )
        
    def sub_cache_load_to_gcs(parent_dag_name, child_dag_name):
        sub_dag = DAG(
            dag_id= "{}.{}".format(parent_dag_name, child_dag_name),
            ...
        )
        
        PythonOperator(
            task_id="get_data_func",
            python_callable=get_data_func,
            op_kwargs={"parent_dag_name": parent_dag_name},
            providde_context=True,
            dag=sub_dag
        )
    
    with DAG(
            dag_id= "my_dag_id",
            ...
    ) as dag:
        task1 = PythonOperator(
            task_id="set_cookies_task",
            python_callable=set_cookies_func,
            op_kwargs={"config": config},
            providde_context=True,
            dag=dag
        )
    
        task2 = SubDagOperator(
            task_id='branch_cache_task',
            subdag=sub_cache_load_to_gcs(dag.dag_id, 'branch_cache_task'),
            provide_context=True,
            dag=dag
        )
    
        task1 >> task2

相关问题 更多 >

    热门问题