如何使用增量DeltaLake表提高合并操作的性能?

2024-09-28 15:12:38 发布

您现在位置:Python中文网/ 问答频道 /正文

我特别希望通过更新数据并将数据插入到一个具有大约4万亿条记录的DeltaLake基表来优化性能

环境: 火花3.0.0 德尔塔拉克0.7.0

在上下文中,这是关于通过DeltaLake生成增量表的内容,我将分步骤对此进行总结,以便更详细:

  1. 创建基表(增量)
  2. 获取定期数据
  3. 将数据添加到基表中

第1步和第2步已经完成,但添加数据时,性能是出了名的慢,例如添加9GB csv大约需要6个小时,这主要是因为delta需要为每次更新重写数据,它还需要“读取”数据库中的所有数据

该表也被分区(partitionby)并存储在集群的GDFS(HDFS)中,以确保spark节点可以执行这些操作

基表的字段(基数由#指定):

  • ID:标识符#10000
  • 类型:字符串,#~30
  • 本地日期:记录的本地日期
  • 日期UTC:UTC注册日期
  • 值:注册表值
  • 年份:列计算整数#4
  • 月份:计算列int#12
  • 日期:计算列int#31

由于一般搜索是按时间进行的,因此决定在年、月、日中按LOCAL_DATE列进行分区,由于其基数较高(性能较差),因此排除了按ID和LOCAL_DATE列进行分区的可能性,最后添加了一个类型,如下所示:

spark.sql(f"""
CREATE OR REPLACE TABLE  {TABLE_NAME} (
  ID INT, 
  FECHA_LOCAL TIMESTAMP,
  FECHA_UTC TIMESTAMP,
  TIPO STRING, 
  VALUE DOUBLE,
  YEAR INT,
  MONTH INT, 
  DAY INT )
USING DELTA
PARTITIONED BY (YEAR , MONTH , DAY, TIPO)
LOCATION '{location}'
""")

从现在起,每5天定期添加这些大约9 Gb的csv文件,以实现增量。目前,合并操作如下:

spark.sql(f"""
        MERGE INTO {BASE_TABLE_NAME}
        USING {INCREMENTAL_TABLE_NAME} ON 
            --partitioned cols
            {BASE_TABLE_NAME}.YEAR  = {INCREMENTAL_TABLE_NAME}.YEAR AND
            {BASE_TABLE_NAME}.MONTH = {INCREMENTAL_TABLE_NAME}.MONTH AND  
            {BASE_TABLE_NAME}.DAY   = {INCREMENTAL_TABLE_NAME}.DAY AND 
            {BASE_TABLE_NAME}.TIPO = {INCREMENTAL_TABLE_NAME}.TIPO AND 


            {BASE_TABLE_NAME}.FECHA_LOCAL= {INCREMENTAL_TABLE_NAME}.FECHA_LOCALAND 
            {BASE_TABLE_NAME}.ID= {INCREMENTAL_TABLE_NAME}.ID

        WHEN MATCHED  THEN  
        UPDATE SET    {BASE_TABLE_NAME}.VALUE= {INCREMENTAL_TABLE_NAME}.VALUE,
                      {BASE_TABLE_NAME}.TIPO= {INCREMENTAL_TABLE_NAME}.TIPO


        WHEN NOT MATCHED THEN
        INSERT *

        """)

需要考虑的一些事实:

  • 此合并操作的时间为6小时
  • 基表是从230GB csv数据创建的(55gb现在为增量!)
  • spark应用程序配置处于群集模式,具有以下参数
  • infra由3个节点、32个内核和250GB RAM组成,尽管它在安全方面比其他现有应用程序占用的资源少约50%

Spark应用程序

mode = 'spark: // spark-master: 7077'
# mode = 'local [*]'
spark = (SparkSession.builder.master (mode)
        .appName ("SparkApp")
.config ('spark.cores.max', '45')
.config ('spark.executor.cores', '5')
.config ('spark.executor.memory', '11g')
.config ('spark.driver.memory', '120g')
.config ("spark.sql.shuffle.partitions", f "200") # 200 only for
200GB delta table reads
.config ("spark.storage.memoryFraction", f "0.8")
# DeltaLake configs
.config ("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0")
.config ("spark.sql.extensions",
"io.delta.sql.DeltaSparkSessionExtension")
.config ("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
# Delta optimization
.config ("spark.databricks.delta.optimizeWrite.enabled", "true")
.config ("spark.databricks.delta.retentionDurationCheck.enabled",
"false")
.getOrCreate ()
)

Tags: 数据nameidconfigsqlbaselocaltable
1条回答
网友
1楼 · 发布于 2024-09-28 15:12:38

好吧,我选择分享这个答案,这样你可以利用一些技巧

Delta建议使用所有分区列,这样,由于“修剪”的效果,最终的数据搜索更少

因此,有必要确定合并可以更新数据的所有情况 对增量数据进行查询以生成此类型的字典:

filter_columns = spark.sql (f "" "
SELECT
    YEAR,
    MONTH,
    DAY,
    COLLECT_LIST (DISTINCT TYPE) AS TYPES
Incremental FROM
GROUP BY YEAR, MONTH, DAY
ORDER BY 1, 2, 3
"" ") .toPandas ()

使用此df,可以生成合并必须更新/插入的条件:

[![df按年、月、日、类型分组]1]1

然后它生成一个名为“final_cond”的字符串,如下所示:

dic = filter_columns.groupby (['YEAR', 'MONTH', 'DAY']) ['TYPE']. apply (lambda grp: list (grp.value_counts (). index)). to_dict ()
final_cond = ''
index = 0
for key, value in dic.items ():
    cond = ''
    year = key [0]
    month = key [1]
    day = key [2]
    variables = ','. join (["'" + str (x) + "'" for x in value [0]])
    or_cond = '' if index + 1 == len (dic) else '\ nOR \ n'
    
    cond = f "" "({BASE_TABLE_NAME} .YEAR == {year} AND {BASE_TABLE_NAME} .MONTH == {month} AND {BASE_TABLE_NAME} .DAY == {day} AND {BASE_TABLE_NAME}. TYPE IN ({variables} )) "" "
      
    final_cond = final_cond + cond + f '{or_cond}'
    index + = 1
    #break
    
print (final_cond)

[![字符串条件]2]

最后,我们将这些条件添加到合并中:

...
WHEN MATCHED AND ({final_cond}) THEN
...

这个简单的“过滤器”减少了大型操作的合并时间

相关问题 更多 >