擅长:python、mysql、java
<p><code>malformed_rows</code>是内部损坏记录列,默认情况下命名为<code>_corrupt_record</code>,您使用以下名称重命名:</p>
<pre><code>.option("columnNameOfCorruptRecord", "malformed_rows")
</code></pre>
<p>但是从Spark 2.3开始,您不能仅使用<a href="https://spark.apache.org/docs/latest/sql-migration-guide.html#upgrading-from-spark-sql-22-to-23" rel="nofollow noreferrer">docs</a>中引用的此列来查询数据,您需要在以下情况之前缓存df:</p>
<blockquote>
<p>Since Spark 2.3, the queries from raw JSON/CSV files are disallowed
when the referenced columns only include the internal corrupt record
column (named <code>_corrupt_record</code> by default). For example,
<code>spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()</code>
and
<code>spark.read.schema(schema).json(file).select("_corrupt_record").show()</code>.
Instead, you can cache or save the parsed results and then send the
same query. For example, <code>val df = spark.read.schema(schema).json(file).cache()</code> and then <code>df.filter($"_corrupt_record".isNotNull).count()</code>.</p>
</blockquote>