不同查询之间的输出数据不一致。原因是什么?

2024-10-01 00:29:26 发布

您现在位置:Python中文网/ 问答频道 /正文

大家好,我的社区

使用Spark DataRicks时,我在使用相同的数据源表(相同的数据内容)多次运行相同的查询时遇到问题。使用具有1个驱动程序/5个工作进程的集群会导致每个查询产生不同的错误结果(可能会丢失数据),但使用具有1个驱动程序/1个工作进程的集群,每次的输出结果都是正确的

出现这种错误的原因是什么?我如何纠正它?

以下是有关查询环境的信息:

多群集参数:

  • 模式:标准
  • 运行时版本:7.3 LTS(包括Apache Spark 3.0.1、Scala 2.12)
  • 工作人员类型,编号:标准\u F32s\u v2(n=5)
  • 驱动器类型:标准_F32s_v2

单簇参数:

  • 模式:标准
  • 运行时版本:7.3 LTS(包括Apache Spark 3.0.1、Scala 2.12)
  • 工作人员类型,编号:标准\u F32s\u v2(n=1)
  • 驱动器类型:标准_F32s_v2

多集群和单集群配置:

    spark.sql.autoBroadcastJoinThreshold 10mb
    spark.sql.join.preferSortMergeJoin false
    spark.databricks.io.cache.maxMetaDataCache 1g
    spark.databricks.io.cache.compression.enabled true
    spark.sql.inMemoryColumnarStorage.batchSize 1000000
    spark.sql.adaptive.coalescePartitions.enabled true
    spark.databricks.delta.stateReconstructionValidation.enabled false
    spark.sql.crossJoin.enabled false
    spark.databricks.preemption.enabled true
    spark.databricks.preemption.timeout 2s
    spark.databricks.io.cache.enabled true
    spark.sql.shuffle.partitions 500
    spark.sql.broadcastTimeout 7200 
    spark.sql.inMemoryColumnarStorage.compressed true
    spark.databricks.io.cache.maxDiskUsage 350g
    spark.databricks.preemption.threshold 0.7
    spark.databricks.preemption.interval 500ms
    spark.task.reaper.enabled true
    spark.scheduler.revive.interval 50ms
    spark.databricks.delta.commitValidation.enabled false
    spark.databricks.service.port 8787
    spark.sql.adaptive.advisoryPartitionSizeInBytes 6128mb
    spark.sql.files.maxPartitionBytes 134217728
    spark.default.parallelism 96

数据源表信息:

  • 新界北行:1'296'524'739
  • Nb Dim1:258
  • Nb Dim2:23'616
  • Nb Dim3:2
  • 注意事项:430'942

查询(Python):

# Get all Dim1 values where Dim2 equals to 'ABC' and Dim3 equals to 126 to an Array 

dim1_list = spark.read.format('delta').table('DBname.DataSource') \
  .select("dim2", "dim1").distinct() \
  .where( (F.col("dim2") == "ABC") & (F.col("dim3") == 126) ) \
  .select("dim1").collect() 

dim1_array = [int(row.dim1) for row in dim1_list]


# Nb of items concern by Dim1 in Array and Dim3 equal to 126

nb_items = spark.read.format('delta').table('DBname.DataSource') \
  .select("items").distinct() \
  .where( (F.col("Dim1").isin(dim1_array)) & (F.col("Dim3") == 126) ).count()

print( "Nb Dim1: %s" % str(len(dim1_array)) ) 
print( "Nb Items: %s" % str(nb_items) ) 

使用多群集多次运行后的结果(全部错误):

  • Nb-Dim1:67;注意事项:230958
  • Nb-Dim1:66;注意事项:229261
  • Nb-Dim1:67;注意事项:231010
  • Nb-Dim1:67;注意事项:231050
  • Nb-Dim1:67;注意事项:231045
  • Nb-Dim1:66;注意事项:230945

使用单个群集多次运行后的结果(正确答案):

  • Nb-Dim1:66;注意事项:231131
  • Nb-Dim1:66;注意事项:231131
  • Nb-Dim1:66;注意事项:231131

注意:我将Python代码转换为Scala,并在两个集群上运行了多次,我发现了相同的差异


Tags: falsetrue类型sql标准enabled集群spark