如何将xcom变量推送到现有的dag id?

2024-10-03 13:23:55 发布

您现在位置:Python中文网/ 问答频道 /正文

我现在有一个带有Python操作符和相关Python调用的DAG,如下所示:

def push_xcom(**kwargs):
    ti = kwargs["ti"]

    ti.xcom_push(key=key, value=value)

xcom_opr = PythonOperator(
        task_id='xcom_opr',
        python_callable=push_xcom,
        dag=dag
    )

此dag的目标是更新气流中定义的其他dag的xcom变量。这不可能吗?我找不到xcom_push的任何源代码,但可能是dag_id参数之类的


Tags: keyid目标taskvaluedeftipush
1条回答
网友
1楼 · 发布于 2024-10-03 13:23:55

查看TaskInstance的源代码,您似乎可以直接复制它在引擎盖下的功能,并指定所需的DAG id

        XCom.set(
            key=key,
            value=value,
            task_id=self.task_id,
            dag_id=self.dag_id,
            execution_date=execution_date or self.execution_date)

但是,xcom_pullAPI直接支持从另一个DAG的xcom中拉取,所以您可以让您想要修改的DAG从另一个DAG中拉取

    def xcom_pull(
            self,
            task_ids: Optional[Union[str, Iterable[str]]] = None,
            dag_id: Optional[str] = None,
            key: str = XCOM_RETURN_KEY,
            include_prior_dates: bool = False) -> Any

相关问题 更多 >