from pyspark import SparkContext, SparkConf
import sys
conf = SparkConf().setAppName("test")
sc = SparkContext(conf=conf)
from operator import add
def convertion(num):
return datetime.datetime.fromtimestamp(num).strftime('%Y-%m-%d')
def compute(strs, num):
if strs == 'apple':
return -num
return num
rdd = sc.parallelize([
{'user':'user','tpe':'apple','timstamp':1500000000,'amount':1},
{'user':'user','tpe':'pear','timstamp':1500000001,'amount':2},
{'user':'user2','tpe':'apple','timstamp':1505000002,'amount':3}
])
rdd = rdd.map(lambda x: ((x['user'],convertion(x['timstamp'])),compute(x['tpe'],x['amount'])))
rdd.reduceByKey(lambda x, y: x+y).take(3)
print(rdd.collect())
输出错误:[(('user', '2017-07-13'), -1), (('user', '2017-07-13'), 2), (('user2', '2017-09-09'), -3)]
我希望输出为:
[(('user', '2017-07-13'), 1), (('user2', '2017-09-09'), -3)]
我想我没有正确使用reducebykey,有人能告诉我如何根据键元组对它们进行分组吗
谢谢大家!
reduceByKey返回(与所有Spark转换一样)一个newrdd。此新rdd未分配给变量,因此不会执行转换
在最后一行中调用
rdd.collect()
时,变量rdd
仍然引用由rdd = rdd.map(...)
创建的rdd,并且打印map
调用后的内容应将
reduceByKey
的结果分配给变量,并删除take(3)
:相关问题 更多 >
编程相关推荐