我有两个自定义操作员
class FetchDataOperator(BaseOperator):
@apply_defaults
def __init__(self, connection, sql_commands, key, *args, **kwargs):
super(FetchDataOperator, self).__init__(*args, **kwargs)
self.connection = connection
self.sql_commands = sql_commands
self.key = key
def execute(self, context):
cursor = self.connection.cursor()
cursor.execute(self.sql_commands)
records = cursor.fetchall()
context['ti'].xcom_push(key=self.key, value=records)
class InsertDataOperator(BaseOperator):
@apply_defaults
def __init__(self, connection, sql_commands, data, *args, **kwargs):
super(InsertDataOperator, self).__init__(*args, **kwargs)
self.connection = connection
self.sql_commands = sql_commands
self.data = data
def execute(self, context):
cursor = self.connection.cursor()
cursor.executemany(self.sql_commands, self.data)
self.connection.commit()
我对如何将FetchDataOperator上推送到xcom的数据放入InsertDataOperator中的'data'参数感到困惑,我尝试了以下方法
insert_data = InsertDataOperator(
task_id="insert_data",
connection=conn,
sql_commands=insert_query,
op_kwargs={
'data' : "{{ti.xcom_pull(key='data', task_ids='fetch_data')}}"
},
provide_context=True
)
但是它给了我这个错误
airflow.exceptions.AirflowException: Argument ['data'] is required
有什么方法可以正确地做吗?谢谢大家!
InsertDataOperator
的构造函数具有签名:所以显然
data
应该作为必需的参数传递将构造函数调用更改为:
相关问题 更多 >
编程相关推荐