我有一个作业需要在分区的spark数据帧上运行,该过程如下所示:
rdd = sp_df.repartition(n_partitions, partition_key).rdd.mapPartitions(lambda x: some_function(x))
结果是rdd
的pandas.dataframe
type(rdd) => pyspark.rdd.PipelinedRDD
type(rdd.collect()[0]) => pandas.core.frame.DataFrame
和rdd.glom().collect()
返回如下结果:
[[df1], [df2], ...]
现在我希望将结果转换为spark数据帧,方法是:
sp = None
for i, partition in enumerate(rdd.collect()):
if i == 0:
sp = spark.createDataFrame(partition)
else:
sp = sp.union(spark.createDataFrame(partition))
return sp
然而,结果可能是巨大的,rdd.collect()
可能会超出驱动程序的内存,所以我需要避免collect()
操作。有办法解决这个问题吗?你知道吗
提前谢谢!你知道吗
你可以做:
参考:
Spark SQL DataFrame
Spark RDD
如果可行,请投票
您可以直接在datframe上使用新的pandasgrouped udf,而不是
rdd.mapPartitions
。函数本身接受一个组作为pandas df并返回pandas df。你知道吗当它与spark dataframe apply api一起使用时,spark会自动将分区的数据帧组合成一个新spark 数据帧。你知道吗
如果你想继续使用RDDAPI。
mapPartitions
接受一个类型的迭代器,并期望另一个类型的迭代器作为结果。df不是mapPartitions
可以直接处理的迭代器类型。如果您必须使用pandas api,那么只需从pandas.iterrows
创建一个适当的生成器即可这样,整个
mapPartitions
结果将是行类型的单个rdd,而不是数据帧的rdd。这样的rdd可以无缝地转换成一个数据帧,并通过动态模式发现返回相关问题 更多 >
编程相关推荐