不确定为什么pyspark将我的列表视为字符串

2024-09-29 01:30:25 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个df如下:

| year | id  | area | visitor
| 2007 | 001 | GFD  | [{'id':'AA1' 'age':20}, {'id':'AA2' 'age':24},{'id':'AA3' 'age':4}]
| 2009 | 045 | TGH  | [{'id':'AA1' 'age':20}, {'id':'AA2' 'age':24},{'id':'AA3' 'age':5}]
| 2009 | 019 | GFD  | [{'id':'AA1' 'age':14}, {'id':'AA2' 'age':24},{'id':'AA3' 'age':55}]
| 2007 | 002 | GFD  | [{'id':'AA1' 'age':15}, {'id':'AA2' 'age':35},{'id':'AA3' 'age':58}]
| 2007 | 003 | GFD  | [{'id':'AA1' 'age':16}, {'id':'AA2' 'age':24},{'id':'AA3' 'age':23}]
| 2007 | 006 | TGH  | [{'id':'AA1' 'age':16}, {'id':'AA2' 'age':14},{'id':'AA3' 'age':60}]    
| 2007 | 008 | TGH  | [{'id':'AA1' 'age':17}, {'id':'AA2' 'age':24},{'id':'AA3' 'age':12}]
| 2008 | 010 | TGH  | [{'id':'AA1' 'age':18}, {'id':'AA2' 'age':16},{'id':'AA3' 'age':23}]    
| 2007 | 044 | GFD  | [{'id':'AA1' 'age':25}, {'id':'AA2' 'age':17},{'id':'AA3' 'age':52}]
| 2008 | 055 | TGH  | [{'id':'AA1' 'age':25}, {'id':'AA2' 'age':24},{'id':'AA3' 'age':43}]
| 2007 | 032 | TGH  | [{'id':'AA1' 'age':22}, {'id':'AA2' 'age':24},{'id':'AA3' 'age':77}]
| 2007 | 034 | TGH  | [{'id':'AA1' 'age':34}, {'id':'AA2' 'age':10},{'id':'AA3' 'age':51}]
| 2009 | 077 | GFD  | [{'id':'AA1' 'age':34}, {'id':'AA2' 'age':10},{'id':'AA3' 'age':12}]
| 2007 | 025 | GFD  | [{'id':'AA1' 'age':34}, {'id':'AA2' 'age':24},{'id':'AA3' 'age':10}]

我试图使用pyspark将数据按area分组,然后找出该地区访客的平均年龄,以及该地区最常见的访客年龄

所以最初,我用groupBy把它们放在一起:

df.groupBy("area").agg(collect_list("visitor").alias("visitor_flatten"))

+-----------+---------------------+
|     area  |      visitor_flatten|
+-----------+---------------------+
|     GFD   |  [[{id=AA1, age=2...|
|     TGH   |  [[{id=AA1, age=2...|

但是当我尝试做一个.withColumn("test", explode("visitor_flatten"))时,我会得到一个区域的扩展列表,每行有一个访问者条目(例如:{'id':'AA1','age'=22}),但是这些数据被当作一个字符串处理。因此,我似乎无法使用udf或任何api函数从中提取年龄并对数据进行处理。比如找到该地区游客的平均年龄。。以及如何找到该区域内最常见的游客年龄

任何想法/帮助都将不胜感激


Tags: 数据iddfagearea地区visitor年龄
2条回答

您可以使用它来获得按区域分组的平均年龄。只要稍加修改,您就可以使用它

import pandas as pd
import pyspark.sql.functions as F

pdf = pd.DataFrame([["2007", "001", "GFD", "{'id':'AA1' 'age':20}, {'id':'AA2' 'age':24},{'id':'AA3' 'age':4}"]]\
                   , columns=['year', 'id', 'area', 'visitor'])

keys = ["id", "age"]

df = spark.createDataFrame(pdf)\
.withColumn("visitor", F.explode(F.split(F.col("visitor"), ",")))\
.withColumn("visitor", F.trim(F.col("visitor")))\
.withColumn("visitor", F.regexp_replace(F.col("visitor"), r"[{}']", ""))\
.withColumn("visitor", F.expr("str_to_map(visitor, ' ', ':')"))\
.select("*", *[ F.col("visitor")[k].alias(k) for k in keys ])\
.groupBy("area")\
.agg(F.avg("age").alias("avg_age"))

df.show()

您的聚合代码看起来很好,应该可以完成这项工作,除非您的原始数据有一些问题(即数据以字符串形式出现,而不是JSON格式)。这是我用上面的数据编写的测试代码,我设置了一个合适的模式:

raw = [
    {'year':2007, 'id': '001', 'area': 'GFD', 'visitor': [{'id':'AA1', 'age':20}, {'id':'AA2', 'age':24},{'id':'AA3', 'age':4}]},
    {'year':2009, 'id': '045', 'area': 'TGH', 'visitor': [{'id':'AA1', 'age':20}, {'id':'AA2', 'age':24},{'id':'AA3', 'age':5}]},
    {'year':2009, 'id': '019', 'area': 'GFD', 'visitor': [{'id':'AA1', 'age':14}, {'id':'AA2', 'age':24},{'id':'AA3', 'age':55}]},
    {'year':2007, 'id': '002', 'area': 'GFD', 'visitor': [{'id':'AA1', 'age':15}, {'id':'AA2', 'age':35},{'id':'AA3', 'age':58}]},
    {'year':2007, 'id': '003', 'area': 'GFD', 'visitor': [{'id':'AA1', 'age':16}, {'id':'AA2', 'age':24},{'id':'AA3', 'age':23}]},
    {'year':2007, 'id': '006', 'area': 'TGH', 'visitor': [{'id':'AA1', 'age':16}, {'id':'AA2', 'age':14},{'id':'AA3', 'age':60}]},
    {'year':2007, 'id': '008', 'area': 'TGH', 'visitor': [{'id':'AA1', 'age':17}, {'id':'AA2', 'age':24},{'id':'AA3', 'age':12}]},
    {'year':2008, 'id': '010', 'area': 'TGH', 'visitor': [{'id':'AA1', 'age':18}, {'id':'AA2', 'age':16},{'id':'AA3', 'age':23}]},
    {'year':2007, 'id': '044', 'area': 'GFD', 'visitor': [{'id':'AA1', 'age':25}, {'id':'AA2', 'age':17},{'id':'AA3', 'age':52}]},
    {'year':2008, 'id': '055', 'area': 'TGH', 'visitor': [{'id':'AA1', 'age':25}, {'id':'AA2', 'age':24},{'id':'AA3', 'age':43}]},
    {'year':2007, 'id': '032', 'area': 'TGH', 'visitor': [{'id':'AA1', 'age':22}, {'id':'AA2', 'age':24},{'id':'AA3', 'age':77}]},
    {'year':2007, 'id': '034', 'area': 'TGH', 'visitor': [{'id':'AA1', 'age':34}, {'id':'AA2', 'age':10},{'id':'AA3', 'age':51}]},
    {'year':2009, 'id': '077', 'area': 'GFD', 'visitor': [{'id':'AA1', 'age':34}, {'id':'AA2', 'age':10},{'id':'AA3', 'age':12}]},
    {'year':2007, 'id': '025', 'area': 'GFD', 'visitor': [{'id':'AA1', 'age':34}, {'id':'AA2', 'age':24},{'id':'AA3', 'age':10}]},
]

df = (spark
    .createDataFrame(raw)
    .select('year', 'id', 'area', 'visitor')
)

df.printSchema()
# root
#  |  year: long (nullable = true)
#  |  id: string (nullable = true)
#  |  area: string (nullable = true)
#  |  visitor: array (nullable = true)
#  |    |  element: map (containsNull = true)
#  |    |    |  key: string
#  |    |    |  value: string (valueContainsNull = true)

agg = df.groupBy("area").agg(F.collect_list("visitor").alias("visitor_flatten"))

agg.show(10, False)
# +  +                                                                                                                                                                                                                                                                  -+
# |area|visitor_flatten                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
# +  +                                                                                                                                                                                                                                                                  -+
# |GFD |[[{age -> 20, id -> AA1}, {age -> 24, id -> AA2}, {age -> 4, id -> AA3}], [{age -> 14, id -> AA1}, {age -> 24, id -> AA2}, {age -> 55, id -> AA3}], [{age -> 15, id -> AA1}, {age -> 35, id -> AA2}, {age -> 58, id -> AA3}], [{age -> 16, id -> AA1}, {age -> 24, id -> AA2}, {age -> 23, id -> AA3}], [{age -> 25, id -> AA1}, {age -> 17, id -> AA2}, {age -> 52, id -> AA3}], [{age -> 34, id -> AA1}, {age -> 10, id -> AA2}, {age -> 12, id -> AA3}], [{age -> 34, id -> AA1}, {age -> 24, id -> AA2}, {age -> 10, id -> AA3}]]|
# |TGH |[[{age -> 20, id -> AA1}, {age -> 24, id -> AA2}, {age -> 5, id -> AA3}], [{age -> 16, id -> AA1}, {age -> 14, id -> AA2}, {age -> 60, id -> AA3}], [{age -> 17, id -> AA1}, {age -> 24, id -> AA2}, {age -> 12, id -> AA3}], [{age -> 18, id -> AA1}, {age -> 16, id -> AA2}, {age -> 23, id -> AA3}], [{age -> 25, id -> AA1}, {age -> 24, id -> AA2}, {age -> 43, id -> AA3}], [{age -> 22, id -> AA1}, {age -> 24, id -> AA2}, {age -> 77, id -> AA3}], [{age -> 34, id -> AA1}, {age -> 10, id -> AA2}, {age -> 51, id -> AA3}]]|
# +  +                                                                                                                                                                                                                                                                  -+

agg.printSchema()
# root
#  |  area: string (nullable = true)
#  |  visitor_flatten: array (nullable = false)
#  |    |  element: array (containsNull = false)
#  |    |    |  element: map (containsNull = true)
#  |    |    |    |  key: string
#  |    |    |    |  value: string (valueContainsNull = true)

agg.withColumn("test", F.explode("visitor_flatten")).select('test').show(10, False)
# +                                    +
# |test                                                                    |
# +                                    +
# |[{age -> 20, id -> AA1}, {age -> 24, id -> AA2}, {age -> 4, id -> AA3}] |
# |[{age -> 14, id -> AA1}, {age -> 24, id -> AA2}, {age -> 55, id -> AA3}]|
# |[{age -> 15, id -> AA1}, {age -> 35, id -> AA2}, {age -> 58, id -> AA3}]|
# |[{age -> 16, id -> AA1}, {age -> 24, id -> AA2}, {age -> 23, id -> AA3}]|
# |[{age -> 25, id -> AA1}, {age -> 17, id -> AA2}, {age -> 52, id -> AA3}]|
# |[{age -> 34, id -> AA1}, {age -> 10, id -> AA2}, {age -> 12, id -> AA3}]|
# |[{age -> 34, id -> AA1}, {age -> 24, id -> AA2}, {age -> 10, id -> AA3}]|
# |[{age -> 20, id -> AA1}, {age -> 24, id -> AA2}, {age -> 5, id -> AA3}] |
# |[{age -> 16, id -> AA1}, {age -> 14, id -> AA2}, {age -> 60, id -> AA3}]|
# |[{age -> 17, id -> AA1}, {age -> 24, id -> AA2}, {age -> 12, id -> AA3}]|
# +                                    +
# only showing top 10 rows

agg.withColumn("test", F.explode("visitor_flatten")).select('test').printSchema()
# root
#  |  test: array (nullable = false)
#  |    |  element: map (containsNull = true)
#  |    |    |  key: string
#  |    |    |  value: string (valueContainsNull = true)

相关问题 更多 >