<p>首先,我刚刚创建了一个函数来平滑地将pandas数据帧转换为spark数据帧</p>
<pre><code>def equivalent_type(f):
if f == 'datetime64[ns]': return DateType()
elif f == 'int64': return LongType()
elif f == 'int32': return IntegerType()
elif f == 'uint8': return IntegerType()
elif f == 'float64': return FloatType()
else: return StringType()
def define_structure(string, format_type):
try: typo = equivalent_type(format_type)
except: typo = StringType()
return StructField(string, typo)
def pandas_to_spark(pandas_df):
columns = list(pandas_df.columns)
types = list(pandas_df.dtypes)
struct_list = []
for column, typo in zip(columns, types):
struct_list.append(define_structure(column, typo))
p_schema = StructType(struct_list)
return spark.createDataFrame(pandas_df, p_schema)
</code></pre>
<p>然后,我使用toPandas()方法将spark数据帧转换为pandas数据帧</p>
<pre><code>temp = df_merge.select('subscription_id', 'cancelleddate', 'subscriptionstartdate', 'termenddate').dropDuplicates()
temp = temp.toPandas()
temp['cancelleddate'] = pd.to_datetime(temp['cancelleddate'])
temp['subscriptionstartdate'] = pd.to_datetime(temp['subscriptionstartdate'])
temp['subscriptionstartdate'] = pd.to_datetime(temp['subscriptionstartdate'])
df_merge = df_merge.toPandas()
df_merge['mean_cancelled_sub_duration'] = (temp['cancelleddate']-temp['subscriptionstartdate']).dt.days.dropna().mean() / 365
df_merge['mean_sub_duration'] = (temp['termenddate']-temp['subscriptionstartdate']).dt.days.dropna().mean() / 365
df_merge = pandas_to_spark(df_merge)
</code></pre>
<p>我使用的是Spark 2.3.0版本,因此我必须确保转换为pandas dataframe的日期字段应在时间戳中,否则会引发错误</p>
<p>这最后解决了我的问题,我得到了平均值(因为这不是一种聚合平均值或行平均值,你可以说它是一个列平均值)</p>