气流Python操作符传递参数

2024-10-01 09:39:59 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图在气流DAG中编写一个Python操作符,并将某些参数传递给Python可调用函数。在

我的代码如下所示。在

def my_sleeping_function(threshold):
   print(threshold)

fmfdependency = PythonOperator(
   task_id='poke_check',
   python_callable=my_sleeping_function,
   provide_context=True,
   op_kwargs={'threshold': 100},
   dag=dag)

end = BatchEndOperator(
   queue=QUEUE,
   dag=dag)

start.set_downstream(fmfdependency)
fmfdependency.set_downstream(end)

但我一直得到下面的错误。在

TypeError: my_sleeping_function() got an unexpected keyword argument 'dag_run'

不知道为什么。在


Tags: 代码thresholdmydeffunctionenddagprint
2条回答

在阈值参数之后,将**kwargs添加到操作员参数列表中

这就是如何在气流中传递Python操作符的参数。在

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

from time import sleep
from datetime import datetime

def my_func(*op_args):
        print(op_args)
        return op_args[0]

with DAG('python_dag', description='Python DAG', schedule_interval='*/5 * * * *', start_date=datetime(2018, 11, 1), catchup=False) as dag:
        dummy_task      = DummyOperator(task_id='dummy_task', retries=3)
        python_task     = PythonOperator(task_id='python_task', python_callable=my_func, op_args=['one', 'two', 'three'])

        dummy_task >> python_task

相关问题 更多 >