有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java如何将csv字符串转换为SparkML兼容的数据集<Row>格式?

我有一个Dataset<Row> df,它包含两个类型为string的列(“key”和“value”)。df。printSchema();正在给我以下输出:

root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)

“值”列的内容实际上是一个csv格式的行(来自卡夫卡主题),该行的最后一个条目表示类标签,前面的所有条目表示特征(数据集中不包括第一行):

feature0,feature1,label
0.6720004294237854,-0.4033586564886893,0
0.6659082469383558,0.07688976580256132,0
0.8086502311695247,0.564354801275521,1

由于我想在此数据上训练分类器,我需要将此表示转换为一行类型密集向量,其中包含所有特征值和一列类型为double的列,其中包含标签值:

root
 |-- indexedFeatures: vector (nullable = false)
 |-- indexedLabel: double (nullable = false)

如何使用Java1.8和Spark 2.2.0实现这一点

编辑:我做得更进一步,但在尝试使用灵活数量的特征尺寸时,我再次陷入困境。我创建了一个follow-up question.


共 (2) 个答案

  1. # 1 楼答案

    你有不同的方法来实现这一点

    根据CSV文件创建架构

    public class CSVData implements Serializable {
      String col1;
      String col2;
      long col3;
      String col4;
      //getters and setters  
    }
    

    然后将文件转换为RDD

    JavaSparkContext sc;
    JavaRDD<String> data = sc.textFile("path-to-csv-file");
    JavaSQLContext sqlContext = new JavaSQLContext(sc);
    
    JavaRDD<Record> csv_rdd = sc.textFile(data).map(
      new Function<String, Record>() {
          public Record call(String line) throws Exception {
             String[] fields = line.split(",");
             Record sd = new Record(fields[0], fields[1], fields[2].trim(), fields[3]);
             return sd;
          }
    });
    

    创建Spark会话以将文件作为数据集读取

    SparkSession spark = SparkSession
                    .builder()
                    .appName("SparkSample")
                    .master("local[*]")
                    .getOrCreate();
    //Read file
    Dataset<Row> ds = spark.read().text("path-to-csv-file");
     or
    Dataset<Row> ds = spark.read().csv("path-to-csv-file");
    ds.show();
    
  2. # 2 楼答案

    一个VectorAssemblerjavadocs)可以将数据集转换为所需的格式

    首先,输入分为三列:

    Dataset<FeaturesAndLabelData> featuresAndLabelData = inputDf.select("value").as(Encoders.STRING())
      .flatMap(s -> {
        String[] splitted = s.split(",");
        if (splitted.length == 3) {
          return Collections.singleton(new FeaturesAndLabelData(
            Double.parseDouble(splitted[0]),
            Double.parseDouble(splitted[1]), 
            Integer.parseInt(splitted[2]))).iterator();
        } else {
          // apply some error handling...
          return Collections.emptyIterator();
        }
      }, Encoders.bean(FeaturesAndLabelData.class));
    

    然后由矢量汇编程序转换结果:

    VectorAssembler assembler = new VectorAssembler()
      .setInputCols(new String[] { "feature1", "feature2" })
      .setOutputCol("indexedFeatures");
    Dataset<Row> result = assembler.transform(featuresAndLabelData)
      .withColumn("indexedLabel", functions.col("label").cast("double"))
      .select("indexedFeatures", "indexedLabel");
    

    结果数据帧具有所需的格式:

    +                    +      +
    |indexedFeatures                         |indexedLabel|
    +                    +      +
    |[0.6720004294237854,-0.4033586564886893]|0.0         |
    |[0.6659082469383558,0.07688976580256132]|0.0         |
    |[0.8086502311695247,0.564354801275521]  |1.0         |
    +                    +      +
    
    root
     |  indexedFeatures: vector (nullable = true)
     |  indexedLabel: double (nullable = true)
    

    FeaturesAndLabelData是一个简单的Java bean,用于确保列名正确:

    public class FeaturesAndLabelData {
      private double feature1;
      private double feature2;
      private int label;
    
      //getters and setters...
    }