如何使用AWS Glue和其他AWS服务从Oracle数据库提取数据

2024-05-20 15:01:37 发布

您现在位置:Python中文网/ 问答频道 /正文

我不熟悉AWS胶水和其他AWS材料。我需要为项目构建一个ETL框架。 这是高级图表。我想了解的是,我是否可以创建一个模板,而不是创建400个胶水管道,这种模板是由postgres aurora/mysql的引用数据驱动的。我熟悉Python。 有人对此有什么想法吗?任何参考资料、代码示例

enter image description here


Tags: 数据项目代码框架aws模板管道图表
1条回答
网友
1楼 · 发布于 2024-05-20 15:01:37
  1. 我们在mysql数据库中有一个配置主表。每方便一列我们将source_table_name作为标识符,以获取适当的表列名/查询以创建STG表、将数据加载到STG表、插入/更新到目标表等
  2. 我们还将INSERT/UPDATE拆分为config master中的两个不同列,因为我们使用ON DUPLICATE键更新现有记录
  3. 通过处理具有登录文件名的lambda事件,获取源表名
  4. 从配置主机获取源表名称所需的所有数据。它将类似于以下内容:
sql_query = "SELECT * FROM {0}.CONFIG_MASTER WHERE src_tbl_name = %s ".format(mydb)
cur.execute(sql_query, (source_fname))
result = cur.fetchall()
for row in result:
stg_table_name = row[1]
tgt_table_name = row[2]
create_stg_table_qry = row[3]
load_data_stg_table_qry = row[4]
insert_tgt_table_qry = row[5]
insert_tgt_table_qry_part_1 = row[6]
insert_tgt_table_qry_part_2 = row[7]
conn.commit()
cur.close()

将适当的参数传递给通用函数,如下所示:

create_stg_table(stg_table_name, create_stg_table_qry, load_data_stg_table_qry)
loaddata(tgt_table_name, insert_tgt_table_qry_part_1, insert_tgt_table_qry_part_2, stg_table_name)

通用函数如下所示,这是用于aurora RDS的,请根据需要进行更改

def create_stg_table(stg_table_name, create_stg_table_qry, load_data_stg_table_qry):
cur, conn = connect()
createStgTable1 = "DROP TABLE IF EXISTS {0}.{1}".format(mydb, stg_table_name)
createStgTable2 = "CREATE TABLE {0}.{1} {2}".format(mydb, stg_table_name, create_stg_table_qry)
loadQry = "LOAD DATA FROM S3 PREFIX 's3://' REPLACE INTO TABLE ...".format()
cur.execute(createStgTable1)
cur.execute(createStgTable2)
cur.execute(loadQry)
conn.commit()
conn.close()

def loaddata(tgt_table_name, insert_tgt_table_qry_part_1, insert_tgt_table_qry_part_2, stg_table_name):
cur, conn = connect()
insertQry = "INSERT INTO target table, from the staging table query here"
print(insertQry)
cur.execute(insertQry)
conn.commit()
conn.close()

希望这能给你一个想法

谢谢

相关问题 更多 >