我有一个map函数,它创建一个类型为[Tuple,Integer]的键值对,但是当我使用combineByKey来添加整数值时,它会创建一个包含这些整数的元组,而不是添加它们
我试过输入整数,但没用
def subset_from_kv(para):
sol = []
para_new = (i for i in (para and ff))
# print(list(para))
tt = (itertools.combinations(sorted(para_new), size))
for j in tt:
flag = True
for i in list(itertools.combinations(list(j), size-1)):
if (i not in (frequent_itemset_val)):
flag = False
break
if flag is True:
sol.append(j)
return sol
rdd1_values = rdd1_original.values()
rdd_inter = rdd1_values.mapPartitions(lambda t: subset_from_kv(t)).map(lambda x: (x, 1))
new_item_rdd_size_i = rdd_inter.combineByKey(
lambda value: (value),
lambda x, y: (x + y),
lambda x, y: (x, y)
)
rdd_inter.collect()
给出:
[(('-050d_XIor1NpCuWkbIVaQ', '-4TMQnQJW1yd6NqGRDvAeA'), 1), (('-050d_XIor1NpCuWkbIVaQ', '-6h3K1hj0d4DRcZNUtHDuw'), 1), (('-050d_XIor1NpCuWkbIVaQ', '-6tvduBzjLI1ISfs3F_qTg'), 1), (('-050d_XIor1NpCuWkbIVaQ', '-9eNGMp8XiygI8t8QFuFWw'), 1), (('-050d_XIor1NpCuWkbIVaQ', '-9nai28tnoylwViuJVrYEQ'), 1).....
编辑:我有3个分区
新物品\u rdd \u尺寸\u我希望提供
[(('-050d_XIor1NpCuWkbIVaQ', '-4TMQnQJW1yd6NqGRDvAeA'),(10, 1, 3), (('-050d_XIor1NpCuWkbIVaQ', '-6h3K1hj0d4DRcZNUtHDuw'), (12, 13, 5), (('-050d_XIor1NpCuWkbIVaQ', '-6tvduBzjLI1ISfs3F_qTg'), (21, 7, 33), (('-050d_XIor1NpCuWkbIVaQ', '-9eNGMp8XiygI8t8QFuFWw'), (111, 34, 14), (('-050d_XIor1NpCuWkbIVaQ', '-9nai28tnoylwViuJVrYEQ'), (41, 33, 11)...
但输出是串联1的值,而不是相加:
[(('-050d_XIor1NpCuWkbIVaQ', '-Bdw-5H5C4AYSMGnAvmnzw'), ((1, 1), 1)), (('-050d_XIor1NpCuWkbIVaQ', '-De4AV1Fx67mDMGrFOw44Q'), ((1, 1), 1)), (('-050d_XIor1NpCuWkbIVaQ', '-FLnsWAa4AGEW4NgE8Fqew'), ((1, 1), 1)), (('-050d_XIor1NpCuWkbIVaQ', '-Ht7HiGBox8lS1Y8IPjO8g'), ((1, 1), 1)), (('-050d_XIor1NpCuWkbIVaQ', '-ZBfr1BHvArFp1d6XH8jOQ'), ((1, 1), 1))]
根据official doc
使用:
在您的情况下:
lambda value: (value)
:创建一个元组lambda x, y: (x + y)
:添加2个元组==>;1个元组,包含2个元素lambda x, y: (x, y)
:创建一个包含2个元素的元组李>因此,您的最终输出是元组的元组的元组的元组的元组等等
您应该尝试一种更简单的方法,比如
reduceByKey
:rdd.reduceByKey(add)
相关问题 更多 >
编程相关推荐