有时Spark会以低效的方式“优化”数据帧计划。考虑Spark 2.1中的以下示例(也可以在Spark 1.6中复制):
val df = sparkContext.parallelize((1 to 500).map(i=> scala.util.Random.nextDouble),100).toDF("value")
val expensiveUDF = udf((d:Double) => {Thread.sleep(100);d})
val df_result = df
.withColumn("udfResult",expensiveUDF($"value"))
df_result
.coalesce(1)
.saveAsTable(tablename)
在这个例子中,我想在一个昂贵的数据帧转换之后写一个文件(这只是一个例子来说明这个问题)。Spark向上移动coalesce(1)
,这样UDF只应用于包含1个分区的数据帧,从而破坏了并行性(有趣的是,repartition(1)
并没有这样做)。在
概括地说,当我想在转换的某个部分增加并行性,但随后又降低并行性时,就会出现这种行为。在
我找到了一个解决方法,它包括缓存数据帧,然后触发对数据帧的完整评估:
^{pr2}$我的问题是:在这种情况下,有没有其他方法可以告诉Spark不要降低并行性?在
其实并不是因为SparkSQL的优化,SparkSQL并没有改变Coalesce操作符的位置,如执行的计划所示:
我引用coalesce API的描述中的一段话:
注:本段由jira SPARK-19399添加。所以它不应该在2.0的API中找到。在
coalesceapi不执行shuffle,但导致以前的RDD和当前的RDD之间的依赖性很小。由于RDD是延迟求值,因此计算实际上是通过合并分区完成的。在
为了防止这种情况发生,您应该使用重新分区API。在
相关问题 更多 >
编程相关推荐