无法使用许可模式在pyspark中保留损坏的行

2024-10-04 01:37:22 发布

您现在位置:Python中文网/ 问答频道 /正文

我得到了一个csv文件,我需要在该文件上使用pyspark执行某些清理任务。在清理之前,我正在做一些模式验证检查。下面是我的代码

# schema for the input data
def get_input_schema():
    return StructType([StructField("Group ID", StringType(), True),                           
                       StructField("Start Date", DateType(), True),
                       StructField("Start Time", StringType(), True),
                       ...
                       StructField("malformed_rows", StringType(), True)
                       ])

# basic cleanup logic
def main(argv):
    spark = SparkSession.builder.appName('cleaner_job').getOrCreate()
    spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
    df = spark.read.option("mode", "PERMISSIVE") \
        .option("dateFormat", "yyyy-MM-dd") \
        .option("columnNameOfCorruptRecord", "malformed_rows") \
        .schema(get_input_schema()) \
        .csv(input_path, header=True)

    # this is where the error is happening
    df_bad = df.filter(df["malformed_rows"].isNotNull())
    df_good = df.filter(df["malformed_rows"].isNull())

    df_good.write.csv(output_path, header=True)
    df_bad.write.csv(output_malformed_path, header=True)

我在读取csv时使用PERMISSIVE模式,并根据malformed_rows列是否为空,尝试将输入数据帧拆分为两个数据帧(df_gooddf_bad)。如果不拆分数据帧并将其直接写入csv,则可以在输出csv中看到malformed_rows列。但上面的代码抛出错误,表示:

ERROR Utils: Aborting task
java.lang.IllegalArgumentException: malformed_rows does not exist. Available: Group ID, Start Date, Start Time, ...,
    at org.apache.spark.sql.types.StructType.$anonfun$fieldIndex$1(StructType.scala:306)
    at scala.collection.MapLike.getOrElse(MapLike.scala:131)
    at scala.collection.MapLike.getOrElse$(MapLike.scala:129)
    at scala.collection.AbstractMap.getOrElse(Map.scala:63)
    at org.apache.spark.sql.types.StructType.fieldIndex(StructType.scala:305)
    at org.apache.spark.sql.catalyst.csv.CSVFilters.$anonfun$predicates$4(CSVFilters.scala:65)
    at org.apache.spark.sql.catalyst.csv.CSVFilters.$anonfun$predicates$4$adapted(CSVFilters.scala:65)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
    at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
    at org.apache.spark.sql.catalyst.csv.CSVFilters.$anonfun$predicates$3(CSVFilters.scala:65)
    at org.apache.spark.sql.catalyst.csv.CSVFilters.$anonfun$predicates$3$adapted(CSVFilters.scala:54)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.sql.catalyst.csv.CSVFilters.<init>(CSVFilters.scala:54)
    at org.apache.spark.sql.catalyst.csv.UnivocityParser.<init>(UnivocityParser.scala:101)
    at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.$anonfun$buildReader$1(CSVFileFormat.scala:138)
    at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:147)
    at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:132)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:116)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:169)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:488)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:272)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:281)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
ERROR FileFormatWriter: Job job_20210302150943_0000 aborted.
ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:291)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

我浏览了spark文档,它说要保留损坏的数据列,我们需要在模式中定义它,我正在这样做。但这让我困惑,为什么只有在我尝试过滤数据时,它才起作用。非常感谢您对解决此问题的任何帮助


Tags: csvorgdfsqlapachejavaatcollection
2条回答

malformed_rows是内部损坏记录列,默认情况下命名为_corrupt_record,您使用以下名称重命名:

.option("columnNameOfCorruptRecord", "malformed_rows")

但是从Spark 2.3开始,您不能仅使用docs中引用的此列来查询数据,您需要在以下情况之前缓存df:

Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column (named _corrupt_record by default). For example, spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count() and spark.read.schema(schema).json(file).select("_corrupt_record").show(). Instead, you can cache or save the parsed results and then send the same query. For example, val df = spark.read.schema(schema).json(file).cache() and then df.filter($"_corrupt_record".isNotNull).count().

如果使用其他内容对损坏的列进行检测,然后进行筛选,则它将起作用

您可以这样做:

df.createOrReplaceTempView("df1")   
spark.sql("select *, concat('error',malformed_rows) from df1 where concat('error',malformed_rows) is not null").show(10,False)

相关问题 更多 >