擅长:python、mysql、java
<p>这可以通过<a href="https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html" rel="nofollow noreferrer">pandas udfs</a>完成。然后你可以直接使用你想要的功能</p>
<pre><code>[IN]
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pandas as pd, numpy as np
s = pd.DataFrame({"col1": ["a","a","a","a","b","b"],
"col2": ["x", "y", "x", "y", "z", "z"],
"result": [123, np.nan, 453, 675, 786, 332]})
spark_df = spark.createDataFrame(s)
grouped_spark_df = spark_df.groupBy("col1", "col2")
@pandas_udf("col1 string, col2 string, result float", PandasUDFType.GROUPED_MAP)
def fillnaspark(df):
df['result'] = df['result'].bfill()
df['result'] = df['result'].ffill()
return df
grouped_spark_df.apply(fillnaspark).show()
[OUT]
+ + + +
|col1|col2|result|
+ + + +
| a| x| 123.0|
| a| x| 453.0|
| b| z| 786.0|
| b| z| 332.0|
| a| y| 675.0|
| a| y| 675.0|
+ + + +
</code></pre>