2024-05-20 15:01:37 发布
网友
我不熟悉AWS胶水和其他AWS材料。我需要为项目构建一个ETL框架。 这是高级图表。我想了解的是,我是否可以创建一个模板,而不是创建400个胶水管道,这种模板是由postgres aurora/mysql的引用数据驱动的。我熟悉Python。 有人对此有什么想法吗?任何参考资料、代码示例
sql_query = "SELECT * FROM {0}.CONFIG_MASTER WHERE src_tbl_name = %s ".format(mydb) cur.execute(sql_query, (source_fname)) result = cur.fetchall() for row in result: stg_table_name = row[1] tgt_table_name = row[2] create_stg_table_qry = row[3] load_data_stg_table_qry = row[4] insert_tgt_table_qry = row[5] insert_tgt_table_qry_part_1 = row[6] insert_tgt_table_qry_part_2 = row[7] conn.commit() cur.close()
将适当的参数传递给通用函数,如下所示:
create_stg_table(stg_table_name, create_stg_table_qry, load_data_stg_table_qry) loaddata(tgt_table_name, insert_tgt_table_qry_part_1, insert_tgt_table_qry_part_2, stg_table_name)
通用函数如下所示,这是用于aurora RDS的,请根据需要进行更改
def create_stg_table(stg_table_name, create_stg_table_qry, load_data_stg_table_qry): cur, conn = connect() createStgTable1 = "DROP TABLE IF EXISTS {0}.{1}".format(mydb, stg_table_name) createStgTable2 = "CREATE TABLE {0}.{1} {2}".format(mydb, stg_table_name, create_stg_table_qry) loadQry = "LOAD DATA FROM S3 PREFIX 's3://' REPLACE INTO TABLE ...".format() cur.execute(createStgTable1) cur.execute(createStgTable2) cur.execute(loadQry) conn.commit() conn.close() def loaddata(tgt_table_name, insert_tgt_table_qry_part_1, insert_tgt_table_qry_part_2, stg_table_name): cur, conn = connect() insertQry = "INSERT INTO target table, from the staging table query here" print(insertQry) cur.execute(insertQry) conn.commit() conn.close()
希望这能给你一个想法
谢谢
将适当的参数传递给通用函数,如下所示:
通用函数如下所示,这是用于aurora RDS的,请根据需要进行更改
希望这能给你一个想法
谢谢
相关问题 更多 >
编程相关推荐