在apachespark中使用联合比使用join更有效,还是无关紧要?

2024-04-27 13:48:07 发布

您现在位置:Python中文网/ 问答频道 /正文

最近我在apachespark集群上运行一个作业,我打算在两个rdd上执行一个内部连接。然而,我当时认为对于这个计算,我可以通过使用union、reduceByKey和filter来避免join。但这基本上就是join在幕后所做的吗?在

假设rdd中的对象具有以下结构:

{ 'key':'someKey', 'value': <some positive integer> }

为了避免加入我会写下:

leftRDD = rdd1.map(lambda y: (y['key'], (1, y['value'], -1))
rightRDD = rdd2.map(lambda y: (y['key'], (0, -1, y['value']))
joinedRDD = (leftRDD + rightRDD) \
    .reduceByKey(lambda x,y: (max(x[0],y[0]), max(x[1],y[1]), max(x[2],y[2])) \
    .filter(lambda y: y[1][0] == 1)

joineddd现在可以有效地获得与我进行内部连接相同的结果,但是为了避免连接而增加的复杂性值得吗?在


Tags: lambdakeymapvalue作业集群filtermax
1条回答
网友
1楼 · 发布于 2024-04-27 13:48:07

Pypark连接在可伸缩性方面通常很差,所以您对手动RDD操作的直觉可能是一个不错的直觉。在

尤其是pyspark中的连接会丢失分区,因此不支持联合分区的连接。在

对于具体细节:您应该注意reduceByKey的语义:它输出与输入相同的数据结构。根据代码,您可能期望得到不同的结果。在

有关reduceByKey的更多信息,请看(PySpark) Nested lists after reduceByKey。在

更新

本机scala版本在保留现有分区方面更具侵略性(不会导致完全混乱):

if (self.partitioner == Some(partitioner)) {
  self.mapPartitions(iter => {
    val context = TaskContext.get()
    new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
  }, preservesPartitioning = true)
} else {
  new ShuffledRDD[K, V, C](self, partitioner)
    .setSerializer(serializer)
    .setAggregator(aggregator)
    .setMapSideCombine(mapSideCombine)
}

相反,python版本总是会引起混乱:

^{pr2}$

正是因为这个原因,我注意到了使用reduceByKey的pyspark的性能问题。在

总的“答案”并不是一个明确的“是”还是“否”:我是说“可能是”,这取决于您如何编写定制的pyspark RDD代码,而不是仅仅使用join()——这总是会导致混乱。在

相关问题 更多 >