在pyspark中展平嵌套的json scala代码

2024-10-03 15:24:54 发布

您现在位置:Python中文网/ 问答频道 /正文

尝试在pyspark中执行以下scala代码:

val maxJsonParts = 3 // whatever that number is...
val jsonElements = (0 until maxJsonParts)
                     .map(i => get_json_object($"Payment", s"$$[$i]"))

val newDF = dataframe
  .withColumn("Payment", explode(array(jsonElements: _*)))
  .where(!isnull($"Payment"))

例如,我尝试创建一个嵌套列,如下面的“付款”列:

^{tb1}$

成为:

^{tb2}$

表架构:

root
|-- id: integer (nullable = true)
|-- Name: string (nullable = true)   
|-- Payment: string (nullable = true)

我尝试在Pyspark中编写此代码,但它只是将嵌套列(payment)变为null:

lst = [range(0,10)]
jsonElem = [F.get_json_object(F.col("payment"), f"$[{i}]") for i in lst]
bronzeDF = bronzeDF.withColumn("payment2", F.explode(F.array(*jsonElem)))
bronzeDF.show()

非常感谢您的帮助


Tags: 代码jsontruegetstringobjectvalpayment
2条回答

下面是另一种方法,它允许您基于正确的模式解析给定的JSON,以便生成支付数组。该解决方案基于from_json函数,该函数允许您将字符串JSON解析为结构类型

from pyspark.sql.types import IntegerType, StringType, ArrayType, StructField
from pyspark.sql.functions import from_json, explode

data = [
  (1, 'James', '[ {"@id": 1, "currency":"GBP"},{"@id": 2, "currency": "USD"} ]'), 
  (2, 'Tonny', '[ {"@id": 3, "currency":"EUR"},{"@id": 4, "currency": "USD"} ]'), 
]
df = spark.createDataFrame(data, ['id', 'name', 'payment'])

str_schema = 'array<struct<`@id`:int,`currency`:string>>'

# st_schema = ArrayType(StructType([
#                 StructField('@id', IntegerType()),
#                 StructField('currency', StringType())]))

df = df.withColumn("payment", explode(from_json(df["payment"], str_schema)))

df.show()

# + -+  -+    +
# | id| name| payment|
# + -+  -+    +
# |  1|James|[1, GBP]|
# |  1|James|[2, USD]|
# |  2|Tonny|[3, EUR]|
# |  2|Tonny|[4, USD]|
# + -+  -+    +

注意:如您所见,您可以在模式的字符串表示或ArrayType之间进行选择。两者应该产生相同的结果

我找到了解决方案:

首先将列转换为字符串类型,如下所示:

bronzeDF = bronzeDF.withColumn("payment2", F.to_json("payment")).drop("payment")

然后,您可以在列上执行以下代码,将n个嵌套的json对象堆叠为具有相同外部键值的单独行:

max_json_parts = 50
lst = [f for f in range(0, max_json_parts, 1)]
jsonElem = [F.get_json_object(F.col("payment2"), f"$[{i}]") for i in lst]
bronzeDF = bronzeDF.withColumn("payment2", F.explode(F.array(*jsonElem))).where(F.col("payment2").isNotNull())

然后使用定义的架构转换回struct,并将对象键分解为单独的列:

bronzeDF = bronzeDF.withColumn("temp", F.from_json("payment2", jsonSchemaPayment)).select("*", "temp.*").drop("payment2")

相关问题 更多 >