Spark:多个rdd的操作

2024-05-01 11:53:01 发布

您现在位置:Python中文网/ 问答频道 /正文

最近我在用pyspark做一个项目,遇到了一个我不知道如何解决的问题。 基本上操作包括3个文件,每个文件如下所示

文件1:将一个id集(idset1)映射到另一个id集(idset2)

线条看起来像

[000001, 15120001]
[000002, 15120002]
...

文件2:将idset2中的id映射到idset2中每个id中包含的项

线条看起来像

^{pr2}$

File3:一系列数字对应于每个id中的每一项

线条看起来像

[600001, 1.11, 1.12, 1.32, 1.42, ..., 1.51]
[600002, 5.12, 5.21, 5.23, 5.21, ..., 5.21]
[601988, 52.1, 52.1, 52.2, 52.4, ..., 52.1]
...

我需要做的是得到

[000001, (1.11+5.12)/2,(1.12+5.21)/2,...,(1.51+5.21)/2]
[000002, 52.1, 52.1, 52.2, 52.4, ..., 52.1]
...

也就是说,将idset1中的id映射到idset2的id中对应于idset1中每个id的项的加权平均值。在

如果有人明白我的意思,请帮我。顺便说一句,id不是自动递增的,而是预先分配的。感谢所有提前帮助我的人。在


Tags: 文件项目id数字pyspark线条平均值pr2
1条回答
网友
1楼 · 发布于 2024-05-01 11:53:01

让我们从创建示例数据开始。我假设所有的id实际上都是字符串,但它不会真正影响进一步的计算。在

rdd1 = sc.parallelize([["000001", "15120001"], ["000002", "15120002"]])

rdd2 = sc.parallelize([
    ["15120001", "600001"], ["15120001", "600002"],
    ["15120002", "601988"]
])

rdd3 = sc.parallelize([
    ["600001", 1.11, 1.12, 1.32, 1.42, 1.51],
    ["600002", 5.12, 5.21, 5.23, 5.21, 5.21],
    ["601988", 52.1, 52.1, 52.2, 52.4, 52.1]
])

接下来让我们将所有RDDs转换为DataFrames

^{pr2}$

联接数据:

from pyspark.sql.functions import col

combined = (df1
    .join(df2, col("id2") == col("id2_"))
    .join(df3, col("item_id") == col("item_id_")))

和聚合:

from pyspark.sql.functions import avg

exprs = [avg(x).alias(x) for x in feature_names]
aggregated = combined.groupBy(col("id1")).agg(*exprs)
aggregated.show()

## +   +  -+  -+         +  -+  +
## |   id1|  x_0|  x_1|               x_2|  x_3| x_4|
## +   +  -+  -+         +  -+  +
## |000001|3.115|3.165|3.2750000000000004|3.315|3.36|
## |000002| 52.1| 52.1|              52.2| 52.4|52.1|
## +   +  -+  -+         +  -+  +

如果需要,可以将聚合数据转换回RDD

aggregated.map(tuple).collect()
## [('000001', 3.115, 3.165, 3.2750000000000004, 3.315, 3.36),
##     ('000002', 52.1, 52.1, 52.2, 52.4, 52.1)]

相关问题 更多 >