聚结降低整个级的平行度(火花)

2024-10-01 09:33:54 发布

您现在位置:Python中文网/ 问答频道 /正文

有时Spark会以低效的方式“优化”数据帧计划。考虑Spark 2.1中的以下示例(也可以在Spark 1.6中复制):

val df = sparkContext.parallelize((1 to 500).map(i=> scala.util.Random.nextDouble),100).toDF("value")

val expensiveUDF = udf((d:Double) => {Thread.sleep(100);d})

val df_result = df
.withColumn("udfResult",expensiveUDF($"value"))

df_result
.coalesce(1)
.saveAsTable(tablename)

在这个例子中,我想在一个昂贵的数据帧转换之后写一个文件(这只是一个例子来说明这个问题)。Spark向上移动coalesce(1),这样UDF只应用于包含1个分区的数据帧,从而破坏了并行性(有趣的是,repartition(1)并没有这样做)。在

概括地说,当我想在转换的某个部分增加并行性,但随后又降低并行性时,就会出现这种行为。在

我找到了一个解决方法,它包括缓存数据帧,然后触发对数据帧的完整评估:

^{pr2}$

我的问题是:在这种情况下,有没有其他方法可以告诉Spark不要降低并行性?在


Tags: 数据方法示例dfvalue方式valresult
1条回答
网友
1楼 · 发布于 2024-10-01 09:33:54

其实并不是因为SparkSQL的优化,SparkSQL并没有改变Coalesce操作符的位置,如执行的计划所示:

Coalesce 1
+- *Project [value#2, UDF(value#2) AS udfResult#11]
   +- *SerializeFromObject [input[0, double, false] AS value#2]
      +- Scan ExternalRDDScan[obj#1]

我引用coalesce API的描述中的一段话:

注:本段由jira SPARK-19399添加。所以它不应该在2.0的API中找到。在

However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1). To avoid this, you can call repartition. This will add a shuffle step, but means the current upstream partitions will be executed in parallel (per whatever the current partitioning is).

coalesceapi不执行shuffle,但导致以前的RDD和当前的RDD之间的依赖性很小。由于RDD是延迟求值,因此计算实际上是通过合并分区完成的。在

为了防止这种情况发生,您应该使用重新分区API。在

相关问题 更多 >