如何压缩两列,分解它们,最后旋转?

2024-10-04 01:33:22 发布

您现在位置:Python中文网/ 问答频道 /正文

我有两个数组列(namesscore)。我要把它们都炸掉。将名称作为得分的name列(类似于pivot)

+------------+-------------------------+--------------------+                        
|      id    |     names               |      score         |
+------------+-------------------------+--------------------+
|ab01        |[F1 , F2, F3, F4, F5]    |[00123, 000.001, 00127, 00.0123, 111]
|ab02        |[F1 , F2, F3, F4, F5, F6]|[00124, 000.003, 00156, 00.067,  156, 254]
|ab03        |[F1 , F2, F3, F4, F5]    |[00234, 000.078, 00188, 00.0144, 188]
|ab04        |[F1 , F2, F3, F4, F5]    |[00345, 000.01112, 001567, 00.0186, 555]

预期产出:

 id       F1      F2        F3        F4    F5  F6
ab01    00123   000.001    00127    00.0123 111 null
ab02    00124   000.003    00156    00.067  156 254
ab03    00234   000.078    00188    00.0144 188 null
ab04    00345   000.01112  001567   00.0186 555 null

我试着把名字和分数拉上拉链,然后把它们炸开

combine = F.udf(lambda x, y: list(zip(x, y)),
                ArrayType(
                          StructType(
                                     [StructField("names", StringType()),
                                      StructField("score", StringType())
                                     ]
                                    )
                         )
               )

df2 = df.withColumn("new", combine("score", "names"))
         .withColumn("new", F.explode("new"))
         .select("id", 
                 F.col("new.names").alias("names"), 
                 F.col("new.score").alias("score")
                )

我得到一个错误:

TypeError: zip argument #1 must support iteration

我还尝试使用rdd flatMap()进行爆炸,但仍然得到相同的错误

有没有其他方法可以实现这一点


Tags: idnewnamesf5nullf2f1score
3条回答

使用^{}函数+explode,然后最后使用group by和pivot:

from pyspark.sql.functions import explode, expr, first

df1 = df.withColumn("name_score", explode(expr("arrays_zip(name, score)"))) \
       .selectExpr("id", "name_score.*")\
       .groupBy("id").pivot("name").agg(first("score"))

df1.show(truncate=False)

#+  +  -+   -+   +   + -+  +
#|id  |F1   |F2     |F3    |F4    |F5 |F6  |
#+  +  -+   -+   +   + -+  +
#|ab01|00123|0.001  |0.127 |0.0123|111|null|
#|ab04|00345|0.01112|0.1567|0.0186|555|null|
#|ab03|00234|0.078  |0.188 |0.0144|188|null|
#|ab02|00124|0.003  |0.156 |0.067 |156|254 |
#+  +  -+   -+   +   + -+  +

对于Spark<;2.4

您的UDFcombine正常。出现错误TypeError: zip argument #1 must support iteration,因为要将列名作为字符串传递给UDF,请使用col传递列:

df1 = df.withColumn("name_score", explode(combine(col("name"), col("score")))) \
      .selectExpr("id", "name_score.*") \
      .groupBy("id").pivot("name").agg(first("score")) 

数据帧中可能有空值。在UDF中添加检查以确保不会发生这种情况

combine = F.udf(lambda x, y: list(zip(x, y)) if x is not None and y is not None else None,
                ArrayType(
                          StructType(
                                     [StructField("names", StringType()),
                                      StructField("score", StringType())
                                     ]
                                    )
                         )
               )

尝试:

df2 = df.set_index('id').apply(pd.Series.explode).reset_index()
df3 = df2.pivot(columns='names', values='score', index='id')

df3:

names   F1       F2         F3      F4      F5  F6
id                      
ab01    00123   000.001     00127   00.0123 111 NaN
ab02    00123   000.003     00156   00.067  156 254
ab03    00234   000.078     00188   00.0144 188 NaN
ab04    00345   000.01112   001567  00.0186 555 NaN

编辑:

x = (df.apply(lambda x: dict(zip(x['names'], x['score'])), axis=1))
y = pd.DataFrame(x.values.tolist(), index=x.index).fillna("null").join(df.id)

x = (df.apply(lambda x: dict(zip(x['names'], x['score'])), axis=1))
z = pd.DataFrame(x.values.tolist(), index=x.index).fillna("null")
y = pd.concat([df.id , z], axis=1)

y:

    F1      F2         F3       F4      F5  F6      id
0   00123   000.001    00127    00.0123 111 null    ab01
1   00123   000.003    00156    00.067  156 254     ab02
2   00234   000.078    00188    00.0144 188 null    ab03
3   00345   000.01112  001567   00.0186 555 null    ab04

相关问题 更多 >