<p>使用struct在调用groupBy之前组合列</p>
<p>假设您有一个数据帧</p>
<pre><code>df = spark.createDataFrame(sc.parallelize([(0,1,2),(0,4,5),(1,7,8),(1,8,7)])).toDF("a","b","c")
df = df.select("a", f.struct(["b","c"]).alias("newcol"))
df.show()
+---+------+
| a|newcol|
+---+------+
| 0| [1,2]|
| 0| [4,5]|
| 1| [7,8]|
| 1| [8,7]|
+---+------+
df = df.groupBy("a").agg(f.collect_list("newcol").alias("collected_col"))
df.show()
+---+--------------+
| a| collected_col|
+---+--------------+
| 0|[[1,2], [4,5]]|
| 1|[[7,8], [8,7]]|
+---+--------------+
</code></pre>
<p>只能对单个列执行聚合操作。</p>
<p>在聚合之后,您可以收集结果并对其进行迭代以分离组合的列,从而生成索引dict
用于分隔组合列的udf。</p>
<pre><code>from pyspark.sql.types import *
def foo(x):
x1 = [y[0] for y in x]
x2 = [y[1] for y in x]
return(x1,x2)
st = StructType([StructField("b", ArrayType(LongType())), StructField("c", ArrayType(LongType()))])
udf_foo = udf(foo, st)
df = df.withColumn("ncol",
udf_foo("collected_col")).select("a",
col("ncol").getItem("b").alias("b"),
col("ncol").getItem("c").alias("c"))
df.show()
+---+------+------+
| a| b| c|
+---+------+------+
| 0|[1, 4]|[2, 5]|
| 1|[7, 8]|[8, 7]|
+---+------+------+
</code></pre>