SPARK 2.2.2连接多个RDD时出现内存不足异常。结果RDD有124列。最佳连接方法是什么?

2024-10-05 14:27:46 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个文件,每个电话号码有多个值。例如:

phone_no circle operator priority1 attribute1 attribute2 attribute3 priority2 attribute1 attribute2 attribute3 
123445   delhi  airtel   1.0        info1      info2      info3      1.1        info4      info5      info6
987654   bhopal idea     1.1        info1      info2      info3      1.4        info4      info5      info6
123445   delhi  airtel   1.3        info1      info2      info3      1.0        info4      info5      info6

我期望的结果是: 对于每个电话号码,选择最小P1及其相应的属性值。你知道吗

正如我上面的示例所述,对于电话号码123445,第1行中的P1小于第3行中的P1(1.0<;1.3),因此我希望从第1行中选择属性1、2和3,而第3行中的P2具有较小的值(1.0<;1.1),因此我希望从第3行中选择属性值。你知道吗

以下是我想要的表格格式:

phone_no circle operator priority1 attribute1 attribute2 attribute3 priority2 attribute1 attribute2 attribute3 
123445   delhi  airtel   1.0        info1      info2      info3      1.0        info4      info5      info6
987654   bhopal idea     1.1        info1      info2      info3      1.4        info4      info5      info6

我有25个不同的优先级值,每个优先级值有4个不同的属性,所以我的列总数大约是125。你知道吗

到目前为止,我尝试了:

  1. 创建一个数据帧,其中电话号码作为键,每个优先级值的最小值。你知道吗
  2. 创建另一个数据帧,其值为min(Priority1),并为每个电话号码创建相应的属性。你知道吗
  3. 创建另一个数据帧,其值为min(Priority2)以及每个电话号码的相应属性。你知道吗
  4. 将电话号码上的这两个数据帧连接起来,以获得完整的信息并将此数据帧保存到磁盘。你知道吗

我的方法的问题是,考虑到我拥有的列的数量,它不是一个好方法。请给我建议一些解决这个问题的好方法。你知道吗

编辑1:这里是我所做工作的pastebin链接:https://pastebin.com/ps4f1KSh


Tags: 数据属性电话号码p1delhiinfo2attribute1attribute2
1条回答
网友
1楼 · 发布于 2024-10-05 14:27:46

我可能会使用窗口函数:

from pyspark.sql.window import Window
import pyspark.sql.functions as spf

df = spark.createDataFrame([
    (123, 1, 'a', 2, 'c'),
    (123, 2, 'b', 1, 'd'),
    (456, 3, 'e', 4, 'f')
], ['phone', 'priority1', 'attribute1', 'priority2', 'attribute2'])

w = Window.partitionBy('phone')
df2 = (
    df
    .select(
        'phone',
        spf.first('attribute1').over(w.orderBy('priority1')).alias('attribute1'),
        spf.first('attribute2').over(w.orderBy('priority2')).alias('attribute2'),
    )
)

(
    df2
    .groupby('phone')
    .agg(*[spf.first(c).alias(c) for c in df2.columns if c != 'phone'])
    .toPandas()
)

提供:

   phone attribute1 attribute2
0    123          a          d
1    456          e          f

这是一个练习,让读者将其模板化(例如,使用列表理解)以概括所有属性和优先级。你知道吗

相关问题 更多 >