Spark df不选择嵌套字段作为列名

2024-09-30 16:30:31 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试创建一个Spark df,其中包含顶级字段和嵌套字段,这些字段来自一系列字典,其中包含与json对象的键和值相对应的键和值,我在选择嵌套列时遇到问题

以下是我到目前为止的情况:

输入是包含JSON值的字典列表:

[{
  "uid": 98763,
  "estimatedGrade": {
    "science": 10.03,
    "english": 20.5,
   },
  "actualGrade":  {
    "science": 10.03,
    "english": 20.5,
   }
}]

printed schema:
 |-- uid: long (nullable = true)
 |-- actualGrade: struct (nullable = true)
 |    |-- science: double(nullable = true)
 |    |-- english: double (nullable = true)
 |-- estimatedGrade: struct (nullable = true)
 |    |-- science: double(nullable = true)
 |    |-- english: double (nullable = true)

期望输出:

^{tb1}$

*请注意,我不需要重命名该列,但必须将其缩短到一行

这是我目前的代码:


    #jsons contains list of dict with the json key/values
    df = self._spark.sparkContext.parallelize(jsons).map(lambda x: json.dumps(x))
    df = self._spark.read.json(df, multiLine=True)
    
    logger.info("Df count: %s", df.count())
    logger.info("Df table schema: %s", df.printSchema())
    
    columns = ['uid', 'estimatedGrade.science', 'estimatedGrade.english', 'actualGrade.science', 'actualGrade.english']
    
    df.select([column_header for column_header in df.columns if column_header in columns])

我只能选择uid,它是顶级字段,所以我猜我在选择嵌套值时出错了

请帮忙


Tags: columnsjsontruedfuid字典englishcolumn
1条回答
网友
1楼 · 发布于 2024-09-30 16:30:31

df.columns只返回顶级列名。您可以通过在使用您提供的数据示例创建的df上运行它来检查这一点。它返回:['actualGrade', 'estimatedGrade', 'uid']

在这一点上,我知道的唯一好方法是迭代df.schema。递归地,如果字段是StructType,则检查字段的嵌套列

下面是一些代码,可以帮助您完成任务。 首先,导入StructType

from pyspark.sql.types import StructType

然后,设置一些辅助函数。第一个用于递归返回所有列名,包括使用点表示法的嵌套列。 第二个helper函数用于展平列表

def get_schema_field_name(field, parent=None):
  if type(field.dataType) == StructType:
    if parent == None:
      prt = field.name
    else:
      prt = parent+"."+field.name # using dot notation
    res = []
    for i in field.dataType.fields:
      res.append(get_schema_field_name(i, prt))
    return res
  else:
    if parent==None:
      res = field.name
    else:
      res = parent+"."+field.name
    return res

def flatten(S):
  if S == []:
    return S
  if isinstance(S[0], list):
    return flatten(S[0]) + flatten(S[1:])
  return S[:1] + flatten(S[1:])

然后,循环您的模式,并使用上面的方法获取所有列(包括嵌套列)

column_list = []
for j in df.schema:
  column_list.append(get_schema_field_name(j))
column_list = flatten(column_list)

最后,替换select语句中的df.columns部分

columns = ['uid', 'estimatedGrade.science', 'estimatedGrade.english', 'actualGrade.science', 'actualGrade.english']

df.select([column_header for column_header in column_list if column_header in columns])

在Databricks 8.2(Spark 3.1.1)上测试时工作

From Databricks

我还使用这种方法列出了Spark中所有表中的所有列名,因此可以查看本文以获得进一步的参考:https://medium.com/helmes-people/how-to-view-all-databases-tables-and-columns-in-databricks-9683b12fee10

相关问题 更多 >