有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

Java Spark数据帧固定长度文件

我想要一个固定长度的文件被加载,这取决于在单独的文件中给定的列名和长度。我可以加载数据并附加新列。但是,无法保留旧列列表。该列正在被覆盖。但是,我想要完整的列列表。以下是我已经实现的代码:

samplefile.txt:

00120181120xyz12341
00220180203abc56792
00320181203pqr25483
00120181120xyz12341


schema.json:
{"Column":"id","length":"3","flag":"0"}
{"Column":"date","length":"8","flag":"0"}
{"Column":"name","length":"3","flag":"1"}
{"Column":"salary","length":"5","flag":"2"}


Current Output:

+-------------------+------+
|                _c0|salary|
+-------------------+------+
|00120181120xyz12341| 12341|
|00220180203abc56792| 56792|
|00320181203pqr25483| 25483|
|00120181120xyz12341| 12341|
+-------------------+------+

Expected Output

+-------------------+------++----+--------+---+
|                _c0|salary|name |date    |id | 
+-------------------+------++----+--------+---+
|00120181120xyz12341| 12341|xyz  |20181120|001|
|00220180203abc56792| 56792|abc  |20180203|002|
|00320181203pqr25483| 25483|pqr  |20181203|003|
|00120181120xyz12341| 12341|xyz  |20181120|001|
+-------------------+------+-----+--------+---+     

代码:

import java.util.ArrayList;
import java.util.List;

import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class App {

public static void main(String[] args) {

    SparkSession spark = SparkSession.builder().appName("Develop")
            .master("local").getOrCreate();

    Dataset<Row> ds = spark
            .read()
            .format("csv")
            .option("header", "false")
            .load("C://Users//path//samplefile.txt");
    ds.show();

    Dataset<Row> SchemaFile = spark
            .read()
            .format("csv")
            .option("header", "true")
            .load("C://Users//path//schema.txt");
    SchemaFile.show();
    List<String> s = new ArrayList<String>();
    int lens = 1;
    List<Row> it = SchemaFile.select("Column", "length").collectAsList();
    List<StructField> fields = new ArrayList<>();
    for (Row fieldName : it) {
        System.out.println(fieldName.get(0));
        System.out.println(Integer.parseInt(fieldName.get(1).toString()));

        ds1 = ds.withColumn(
                fieldName.get(0).toString(),
                substrings(ds, "_c0", lens,
                        Integer.parseInt(fieldName.get(1).toString()),
                        fieldName.get(1).toString())); // selectExpr("substring("+"_c0"+","+lens+","+Integer.parseInt(fieldName.get(1).toString())+")");
        s.add(fieldName.get(0).toString());
        lens += Integer.parseInt((String) fieldName.get(1));
        System.out.println("Lengths:" + lens);
        ds1.show();
        StructField field = DataTypes.createStructField(
                fieldName.toString(), DataTypes.StringType, true);
        fields.add(field);
    }
    StructType schema = DataTypes.createStructType(fields);
    System.out.println(schema);
    for (String s1 : s) {
        System.out.println(s1);
    }


}

private static Column substrings(Dataset<Row> ds, String string, int lens,
        int i, String cols) {
    return ds.col("_c0").substr(lens, i);
}

}

任何形式的帮助和建议都将不胜感激。 提前谢谢


共 (1) 个答案

  1. # 1 楼答案

    我知道你的问题已经很老了,但也许其他人也会遇到这个问题,并希望得到答案。我认为您刚刚添加了错误的数据集,因此在添加后删除了列

    可能的解决方案:

    import org.apache.spark.sql.Column;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SparkSession;
    
    import java.util.List;
    
    public class FlfReader {
    
    public static void main(String[] args) {
    
        SparkSession spark = SparkSession.builder().appName("FixedLengthFileReader")
                .master("local[*]").getOrCreate();
    
        Dataset<Row> ds = spark
                .read()
                .format("csv")
                .option("header", "false")
                .load(FlfReader.class.getClassLoader().getResource("samplefile.txt").getPath());
        ds.show();
    
        Dataset<Row> SchemaFile = spark
                .read()
                .format("json")
                .option("header", "true")
                .load(FlfReader.class.getClassLoader().getResource("schema.json").getPath());
        SchemaFile.show();
    
        int lengths = 1;
        List<Row> schemaFields = SchemaFile.select("Column", "length").collectAsList();
        for (Row fieldName : schemaFields) {
            int fieldLength = Integer.parseInt(fieldName.get(1).toString());
            ds = ds.withColumn(
                    fieldName.get(0).toString(),
                    colSubstring(ds,
                            lengths,
                            fieldLength));
            lengths += fieldLength;
        }
        ds.show();
    }
    
    private static Column colSubstring(Dataset<Row> ds, int startPos, int length) {
            return ds.col("_c0").substr(startPos, length);
        }
    }