如何将参数传递给ML管道.fit方法?

2024-10-04 05:20:00 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试使用

  • 谷歌Dataproc+Spark
  • 谷歌大查询
  • 使用Spark ML KMeans+管道创建作业

具体如下:


  1. 在bigquery中创建基于用户级别的功能表
    示例:要素表的外观

    userid |x1 |x2 |x3 |x4 |x5 |x6 |x7 |x8 |x9 |x10
    00013 |0.01 | 0 |0 |0 |0 |0 |0 |0.06 |0.09 | 0.001

    1. 启动一个默认设置集群,am使用gcloud命令行界面创建集群并运行作业,如图here
    2. 使用提供的启动程序代码,我读取BQ表,将RDD转换为数据帧并传递给KMeans模型/管道:
#!/usr/bin/python
"""BigQuery I/O PySpark example."""
import json
import pprint
import subprocess
import pyspark
import numpy as np
from pyspark.ml.clustering import KMeans
from pyspark import SparkContext
from pyspark.ml import Pipeline
from pyspark.sql import SQLContext
from pyspark.mllib.linalg import Vectors, _convert_to_vector
from pyspark.sql.types import Row
from pyspark.mllib.common import callMLlibFunc, callJavaFunc, _py2java, _java2py
sc = pyspark.SparkContext()

# Use the Google Cloud Storage bucket for temporary BigQuery export data used by the InputFormat.
# This assumes the Google Cloud Storage connector for Hadoop is configured.

bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
input_directory ='gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)
 conf = {# Input Parameters
 'mapred.bq.project.id': project,
 'mapred.bq.gcs.bucket': bucket,
 'mapred.bq.temp.gcs.path': input_directory,
 'mapred.bq.input.project.id': 'my-project',
 'mapred.bq.input.dataset.id': 'tempData',
 'mapred.bq.input.table.id': 'userFeatureInBQ'}

# Load data in from BigQuery.
table_data = sc.newAPIHadoopRDD(
 'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
 'org.apache.hadoop.io.LongWritable',
 'com.google.gson.JsonObject',conf=conf)

# Tranform the userid-Feature table into feature_data RDD
 feature_data = (
 table_data
  .map(lambda (_, record): json.loads(record))
  .map(lambda   x:(x['x0'],x['x1'],x['x2'],x['x3'],x['x4'],
                  x['x5'],x['x6'],x['x7'],x['x8'],
                  x['x9'],x['x10'])))

# Function to convert each line in RDD into an array, return the vector
  def parseVector(values):
     array = np.array([float(v) for v in values])
     return _convert_to_vector(array)

# Convert the RDD into a row wise RDD
  data = feature_data.map(parseVector)
  row_rdd = data.map(lambda x: Row(x))

sqlContext = SQLContext(sc)

# cache the RDD to improve performance
row_rdd.cache()

# Create a Dataframe
df = sqlContext.createDataFrame(row_rdd, ["features"])

# cache the Dataframe
df.cache()

下面是我打印到控制台的Schema和head():

^{pr2}$
  1. 按以下方式运行集群KMeans算法
    • 多次运行模型
    • 使用不同的参数(即改变集群和初始化模式)
    • 计算误差或成本指标
    • 选择最佳模型参数组合
    • 使用KMeans作为估计器创建管道
    • 使用paramMap传递多个参数
#Define the paramMap & model
paramMap = ({'k':3,'initMode':'kmeans||'},{'k':3,'initMode':'random'},
  {'k':4,'initMode':'kmeans||'},{'k':4,'initMode':'random'},
  {'k':5,'initMode':'kmeans||'},{'k':5,'initMode':'random'},
  {'k':6,'initMode':'kmeans||'},{'k':6,'initMode':'random'},
  {'k':7,'initMode':'kmeans||'},{'k':7,'initMode':'random'},
  {'k':8,'initMode':'kmeans||'},{'k':8,'initMode':'random'},
  {'k':9,'initMode':'kmeans||'},{'k':9,'initMode':'random'},
  {'k':10,'initMode':'kmeans||'},{'k':10,'initMode':'random'})

 km = KMeans()

 # Create a Pipeline with estimator stage
 pipeline = Pipeline(stages=[km])

 # Call & fit the pipeline with the paramMap
 models = pipeline.fit(df, paramMap)`
 print models

我得到以下带警告的输出

7:03:24 WARN org.apache.spark.mllib.clustering.KMeans: The input data was not directly cached, which may hurt performance if its parent RDDs are also uncached. [PipelineModel_443dbf939b7bd3bf7bfc, PipelineModel_4b64bb761f4efe51da50, PipelineModel_4f858411ac19beacc1a4, PipelineModel_4f58b894f1d14d79b936, PipelineModel_4b8194f7a5e6be6eaf33, PipelineModel_4fc5b6370bff1b4d7dba, PipelineModel_43e0a196f16cfd3dae57, PipelineModel_47318a54000b6826b20e, PipelineModel_411bbe1c32db6bf0a92b, PipelineModel_421ea1364d8c4c9968c8, PipelineModel_4acf9cdbfda184b00328, PipelineModel_42d1a0c61c5e45cdb3cd, PipelineModel_4f0db3c394bcc2bb9352, PipelineModel_441697f2748328de251c, PipelineModel_4a64ae517d270a1e0d5a, PipelineModel_4372bc8db92b184c05b0]


#Print the cluster centers:
for model in models:
    print vars(model)
    print model.stages[0].clusterCenters()
    print model.extractParamMap()

输出: [array([7.64676638e-07, 3.58531391e-01, 1.68879698e-03, 0.00000000e+00, 1.53477043e-02, 1.25822915e-02, 0.00000000e+00, 6.93060772e-07, 1.41766847e-03, 1.60941306e-02], array([2.36494105e-06, 1.87719732e-02, 3.73829379e-03, 0.00000000e+00, 4.20724542e-02, 2.28675684e-02, 0.00000000e+00, 5.45002249e-06, 1.17331153e-02, 1.24364600e-02])


这里列出了问题和需要帮助的问题:

  • 我得到一份只有2个群集中心作为所有型号阵列的列表,
    • 当我试图访问管道时,KMeans模型似乎默认为k=2?为什么会这样?在
    • 最后一个循环应该访问pipelineModel和第0阶段并运行clusterCenter()方法?这是正确的方法吗?在
    • 为什么会出现数据未缓存的错误?在
  • 在使用管道时,我找不到如何计算WSSSE或任何等效方法,如.computeCost()(用于mllib)?我如何根据不同的模型比较不同的参数?在
  • 我尝试使用以下代码运行源代码here中定义的.computeCost方法:
    • 这违背了使用管道并行运行KMeans模型和模型选择的目的,但是我尝试了以下代码:
#computeError
def computeCost(model, rdd):`
"""Return the K-means cost (sum of squared distances of
 points to their nearest center) for this model on the given data."""
    cost = callMLlibFunc("computeCostKmeansModel",
                          rdd.map(_convert_to_vector),
               [_convert_to_vector(c) for c in model.clusterCenters()])
    return cost

cost= np.zeros(len(paramMap))

for i in range(len(paramMap)):
    cost[i] = cost[i] + computeCost(model[i].stages[0], feature_data)
print cost

这将在循环结束时打印出以下内容:

[ 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687]

  • 每种模型计算的成本/误差是相同的?同样无法使用正确的参数访问pipelineModel。在

非常感谢任何帮助/指导!谢谢!在


Tags: thetofrom模型importforinputdata
1条回答
网友
1楼 · 发布于 2024-10-04 05:20:00

参数定义不正确。它应该从特定参数映射到值,而不是从任意名称映射。您得到k等于2,因为您传递的参数没有被利用,而且每个模型使用完全相同的默认参数。在

让我们从示例数据开始:

import numpy as np
from pyspark.mllib.linalg import Vector

df = (sc.textFile("data/mllib/kmeans_data.txt")
  .map(lambda s: Vectors.dense(np.fromstring(s, dtype=np.float64, sep=" ")))
  .zipWithIndex()
  .toDF(["features", "id"]))

和一个Pipeline

^{pr2}$

如上所述,参数映射应该使用特定的参数作为键。例如:

params = [
    {km.k: 2, km.initMode: "k-means||"},
    {km.k: 3, km.initMode: "k-means||"},
    {km.k: 4, km.initMode: "k-means||"}
]

models = pipeline.fit(df, params=params)

assert [len(m.stages[0].clusterCenters()) for m in models] == [2, 3, 4]

注意事项:

  • K-均值的正确initModek-means||不是{}。在
  • 在流水线中使用参数映射并不意味着模型是并行训练的。Spark将训练过程与数据并行,而不是参数。这只不过是一种方便的方法。在
  • 您会收到关于未缓存数据的警告,因为K-Means的实际输入不是DataFrame,而是经过转换的RDD。在

相关问题 更多 >