我有一个艰难的时间与我的程序,我试图应用一个自定义项到一个数据帧,并得到一个错误消息根据我的标题。这是我的密码
import pandas as pd
import datetime as dt
import numpy as np
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
df = pd.DataFrame({
'ID':[1,2,2],
'dt':[pd.Timestamp.now(),pd.Timestamp.now(),
pd.Timestamp.now()]})
df.head()
def FlagUsers(df,ids,tm,gap):
df=df.sort_values([ids,tm])
df[ids]=df[ids].astype(str)
df['timediff'] = df.groupby(ids)[tm].diff()
df['prevtime']= df.groupby (ids)[tm].shift()
df['prevuser']= df[ids].shift()
df['prevuser'].fillna(0,inplace=True)
df['timediff']=df.timediff/ pd.Timedelta('1 minute')
df['timediff'].fillna(99,inplace=True)
df['flagnew']=np.where((df.timediff<gap) & (df['prevuser']==df[ids]),'existing','new' )
df.loc[df.flagnew == 'new','sessnum'] = df.groupby([ids,'flagnew']).cumcount()+1
df['sessnum']=df['sessnum'].fillna(method='ffill')
df['session_key']= df[ids].astype(str)+"_"+df['sessnum'].astype(str)
df.drop(['prevtime', 'prevuser'], axis =1, inplace= True)
arr=df['session_key'].values
return arr
# Python Function works fine:
FlagUsers(df,'ID','dt',5)
s_df = spark.createDataFrame(df)
s_df.show()
spark.udf.register("FlagUsers", FlagUsers)
s_df = s_df.withColumn('session_key',FlagUsers(s_df,'ID','dt',5))
我的函数在python中工作得很好,但当我尝试在Spark中运行它时,它不工作?如果这是一个愚蠢的问题,我真的很抱歉!多谢各位;祝福
pyspark udf与其他原生python udf不同,它有特定的要求
请用熊猫udf做实验
它比以前快好几倍 https://docs.databricks.com/spark/latest/spark-sql/udf-python-pandas.html
相关问题 更多 >
编程相关推荐