如何从pyspark数据帧创建JSON结构?

2024-10-01 11:24:26 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试从pyspark数据帧创建JSON结构。我的数据框中有以下列-批次id、批次运行id、表名、列名、列数据类型、上次刷新时间、刷新频率、所有者

我希望它在下面的JSON结构中-

{
"GeneralInfo": {
    "DataSetID": "xxxx1234Abcsd", 
    "Owner" : ["test1@email.com", "test2@email.com", "test3@email.com"]
    "Description": "", 
    "BuisnessFunction": "", 
    "Source": "", 
    "RefreshRate": "Weekly",
    "LastUpdate": "2020/10/15", 
    "InfoSource": "TemplateInfo"
  },
  "Tables": [
    {
      "TableName": "Employee",
      "Columns" : [
               { "ColumnName" : "EmployeeID",
                  "ColumnDataType": "int"
               },
               { "ColumnName" : "EmployeeName",
                  "ColumnDataType": "string"
               }
            ]
    }
   }
}

我试图通过dataframe列索引分配JSON字符串中的值,但它给了我一个错误,因为“column类型的对象不可JSON序列化”。我用过如下方法-

{
"GeneralInfo": {
    "DataSetID": df["batch_id"], 
    "Owner" : list(df["owner"])
    "Description": "", 
    "BuisnessFunction": "", 
    "Source": "", 
    "RefreshRate": df["refresh_frequency"],
    "LastUpdate": df["last_update_time"], 
    "InfoSource": "TemplateInfo"
  },
  "Tables": [
    {
      "TableName": df["table_name"],
      "Columns" : [
               { "ColumnName" : df["table_name"]["column_name"],
                  "ColumnDataType": df["table_name"]["column_datatype"]
               } 
     
            ]
     }
  }
}

样本数据- Sample Data

请在这方面帮助我,我刚刚开始在Pyspark中编码


Tags: 数据namecomidjsondfemailtable
1条回答
网友
1楼 · 发布于 2024-10-01 11:24:26

尝试从您提供的示例数据中获取JSON格式,输出格式与您期望的不完全匹配。您可以进一步即兴编写下面的代码

我们可以使用toJSON函数将数据帧转换为JSON格式。在调用toJSON函数之前,我们需要使用array()、struct函数,方法是根据需要传递所需的列以匹配JSON格式

from pyspark.sql import *
from pyspark.sql.functions import *

spark = SparkSession.builder.master('local[*]').getOrCreate()

in_values = [
    (123, '123abc', 'Employee', 'Employee_id', 'int', '21/05/15', 'Weekly',
     ['test1@gmail.com', 'test1@gmail.com', 'test3@gmail.com']),
    (123, '123abc', 'Employee', 'Employee_name', 'string', '21/05/15', 'Weekly',
     ['test1@gmail.com', 'test1@gmail.com', 'test3@gmail.com'])
]

cols = ["batch_id", "batch_run_id", "table_name", "column_name", "column_datatype",
        "last_update_time", "refresh_frequency", "Owner"]


df = spark.createDataFrame(in_values).toDF(*cols)\
    .selectExpr("*","'' Description", "'' BusinessFunction", "'TemplateInfo' InfoSource", "'' Source")

list1 = [df["batch_id"].alias("DataSetID"), df["Owner"], df["refresh_frequency"].alias("RefreshRate"),
         df["last_update_time"].alias("LastUpdate"), "Description", "BusinessFunction","InfoSource", "Source"]

list2 = [df["table_name"].alias("TableName"),df["column_name"].alias("ColumnName"),
         df["column_datatype"].alias("ColumnDataType")]

df.groupBy("batch_id") \
    .agg(collect_set(struct(*list1))[0].alias("GeneralInfo"),
         collect_list(struct(*list2)).alias("Tables")).drop("batch_id") \
    .toJSON().foreach(print)

# outputs JSON  ->
    '''
     {
   "GeneralInfo":{
         "DataSetID":123,
         "Owner":[
            "test1@gmail.com",
            "test1@gmail.com",
            "test3@gmail.com"
         ],
         "RefreshRate":"Weekly",
         "LastUpdate":"21/05/15",
         "Description":"",
         "BusinessFunction":"",
         "InfoSource":"TemplateInfo",
         "Source":""
      },
   "Tables":[
      {
         "TableName":"Employee",
         "ColumnName":"Employee_id",
         "ColumnDataType":"int"
      },
      {
         "TableName":"Employee",
         "ColumnName":"Employee_name",
         "ColumnDataType":"string"
      }
   ]
}
'''
  

相关问题 更多 >