如何从单个RDD向中添加包含2个RDD的列,然后根据PySp中的日期数据进行行聚合

2024-09-21 07:40:47 发布

您现在位置:Python中文网/ 问答频道 /正文

我在PySpark中有两个RDDs

RDD1:

[(u'2013-01-31 00:00:00', u'a', u'Pab', u'abc', u'd'),(u'2013-01-31 00:00:00', u'a', u'ab', u'abc', u'g'),.....]

RDD2:

^{pr2}$

两个RDDs具有相同的编号或行。现在我要做的是从RDD1中获取每一行中的所有列(从unicode转换为正常string)和RDD2中每行的第二列(从unicode string转换为float),并用它形成一个新的RDD。因此,新的RDD将如下所示:

RDD3:

[('2013-01-31 00:00:00', 'a', 'Pab', 'abc', 'd',42.0),('2013-01-31 00:00:00', 'a', 'ab', u'abc', 'g',98.0),.....]

完成后,我想用第1列中的date值对新的RDD3中每一行的最后一个值(浮点值)进行aggregation。对于date2013-01-31 00:00:00的所有行,应该添加它们的最后一个数值。在

在PySpark中我该怎么做呢?在


Tags: datestringabunicodefloat编号pysparkabc
2条回答

对于问题的第一部分,即将两个RDD组合成一个,其中每一行都是一个7的元组,您可以这样做:

rdd3 = rdd1.zip(rdd2).map(lambda ((a,b,c,d,e), (f,g)): (a,b,c,d,e,f,g))

我不确定你最终需要什么,只是日期和第二个值的总和吗?如果是,则不需要所有值:

^{pr2}$

您需要zipWithIndex您的RDDs,此方法用您的数据和另一个表示该条目的索引的值创建一个元组,因此您可以通过index连接{}。在

你的方法应该类似于(我打赌还有更有效的方法):

rdd1 = sc.parallelize([u"A", u"B", u"C", u"A", u"Z"])
rdd2 = sc.parallelize(xrange(5))

zdd1 = rdd1.zipWithIndex().map(lambda (v, k): (k, v))
zdd2 = rdd2.zipWithIndex().map(lambda (v, k): (k, v))

print zdd1.join(zdd2).collect()

输出将是: [(0, (u'A', 0)), (4, (u'Z', 4)), (1, (u'B', 1)), (2, (u'C', 2)), (3, (u'A', 3))],在此之后,只需要map来重新组合数据。E、 g.以下:

^{pr2}$

输出将是: [(u'A', 0), (u'Z', 4), (u'B', 1), (u'C', 2), (u'A', 3)]

关于数据类型转换,我以前遇到过这个问题,为了解决这个问题,我使用this snippet。在

import unicodedata

convert = lambda (v1, v2): (unicodedata.normalize('NFKD', v1)
                                       .encode('ascii','ignore'), v2)

combinedRDD = combinedRDD.map(convert)
print combinedRDD.collect()

将输出:[('A', 0), ('Z', 4), ('B', 1), ('C', 2), ('A', 3)]

相关问题 更多 >

    热门问题