<p>下面的示例演示了如何设置通过<a href="https://github.com/apache/airflow/blob/1.10.11/airflow/operators/dagrun_operator.py" rel="nofollow noreferrer">TriggerDagRunOperator (in 1.10.11)</a>触发的dagruns发送的conf</p>
<p>trigger.py</p>
<pre class="lang-py prettyprint-override"><code>from datetime import datetime
from airflow.models import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
dag = DAG(
dag_id='trigger',
schedule_interval='@once',
start_date=datetime(2021, 1, 1)
)
def modify_dro(context, dagrun_order):
print(context)
print(dagrun_order)
dagrun_order.payload = {
"message": "This is my conf message"
}
return dagrun_order
run_this = TriggerDagRunOperator(
task_id='run_this',
trigger_dag_id='my_dag',
python_callable=modify_dro,
dag=dag
)
</code></pre>
<p>达格·皮</p>
<pre class="lang-py prettyprint-override"><code>from datetime import datetime
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
dag = DAG(
dag_id='my_dag',
schedule_interval='@once',
start_date=datetime(2021, 1, 1)
)
def run_this_func(**context):
print(context["dag_run"].conf)
run_this = PythonOperator(
task_id='run_this',
provide_context=True,
python_callable=run_this_func,
dag=dag,
)
</code></pre>
<p>以下是<code>my_dag</code>DAG中<code>run_this</code>任务的输出</p>
<pre><code>[2021-03-05 16:39:15,649] {logging_mixin.py:112} INFO - {'message': 'This is my conf message'}
</code></pre>
<hr/>
<p>TriggerDagRunOnOperator接受python_可调用函数,允许您修改DagRunOrder。DagRunOrder是一个抽象构造,它保存运行id和有效负载,该id和有效负载将为<a href="https://github.com/apache/airflow/blob/1.10.11/airflow/operators/dagrun_operator.py#L88-L95" rel="nofollow noreferrer">sent as conf when the target DAG is triggered</a></p>