在pyspark数据帧中查找所有空和nan列

2024-10-05 18:04:33 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图分析数据中的null、空格nan,并在基于范围的分类中列出列

NullValuePercentageRange ColumnList
10-80%                    Col1,Col3
80-99%                    Col4
100%                      Col5 

如果所有列都是相同的类型,我可以实现这一点,但是当存在混合数据类型时,它会失败。多亏了其他几篇文章,我才能够得到其中的一部分

def dataprofile(df):
    d=df.count()
    df1=df.select([count(when(isnan(c) | col(c).isNull() , c)).alias(c) for c in df.columns])
    df1 = df1.withColumn('TotalCount',F.lit(d))
    df1 = to_explode(df1, ['TotalCount'])
    dprof_df = df1.where(df1['NullCounts'] > 0 )
    dprof_df = dprof_df.where(dprof_df['TotalCount'] > 0)
    dprof_df = dprof_df.withColumn('NullPercentage',(col('NullCounts')*100/col('TotalCount')).cast("int"))
    dprof_dff = dprof_df.where(dprof_df['NullPercentage'] > 10)
    dprof_dff.createOrReplaceTempView("profildata")
    dprof_df_1 = spark.sql("select ColumnNames,case when NullPercentage >=10 and NullPercentage <80 THEN '10-80%' WHEN NullPercentage >=80 and NullPercentage <100 THEN '80-99%' ELSE '100%' end as NullValuePercentageRange from     profildata")
    dprof_df_2 = dprof_df_1.select("NullValuePercentageRange", "ColumnNames").groupBy("NullValuePercentageRange").agg(concat_ws(",", collect_set("ColumnNames")).alias('ColumnList'))
    return dprof_df_2


def to_explode(df, by):

    # Filter dtypes and split into column names and type description
    cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
    # Spark SQL supports only homogeneous columns
    assert len(set(dtypes)) == 1, "All columns have to be of the same type"

    # Create and explode an array of (column_name, column_value) structs
    kvs = explode(array([
      struct(lit(c).alias("ColumnNames"), col(c).alias("NullCounts")) for c in cols
    ])).alias("kvs")

    return df.select(by + [kvs]).select(by + ["kvs.ColumnNames", "kvs.NullCounts"])

此代码失败,并出现此错误

pyspark.sql.utils.AnalysisException: "cannot resolve 'isnan(targettable.`startdate`)' due to data type mismatch: argument 1 requires (double or float) type, however, 'targettable.`startdate`' is of date type.;;\n'Aggregate [count(CASE WHEN (isnan(startdate#74) || isnull(startdate#74)) THEN startdate END) AS startdate#175, count(CASE WHEN (isnan(period2#79) || isnull(period2#79)) THEN period2 END) AS period2#177L, count(CASE WHEN (isnan(period3#78) || isnull(period3#78)) THEN period3 END) AS period3#179L, count(CASE WHEN (isnan(cast(dmdgroup#72 as double)) || isnull(dmdgroup#72)) THEN dmdgroup END) AS dmdgroup#181L, count(CASE WHEN (isnan(cast(model#77 as double)) || isnull(model#77)) THEN model END) AS model#183L, count(CASE WHEN (isnan(period1#80) || isnull(period1#80)) THEN period1 END) AS period1#185L, count(CASE WHEN (isnan(cast(keycol#70 as double)) || isnull(keycol#70)) THEN keycol END) AS keycol#187L, count(CASE WHEN (isnan(cast(loc#73 as double)) || isnull(loc#73)) THEN loc END) AS loc#189L, count(CASE WHEN (isnan(cast(type#75 as double)) || isnull(type#75)) THEN type END) AS type#191L, count(CASE WHEN (isnan(cast(fcstid#76 as double)) || isnull(fcstid#76)) THEN fcstid END) AS fcstid#193L, count(CASE WHEN (isnan(cast(dmdunit#71 as double)) || isnull(dmdunit#71)) THEN dmdunit END) AS dmdunit#195L, count(CASE WHEN (isnan(cast(all_hash_val#146 as double)) || isnull(all_hash_val#146)) THEN all_hash_val END) AS all_hash_val#197L]\n+- RepartitionByExpression [dmdunit#71, dmdgroup#72, loc#73, startdate#74, type#75, fcstid#76], 200\n   +- Project [startdate#74, period2#79, period3#78, dmdgroup#72, model#77, period1#80, keycol#70, loc#73, type#75, fcstid#76, dmdunit#71, sha2(cast(concat_ws(||, cast(startdate#74 as string), cast(period2#79 as string), cast(period3#78 as string), dmdgroup#72, model#77, cast(period1#80 as string), cast(keycol#70 as string), loc#73, cast(type#75 as string), fcstid#76, dmdunit#71) as binary), 256) AS all_hash_val#146]\n      +- Project [startdate#74, period2#79, period3#78, dmdgroup#72, model#77, period1#80, keycol#70, loc#73, type#75, fcstid#76, dmdunit#71]\n         +- SubqueryAlias targettable\n            +- Relation[KEYCOL#70,DMDUNIT#71,DMDGROUP#72,LOC#73,STARTDATE#74,TYPE#75,FCSTID#76,MODEL#77,PERIOD3#78,PERIOD2#79,PERIOD1#80] parquet\n"

甚至explode函数也适用于相同的数据类型记录

示例-

import pyspark.sql.types as st
​
data = [
    ('1990-05-03', 29, 'Test',None),
    ('1994-09-23', 25, 'Testing',None),
    ('2004-09-23',None , None,None),
    ('2004-09-23', None , None,None)
]
​
user_schema = st.StructType([
    st.StructField('dob', st.StringType(), True),
    st.StructField('age', st.IntegerType(), True),
    st.StructField('is_fan', st.StringType(), True),
    st.StructField('allnull', st.StringType(), True)
])
​
user_df = spark.createDataFrame(data, user_schema)
user_df.show()


df = dataprofile(user_df)
df.show()

+------------------------+----------+
|NullValuePercentageRange|ColumnList|
+------------------------+----------+
|                  10-80%|is_fan,age|
|                    100%|   allnull|
+------------------------+----------+

如果我将dob转换为数据列-

spark_df1=user_df.withColumn("dob",user_df['dob'].cast(DateType()))
df_1=dataprofile(spark_df1)


Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 3, in dataprofile
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/dataframe.py", line 1202, in select
    jdf = self._jdf.select(self._jcols(*cols))
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u"cannot resolve 'isnan(`dob`)' due to data type mismatch: argument 1 requires (double or float) type, however, '`dob`' is of date type.;;\n'Aggregate [count(CASE WHEN (isnan(dob#112) || isnull(dob#112)) THEN dob END) AS dob#141, count(CASE WHEN (isnan(cast(age#1 as double)) || isnull(age#1)) THEN age END) AS age#143L, count(CASE WHEN (isnan(cast(is_fan#2 as double)) || isnull(is_fan#2)) THEN is_fan END) AS is_fan#145L, count(CASE WHEN (isnan(cast(allnull#3 as double)) || isnull(allnull#3)) THEN allnull END) AS allnull#147L]\n+- Project [cast(dob#0 as date) AS dob#112, age#1, is_fan#2, allnull#3]\n   +- LogicalRDD [dob#0, age#1, is_fan#2, allnull#3], false\n"

请告知是否有方法根据不同数据类型列数据中null的百分比将其转换为汇总版本?我不想把所有东西都转换成字符串,而是想处理数据类型


Tags: dfastypecountendwhendoublecase