我在使用MLFlow和pyspark(2.4.0)编写自定义预测方法时遇到问题。到目前为止,我拥有的是一个自定义转换器,它可以将数据更改为我需要的格式
class CustomGroupBy(Transformer):
def __init__(self):
pass
def _transform(self, dataset):
df = dataset.select("userid", explode(split("widgetid", ',')).alias("widgetid"))
return(df)
然后我构建了一个自定义估计器来运行pyspark机器学习算法
class PipelineFPGrowth(Estimator, HasInputCol, DefaultParamsReadable, DefaultParamsWritable):
def __init__(self, inputCol=None, minSupport=0.005, minConfidence=0.01):
super(PipelineFPGrowth, self).__init__()
self.minSupport = minSupport
self.minConfidence = minConfidence
def setInputCol(self, value):
return(self._set(inputCol=value))
def _fit(self, dataset):
c = self.getInputCol()
fpgrowth = FPGrowth(itemsCol=c, minSupport=self.minSupport, minConfidence=self.minConfidence)
model = fpgrowth.fit(dataset)
return(model)
这在MLFlow管道中运行
pipeline = Pipeline(stages = [CustomGroupBy,PipelineFPGrowth]).fit(df)
这一切都有效。如果我用新数据创建一个新的pyspark数据框来预测,我会得到预测
newDF = spark.createDataFrame([(123456,['123ABC', '789JSF'])], ["userid", "widgetid"])
pipeline.stages[1].transform(newDF).show(3, False)
# How to access frequent itemset.
pipeline.stages[1].freqItemsets.show(3, False)
我遇到的问题是编写自定义预测。我需要将FPGrowth生成的频繁项集附加到预测的末尾。我已经为此编写了逻辑,但我很难弄清楚如何将其放入自定义方法中。我曾尝试将其添加到我的自定义估计器中,但没有成功。然后我编写了一个单独的类来接受返回的模型并给出扩展的预测。这也是不成功的
最后,我需要记录并保存模型,以便对其进行Dockerize,这意味着我将需要一个自定义风格并使用pyfunc函数。有人对如何扩展预测方法,然后记录并保存模型有什么建议吗
目前没有回答
相关问题 更多 >
编程相关推荐