Apache:在自定义操作符上传递数据

2024-10-03 13:28:16 发布

您现在位置:Python中文网/ 问答频道 /正文

我有两个自定义操作员

class FetchDataOperator(BaseOperator):

    @apply_defaults
    def __init__(self, connection, sql_commands, key, *args, **kwargs):
        super(FetchDataOperator, self).__init__(*args, **kwargs)
        self.connection = connection
        self.sql_commands = sql_commands
        self.key = key

    def execute(self, context):
        cursor = self.connection.cursor()
        cursor.execute(self.sql_commands)
        records = cursor.fetchall()
        context['ti'].xcom_push(key=self.key, value=records)
class InsertDataOperator(BaseOperator):

    @apply_defaults
    def __init__(self, connection, sql_commands, data, *args, **kwargs):
        super(InsertDataOperator, self).__init__(*args, **kwargs)
        self.connection = connection
        self.sql_commands = sql_commands
        self.data = data

    def execute(self, context):
        cursor = self.connection.cursor()
        cursor.executemany(self.sql_commands, self.data)
        self.connection.commit()

我对如何将FetchDataOperator上推送到xcom的数据放入InsertDataOperator中的'data'参数感到困惑,我尝试了以下方法

insert_data = InsertDataOperator(
    task_id="insert_data",
    connection=conn,
    sql_commands=insert_query,
    op_kwargs={
        'data' : "{{ti.xcom_pull(key='data', task_ids='fetch_data')}}"
    },
    provide_context=True
)

但是它给了我这个错误

airflow.exceptions.AirflowException: Argument ['data'] is required

有什么方法可以正确地做吗?谢谢大家!


Tags: keyselfexecutesqldatainitdefcontext
1条回答
网友
1楼 · 发布于 2024-10-03 13:28:16

InsertDataOperator的构造函数具有签名:

__init__(self, connection, sql_commands, data, *args, **kwargs)

所以显然data应该作为必需的参数传递

将构造函数调用更改为:

insert_data = InsertDataOperator(
    task_id="insert_data",
    connection=conn,
    sql_commands=insert_query,
    data="{{ ti.xcom_pull(key='data', task_ids='fetch_data') }}"
)

相关问题 更多 >