将json嵌套到databricks pyspark中的tsv

2024-06-26 13:54:58 发布

您现在位置:Python中文网/ 问答频道 /正文

希望使用pysoark将databricks笔记本中的嵌套json转换为tsv

下面是可以更改列的json结构

{"tables":[{"name":"Result","columns":[{"name":"JobTime","type":"datetime"},{"name":"Status","type":"string"}]
,"rows":[
["2020-04-19T13:45:12.528Z","Failed"]
,["2020-04-19T14:05:40.098Z","Failed"]
,["2020-04-19T13:46:31.655Z","Failed"]
,["2020-04-19T14:01:16.275Z","Failed"],
["2020-04-19T14:03:16.073Z","Failed"],
["2020-04-19T14:01:16.672Z","Failed"],
["2020-04-19T14:02:13.958Z","Failed"],
["2020-04-19T14:04:41.099Z","Failed"],
["2020-04-19T14:04:41.16Z","Failed"],
["2020-04-19T14:05:14.462Z","Failed"]
]}
]}

我是databricks的新手请帮忙


Tags: columnsnamejsontablesdatetimetsvtypestatus
1条回答
网友
1楼 · 发布于 2024-06-26 13:54:58

处理这个问题有两种方法。您可以使用json库(或等效库)在python中进行一些预处理,或者直接加载到pyspark中并进行如下操作:

from pyspark.sql import SparkSession
import pyspark.sql.functions as f

spark = SparkSession.builder.getOrCreate()

# your json
so_json = """
{"tables":[{"name":"Result","columns":[{"name":"JobTime","type":"datetime"},{"name":"Status","type":"string"}]
,"rows":[
["2020-04-19T13:45:12.528Z","Failed"]
,["2020-04-19T14:05:40.098Z","Failed"]
,["2020-04-19T13:46:31.655Z","Failed"]
,["2020-04-19T14:01:16.275Z","Failed"],
["2020-04-19T14:03:16.073Z","Failed"],
["2020-04-19T14:01:16.672Z","Failed"],
["2020-04-19T14:02:13.958Z","Failed"],
["2020-04-19T14:04:41.099Z","Failed"],
["2020-04-19T14:04:41.16Z","Failed"],
["2020-04-19T14:05:14.462Z","Failed"]
]}
]}
"""

# load in directly using read.json(), you'll see that this becomes 
# a nested ArrayType/StructType wombo combo
json_df = spark.read.json(spark._sc.parallelize([so_json]))
json_df.printSchema()
root
 |  tables: array (nullable = true)
 |    |  element: struct (containsNull = true)
 |    |    |  columns: array (nullable = true)
 |    |    |    |  element: struct (containsNull = true)
 |    |    |    |    |  name: string (nullable = true)
 |    |    |    |    |  type: string (nullable = true)
 |    |    |  name: string (nullable = true)
 |    |    |  rows: array (nullable = true)
 |    |    |    |  element: array (containsNull = true)
 |    |    |    |    |  element: string (containsNull = true)


# select nested columns "tables" and "rows" and explode
array_df = json_df.select(f.explode(f.col('tables')['rows'][0]))

分解取rows,即ArrayType,并将其拆分为实际行。 然后,您可以通过点或切片表示法进行子选择

array_df.printSchema()
root
 |  col: array (nullable = true)
 |    |  element: string (containsNull = true)


tabular_df = array_df.select(
  array_df.col[0].alias("JobTime"), 
  array_df.col[1].alias("Status")
)
tabular_df.show()

+          +   +
|             JobTime|Status|
+          +   +
|2020-04-19T13:45:...|Failed|
|2020-04-19T14:05:...|Failed|
|2020-04-19T13:46:...|Failed|
|2020-04-19T14:01:...|Failed|
|2020-04-19T14:03:...|Failed|
|2020-04-19T14:01:...|Failed|
|2020-04-19T14:02:...|Failed|
|2020-04-19T14:04:...|Failed|
|2020-04-19T14:04:...|Failed|
|2020-04-19T14:05:...|Failed|
+          +   +

最后,您希望使用自定义分隔符(\t)另存为CSV。因此:

tabular_df.write.csv("path/to/file.tsv", sep="\t")

注意:您可能需要手动控制类型,例如将JobTime转换为TimestampType,但这由您决定。 希望这有帮助

相关问题 更多 >