#熊猫代码
temp = df_merge[['subscription_id', 'cancelleddate', 'subscriptionstartdate', 'termenddate']].drop_duplicates()
df_merge['mean_cancelled_sub_duration'] = (temp['cancelleddate']-temp['subscriptionstartdate']).dt.days.dropna().mean()/ 365
df_merge['mean_sub_duration'] = (temp['termenddate']-temp['subscriptionstartdate']).dt.days.dropna().mean()/365``
如何在pyspark中实现pandas代码的相同逻辑,虽然我在pyspark中尝试过这样做,但没有帮助,我们删除了行,计算错误:
名称中带有日期的列属于日期类型
#Pyspark转换失败
temp = df_merge.select('subscription_id', 'cancelleddate', 'subscriptionstartdate', 'termenddate').dropDuplicates()
temp = temp.withColumn("cancelled_sub_duration", datediff(temp.cancelleddate,temp.subscriptionstartdate)).withColumn("sub_duration", datediff(temp.termenddate,temp.subscriptionstartdate))
temp = temp.na.drop(subset=['cancelled_sub_duration','sub_duration'])
spec3 = Window.partitionBy("subscription_id")
temp = temp.withColumn('mean_cancelled_sub_duration',(mean("cancelled_sub_duration").over(spec3))/365).withColumn('mean_sub_duration',(mean("sub_duration").over(spec3))/365)
temp = temp.select(col('subscription_id').alias('subsid'), col('mean_cancelled_sub_duration'), col('mean_sub_duration'))
df_merge = df_merge.join(broadcast(temp), df_merge.subscription_id==temp.subsid,"left").drop(col('subsid'))
您好,请发布pandas代码的预期输出以及您从pyspark代码中获得的信息,以便我们可以评估数据集之间的差异。如果没有这一点,就很难具体看到什么不起作用,什么起作用
同时,我只需要看看熊猫的代码,并尝试在pyspark中创建一个类似的代码,这就是我想到的
首先,我刚刚创建了一个函数来平滑地将pandas数据帧转换为spark数据帧
然后,我使用toPandas()方法将spark数据帧转换为pandas数据帧
我使用的是Spark 2.3.0版本,因此我必须确保转换为pandas dataframe的日期字段应在时间戳中,否则会引发错误
这最后解决了我的问题,我得到了平均值(因为这不是一种聚合平均值或行平均值,你可以说它是一个列平均值)
相关问题 更多 >
编程相关推荐