使用MLFlow和pyspark编写自定义预测方法

2024-09-27 21:32:39 发布

您现在位置:Python中文网/ 问答频道 /正文

我在使用MLFlow和pyspark(2.4.0)编写自定义预测方法时遇到问题。到目前为止,我拥有的是一个自定义转换器,它可以将数据更改为我需要的格式

class CustomGroupBy(Transformer):
    def __init__(self):
        pass
    def _transform(self, dataset):
        df = dataset.select("userid", explode(split("widgetid", ',')).alias("widgetid"))
        return(df)

然后我构建了一个自定义估计器来运行pyspark机器学习算法

class PipelineFPGrowth(Estimator, HasInputCol, DefaultParamsReadable, DefaultParamsWritable): 
    def __init__(self, inputCol=None, minSupport=0.005, minConfidence=0.01):
        super(PipelineFPGrowth, self).__init__()
        self.minSupport = minSupport
        self.minConfidence = minConfidence
    def setInputCol(self, value):
        return(self._set(inputCol=value))
    def _fit(self, dataset):
        c = self.getInputCol() 
        fpgrowth = FPGrowth(itemsCol=c, minSupport=self.minSupport, minConfidence=self.minConfidence)
        model = fpgrowth.fit(dataset)
        return(model)

这在MLFlow管道中运行

pipeline = Pipeline(stages = [CustomGroupBy,PipelineFPGrowth]).fit(df)

这一切都有效。如果我用新数据创建一个新的pyspark数据框来预测,我会得到预测

newDF = spark.createDataFrame([(123456,['123ABC', '789JSF'])], ["userid", "widgetid"])
pipeline.stages[1].transform(newDF).show(3, False)

# How to access frequent itemset.
pipeline.stages[1].freqItemsets.show(3, False)

我遇到的问题是编写自定义预测。我需要将FPGrowth生成的频繁项集附加到预测的末尾。我已经为此编写了逻辑,但我很难弄清楚如何将其放入自定义方法中。我曾尝试将其添加到我的自定义估计器中,但没有成功。然后我编写了一个单独的类来接受返回的模型并给出扩展的预测。这也是不成功的

最后,我需要记录并保存模型,以便对其进行Dockerize,这意味着我将需要一个自定义风格并使用pyfunc函数。有人对如何扩展预测方法,然后记录并保存模型有什么建议吗


Tags: 数据方法selfdfreturnpipelineinitdef

热门问题