我的问题是:
我有一个名为details
的大数据帧,包含900K行,另一个包含80M行,名为attributes
。
两个都有一个列A
,我想在该列上执行左外部联接,左数据帧是deatils
。
在数据帧details
的列A
中只有75K个唯一条目。数据帧attributes
列A
中的80M个唯一条目。
实现join
操作的最佳方法是什么?在
我试过什么?
简单的连接即details.join(attributes, "A", how="left_outer")
只是超时(或内存不足)。
由于在details
的列A
中只有75K个唯一条目,所以我们不关心attributes
中数据帧中的其余条目。所以,首先我用以下方法过滤:
uniqueA = details.select('A').distinct().collect()
uniqueA = map(lambda x: x.A, uniqueA)
attributes_filtered = attributes.filter(attributes.A.isin(*uniqueA))
我认为这是可行的,因为attributes
表从80M行减少到了75K行。但是,完成join
仍然需要花费很长时间(而且永远不会完成)。
接下来,我认为分区太多,要连接的数据不在同一个分区上。不过,我不知道如何将所有数据带到同一个分区,我认为重新分区可能会有所帮助。就这样。在
details_repartitioned = details.repartition("A")
attributes_repartitioned = attributes.repartition("A")
上面的操作将attributes
中的分区数从70K减少到200。details
中的分区数约为1100个。在
details_attributes = details_repartitioned.join(broadcast(
attributes_repartitioned), "A", how='left_outer') # tried without broadcast too
在所有这些之后,join
仍然不起作用。我还在学习PySpark,所以我可能误解了重新划分背后的基本原理。如果有人能解释清楚,那就太好了。在
另外,我已经看过this问题,但这不能回答这个问题。在
Details表有900k个条目,在A列中有75k个不同的条目。我认为您尝试的A列上的过滤器是正确的。但是,收集和随后的地图操作
这太贵了。另一种方法是
^{pr2}$另外,如果还没有设置shuffle分区,那么可能需要正确设置该分区。在
数据集中可能出现的一个问题是倾斜。它可能发生在75k个唯一值中,只有少数值与属性表中的大量行相连接。在这种情况下,加入可能需要更长的时间,而且可能无法完成。在
要解决这个问题,您需要找到列A的倾斜值并分别处理它们。在
相关问题 更多 >
编程相关推荐