如何计算RDDs中每个数据流中所有值的总和?

2024-10-03 11:20:58 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在使用spark流不断地从kafka读取数据并执行一些统计。我每秒钟都在流。在

所以我有第二批(dstream)。此数据流中的每个RDD都包含一个JSON。在

我的数据流是这样的:

kafkaStream = KafkaUtils.createDirectStream(stream, ['livedata'], {"metadata.broker.list": 'localhost:9092'})
raw = kafkaStream.map(lambda kafkaS: kafkaS[1])
clean = raw.map(lambda xs:json.loads(xs))

我的干净数据流中的一个RDD如下所示:

^{pr2}$

我在每个数据流中有30-150个这样的RDD。在

现在,我要做的是,求出每个数据流中“长度”的总和,或者说“packetcounts”。也就是说

rdd1.length + rdd2.length + ... + LastRDDInTheOneSecondBatch.length

我尝试了:

add=clean.map(lambda xs: (xs['length'],1)).reduceByKey(lambda a, b: a+b)

我得到的:

频率而不是总和。在

(17, 6)
(6, 24)

我该怎么做才能得到总的和而不是键的频率?在


Tags: kafkalambdacleanmapraw读取数据lengthspark
1条回答
网友
1楼 · 发布于 2024-10-03 11:20:58

这是因为您将'length'的值用作键,请尝试以下操作:

add=clean.map(lambda xs: ('Lenght',xs['length'])).reduceByKey(lambda a, b: a+b)

必须为所有对(键、值)设置相同的键。值可以是字段长度或要聚合的其他字段。。。在

相关问题 更多 >