使用map-reduce-pysp将元组列表转换为具有计数的rdd

2024-05-19 09:15:45 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个类似于以下的rdd:

[('C3', [{'Item': 'Shirt', 'Color ': 'Black', 'Size': '32','Price':'2500'}, {'Item': 'Sweater', 'Color ': 'Red', 'Size': '35', 'Price': '1000'},  {'Item': 'Jeans', 'Color ': 'Yellow', 'Size': '30', 'Price': '1500'}]), ('C1', [{'Item': 'Shirt', 'Color ': 'Green', 'Size': '25', 'Price': '2000'}, {'Item': 'Saree', 'Color ': 'Green', 'Size': '25', 'Price': '1500'}, {'Item': 'Saree', 'Color ': 'Green', 'Size': '25', 'Price': '1500'}, {'Item': 'Jeans', 'Color ': 'Yellow', 'Size': '30', 'Price': '1500'}])]

我们可以通过以下方式创建rdd:

^{pr2}$

我需要创建一个类似于以下内容的dataframe/rdd(我将向所有属性添加count)

{'C1': {'Color ': {'Green': 3, 'Yellow': 1},
'Item': {'Jeans': 1, 'Saree': 2, 'Shirt': 1},
'Price': {'1500': 3, '2000': 1},
'Size': {'25': 3, '30': 1}},
'C3': {'Color ': {'Black': 1, 'Red': 1, 'Yellow': 1},
'Item': {'Jeans': 1, 'Shirt': 1, 'Sweater': 1},
'Price': {'1000': 1, '1500': 1, '2500': 1},
'Size': {'30': 1, '32': 1, '35': 1}}}

相应的数据帧/rdd将是:

+-------+---------------------------------------------------------------------
|custo      |attr                                                                                      
|C1     |Map(Color -> Map(Green -> 3, yellow -> 1), Item -> Map(Jeans -> 1, Saree  -> 2, Shirt ->1),  Price ->  | 
+-------+-------------------------------------------------------------------------------------------------------+

Tags: mapsizegreenitempricecolorblackrdd
1条回答
网友
1楼 · 发布于 2024-05-19 09:15:45

使用自定义项收集计数。在

from pyspark.sql import functions as f
from pyspark.sql import types as t

def count(c_dict):
    res = {}
    for item in c_dict:
        print(type(item))
        for key in item:
            print(key, item[key])
            if key in res:
                if item[key] in res[key]:
                    res[key][item[key]]+= 1
                else:
                    res[key][item[key]] = 1
            else:
                res[key]={}
                res[key][item[key]] = 1
    return(res)
schema = t.MapType(t.StringType(), t.MapType(t.StringType(), t.IntegerType()))
count_udf = f.udf(count, schema)

df2 = df.withColumn( 'col2' , count_udf(df.col2))
df.collect()

结果

^{pr2}$

相关问题 更多 >

    热门问题