使用pyspark解析Spark 3数据帧中的多行嵌套json

2024-09-28 05:39:56 发布

您现在位置:Python中文网/ 问答频道 /正文

我在Pyspark中读取多行json时遇到问题。例如:

{
    "_index": "kl.service-log.2021.04.06",
    "_type": "_doc",
    "_id": "hZ3SpHgBhp2ht1Q8n8ym",
    "_version": 1,
    "_score": null,
    "_source": {
        "publishTime": "2021-04-06T01:36:09.422Z",
        "client_ips": "2601:247:c580:3337:45c0:dd63:35e0:9247",
        "body": {
            "events": "[{\"key\":\"Key  Launched\",\"count\":1,\"timestamp\":1617672914673,\"sum\":0},{\"key\":\"Viewed Screen\",\"count\":1,\"timestamp\":1617672969301,\"sum\":0}]",
            "sdk_name": "java-native-android",
            "tz": "-300"
        }
    }
}

模式定义如下:

root
 |-- _id: string (nullable = true)
 |-- _index: string (nullable = true)
 |-- _score: string (nullable = true)
 |-- _source: struct (nullable = true)
 |    |-- body: struct (nullable = true)
 |    |    |-- events: string (nullable = true)
 |    |    |-- sdk_name: string (nullable = true)
 |    |    |-- tz: string (nullable = true)
 |    |-- client_ips: string (nullable = true)
 |    |-- publishTime: string (nullable = true)
 |-- _type: string (nullable = true)
 |-- _version: long (nullable = true)

_source.body.events下,我看到数据类型是string,但它是一个包含两个不同记录的命令式。我想让他们作为两个特定的列不同的行


Tags: keyclientidtruesourcestringindexversion
1条回答
网友
1楼 · 发布于 2024-09-28 05:39:56

您可以使用from_json解析事件列,并重建_源列:

import pyspark.sql.functions as F

df2 = df.withColumn(
    '_source', 
    F.struct(
        F.struct(
            F.from_json(
                '_source.body.events',
                'array<struct<key:string, count:int, timestamp:long, sum:int>>'
            ).alias('events'), 
            '_source.body.sdk_name', 
            '_source.body.tz'
        ).alias('body'), 
        '_source.client_ips', 
        '_source.publishTime'
    )
)

df2.show(truncate=False)
+          +            -+   +                                                                                      -+  -+    +
|_id                 |_index                   |_score|_source                                                                                                                                                                      |_type|_version|
+          +            -+   +                                                                                      -+  -+    +
|hZ3SpHgBhp2ht1Q8n8ym|kl.service-log.2021.04.06|null  |[[[[Key  Launched, 1, 1617672914673, 0], [Viewed Screen, 1, 1617672969301, 0]], java-native-android, -300], 2601:247:c580:3337:45c0:dd63:35e0:9247, 2021-04-06T01:36:09.422Z]|_doc |1       |
+          +            -+   +                                                                                      -+  -+    +

df2.printSchema()
root
 |  _id: string (nullable = true)
 |  _index: string (nullable = true)
 |  _score: string (nullable = true)
 |  _source: struct (nullable = false)
 |    |  body: struct (nullable = false)
 |    |    |  events: array (nullable = true)
 |    |    |    |  element: struct (containsNull = true)
 |    |    |    |    |  key: string (nullable = true)
 |    |    |    |    |  count: integer (nullable = true)
 |    |    |    |    |  timestamp: long (nullable = true)
 |    |    |    |    |  sum: integer (nullable = true)
 |    |    |  sdk_name: string (nullable = true)
 |    |    |  tz: string (nullable = true)
 |    |  client_ips: string (nullable = true)
 |    |  publishTime: string (nullable = true)
 |  _type: string (nullable = true)
 |  _version: long (nullable = true)

如果要将数组分解为单独的行,可以对上面获得的df2进行操作:

df3 = df2.withColumn(
    'idx', 
    F.expr('explode(sequence(0, size(_source.body.events) - 1))')
).withColumn(
    '_source', 
    F.struct(
        F.struct(
            F.expr('_source.body.events[idx]'),
            '_source.body.sdk_name', 
            '_source.body.tz'
        ).alias('body'), 
        '_source.client_ips', 
        '_source.publishTime'
    )
).drop('idx')

df3.show(truncate=False)
+          +            -+   +                                                                  -+  -+    +
|_id                 |_index                   |_score|_source                                                                                                                              |_type|_version|
+          +            -+   +                                                                  -+  -+    +
|hZ3SpHgBhp2ht1Q8n8ym|kl.service-log.2021.04.06|null  |[[[Key  Launched, 1, 1617672914673, 0], java-native-android, -300], 2601:247:c580:3337:45c0:dd63:35e0:9247, 2021-04-06T01:36:09.422Z]|_doc |1       |
|hZ3SpHgBhp2ht1Q8n8ym|kl.service-log.2021.04.06|null  |[[[Viewed Screen, 1, 1617672969301, 0], java-native-android, -300], 2601:247:c580:3337:45c0:dd63:35e0:9247, 2021-04-06T01:36:09.422Z]|_doc |1       |
+          +            -+   +                                                                  -+  -+    +

相关问题 更多 >

    热门问题