我创建了一个dag,其中包含一个子dag,用于通过一个列表进行循环,该列表是任务的返回值
def mySubDag(parent: Text, child: Text, args, **context):
task = context['tasl_instance']
data = task.xcom_pull(task_ids='task1', dag_id=parent)
for d in data:
# do something...
with DAG(...) as dag:
task1 = PythonOperator(task_id="task1", ..., providde_context=True, dag=dag)
task2 = SubDagOperator(subdag=mySubDag(...),..., provide_context=True, dag=dag)
task1 >> task2
我不知道把参数“context”放在哪里,也不知道如何让subdag函数使用它
如果有人能帮忙解决,我真的很感激
在taskinstance.py中定义代码
将当前dag的dag_id传递给xom_pull
因此,如果要从父dag获取数据,请使用父dag_id重写dag_id参数
对于您的示例,请使用op_kwargs传递更多上下文:
相关问题 更多 >
编程相关推荐