加入一个巨大的spark数据帧

2024-10-01 11:28:56 发布

您现在位置:Python中文网/ 问答频道 /正文

我的问题是:

  • 我有一个名为details的大数据帧,包含900K行,另一个包含80M行,名为attributes

  • 两个都有一个列A,我想在该列上执行左外部联接,左数据帧是deatils

  • 在数据帧details的列A中只有75K个唯一条目。数据帧attributesA中的80M个唯一条目。

实现join操作的最佳方法是什么?在

我试过什么?

  • 简单的连接即details.join(attributes, "A", how="left_outer")只是超时(或内存不足)。

  • 由于在details的列A中只有75K个唯一条目,所以我们不关心attributes中数据帧中的其余条目。所以,首先我用以下方法过滤:

    uniqueA = details.select('A').distinct().collect()
    uniqueA = map(lambda x: x.A, uniqueA)
    attributes_filtered = attributes.filter(attributes.A.isin(*uniqueA))
    

    我认为这是可行的,因为attributes表从80M行减少到了75K行。但是,完成join仍然需要花费很长时间(而且永远不会完成)。

  • 接下来,我认为分区太多,要连接的数据不在同一个分区上。不过,我不知道如何将所有数据带到同一个分区,我认为重新分区可能会有所帮助。就这样。在

    details_repartitioned = details.repartition("A")
    attributes_repartitioned = attributes.repartition("A")
    
  • 上面的操作将attributes中的分区数从70K减少到200。details中的分区数约为1100个。在

    details_attributes = details_repartitioned.join(broadcast(
    attributes_repartitioned), "A", how='left_outer')  # tried without broadcast too
    

在所有这些之后,join仍然不起作用。我还在学习PySpark,所以我可能误解了重新划分背后的基本原理。如果有人能解释清楚,那就太好了。在

另外,我已经看过this问题,但这不能回答这个问题。在


Tags: 数据方法条目detailsleftattributeshowbroadcast
1条回答
网友
1楼 · 发布于 2024-10-01 11:28:56

Details表有900k个条目,在A列中有75k个不同的条目。我认为您尝试的A列上的过滤器是正确的。但是,收集和随后的地图操作

attributes_filtered = attributes.filter(attributes.A.isin(*uniqueA)) 

这太贵了。另一种方法是

^{pr2}$

另外,如果还没有设置shuffle分区,那么可能需要正确设置该分区。在

数据集中可能出现的一个问题是倾斜。它可能发生在75k个唯一值中,只有少数值与属性表中的大量行相连接。在这种情况下,加入可能需要更长的时间,而且可能无法完成。在

要解决这个问题,您需要找到列A的倾斜值并分别处理它们。在

相关问题 更多 >