pyspark中的reducebykey,元组中有多个键字段

2024-10-03 21:28:40 发布

您现在位置:Python中文网/ 问答频道 /正文

from pyspark import SparkContext, SparkConf

import sys

 
conf = SparkConf().setAppName("test")

sc = SparkContext(conf=conf)

from operator import add

def convertion(num):

    return datetime.datetime.fromtimestamp(num).strftime('%Y-%m-%d')

def compute(strs, num):

    if strs == 'apple':

        return -num

    return num

rdd = sc.parallelize([

    {'user':'user','tpe':'apple','timstamp':1500000000,'amount':1},

    {'user':'user','tpe':'pear','timstamp':1500000001,'amount':2},

    {'user':'user2','tpe':'apple','timstamp':1505000002,'amount':3}

])

rdd = rdd.map(lambda x: ((x['user'],convertion(x['timstamp'])),compute(x['tpe'],x['amount'])))

rdd.reduceByKey(lambda x, y: x+y).take(3)

print(rdd.collect())

输出错误:[(('user', '2017-07-13'), -1), (('user', '2017-07-13'), 2), (('user2', '2017-09-09'), -3)]

我希望输出为: [(('user', '2017-07-13'), 1), (('user2', '2017-09-09'), -3)]

我想我没有正确使用reducebykey,有人能告诉我如何根据键元组对它们进行分组吗

谢谢大家!


Tags: fromimportapplereturnconfamountnumsc
1条回答
网友
1楼 · 发布于 2024-10-03 21:28:40

reduceByKey返回(与所有Spark转换一样)一个newrdd。此新rdd未分配给变量,因此不会执行转换

在最后一行中调用rdd.collect()时,变量rdd仍然引用由rdd = rdd.map(...)创建的rdd,并且打印map调用后的内容

应将reduceByKey的结果分配给变量,并删除take(3)

rdd = rdd.map(lambda x: ((x['user'],convertion(x['timstamp'])),compute(x['tpe'],x['amount'])))

rdd = rdd.reduceByKey(lambda x, y: x+y)

print(rdd.collect())

相关问题 更多 >