目前pyspark格式化日志文件,然后加载redshift。在
分析以json格式输出的日志文件的每一项,添加一项,并将其加载到Redshift中。 但是,对于每种类型,某些项目的格式是不同的。 (对于同一项目,应事先使用Shcema。) 即使按原样输出,也将输入转义字符 有没有一种方法可以动态创建模式信息,并且输出的jsonfile没有转义符?在
--环境--
- spark 2.4.0
- python version 2.7.15
--数据帧--
^{pr2}$--架构(用于公共项)---
>> print(json.dumps(schema.jsonValue(), indent=2))
{
"fields": [
{
"metadata": {},
"type": "string",
"name": "Name",
"nullable": false
},
{
"metadata": {},
"type": {
"keyType": "string",
"type": "map",
"valueType": "string",
"valueContainsNull": true
},
"name": "d",
"nullable": false
}
],
"type": "struct"
}
--代码--
from pyspark.sql.types import *
rdd = sc.parallelize([("Amber", {"Body": "{\"City\": \"Oregon\", \"Country\": \"US\"}", "BodyType": 1}), ("Alfred", {"Body": "{\"Weight\": 80, \"Height\": 176}", "BodyType": 2})])
schema = StructType([StructField('Name',StringType(), False)
,StructField('d',MapType(StringType(),StringType()), False)])
df = spark.createDataFrame(rdd, schema)
--输出json文件--
{"Name":"Amber","d":{"Body":"{\"City\": \"Oregon\", \"Country\": \"US\"}","BodyType":"1"}}
{"Name":"Alfred","d":{"Body":"{\"Weight\": 80, \"Height\": 176}","BodyType":"2"}}
--输出json文件(理想)--
{"Name":"Amber","d":{"Body":"{\"City\": \"Oregon\", \"Country\": \"US\"}","BodyType":"1"}, "Body":{"City": "Oregon", "Country": "US"}}
{"Name":"Alfred","d":{"Body":"{\"Weight\": 80, \"Height\": 176}","BodyType":"2"}, "Body":{"Weight": 80, "Height": 176}}
我尝试使用模式_json()和pyspark.sql.functions,但没用。 (_json的schema_只能接受字符文本)
--试验结果--
from pyspark.sql.functions import schema_of_json
from pyspark.sql.functions import from_json
df = df.withColumn('Body', df.select(from_json(df.d.body,schema_of_json(df.d.Body))))
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/spark-2.4.0-bin-hadoop2.7/python/pyspark/sql/functions.py", line 2277, in from_json
jc = sc._jvm.functions.from_json(_to_java_column(col), schema, options)
File "/usr/local/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/usr/local/spark-2.4.0-bin-hadoop2.7/python/pyspark/sql/utils.py", line 69, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u"Schema should be specified in DDL format as a string literal or output of the schema_of_json function instead of schemaofjson(`d`['Body']);"
但是,有一种方法可以输出所需的json字符串,并将不同的json协调到一个通用的、富类型的schema
细节
如果允许的话,它将非常缓慢,但更重要的是它是不允许的,因为它打破了允许SparkSQL一致运行的关系模型。在
数据帧由列(字段)组成,而列只有一个数据类型;数据类型表示整个列。鉴于Pyspark的特性,它在Pyspark中并不是严格可执行的,但在运行时它很重要,因此该语句仍然适用。在
在您的示例中,如果您希望使用类似于
d.Body.City
的东西来投射City
属性,那么Alfred和Amber都必须存在该属性。至少,该字段的元数据必须存在,即使没有值。要知道引擎是否需要快速避免每一行的无效执行。在在一列中协调多个类型的一些方法是(我肯定还有很多我想不出来的):
在这种情况下,我喜欢(1),但是(4)可以作为找到通用模式的过渡步骤。在
您的示例“common”json模式更像选项(3)。在地图里面你叫“d”(我猜是因为这是个口头禅?)如果不扫描数据,有关字段的信息将不可用。在
我意识到这只是添加一个包含
Body
的新列的过渡步骤,但是要做到这一点,您必须将映射中所有可能的键枚举到一个更有用的模式中。在解决方案
通用(公共)模式不是
^{pr2}$string -> string
的通用映射,我认为下面这样更有用。它接近于您最初尝试的内容,但不是动态的,并且对两行都有效。注意,nullable
是所有属性的默认True
现在,您可以通过选择
d.Body.City
轻松到达Body.City
,而不必担心哪些行有City。在对于下一步,您可以将其恢复为json字符串
你也可以把它和上一步结合起来
请注意,当将其转换回json字符串时,这些空值将消失。它现在也是一个jsonstring,很容易按照您的需要写入文件。在
更进一步
如果您将此操作作为过程的一部分,以使数据可用于分析、报告或其他目的,我会这样做
然后您可以选择
Body.City
,Body.Country
,Body.Weight,
车身。高度`在你可以再多走一步,但这将取决于这些可能的身体关键点有多少以及它有多稀疏。在
相关问题 更多 >
编程相关推荐