有没有一种方法可以使用pyspark动态创建模式信息,而不是输出jsonfile中的转义字符?

2024-06-28 18:59:49 发布

您现在位置:Python中文网/ 问答频道 /正文

目前pyspark格式化日志文件,然后加载redshift。在

分析以json格式输出的日志文件的每一项,添加一项,并将其加载到Redshift中。 但是,对于每种类型,某些项目的格式是不同的。 (对于同一项目,应事先使用Shcema。) 即使按原样输出,也将输入转义字符 有没有一种方法可以动态创建模式信息,并且输出的jsonfile没有转义符?在

--环境--

- spark 2.4.0
- python version 2.7.15

--数据帧--

^{pr2}$

--架构(用于公共项)---

>> print(json.dumps(schema.jsonValue(), indent=2))
{
  "fields": [
    {
      "metadata": {}, 
      "type": "string", 
      "name": "Name", 
      "nullable": false
    }, 
    {
      "metadata": {}, 
      "type": {
        "keyType": "string", 
        "type": "map", 
        "valueType": "string", 
        "valueContainsNull": true
      }, 
      "name": "d", 
      "nullable": false
    }
  ], 
  "type": "struct"
}

--代码--

from pyspark.sql.types import *

rdd = sc.parallelize([("Amber", {"Body": "{\"City\": \"Oregon\", \"Country\": \"US\"}", "BodyType": 1}), ("Alfred", {"Body": "{\"Weight\": 80, \"Height\": 176}", "BodyType": 2})])
schema = StructType([StructField('Name',StringType(), False)
    ,StructField('d',MapType(StringType(),StringType()), False)])
df = spark.createDataFrame(rdd, schema)

--输出json文件--

{"Name":"Amber","d":{"Body":"{\"City\": \"Oregon\", \"Country\": \"US\"}","BodyType":"1"}}
{"Name":"Alfred","d":{"Body":"{\"Weight\": 80, \"Height\": 176}","BodyType":"2"}}

--输出json文件(理想)--

{"Name":"Amber","d":{"Body":"{\"City\": \"Oregon\", \"Country\": \"US\"}","BodyType":"1"}, "Body":{"City": "Oregon", "Country": "US"}}
{"Name":"Alfred","d":{"Body":"{\"Weight\": 80, \"Height\": 176}","BodyType":"2"}, "Body":{"Weight": 80, "Height": 176}}

我尝试使用模式_json()和pyspark.sql.functions,但没用。 (_json的schema_只能接受字符文本)

--试验结果--

from pyspark.sql.functions import schema_of_json
from pyspark.sql.functions import from_json
df = df.withColumn('Body', df.select(from_json(df.d.body,schema_of_json(df.d.Body))))

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/spark-2.4.0-bin-hadoop2.7/python/pyspark/sql/functions.py", line 2277, in from_json
    jc = sc._jvm.functions.from_json(_to_java_column(col), schema, options)
  File "/usr/local/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/local/spark-2.4.0-bin-hadoop2.7/python/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u"Schema should be specified in DDL format as a string literal or output of the schema_of_json function instead of schemaofjson(`d`['Body']);"

Tags: 文件ofnameinfromjsondfsql
1条回答
网友
1楼 · 发布于 2024-06-28 18:59:49
太长了,读不下去了;

The short answer is no, there is no way to dynamically infer the schema on each row and end up with a column where different rows have different schemas.

但是,有一种方法可以输出所需的json字符串,并将不同的json协调到一个通用的、富类型的schema

细节

如果允许的话,它将非常缓慢,但更重要的是它是不允许的,因为它打破了允许SparkSQL一致运行的关系模型。在

数据帧由列(字段)组成,而列只有一个数据类型;数据类型表示整个列。鉴于Pyspark的特性,它在Pyspark中并不是严格可执行的,但在运行时它很重要,因此该语句仍然适用。在

在您的示例中,如果您希望使用类似于d.Body.City的东西来投射City属性,那么Alfred和Amber都必须存在该属性。至少,该字段的元数据必须存在,即使没有值。要知道引擎是否需要快速避免每一行的无效执行。在

在一列中协调多个类型的一些方法是(我肯定还有很多我想不出来的):

  1. 使用variant/union/option类型(例如将所有常见和不常见的json模式联合在一起)
  2. 将其序列化为json字符串(这是在应用jsonschema之前开始的地方,非常适合传输数据,而不是用于分析)
  3. 将其上推到具有最低公分母行为/接口/属性(丢失元数据和子类型属性)的超类型、装箱或通用对象(如RDD中的对象)
  4. 不要将其存储为单一类型,例如,将不同的变体存储到不同的列中,并在每个列上使用不同的json模式

在这种情况下,我喜欢(1),但是(4)可以作为找到通用模式的过渡步骤。在

您的示例“common”json模式更像选项(3)。在地图里面你叫“d”(我猜是因为这是个口头禅?)如果不扫描数据,有关字段的信息将不可用。在

root
 |  Name: string (nullable = false)
 |  d: map (nullable = false)
 |    |  key: string
 |    |  value: string (valueContainsNull = true)

我意识到这只是添加一个包含Body的新列的过渡步骤,但是要做到这一点,您必须将映射中所有可能的键枚举到一个更有用的模式中。在

解决方案

通用(公共)模式不是string -> string的通用映射,我认为下面这样更有用。它接近于您最初尝试的内容,但不是动态的,并且对两行都有效。注意,nullable是所有属性的默认True

^{pr2}$

现在,您可以通过选择d.Body.City轻松到达Body.City,而不必担心哪些行有City。在

对于下一步,您可以将其恢复为json字符串

df = df.withColumn("Body", to_json("d.Body"))

你也可以把它和上一步结合起来

df = df.withColumn("Body", to_json(from_json("d.Body", schema_body)))
df.printSchema()
root
 |  Name: string (nullable = false)
 |  BodyAttributes: struct (nullable = true)
 |    |  Body: string (nullable = true)
 |    |  BodyType: integer (nullable = true)
 |  Body: string (nullable = true)

df.show(2, False)

+   +                   -+                +
|Name  |BodyAttributes                         |Body                            |
+   +                   -+                +
|Amber |[{"City": "Oregon", "Country": "US"},1]|{"City":"Oregon","Country":"US"}|
|Alfred|[{"Weight": 80, "Height": 176},2]      |{"Weight":80,"Height":176}      |
+   +                   -+                +

请注意,当将其转换回json字符串时,这些空值将消失。它现在也是一个jsonstring,很容易按照您的需要写入文件。在


更进一步

如果您将此操作作为过程的一部分,以使数据可用于分析、报告或其他目的,我会这样做

schema = StructType([
    StructField('Name',StringType(), False),
    StructField(
        'd',
        StructType([
            StructField("Body", StringType()),
            StructField("BodyType", IntegerType())
        ])
    )
])

df = spark.createDataFrame(rdd, schema)
df = df.withColumn(
    "Body", 
    from_json("d.Body", schema_body)
).withColumn(
    "BodyType", 
    col("d.BodyType")
).drop("d")

df.printSchema()

root
 |  Name: string (nullable = false)
 |  Body: struct (nullable = true)
 |    |  City: string (nullable = true)
 |    |  Country: string (nullable = true)
 |    |  Weight: integer (nullable = true)
 |    |  Height: integer (nullable = true)
 |  BodyType: integer (nullable = true)


df.show(2, False)

+   +          -+    +
|Name  |Body                 |BodyType|
+   +          -+    +
|Amber |[Oregon,US,null,null]|1       |
|Alfred|[null,null,80,176]   |2       |
+   +          -+    +

然后您可以选择Body.CityBody.CountryBody.Weight,车身。高度`在

你可以再多走一步,但这将取决于这些可能的身体关键点有多少以及它有多稀疏。在

df = df.withColumn(
    "City", col("Body.City")
).withColumn(
    "Country", col("Body.Country")
).withColumn(
    "Weight", col("Body.Weight")
).withColumn(
    "Height", col("Body.Height")
).drop("Body")

df.printSchema()

root
 |  Name: string (nullable = false)
 |  BodyType: integer (nullable = true)
 |  City: string (nullable = true)
 |  Country: string (nullable = true)
 |  Weight: integer (nullable = true)
 |  Height: integer (nullable = true)

df.show(2, False)

+   +    +   +   -+   +   +
|Name  |BodyType|City  |Country|Weight|Height|
+   +    +   +   -+   +   +
|Amber |1       |Oregon|US     |null  |null  |
|Alfred|2       |null  |null   |80    |176   |
+   +    +   +   -+   +   +

相关问题 更多 >