在pyspark中积累数据帧最有效的方法是什么?

2024-09-28 19:21:44 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个dataframe(或者可以是任何RDD),它在一个众所周知的模式中包含数百万行:

Key | FeatureA | FeatureB
--------------------------
U1  |        0 |         1
U2  |        1 |         1

我需要从磁盘上加载十几个其他数据集,这些数据集包含相同数量密钥的不同特性。有些数据集的宽度可达十几列。想象一下:

^{pr2}$

这感觉像是一个折叠或积累,我只想迭代所有的数据集,然后得到这样的结果:

Key | FeatureA | FeatureB | FeatureC | FeatureD | FeatureE | FeatureF 
---------------------------------------------------------------------
U1  |        0 |        1 |        0 |        0 |        1 |        0
U2  |        1 |        1 |        0 |        0 |        0 |        1

我尝试过加载每个数据帧然后加入,但是一旦我通过了一些数据集,那就花了很多时间。我是否缺少完成这项任务的共同模式或有效方式?在


Tags: 数据keydataframe数量宽度模式密钥特性
1条回答
网友
1楼 · 发布于 2024-09-28 19:21:44

假设每个DataFrame中每个键最多有一行,并且所有键都是基元类型,则可以尝试使用聚合进行联合。让我们从一些导入和示例数据开始:

from itertools import chain
from functools import reduce
from pyspark.sql.types import StructType
from pyspark.sql.functions import col, lit, max
from pyspark.sql import DataFrame

df1 = sc.parallelize([
    ("U1", 0, 1), ("U2", 1, 1)
]).toDF(["Key", "FeatureA", "FeatureB"])

df2 = sc.parallelize([
  ("U1", 0, 0, 1)
]).toDF(["Key", "FeatureC", "FeatureD", "FeatureE"])

df3 = sc.parallelize([("U2", 1)]).toDF(["Key", "FeatureF"])

dfs = [df1, df2, df3]

接下来我们可以提取公共模式:

^{pr2}$

并转换所有DataFrames

transformed_dfs = [df.select(*[
  lit(None).cast(c.dataType).alias(c.name) if c.name not in df.columns 
  else col(c.name)
  for c in output_schema.fields
]) for df in dfs]

最后是一个联合和虚拟聚合:

combined = reduce(DataFrame.unionAll, transformed_dfs)
exprs = [max(c).alias(c) for c in combined.columns[1:]]
result = combined.repartition(col("Key")).groupBy(col("Key")).agg(*exprs)

如果每个键有多行,但个别列仍然是原子的,则可以尝试将max替换为collect_list/collect_set,后跟{}。在

相关问题 更多 >