在PySpark中使用列名中带有特殊字符的拼花地板文件

2024-09-27 04:24:15 发布

您现在位置:Python中文网/ 问答频道 /正文

主要目标
显示或选择从拼花文件读取的Spark数据框中的列。 论坛中提到的所有解决方案在我们的案例中都不成功

问题
当使用SPARK读取和查询拼花地板文件时,会出现此问题,这是由于列名中存在特殊字符 ,;{}()\n\t=。这个问题是用一个简单的拼花文件重现的,它有两列五行。列的名称为:

  • SpeedReference_Final_01 (RifVel_G0)
  • SpeedReference_Final_02 (RifVel_G1)

出现的错误是:
Attribute name "SpeedReference_Final_01 (RifVel_G0)" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.

我们正在Python语言中使用PySpark,实验解决方案可分为以下几类:

  1. 基于列重命名的解决方案-[spark.read.parquet+获得的数据帧的重命名]
    已经试验了几种解决方案:

    • withColumnRenamed(脚本中的问题N.2)
    • toDF(第3期)
    • alias(第5期)

    他们中没有一个在我们的案件中起作用

  2. 将拼花地板文件读入熊猫数据框,然后从中创建一个新的拼花地板文件-[pd.read.parquet+spark.createDataFrame]
    此解决方案使用的是一个小拼花文件(问题N.0,即脚本中的解决方法):即使创建的spark dataframe具有包含特殊字符的列名,也可以成功查询它不幸的是,对于我们的大型拼花文件(每个拼花600000行x 1000列),这是不可行的,因为创建spark数据框是无止境的

  3. 尝试将拼花地板文件读入Spark数据框,并使用其rdd和重命名的模式创建新的Spark数据框是不可行的,因为从Spark数据框提取rdd会产生相同的错误(问题N.4)

  4. 读取带有前缀模式的拼花地板文件(避免特殊字符)-[spark.read.schema(...).parquet]
    由于原始文件中不存在重命名的列,因此与关键列相关的数据按预期变为null/None,因此解决方案不起作用

下面的python代码总结了提到的解决方案,并用Example parquet file进行了实验

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col

import pandas as pd

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

# Select file
filename = 'D:/Simple.parquet'

issue_num = 0 # Workaround to issues (Equivalent to no issue)
#issue_num = 1 # Issue 1 - Unable to show dataframe or select column with name containing invalid character(s)
#issue_num = 2 # Issue 2 - Unable to show dataframe or select column after rename (using withColumnRenamed)
#issue_num = 3 # Issue 3 - Unable to show dataframe or select column after rename (using toDF)
#issue_num = 4 # Issue 4 - Unable to extract rdd from renamed dataframe 
#issue_num = 5 # Issue 5 - Unable to select column with alias

if issue_num == 0:

    ################################################################################################
    # WORKAROUND - Create Spark data frame from Pandas dataframe
    df_pd = pd.read_parquet(filename)
    DF = spark.createDataFrame(df_pd)
    print('WORKAROUND')
    DF.show()
    # +-----------------------------------+-----------------------------------+
    # |SpeedReference_Final_01 (RifVel_G0)|SpeedReference_Final_02 (RifVel_G1)|
    # +-----------------------------------+-----------------------------------+
    # |                  553.5228271484375|                     720.3720703125|
    # |                  553.5228271484375|                     720.3720703125|
    # |                  553.5228271484375|                     720.3720703125|
    # |                  553.5228271484375|                     720.3720703125|
    # |                  553.5228271484375|                     720.3720703125|
    # +-----------------------------------+-----------------------------------+

    ################################################################################################
    # Correct management of columns with  invalid characters when using spark.createDataFrame
    # spark.createDataFrame: Create a dataframe with two columns with  invalid characters - OK
    # DFCREATED
    schema = StructType(
        [
            StructField("SpeedReference_Final_01 (RifVel_G0)", FloatType(), nullable=True),
            StructField("SpeedReference_Final_02 (RifVel_G1)", FloatType(), nullable=True)
        ]
    )

    row_in = [(553.523,720.372), (553.523,720.372), (553.523,720.372), (553.523,720.372), (553.523,720.372)]

    rdd=spark.sparkContext.parallelize(row_in)
    DFCREATED = spark.createDataFrame(rdd, schema)
    DFCREATED.show()
    # +-----------------------------------+-----------------------------------+
    # |SpeedReference_Final_01 (RifVel_G0)|SpeedReference_Final_02 (RifVel_G1)|
    # +-----------------------------------+-----------------------------------+
    # |                            553.523|                            720.372|
    # |                            553.523|                            720.372|
    # |                            553.523|                            720.372|
    # |                            553.523|                            720.372|
    # |                            553.523|                            720.372|
    # +-----------------------------------+-----------------------------------+
    DF_SEL_VAR_CREATED = DFCREATED.select(DFCREATED.columns[0]).take(2)
    for el in DF_SEL_VAR_CREATED:
        print(el)
    #Row(SpeedReference_Final_01 (RifVel_G0)=553.5230102539062)
    #Row(SpeedReference_Final_01 (RifVel_G0)=553.5230102539062)
    
else:
    # spark.read: read file into dataframe - OK
    DF = spark.read.parquet(filename)
    print('ORIGINAL SCHEMA')
    DF.printSchema()
    # root
    #  |-- SpeedReference_Final_01 (RifVel_G0): float (nullable = true)
    #  |-- SpeedReference_Final_02 (RifVel_G1): float (nullable = true)
    
    if issue_num == 1:
        ###############################################################################################    
        # Issue 1 - Unable to show dataframe or select column with name containing invalid character(s)
        DF.show()
        # DF.select(DF.columns[0]).show()
        # DF_SEL_VAR = DF.select(DF.columns[0]).take(3)
        #ECC: Attribute name "SpeedReference_Final_01 (RifVel_G0)" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.
        # on all 3 previous statements

    elif issue_num == 2:
        ###############################################################################################    
        # Issue 2 - Unable to show dataframe or select column after rename (using withColumnRenamed)
        DFRENAMED = DF.withColumnRenamed('SpeedReference_Final_01 (RifVel_G0)','RifVelG0').withColumnRenamed('SpeedReference_Final_02 (RifVel_G1)','RifVelG1')
       
        print('RENAMED SCHEMA')
        DFRENAMED.printSchema()
        # root
        #  |-- RifVelG0: float (nullable = true)
        #  |-- RifVelG1: float (nullable = true)

        DFRENAMED.show()
        # DF_SEL_VAR_RENAMED = DFRENAMED.select(DFRENAMED.RifVelG0).take(2)
        #ECC: Attribute name "SpeedReference_Final_01 (RifVel_G0)" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.
        # on all 2 previous statements

    elif issue_num == 3:
        ###############################################################################################    
        # Issue 3 - Unable to show dataframe or select column after rename (using to_DF)
        DFRENAMED = DF.toDF('RifVelG0', 'RifVelG1')
    
        print('RENAMED SCHEMA')
        DFRENAMED.printSchema()
        # root
        #  |-- RifVelG0: float (nullable = true)
        #  |-- RifVelG1: float (nullable = true)

        DFRENAMED.show()
        # DF_SEL_VAR_RENAMED = DFRENAMED.select(DFRENAMED.RifVelG0).take(2)
        #ECC: Attribute name "SpeedReference_Final_01 (RifVel_G0)" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.
        # on all 2 previous statements

    elif issue_num == 4:
        ###############################################################################################    
        # Issue 4 - Unable to extract rdd from renamed dataframe 
        DFRENAMED = DF.withColumnRenamed('SpeedReference_Final_01 (RifVel_G0)','RifVelG0').withColumnRenamed('SpeedReference_Final_02 (RifVel_G1)','RifVelG1')
        DFRENAMED_rdd = DFRENAMED.rdd
        #ECC: Attribute name "SpeedReference_Final_01 (RifVel_G0)" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.

    elif issue_num == 5:
        ###############################################################################################    
        # Issue 5 - Unable to select column with alias
        DF_SEL_VAR = DF.select(col(DF.columns[0]).alias('RifVelG0')).take(3)
        #ECC: Attribute name "SpeedReference_Final_01 (RifVel_G0)" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.

你知道我们怎样才能解决这个问题吗

任何建议都非常感谢


Tags: 文件todataframedfshowissueselectnum

热门问题