<p>您的聚合代码看起来很好,应该可以完成这项工作,除非您的原始数据有一些问题(即数据以字符串形式出现,而不是JSON格式)。这是我用上面的数据编写的测试代码,我设置了一个合适的模式:</p>
<pre class="lang-py prettyprint-override"><code>raw = [
{'year':2007, 'id': '001', 'area': 'GFD', 'visitor': [{'id':'AA1', 'age':20}, {'id':'AA2', 'age':24},{'id':'AA3', 'age':4}]},
{'year':2009, 'id': '045', 'area': 'TGH', 'visitor': [{'id':'AA1', 'age':20}, {'id':'AA2', 'age':24},{'id':'AA3', 'age':5}]},
{'year':2009, 'id': '019', 'area': 'GFD', 'visitor': [{'id':'AA1', 'age':14}, {'id':'AA2', 'age':24},{'id':'AA3', 'age':55}]},
{'year':2007, 'id': '002', 'area': 'GFD', 'visitor': [{'id':'AA1', 'age':15}, {'id':'AA2', 'age':35},{'id':'AA3', 'age':58}]},
{'year':2007, 'id': '003', 'area': 'GFD', 'visitor': [{'id':'AA1', 'age':16}, {'id':'AA2', 'age':24},{'id':'AA3', 'age':23}]},
{'year':2007, 'id': '006', 'area': 'TGH', 'visitor': [{'id':'AA1', 'age':16}, {'id':'AA2', 'age':14},{'id':'AA3', 'age':60}]},
{'year':2007, 'id': '008', 'area': 'TGH', 'visitor': [{'id':'AA1', 'age':17}, {'id':'AA2', 'age':24},{'id':'AA3', 'age':12}]},
{'year':2008, 'id': '010', 'area': 'TGH', 'visitor': [{'id':'AA1', 'age':18}, {'id':'AA2', 'age':16},{'id':'AA3', 'age':23}]},
{'year':2007, 'id': '044', 'area': 'GFD', 'visitor': [{'id':'AA1', 'age':25}, {'id':'AA2', 'age':17},{'id':'AA3', 'age':52}]},
{'year':2008, 'id': '055', 'area': 'TGH', 'visitor': [{'id':'AA1', 'age':25}, {'id':'AA2', 'age':24},{'id':'AA3', 'age':43}]},
{'year':2007, 'id': '032', 'area': 'TGH', 'visitor': [{'id':'AA1', 'age':22}, {'id':'AA2', 'age':24},{'id':'AA3', 'age':77}]},
{'year':2007, 'id': '034', 'area': 'TGH', 'visitor': [{'id':'AA1', 'age':34}, {'id':'AA2', 'age':10},{'id':'AA3', 'age':51}]},
{'year':2009, 'id': '077', 'area': 'GFD', 'visitor': [{'id':'AA1', 'age':34}, {'id':'AA2', 'age':10},{'id':'AA3', 'age':12}]},
{'year':2007, 'id': '025', 'area': 'GFD', 'visitor': [{'id':'AA1', 'age':34}, {'id':'AA2', 'age':24},{'id':'AA3', 'age':10}]},
]
df = (spark
.createDataFrame(raw)
.select('year', 'id', 'area', 'visitor')
)
df.printSchema()
# root
# | year: long (nullable = true)
# | id: string (nullable = true)
# | area: string (nullable = true)
# | visitor: array (nullable = true)
# | | element: map (containsNull = true)
# | | | key: string
# | | | value: string (valueContainsNull = true)
agg = df.groupBy("area").agg(F.collect_list("visitor").alias("visitor_flatten"))
agg.show(10, False)
# + + -+
# |area|visitor_flatten |
# + + -+
# |GFD |[[{age -> 20, id -> AA1}, {age -> 24, id -> AA2}, {age -> 4, id -> AA3}], [{age -> 14, id -> AA1}, {age -> 24, id -> AA2}, {age -> 55, id -> AA3}], [{age -> 15, id -> AA1}, {age -> 35, id -> AA2}, {age -> 58, id -> AA3}], [{age -> 16, id -> AA1}, {age -> 24, id -> AA2}, {age -> 23, id -> AA3}], [{age -> 25, id -> AA1}, {age -> 17, id -> AA2}, {age -> 52, id -> AA3}], [{age -> 34, id -> AA1}, {age -> 10, id -> AA2}, {age -> 12, id -> AA3}], [{age -> 34, id -> AA1}, {age -> 24, id -> AA2}, {age -> 10, id -> AA3}]]|
# |TGH |[[{age -> 20, id -> AA1}, {age -> 24, id -> AA2}, {age -> 5, id -> AA3}], [{age -> 16, id -> AA1}, {age -> 14, id -> AA2}, {age -> 60, id -> AA3}], [{age -> 17, id -> AA1}, {age -> 24, id -> AA2}, {age -> 12, id -> AA3}], [{age -> 18, id -> AA1}, {age -> 16, id -> AA2}, {age -> 23, id -> AA3}], [{age -> 25, id -> AA1}, {age -> 24, id -> AA2}, {age -> 43, id -> AA3}], [{age -> 22, id -> AA1}, {age -> 24, id -> AA2}, {age -> 77, id -> AA3}], [{age -> 34, id -> AA1}, {age -> 10, id -> AA2}, {age -> 51, id -> AA3}]]|
# + + -+
agg.printSchema()
# root
# | area: string (nullable = true)
# | visitor_flatten: array (nullable = false)
# | | element: array (containsNull = false)
# | | | element: map (containsNull = true)
# | | | | key: string
# | | | | value: string (valueContainsNull = true)
agg.withColumn("test", F.explode("visitor_flatten")).select('test').show(10, False)
# + +
# |test |
# + +
# |[{age -> 20, id -> AA1}, {age -> 24, id -> AA2}, {age -> 4, id -> AA3}] |
# |[{age -> 14, id -> AA1}, {age -> 24, id -> AA2}, {age -> 55, id -> AA3}]|
# |[{age -> 15, id -> AA1}, {age -> 35, id -> AA2}, {age -> 58, id -> AA3}]|
# |[{age -> 16, id -> AA1}, {age -> 24, id -> AA2}, {age -> 23, id -> AA3}]|
# |[{age -> 25, id -> AA1}, {age -> 17, id -> AA2}, {age -> 52, id -> AA3}]|
# |[{age -> 34, id -> AA1}, {age -> 10, id -> AA2}, {age -> 12, id -> AA3}]|
# |[{age -> 34, id -> AA1}, {age -> 24, id -> AA2}, {age -> 10, id -> AA3}]|
# |[{age -> 20, id -> AA1}, {age -> 24, id -> AA2}, {age -> 5, id -> AA3}] |
# |[{age -> 16, id -> AA1}, {age -> 14, id -> AA2}, {age -> 60, id -> AA3}]|
# |[{age -> 17, id -> AA1}, {age -> 24, id -> AA2}, {age -> 12, id -> AA3}]|
# + +
# only showing top 10 rows
agg.withColumn("test", F.explode("visitor_flatten")).select('test').printSchema()
# root
# | test: array (nullable = false)
# | | element: map (containsNull = true)
# | | | key: string
# | | | value: string (valueContainsNull = true)
</code></pre>