关于Pandas代码的Pypark平均天数计算

2024-09-29 21:27:05 发布

您现在位置:Python中文网/ 问答频道 /正文

#熊猫代码

temp = df_merge[['subscription_id', 'cancelleddate', 'subscriptionstartdate', 'termenddate']].drop_duplicates()
df_merge['mean_cancelled_sub_duration'] = (temp['cancelleddate']-temp['subscriptionstartdate']).dt.days.dropna().mean()/ 365
df_merge['mean_sub_duration'] = (temp['termenddate']-temp['subscriptionstartdate']).dt.days.dropna().mean()/365``

如何在pyspark中实现pandas代码的相同逻辑,虽然我在pyspark中尝试过这样做,但没有帮助,我们删除了行,计算错误:

名称中带有日期的列属于日期类型

#Pyspark转换失败

    temp = df_merge.select('subscription_id', 'cancelleddate', 'subscriptionstartdate', 'termenddate').dropDuplicates()
    temp = temp.withColumn("cancelled_sub_duration", datediff(temp.cancelleddate,temp.subscriptionstartdate)).withColumn("sub_duration", datediff(temp.termenddate,temp.subscriptionstartdate))
    temp = temp.na.drop(subset=['cancelled_sub_duration','sub_duration'])
    spec3 = Window.partitionBy("subscription_id")
    temp = temp.withColumn('mean_cancelled_sub_duration',(mean("cancelled_sub_duration").over(spec3))/365).withColumn('mean_sub_duration',(mean("sub_duration").over(spec3))/365)
    temp = temp.select(col('subscription_id').alias('subsid'), col('mean_cancelled_sub_duration'), col('mean_sub_duration'))
    df_merge = df_merge.join(broadcast(temp), df_merge.subscription_id==temp.subsid,"left").drop(col('subsid'))

Tags: iddfcolmergemeantempsubscriptiondrop
2条回答

您好,请发布pandas代码的预期输出以及您从pyspark代码中获得的信息,以便我们可以评估数据集之间的差异。如果没有这一点,就很难具体看到什么不起作用,什么起作用

同时,我只需要看看熊猫的代码,并尝试在pyspark中创建一个类似的代码,这就是我想到的

temp = temp \
.withColumn('mean_cancelled_sub_duration' avg(datediff('cancelledate', 'subscriptionstartdate')).over(spec3) / lit(365)) \
.withColumn('mean_sub_duration', avg(datediff('termenddate', 'subscriptionstartdate')).over(spec3) / lit(365))

首先,我刚刚创建了一个函数来平滑地将pandas数据帧转换为spark数据帧

def equivalent_type(f):
 if f == 'datetime64[ns]': return DateType()
 elif f == 'int64': return LongType()
 elif f == 'int32': return IntegerType()
 elif f == 'uint8': return IntegerType()
 elif f == 'float64': return FloatType()
 else: return StringType()

def define_structure(string, format_type):
 try: typo = equivalent_type(format_type)
 except: typo = StringType()
 return StructField(string, typo)

def pandas_to_spark(pandas_df):
 columns = list(pandas_df.columns)
 types = list(pandas_df.dtypes)
 struct_list = []
 for column, typo in zip(columns, types):
   struct_list.append(define_structure(column, typo))
 p_schema = StructType(struct_list)
 return spark.createDataFrame(pandas_df, p_schema)

然后,我使用toPandas()方法将spark数据帧转换为pandas数据帧

temp = df_merge.select('subscription_id', 'cancelleddate', 'subscriptionstartdate', 'termenddate').dropDuplicates()
temp = temp.toPandas()

temp['cancelleddate'] = pd.to_datetime(temp['cancelleddate'])
temp['subscriptionstartdate'] = pd.to_datetime(temp['subscriptionstartdate'])
temp['subscriptionstartdate'] = pd.to_datetime(temp['subscriptionstartdate'])

df_merge = df_merge.toPandas()
df_merge['mean_cancelled_sub_duration'] = (temp['cancelleddate']-temp['subscriptionstartdate']).dt.days.dropna().mean() / 365
df_merge['mean_sub_duration'] = (temp['termenddate']-temp['subscriptionstartdate']).dt.days.dropna().mean() / 365

df_merge = pandas_to_spark(df_merge)

我使用的是Spark 2.3.0版本,因此我必须确保转换为pandas dataframe的日期字段应在时间戳中,否则会引发错误

这最后解决了我的问题,我得到了平均值(因为这不是一种聚合平均值或行平均值,你可以说它是一个列平均值)

相关问题 更多 >

    热门问题