我正在尝试创建一个Spark df,其中包含顶级字段和嵌套字段,这些字段来自一系列字典,其中包含与json对象的键和值相对应的键和值,我在选择嵌套列时遇到问题
以下是我到目前为止的情况:
输入是包含JSON值的字典列表:
[{
"uid": 98763,
"estimatedGrade": {
"science": 10.03,
"english": 20.5,
},
"actualGrade": {
"science": 10.03,
"english": 20.5,
}
}]
printed schema:
|-- uid: long (nullable = true)
|-- actualGrade: struct (nullable = true)
| |-- science: double(nullable = true)
| |-- english: double (nullable = true)
|-- estimatedGrade: struct (nullable = true)
| |-- science: double(nullable = true)
| |-- english: double (nullable = true)
期望输出:
*请注意,我不需要重命名该列,但必须将其缩短到一行
这是我目前的代码:
#jsons contains list of dict with the json key/values
df = self._spark.sparkContext.parallelize(jsons).map(lambda x: json.dumps(x))
df = self._spark.read.json(df, multiLine=True)
logger.info("Df count: %s", df.count())
logger.info("Df table schema: %s", df.printSchema())
columns = ['uid', 'estimatedGrade.science', 'estimatedGrade.english', 'actualGrade.science', 'actualGrade.english']
df.select([column_header for column_header in df.columns if column_header in columns])
我只能选择uid,它是顶级字段,所以我猜我在选择嵌套值时出错了
请帮忙
df.columns只返回顶级列名。您可以通过在使用您提供的数据示例创建的df上运行它来检查这一点。它返回:
['actualGrade', 'estimatedGrade', 'uid']
在这一点上,我知道的唯一好方法是迭代df.schema。递归地,如果字段是StructType,则检查字段的嵌套列
下面是一些代码,可以帮助您完成任务。 首先,导入StructType
然后,设置一些辅助函数。第一个用于递归返回所有列名,包括使用点表示法的嵌套列。 第二个helper函数用于展平列表
然后,循环您的模式,并使用上面的方法获取所有列(包括嵌套列)
最后,替换select语句中的df.columns部分
在Databricks 8.2(Spark 3.1.1)上测试时工作
我还使用这种方法列出了Spark中所有表中的所有列名,因此可以查看本文以获得进一步的参考:https://medium.com/helmes-people/how-to-view-all-databases-tables-and-columns-in-databricks-9683b12fee10
相关问题 更多 >
编程相关推荐