带缩进的Pyspark文本文件json

2024-10-01 09:16:35 发布

您现在位置:Python中文网/ 问答频道 /正文

Wanne读取了一个带有缩进的json文件到RDD中,但是spark抛出了一个异常。在

# txts = sc.textFile('data/jsons_without_indentation') # works
txts = sc.textFile('data/jsons_with_indentation')      # fails
txts_dicts = txts.map(lambda data: json.loads(data))
txts_dicts.collect()

在sc.wholeTextFiles公司也不起作用。是否可以加载带有缩进的json而不必先将其转换为文件?在

示例json文件如下所示:

^{pr2}$

Tags: 文件jsondatawithsparkwithoutscrdd
1条回答
网友
1楼 · 发布于 2024-10-01 09:16:35

如果这只是每个文件的一个JSON文档,那么您只需要SparkContext.wholeTextFiles。首先创建一些虚拟数据:

import tempfile
import json 

input_dir = tempfile.mkdtemp()

docs = [
    {'data': {'text': {'de': 'Ein Text.', 'en': 'A text.'}}},
    {'data': {'text': {'de': 'Ein Bahnhof.', 'en': 'A railway station.'}}},
    {'data': {'text': {'de': 'Ein Hund.', 'en': 'A dog.'}}}]

for doc in docs:
    with open(tempfile.mktemp(suffix="json", dir=input_dir), "w") as fw:
        json.dump(doc, fw, indent=4)

现在让我们读取数据:

^{pr2}$

并确保文件确实缩进:

print rdd.top(1)[0]

## {
##     "data": {
##         "text": {
##             "de": "Ein Text.", 
##             "en": "A text."
##         }
##     }
## }

最后,我们可以分析:

parsed = rdd.map(json.loads)

检查是否一切正常:

parsed.takeOrdered(3)

## [{u'data': {u'text': {u'de': u'Ein Bahnhof.', u'en': u'A railway station.'}}},
##  {u'data': {u'text': {u'de': u'Ein Hund.', u'en': u'A dog.'}}},
##  {u'data': {u'text': {u'de': u'Ein Text.', u'en': u'A text.'}}}]

如果您仍然遇到一些问题,很可能是由于某些错误的条目造成的。您可以做的最简单的事情是使用带有自定义包装的flatMap丢弃格式错误的条目:

rdd_malformed = sc.parallelize(["{u'data': {u'text': {u'de':"]).union(rdd)

## org.apache.spark.api.python.PythonException: Traceback (most recent call ...
##     ...
## ValueError: Expecting property name: line 1 column 2 (char 1)

并使用try_seq(此处定义:What is the equivalent to scala.util.Try in pyspark?)进行包装

rdd_malformed.flatMap(lambda x: seq_try(json.loads, x)).collect()

## [{u'data': {u'text': {u'de': u'Ein Hund.', u'en': u'A dog.'}}},
##  {u'data': {u'text': {u'de': u'Ein Text.', u'en': u'A text.'}}},
##  {u'data': {u'text': {u'de': u'Ein Bahnhof.', u'en': u'A railway station.'}}}]

相关问题 更多 >