多个rdd的火花并合

2024-04-28 03:18:13 发布

您现在位置:Python中文网/ 问答频道 /正文

在我的猪代码中,我这样做:

all_combined = Union relation1, relation2, 
    relation3, relation4, relation5, relation 6.

我想对斯帕克也这么做。然而,不幸的是,我看到我不得不继续成对地做:

first = rdd1.union(rdd2)
second = first.union(rdd3)
third = second.union(rdd4)
# .... and so on

是否有一个union运算符允许我一次操作多个RDD:

例如union(rdd1, rdd2,rdd3, rdd4, rdd5, rdd6)

这是一个关于方便的问题。


Tags: 代码allfirstunionsecondcombinedrelation4rdd3
3条回答

如果这些是RDD,则可以使用SparkContext.union方法:

rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize([4, 5, 6])
rdd3 = sc.parallelize([7, 8, 9])

rdd = sc.union([rdd1, rdd2, rdd3])
rdd.collect()

## [1, 2, 3, 4, 5, 6, 7, 8, 9]

不存在DataFrame等价物,但这只是一个简单的一行:

from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame

def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

df1 = sqlContext.createDataFrame([(1, "foo1"), (2, "bar1")], ("k", "v"))
df2 = sqlContext.createDataFrame([(3, "foo2"), (4, "bar2")], ("k", "v"))
df3 = sqlContext.createDataFrame([(5, "foo3"), (6, "bar3")], ("k", "v"))

unionAll(df1, df2, df3).show()

## +---+----+
## |  k|   v|
## +---+----+
## |  1|foo1|
## |  2|bar1|
## |  3|foo2|
## |  4|bar2|
## |  5|foo3|
## |  6|bar3|
## +---+----+

如果在RDD上使用SparkContext.union并且重新创建DataFrame可能是避免issues related to the cost of preparing an execution plan的更好选择:

def unionAll(*dfs):
    first, *_ = dfs  # Python 3.x, for 2.x you'll have to unpack manually
    return first.sql_ctx.createDataFrame(
        first.sql_ctx._sc.union([df.rdd for df in dfs]),
        first.schema
    )

不幸的是,这是Spark中UNION表的唯一方法。但是不是

first = rdd1.union(rdd2)
second = first.union(rdd3)
third = second.union(rdd4)
...

你可以用一种更干净的方式来执行它,比如:

result = rdd1.union(rdd2).union(rdd3).union(rdd4)

您还可以使用addition来实现rdd之间的联合

rdd = sc.parallelize([1, 1, 2, 3])
(rdd + rdd).collect()
## [1, 1, 2, 3, 1, 1, 2, 3]

相关问题 更多 >