大家好,我的社区
使用Spark DataRicks时,我在使用相同的数据源表(相同的数据内容)多次运行相同的查询时遇到问题。使用具有1个驱动程序/5个工作进程的集群会导致每个查询产生不同的错误结果(可能会丢失数据),但使用具有1个驱动程序/1个工作进程的集群,每次的输出结果都是正确的
出现这种错误的原因是什么?我如何纠正它?
以下是有关查询环境的信息:
多群集参数:
单簇参数:
多集群和单集群配置:
spark.sql.autoBroadcastJoinThreshold 10mb
spark.sql.join.preferSortMergeJoin false
spark.databricks.io.cache.maxMetaDataCache 1g
spark.databricks.io.cache.compression.enabled true
spark.sql.inMemoryColumnarStorage.batchSize 1000000
spark.sql.adaptive.coalescePartitions.enabled true
spark.databricks.delta.stateReconstructionValidation.enabled false
spark.sql.crossJoin.enabled false
spark.databricks.preemption.enabled true
spark.databricks.preemption.timeout 2s
spark.databricks.io.cache.enabled true
spark.sql.shuffle.partitions 500
spark.sql.broadcastTimeout 7200
spark.sql.inMemoryColumnarStorage.compressed true
spark.databricks.io.cache.maxDiskUsage 350g
spark.databricks.preemption.threshold 0.7
spark.databricks.preemption.interval 500ms
spark.task.reaper.enabled true
spark.scheduler.revive.interval 50ms
spark.databricks.delta.commitValidation.enabled false
spark.databricks.service.port 8787
spark.sql.adaptive.advisoryPartitionSizeInBytes 6128mb
spark.sql.files.maxPartitionBytes 134217728
spark.default.parallelism 96
数据源表信息:
查询(Python):
# Get all Dim1 values where Dim2 equals to 'ABC' and Dim3 equals to 126 to an Array
dim1_list = spark.read.format('delta').table('DBname.DataSource') \
.select("dim2", "dim1").distinct() \
.where( (F.col("dim2") == "ABC") & (F.col("dim3") == 126) ) \
.select("dim1").collect()
dim1_array = [int(row.dim1) for row in dim1_list]
# Nb of items concern by Dim1 in Array and Dim3 equal to 126
nb_items = spark.read.format('delta').table('DBname.DataSource') \
.select("items").distinct() \
.where( (F.col("Dim1").isin(dim1_array)) & (F.col("Dim3") == 126) ).count()
print( "Nb Dim1: %s" % str(len(dim1_array)) )
print( "Nb Items: %s" % str(nb_items) )
使用多群集多次运行后的结果(全部错误):
使用单个群集多次运行后的结果(正确答案):
注意:我将Python代码转换为Scala,并在两个集群上运行了多次,我发现了相同的差异
目前没有回答
相关问题 更多 >
编程相关推荐