有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java Spark SQL读取已转义双引号的JSON文件

我有一个简单的Spark程序,它读取JSON文件并发出CSV文件。在JSON文件中,数据被转义为双引号。spark程序无法将该行作为有效的JSON字符串读取

输入。json

{\"key\" : \"k1\", \"value1\": \"Good String\", \"value2\": \"Good String\"}

输入1。json

"{\"key\" : \"k1\", \"value1\": \"Good String\", \"value2\": \"Good String\"}"

输出。csv-数据作为损坏记录返回

_corrupt_record,key,value1,value2
"{\\"key\\\" : \\\"k1\\\", \\\"value1\\\": \\\"Good String\\\", \\\"value2\\\": \\\"Good String\\\"}",,,

预料之中。csv

,k1,Good String,Good String

请查看下面的主代码并给出建议

public static void main(String[] args) {
    SparkSession sparkSession = SparkSession.builder()
            .appName(TestSpark.class.getName()).master("local[1]").getOrCreate();

    SparkContext context = sparkSession.sparkContext();
    context.setLogLevel("ERROR");
    SQLContext sqlCtx = sparkSession.sqlContext();

    List<StructField> kvFields = new ArrayList<>();
    kvFields.add(DataTypes.createStructField("_corrupt_record", DataTypes.StringType, true));
    kvFields.add(DataTypes.createStructField("key", DataTypes.StringType, true));
    kvFields.add(DataTypes.createStructField("value1", DataTypes.StringType, true));
    kvFields.add(DataTypes.createStructField("value2", DataTypes.StringType, true));
    StructType employeeSchema = DataTypes.createStructType(kvFields);

    Dataset<Row> dataset = sparkSession.read()
                    .option("inferSchema", false)
                    .format("json")
                    .schema(employeeSchema)
                    .load("D:\\dev\\workspace\\java\\simple-kafka\\key_value.json");

    dataset.createOrReplaceTempView("sourceView");
    sqlCtx.sql("select * from sourceView")
            .write()
            .option("header", true)
            .format("csv")
            .save("D:\\dev\\workspace\\java\\simple-kafka\\output\\" + UUID.randomUUID().toString());
    sparkSession.close();
}

共 (1) 个答案

  1. # 1 楼答案

    您需要读取文本并手动解析。我不在Java中使用Spark,但这里是Scala的等价物,您可以将其用作伪代码:

    val rdd: RDD[MyClass] = sc.textFile(path)
      .map { line =>
        val json = ... // turn line into valid json
        Try(parse(json))
          .recover {
            case NonFatal(ex) => // handle parse error
          }
          .map { jvalue =>
            // Convert the JSON into a case class
          }
          .get
      }
    
    val ds: Dataset[MyClass] = spark.createDataset(rdd)