PySpark:GroupByKey和获取元组的元组和

2024-09-29 23:21:36 发布

您现在位置:Python中文网/ 问答频道 /正文

我有这组数据:

[('Manhattan', ('East Village', 2)),
('Manhattan', ('Theater District', 2)),
('Queens', ('Sunnyside', 2)),
('Manhattan', ('Murray Hill', 2)),
('Manhattan', ('Battery Park City', 2)),
('Queens', ('John F. Kennedy International Airport', 2)),
('Queens', ('LaGuardia Airport', 2)),
('Manhattan', ('NoHo', 2)),
('Manhattan', ('Chinatown', 2)),
('Brooklyn', ('Brooklyn Heights', 2))]

我的目标是,对于每一个行政区,我希望在总和方面排名前三

此数据的格式为

X = (Borough, (Neighborhood, total))

我的思考过程是:

我想对这些数据做一个groupbykey,首先得到所有三个行政区,然后是三个最高的社区,因此代码:

X.groupByKey().mapValues(sum).collect()

但是,据我所知,这将给出一个错误,因为第二个元素也是一个元组,我想访问第二个元组的第二个元素,我不知道如何访问

同样,通过这种方式,我只需要聚合数据,因此我编写了这段代码,它将为我提供三个最高的邻域:

def findingLargest(item):
    from heapq import nlargest
    i, j = item
    tops = nlargest(3, j,key=lambda x: x[1])
    return (i, tops)

所以,我能想到的最后一个代码是:

X.groupByKey()\
 .map(findingLargest)

预期产出:

Borough, Top_1 Neighborhood, Top_1_count, Top_2 Neighborhood, Top_2_count

关于如何进行这项工作,有什么建议吗


Tags: 数据代码元素topitem行政区元组queens
1条回答
网友
1楼 · 发布于 2024-09-29 23:21:36

我有一个解决方案,但它需要每次从rdd切换到使用DataFrame。最直接的实现是直接使用DataFrame

data = sc.parallelize([('Manhattan', ('East Village', 2)),
('Manhattan', ('Theater District', 2)),
('Queens', ('Sunnyside', 2)),
('Manhattan', ('Murray Hill', 2)),
('Manhattan', ('Battery Park City', 2)),
('Queens', ('John F. Kennedy International Airport', 2)),
('Queens', ('LaGuardia Airport', 2)),
('Manhattan', ('NoHo', 2)),
('Manhattan', ('Chinatown', 2)),
('Brooklyn', ('Brooklyn Heights', 2))])

将rdd转换为(键1\u键2,值)格式:

data = data.map(lambda l: (l[0] + "_" + l[1][0], l[1][1]))
data.take(2)
# [('Manhattan_East Village', 2), ('Manhattan_Theater District', 2)]

然后汇总:

data = data.reduceByKey(lambda x,y:x+y)
data.take(2)
# [('Manhattan_Theater District', 2), ('Queens_John F. Kennedy International Airport', 2)]

拆分以获取(键1、键2、值)格式:

data2 = data.map(lambda l: (l[0].split("_"), l[1]))
data2 = data2.map(lambda l: (l[0][0], l[0][1], l[1]))
data2.take(2)
# [('Manhattan', 'Theater District', 2), ('Queens', 'John F. Kennedy International Airport', 2)]

使用DataFrameAPI选择前n个特性会更容易(事实上,第一部分会更容易)。我使用window函数:

df = data2.toDF(['district','neighbor','count'])
import pyspark.sql.functions as psf
import pyspark.sql.window as psw

w = psw.Window.partitionBy('district').orderBy(psf.desc('count'))
df = (df.select(psf.col('*'), psf.row_number().over(w).alias('row_number'))
      .where(psf.col('row_number') <= 3)
     )
df.show(10)
+    -+          +  -+     +
| district|            neighbor|count|row_number|
+    -+          +  -+     +
|   Queens|John F. Kennedy I...|    2|         1|
|   Queens|   LaGuardia Airport|    2|         2|
|   Queens|           Sunnyside|    2|         3|
| Brooklyn|    Brooklyn Heights|    2|         1|
|Manhattan|    Theater District|    2|         1|
|Manhattan|           Chinatown|    2|         2|
|Manhattan|         Murray Hill|    2|         3|
+    -+          +  -+     +

要获得所需的输出,一种方法是切换回rdd

df.rdd.map(lambda l: (l[0], (l[1], l[2]))).reduceByKey(lambda x,y: x + y).take(2)
# [('Manhattan', ('Theater District', 2, 'Chinatown', 2, 'Murray Hill', 2)),
 ('Brooklyn', ('Brooklyn Heights', 2))]

相关问题 更多 >

    热门问题