我在PySpark中有两个RDDs
:
RDD1:
[(u'2013-01-31 00:00:00', u'a', u'Pab', u'abc', u'd'),(u'2013-01-31 00:00:00', u'a', u'ab', u'abc', u'g'),.....]
RDD2:
^{pr2}$两个RDDs
具有相同的编号或行。现在我要做的是从RDD1中获取每一行中的所有列(从unicode
转换为正常string
)和RDD2中每行的第二列(从unicode string
转换为float
),并用它形成一个新的RDD。因此,新的RDD将如下所示:
RDD3:
[('2013-01-31 00:00:00', 'a', 'Pab', 'abc', 'd',42.0),('2013-01-31 00:00:00', 'a', 'ab', u'abc', 'g',98.0),.....]
完成后,我想用第1列中的date
值对新的RDD3
中每一行的最后一个值(浮点值)进行aggregation
。对于date
是2013-01-31 00:00:00
的所有行,应该添加它们的最后一个数值。在
在PySpark中我该怎么做呢?在
对于问题的第一部分,即将两个RDD组合成一个,其中每一行都是一个7的元组,您可以这样做:
我不确定你最终需要什么,只是日期和第二个值的总和吗?如果是,则不需要所有值:
^{pr2}$您需要zipWithIndex您的}。在
RDDs
,此方法用您的数据和另一个表示该条目的索引的值创建一个元组,因此您可以通过index
连接{你的方法应该类似于(我打赌还有更有效的方法):
输出将是:
^{pr2}$[(0, (u'A', 0)), (4, (u'Z', 4)), (1, (u'B', 1)), (2, (u'C', 2)), (3, (u'A', 3))]
,在此之后,只需要map
来重新组合数据。E、 g.以下:输出将是:
[(u'A', 0), (u'Z', 4), (u'B', 1), (u'C', 2), (u'A', 3)]
关于数据类型转换,我以前遇到过这个问题,为了解决这个问题,我使用this snippet。在
将输出:
[('A', 0), ('Z', 4), ('B', 1), ('C', 2), ('A', 3)]
相关问题 更多 >
编程相关推荐