无法反序列化pai中具有不同项数的RDD

2024-10-02 00:20:25 发布

您现在位置:Python中文网/ 问答频道 /正文

我有两个RDD有键值对。我想通过键连接它们(根据键,得到所有值的笛卡尔积),我假设可以用pyspark的zip()函数来完成。但是,当我用这个的时候

elemPairs = elems1.zip(elems2).reduceByKey(add)

它给了我一个错误:

^{pr2}$

下面是我尝试压缩的2个RDD:

elems1 => [((0, 0), ('A', 0, 90)), ((0, 1), ('A', 0, 90)), ((0, 2), ('A', 0, 90)), ((0, 3), ('A', 0, 90)), ((0, 4), ('A', 0, 90)), ((0, 0), ('A', 1, 401)), ((0, 1), ('A', 1, 401)), ((0, 2), ('A', 1, 401)), ((0, 3), ('A', 1, 401)), ((0, 4), ('A', 1, 401)), ((1, 0), ('A', 0, 305)), ((1, 1), ('A', 0, 305)), ((1, 2), ('A', 0, 305)), ((1, 3), ('A', 0, 305)), ((1, 4), ('A', 0, 305)), ((1, 0), ('A', 1, 351)), ((1, 1), ('A', 1, 351)), ((1, 2), ('A', 1, 351)), ((1, 3), ('A', 1, 351)), ((1, 4), ('A', 1, 351)), ((2, 0), ('A', 0, 178)), ((2, 1), ('A', 0, 178)), ((2, 2), ('A', 0, 178)), ((2, 3), ('A', 0, 178)), ((2, 4), ('A', 0, 178)), ((2, 0), ('A', 1, 692)), ((2, 1), ('A', 1, 692)), ((2, 2), ('A', 1, 692)), ((2, 3), ('A', 1, 692)), ((2, 4), ('A', 1, 692)), ((3, 0), ('A', 0, 936)), ((3, 1), ('A', 0, 936)), ((3, 2), ('A', 0, 936)), ((3, 3), ('A', 0, 936)), ((3, 4), ('A', 0, 936)), ((3, 0), ('A', 1, 149)), ((3, 1), ('A', 1, 149)), ((3, 2), ('A', 1, 149)), ((3, 3), ('A', 1, 149)), ((3, 4), ('A', 1, 149))]

elems2 => [((0, 0), ('B', 0, 573)), ((1, 0), ('B', 0, 573)), ((2, 0), ('B', 0, 573)), ((3, 0), ('B', 0, 573)), ((4, 0), ('B', 0, 573)), ((0, 0), ('B', 1, 324)), ((1, 0), ('B', 1, 324)), ((2, 0), ('B', 1, 324)), ((3, 0), ('B', 1, 324)), ((4, 0), ('B', 1, 324))]

其中((0, 0), ('B', 0, 573)), (0, 0)是键,('B', 0, 573)是值。在

在谷歌上快速搜索后,我发现这是一个只在spark 1.2中出现的问题,但我使用的是spark 1.5


Tags: 函数add错误zipsparkpyspark键值rdd
2条回答

错误消息的原因在RDD API中描述。在

Zips this RDD with another one, returning key-value pairs with the first element in each RDD, second element in each RDD, etc. Assumes that the two RDDs have the same number of partitions and the same number of elements in each partition (e.g. one was made through a map on the other).

正如@alwaysprep所说,您可以使用join,因为zip做了完全不同的事情:

val a = sc.parallelize(1 to 100, 3)
val b = sc.parallelize(101 to 200, 3)
a.zip(b).collect
res1: Array[(Int, Int)] = Array((1,101), (2,102), (3,103), (4,104), (5,105), 
(6,106), (7,107), (8,108), (9,109), (10,110), (11,111), (12,112), (13,113), 
(14,114), (15,115), (16,116), (17,117), (18,118), (19,119), (20,120), (21,121), 
(22,122), (23,123), (24,124), (25,125), (26,126), (27,127), (28,128), (29,129), 
(30,130), (31,131), (32,132), (33,133), (34,134), (35,135), (36,136), (37,137), 
(38,138), (39,139), (40,140), (41,141), (42,142), (43,143), (44,144), (45,145), 
(46,146), (47,147), (48,148), (49,149), (50,150), (51,151), (52,152), (53,153), 
(54,154), (55,155), (56,156), (57,157), (58,158), (59,159), (60,160), (61,161), 
(62,162), (63,163), (64,164), (65,165), (66,166), (67,167), (68,168), (69,169), 
(70,170), (71,171), (72,172), (73,173), (74,174), (75,175), (76,176), (77,177), 
(78,...

如您所见,zip将数组a的第n个元素与数组b的第n个元素相关联,因此数组的大小必须相同。在

在您的例子中,数组elem1包含的元素比elem2更多——也许您可以看看rightOuterJoin(或leftOuterJoin)。这是因为.join将跳过两个RDD中没有键的元素。例如,我看到(4,0)只存在于elem2中。如果您join它们,那么它将被跳过,因为在elem1数组中找不到它。另外,如果您真的想要笛卡尔积,还有一个方法.cartesian。在

为什么不直接使用elems1.join(elems2)

相关问题 更多 >

    热门问题