我特别希望通过更新数据并将数据插入到一个具有大约4万亿条记录的DeltaLake基表来优化性能
环境: 火花3.0.0 德尔塔拉克0.7.0
在上下文中,这是关于通过DeltaLake生成增量表的内容,我将分步骤对此进行总结,以便更详细:
第1步和第2步已经完成,但添加数据时,性能是出了名的慢,例如添加9GB csv大约需要6个小时,这主要是因为delta需要为每次更新重写数据,它还需要“读取”数据库中的所有数据
该表也被分区(partitionby)并存储在集群的GDFS(HDFS)中,以确保spark节点可以执行这些操作
基表的字段(基数由#指定):
由于一般搜索是按时间进行的,因此决定在年、月、日中按LOCAL_DATE列进行分区,由于其基数较高(性能较差),因此排除了按ID和LOCAL_DATE列进行分区的可能性,最后添加了一个类型,如下所示:
spark.sql(f"""
CREATE OR REPLACE TABLE {TABLE_NAME} (
ID INT,
FECHA_LOCAL TIMESTAMP,
FECHA_UTC TIMESTAMP,
TIPO STRING,
VALUE DOUBLE,
YEAR INT,
MONTH INT,
DAY INT )
USING DELTA
PARTITIONED BY (YEAR , MONTH , DAY, TIPO)
LOCATION '{location}'
""")
从现在起,每5天定期添加这些大约9 Gb的csv文件,以实现增量。目前,合并操作如下:
spark.sql(f"""
MERGE INTO {BASE_TABLE_NAME}
USING {INCREMENTAL_TABLE_NAME} ON
--partitioned cols
{BASE_TABLE_NAME}.YEAR = {INCREMENTAL_TABLE_NAME}.YEAR AND
{BASE_TABLE_NAME}.MONTH = {INCREMENTAL_TABLE_NAME}.MONTH AND
{BASE_TABLE_NAME}.DAY = {INCREMENTAL_TABLE_NAME}.DAY AND
{BASE_TABLE_NAME}.TIPO = {INCREMENTAL_TABLE_NAME}.TIPO AND
{BASE_TABLE_NAME}.FECHA_LOCAL= {INCREMENTAL_TABLE_NAME}.FECHA_LOCALAND
{BASE_TABLE_NAME}.ID= {INCREMENTAL_TABLE_NAME}.ID
WHEN MATCHED THEN
UPDATE SET {BASE_TABLE_NAME}.VALUE= {INCREMENTAL_TABLE_NAME}.VALUE,
{BASE_TABLE_NAME}.TIPO= {INCREMENTAL_TABLE_NAME}.TIPO
WHEN NOT MATCHED THEN
INSERT *
""")
需要考虑的一些事实:
Spark应用程序
mode = 'spark: // spark-master: 7077'
# mode = 'local [*]'
spark = (SparkSession.builder.master (mode)
.appName ("SparkApp")
.config ('spark.cores.max', '45')
.config ('spark.executor.cores', '5')
.config ('spark.executor.memory', '11g')
.config ('spark.driver.memory', '120g')
.config ("spark.sql.shuffle.partitions", f "200") # 200 only for
200GB delta table reads
.config ("spark.storage.memoryFraction", f "0.8")
# DeltaLake configs
.config ("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0")
.config ("spark.sql.extensions",
"io.delta.sql.DeltaSparkSessionExtension")
.config ("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
# Delta optimization
.config ("spark.databricks.delta.optimizeWrite.enabled", "true")
.config ("spark.databricks.delta.retentionDurationCheck.enabled",
"false")
.getOrCreate ()
)
好吧,我选择分享这个答案,这样你可以利用一些技巧
Delta建议使用所有分区列,这样,由于“修剪”的效果,最终的数据搜索更少
因此,有必要确定合并可以更新数据的所有情况 对增量数据进行查询以生成此类型的字典:
使用此df,可以生成合并必须更新/插入的条件:
[![df按年、月、日、类型分组]1]1
然后它生成一个名为“final_cond”的字符串,如下所示:
[![字符串条件]2]
最后,我们将这些条件添加到合并中:
这个简单的“过滤器”减少了大型操作的合并时间
相关问题 更多 >
编程相关推荐