执行嵌入在DAG文件中的sql代码

2024-10-04 05:22:36 发布

您现在位置:Python中文网/ 问答频道 /正文

Fetch results from BigQueryOperator in airflow

我遵循了上面链接中的建议,解决方案是有效的,它是好的,如果我的sql是单行的话,它是有效的。但是如果SQL代码很大,并将其放在一个文件中,然后在函数中引用该文件,则会失败。你知道吗

def MyChequer(**kwargs):
big_query_count = bigquery_operator.BigQueryOperator(
    task_id='my_bq_query',
    sql='/dags/sqls/invalidTable.sql'
)

然后我得到一个错误:BigQuery作业失败。最后一个错误是:{'reason':'invalidQuery','location':'query','message':'语法错误:意外的标识符“dags”位于[1:1]'}

通常我用下面的方式和下面的作品

BigQueryOperator(
        task_id='invalidXXX',
        use_legacy_sql=False,
        sql='/dags/sqls/invalid_v1.sql',
        destination_dataset_table=targetTable,
        create_disposition='CREATE_IF_NEEDED',
        write_disposition='WRITE_TRUNCATE',
        dag=dag
    )
   dag = DAG('invalidXXX', 
    default_args=default_args, 
    description='', 
    schedule_interval="0 5 * * *",
    catchup=False,
    template_searchpath=['/home/airflow/stgAirflow/']
   )

Tags: 文件idfalsedefaulttasksql错误query
2条回答

似乎错误来自尝试将此字符串'/dags/sqls/invalid_v1.sql'作为sql执行…这是无效的。你知道吗

如果要将sql保存在一个单独的文件中,可以读入其中的文件内容吗?似乎sql arg需要一个实际的sql语句。你知道吗

好的,我把这个修好了。这意味着,在执行dag时,使用并执行文件中的sql代码。不确定这是不是一个优化的解决方案。因此,欢迎您提出更多建议。你知道吗

//define 
class SQLTemplatedPythonOperator(PythonOperator):
    template_ext = ('.sql',)

//modify function
def loadCSV(**kwargs):
    print("inside loadCSV")
    query = kwargs['templates_dict']['query']
    big_query_count = bigquery_operator.BigQueryOperator(
        task_id='my_bq_query',
        sql=query,

//dag - task
SQLTemplatedPythonOperator(
    task_id='invalidBBDToCSV',
    templates_dict={'query': 'invalidBBD.sql'},
    provide_context=True,
    python_callable=loadCSV,
    dag=dag,
//dag
dag = DAG('invalidBBDLoad', 
    default_args=default_args, 
    description='DAG data', 
    schedule_interval="0 11 * * *",
    catchup=False,
    template_searchpath=['/home/stgairflow/dags/sqls'], 
    user_defined_macros={'myProjectId': myProjectId,}
)

相关问题 更多 >