使用PySp在数据帧上应用sklearn训练模型

2024-10-02 22:26:05 发布

您现在位置:Python中文网/ 问答频道 /正文

我用Python训练了一个随机森林算法,并希望用PySpark将其应用到一个大数据集上。在

我首先加载了经过训练的sklearn RF模型(使用joblib),将包含特性的数据加载到Spark数据帧中,然后添加一列预测,并使用如下用户定义的函数:

def predictClass(features):
    return rf.predict(features)
udfFunction = udf(predictClass, StringType())
new_dataframe = dataframe.withColumn('prediction', 
udfFunction('features'))

虽然跑起来要花很多时间,有没有更有效的方法来做同样的事情?(不使用Spark ML)


Tags: 数据模型算法dataframe森林sklearnsparkpyspark
3条回答

现在您还可以使用spark2.3中引入的pandas_udf,以实现高处理速度和分布式计算。它基于用于内存计算的apachearrow的python实现。在

在最近的项目中,我不得不做同样的事情。对pyspark每次必须读取sklearn模型的每一行应用udf的缺点是,这就是为什么它需要很长时间才能完成。我发现的最好的解决方案是在rdd上使用.mapPartitions或foreachPartition方法,这里有很好的解释

https://github.com/mahmoudparsian/pyspark-tutorial/blob/master/tutorial/map-partitions/README.md

它的工作速度很快,因为它确保了没有洗牌,而且对于每个分区,pyspark只需读取模型并预测一次。因此,流程是:

  • 将数据框转换为RDD
  • 将模型广播到节点,以便工人可以访问它
  • 编写一个以interator(包含分区内所有行)为参数的udf函数
  • 遍历行并创建一个适当的矩阵,其中包含您的特性(顺序问题)
  • 打电话。只预测一次
  • 回报预测
  • 如果需要,将rdd转换为df

sklearn射频模型可以相当大时,腌制。在任务调度过程中,模型的频繁pickle/unpickle可能会导致该问题。你可以考虑使用广播变量。在

official document

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.

相关问题 更多 >