pyspark redueByKey修改单个结果

2024-10-02 20:37:48 发布

您现在位置:Python中文网/ 问答频道 /正文

我在pyspark中有一个类似这样的数据集:

samp = sc.parallelize([(1,'TAGA'), (1, 'TGGA'), (1, 'ATGA'), (1, 'GTGT'), (2, 'GTAT'), (2, 'ATGT'), (3, 'TAAT'), (4, 'TAGC')])

我有一个用于组合字符串的函数:

   def combine_strings(x,y):
        if (isinstance(x,list) and isinstance(y, list)):
            z = x + y
            return z
        if (isinstance(x, list) and isinstance(y, str)):
            x.append(y)
            return x
        if (isinstance(x, str) and isinstance(y, list)):
            y.append(x)
            return y
        return [x,y]

我得到的结果是:

samp.reduceByKey(lambda x,y : combine_strings(x,y)).collect()
[(1, ['TAGA', 'TGGA', 'ATGA', 'GTGT']), (2, ['GTAT', 'ATGT']), (3, 'TAAT'), (4, 'TAGC')]

我想要的是:

[(1,['TAGA','TGGA','ATGA','GTGTGT'],(2,['GTAT','ATGT'],(3,['TAAT'],(4,['TAGC'])]

一切都是一个数组。我不知道pyspark是在一个只有1个条目的结果上调用combine\u字符串,还是我可以告诉reduceByKey对单例结果做些什么?如何修改reduceByKey()或combine\u strings函数以生成所需的内容


Tags: andreturniflistisinstancestringscombinereducebykey
1条回答
网友
1楼 · 发布于 2024-10-02 20:37:48

您可以首先将值映射到列表中,然后仅合并这些列表:

samp.mapValues(lambda x : [x]).reduceByKey(lambda x,y : x + y).collect()

这里的问题是这些单例不受reduceByKey的影响。下面是另一个例子:

samp = sc.parallelize([(1,1),(2,2),(2,2),(3,3)])
>>> samp.reduceByKey(lambda x, y : x + y + 1).collect()
[(3, 3), (1, 1), (2, 5)]

相关问题 更多 >