pyspark UDF返回AttributeError:“DataFrame”对象没有属性“sort\u values”

2024-09-23 09:06:26 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个艰难的时间与我的程序,我试图应用一个自定义项到一个数据帧,并得到一个错误消息根据我的标题。这是我的密码

import pandas as pd
import datetime as dt
import numpy as np
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

df = pd.DataFrame({
              'ID':[1,2,2],
              'dt':[pd.Timestamp.now(),pd.Timestamp.now(),
                  pd.Timestamp.now()]})
df.head()

def FlagUsers(df,ids,tm,gap):
  df=df.sort_values([ids,tm])
  df[ids]=df[ids].astype(str)
  df['timediff'] = df.groupby(ids)[tm].diff()
  df['prevtime']= df.groupby (ids)[tm].shift()
  df['prevuser']= df[ids].shift()
  df['prevuser'].fillna(0,inplace=True)
  df['timediff']=df.timediff/ pd.Timedelta('1 minute')
  df['timediff'].fillna(99,inplace=True)
  df['flagnew']=np.where((df.timediff<gap) & (df['prevuser']==df[ids]),'existing','new' )
  df.loc[df.flagnew == 'new','sessnum'] = df.groupby([ids,'flagnew']).cumcount()+1
  df['sessnum']=df['sessnum'].fillna(method='ffill')
  df['session_key']= df[ids].astype(str)+"_"+df['sessnum'].astype(str)
  df.drop(['prevtime', 'prevuser'], axis =1, inplace= True)
  arr=df['session_key'].values
  return arr

# Python Function works fine:
FlagUsers(df,'ID','dt',5)


s_df = spark.createDataFrame(df)
s_df.show()

spark.udf.register("FlagUsers", FlagUsers)
s_df = s_df.withColumn('session_key',FlagUsers(s_df,'ID','dt',5))

我的函数在python中工作得很好,但当我尝试在Spark中运行它时,它不工作?如果这是一个愚蠢的问题,我真的很抱歉!多谢各位;祝福


Tags: importididsdfasdtnowtimestamp