我正在使用spark流不断地从kafka读取数据并执行一些统计。我每秒钟都在流。在
所以我有第二批(dstream)。此数据流中的每个RDD都包含一个JSON。在
我的数据流是这样的:
kafkaStream = KafkaUtils.createDirectStream(stream, ['livedata'], {"metadata.broker.list": 'localhost:9092'})
raw = kafkaStream.map(lambda kafkaS: kafkaS[1])
clean = raw.map(lambda xs:json.loads(xs))
我的干净数据流中的一个RDD如下所示:
^{pr2}$我在每个数据流中有30-150个这样的RDD。在
现在,我要做的是,求出每个数据流中“长度”的总和,或者说“packetcounts”。也就是说
rdd1.length + rdd2.length + ... + LastRDDInTheOneSecondBatch.length
我尝试了:
add=clean.map(lambda xs: (xs['length'],1)).reduceByKey(lambda a, b: a+b)
我得到的:
频率而不是总和。在
(17, 6)
(6, 24)
我该怎么做才能得到总的和而不是键的频率?在
这是因为您将'length'的值用作键,请尝试以下操作:
必须为所有对(键、值)设置相同的键。值可以是字段长度或要聚合的其他字段。。。在
相关问题 更多 >
编程相关推荐