我试图分析数据中的null、空格nan,并在基于范围的分类中列出列
NullValuePercentageRange ColumnList
10-80% Col1,Col3
80-99% Col4
100% Col5
如果所有列都是相同的类型,我可以实现这一点,但是当存在混合数据类型时,它会失败。多亏了其他几篇文章,我才能够得到其中的一部分
def dataprofile(df):
d=df.count()
df1=df.select([count(when(isnan(c) | col(c).isNull() , c)).alias(c) for c in df.columns])
df1 = df1.withColumn('TotalCount',F.lit(d))
df1 = to_explode(df1, ['TotalCount'])
dprof_df = df1.where(df1['NullCounts'] > 0 )
dprof_df = dprof_df.where(dprof_df['TotalCount'] > 0)
dprof_df = dprof_df.withColumn('NullPercentage',(col('NullCounts')*100/col('TotalCount')).cast("int"))
dprof_dff = dprof_df.where(dprof_df['NullPercentage'] > 10)
dprof_dff.createOrReplaceTempView("profildata")
dprof_df_1 = spark.sql("select ColumnNames,case when NullPercentage >=10 and NullPercentage <80 THEN '10-80%' WHEN NullPercentage >=80 and NullPercentage <100 THEN '80-99%' ELSE '100%' end as NullValuePercentageRange from profildata")
dprof_df_2 = dprof_df_1.select("NullValuePercentageRange", "ColumnNames").groupBy("NullValuePercentageRange").agg(concat_ws(",", collect_set("ColumnNames")).alias('ColumnList'))
return dprof_df_2
def to_explode(df, by):
# Filter dtypes and split into column names and type description
cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
# Spark SQL supports only homogeneous columns
assert len(set(dtypes)) == 1, "All columns have to be of the same type"
# Create and explode an array of (column_name, column_value) structs
kvs = explode(array([
struct(lit(c).alias("ColumnNames"), col(c).alias("NullCounts")) for c in cols
])).alias("kvs")
return df.select(by + [kvs]).select(by + ["kvs.ColumnNames", "kvs.NullCounts"])
此代码失败,并出现此错误
pyspark.sql.utils.AnalysisException: "cannot resolve 'isnan(targettable.`startdate`)' due to data type mismatch: argument 1 requires (double or float) type, however, 'targettable.`startdate`' is of date type.;;\n'Aggregate [count(CASE WHEN (isnan(startdate#74) || isnull(startdate#74)) THEN startdate END) AS startdate#175, count(CASE WHEN (isnan(period2#79) || isnull(period2#79)) THEN period2 END) AS period2#177L, count(CASE WHEN (isnan(period3#78) || isnull(period3#78)) THEN period3 END) AS period3#179L, count(CASE WHEN (isnan(cast(dmdgroup#72 as double)) || isnull(dmdgroup#72)) THEN dmdgroup END) AS dmdgroup#181L, count(CASE WHEN (isnan(cast(model#77 as double)) || isnull(model#77)) THEN model END) AS model#183L, count(CASE WHEN (isnan(period1#80) || isnull(period1#80)) THEN period1 END) AS period1#185L, count(CASE WHEN (isnan(cast(keycol#70 as double)) || isnull(keycol#70)) THEN keycol END) AS keycol#187L, count(CASE WHEN (isnan(cast(loc#73 as double)) || isnull(loc#73)) THEN loc END) AS loc#189L, count(CASE WHEN (isnan(cast(type#75 as double)) || isnull(type#75)) THEN type END) AS type#191L, count(CASE WHEN (isnan(cast(fcstid#76 as double)) || isnull(fcstid#76)) THEN fcstid END) AS fcstid#193L, count(CASE WHEN (isnan(cast(dmdunit#71 as double)) || isnull(dmdunit#71)) THEN dmdunit END) AS dmdunit#195L, count(CASE WHEN (isnan(cast(all_hash_val#146 as double)) || isnull(all_hash_val#146)) THEN all_hash_val END) AS all_hash_val#197L]\n+- RepartitionByExpression [dmdunit#71, dmdgroup#72, loc#73, startdate#74, type#75, fcstid#76], 200\n +- Project [startdate#74, period2#79, period3#78, dmdgroup#72, model#77, period1#80, keycol#70, loc#73, type#75, fcstid#76, dmdunit#71, sha2(cast(concat_ws(||, cast(startdate#74 as string), cast(period2#79 as string), cast(period3#78 as string), dmdgroup#72, model#77, cast(period1#80 as string), cast(keycol#70 as string), loc#73, cast(type#75 as string), fcstid#76, dmdunit#71) as binary), 256) AS all_hash_val#146]\n +- Project [startdate#74, period2#79, period3#78, dmdgroup#72, model#77, period1#80, keycol#70, loc#73, type#75, fcstid#76, dmdunit#71]\n +- SubqueryAlias targettable\n +- Relation[KEYCOL#70,DMDUNIT#71,DMDGROUP#72,LOC#73,STARTDATE#74,TYPE#75,FCSTID#76,MODEL#77,PERIOD3#78,PERIOD2#79,PERIOD1#80] parquet\n"
甚至explode函数也适用于相同的数据类型记录
示例-
import pyspark.sql.types as st
data = [
('1990-05-03', 29, 'Test',None),
('1994-09-23', 25, 'Testing',None),
('2004-09-23',None , None,None),
('2004-09-23', None , None,None)
]
user_schema = st.StructType([
st.StructField('dob', st.StringType(), True),
st.StructField('age', st.IntegerType(), True),
st.StructField('is_fan', st.StringType(), True),
st.StructField('allnull', st.StringType(), True)
])
user_df = spark.createDataFrame(data, user_schema)
user_df.show()
df = dataprofile(user_df)
df.show()
+------------------------+----------+
|NullValuePercentageRange|ColumnList|
+------------------------+----------+
| 10-80%|is_fan,age|
| 100%| allnull|
+------------------------+----------+
如果我将dob转换为数据列-
spark_df1=user_df.withColumn("dob",user_df['dob'].cast(DateType()))
df_1=dataprofile(spark_df1)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 3, in dataprofile
File "/usr/hdp/current/spark2-client/python/pyspark/sql/dataframe.py", line 1202, in select
jdf = self._jdf.select(self._jcols(*cols))
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 69, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u"cannot resolve 'isnan(`dob`)' due to data type mismatch: argument 1 requires (double or float) type, however, '`dob`' is of date type.;;\n'Aggregate [count(CASE WHEN (isnan(dob#112) || isnull(dob#112)) THEN dob END) AS dob#141, count(CASE WHEN (isnan(cast(age#1 as double)) || isnull(age#1)) THEN age END) AS age#143L, count(CASE WHEN (isnan(cast(is_fan#2 as double)) || isnull(is_fan#2)) THEN is_fan END) AS is_fan#145L, count(CASE WHEN (isnan(cast(allnull#3 as double)) || isnull(allnull#3)) THEN allnull END) AS allnull#147L]\n+- Project [cast(dob#0 as date) AS dob#112, age#1, is_fan#2, allnull#3]\n +- LogicalRDD [dob#0, age#1, is_fan#2, allnull#3], false\n"
请告知是否有方法根据不同数据类型列数据中null的百分比将其转换为汇总版本?我不想把所有东西都转换成字符串,而是想处理数据类型
目前没有回答
相关问题 更多 >
编程相关推荐