使用spark中的groupby other列填充NaN

2024-09-28 03:24:32 发布

您现在位置:Python中文网/ 问答频道 /正文

数据

 Col1 Col2    result
0  a    x      123.0 
1  a    y     NaN    
2  a    x      453.0 
3  a    y      675.0 
4  b    z      786.0 
5  b    z      332.0 

我想用675.0填充NaN,首先按col1分组,然后按col2分组,然后填充NaN值

在熊猫中

df['result'] = df['result'].fillna(df.groupby(['col1', 'col2', ])['result'].bfill())


df['result'] = df['result'].fillna(df.groupby(['col1', 'col2', ])['result'].ffill())

如何在pyspark中实现它


Tags: 数据dfresultnanpysparkcol2col1groupby
2条回答

您可以使用nanvlNaN替换为结果的lead值(相当于ffillbfill将是lag):

from pyspark.sql import functions as F, Window

df2 = df.withColumn(
    'id', F.monotonically_increasing_id()
).withColumn(
    'result', 
    F.nanvl(
        'result',
        F.coalesce(
            F.lag('result').over(Window.partitionBy('Col1', 'Col2').orderBy('id')),
            F.lead('result').over(Window.partitionBy('Col1', 'Col2').orderBy('id'))
        )
    )
).orderBy('id')

df2.show()
+ -+  +  +   +
| id|Col1|Col2|result|
+ -+  +  +   +
|  0|   a|   x| 123.0|
|  1|   a|   y| 675.0|
|  2|   a|   x| 453.0|
|  3|   a|   y| 675.0|
|  4|   b|   x| 786.0|
|  5|   b|   y| 332.0|
+ -+  +  +   +

这可以通过pandas udfs完成。然后你可以直接使用你想要的功能

[IN]
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pandas as pd, numpy as np

s = pd.DataFrame({"col1": ["a","a","a","a","b","b"],
              "col2": ["x", "y", "x", "y", "z", "z"],
              "result": [123, np.nan, 453, 675, 786, 332]})

spark_df = spark.createDataFrame(s)

grouped_spark_df = spark_df.groupBy("col1", "col2")

@pandas_udf("col1 string, col2 string, result float", PandasUDFType.GROUPED_MAP)
def fillnaspark(df):
    df['result'] = df['result'].bfill()
    df['result'] = df['result'].ffill()
    return df

grouped_spark_df.apply(fillnaspark).show()

[OUT]
+  +  +   +
|col1|col2|result|
+  +  +   +
|   a|   x| 123.0|
|   a|   x| 453.0|
|   b|   z| 786.0|
|   b|   z| 332.0|
|   a|   y| 675.0|
|   a|   y| 675.0|
+  +  +   +

相关问题 更多 >

    热门问题