Java Spark数据帧固定长度文件
我想要一个固定长度的文件被加载,这取决于在单独的文件中给定的列名和长度。我可以加载数据并附加新列。但是,无法保留旧列列表。该列正在被覆盖。但是,我想要完整的列列表。以下是我已经实现的代码:
samplefile.txt:
00120181120xyz12341
00220180203abc56792
00320181203pqr25483
00120181120xyz12341
schema.json:
{"Column":"id","length":"3","flag":"0"}
{"Column":"date","length":"8","flag":"0"}
{"Column":"name","length":"3","flag":"1"}
{"Column":"salary","length":"5","flag":"2"}
Current Output:
+-------------------+------+
| _c0|salary|
+-------------------+------+
|00120181120xyz12341| 12341|
|00220180203abc56792| 56792|
|00320181203pqr25483| 25483|
|00120181120xyz12341| 12341|
+-------------------+------+
Expected Output
+-------------------+------++----+--------+---+
| _c0|salary|name |date |id |
+-------------------+------++----+--------+---+
|00120181120xyz12341| 12341|xyz |20181120|001|
|00220180203abc56792| 56792|abc |20180203|002|
|00320181203pqr25483| 25483|pqr |20181203|003|
|00120181120xyz12341| 12341|xyz |20181120|001|
+-------------------+------+-----+--------+---+
代码:
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
public class App {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder().appName("Develop")
.master("local").getOrCreate();
Dataset<Row> ds = spark
.read()
.format("csv")
.option("header", "false")
.load("C://Users//path//samplefile.txt");
ds.show();
Dataset<Row> SchemaFile = spark
.read()
.format("csv")
.option("header", "true")
.load("C://Users//path//schema.txt");
SchemaFile.show();
List<String> s = new ArrayList<String>();
int lens = 1;
List<Row> it = SchemaFile.select("Column", "length").collectAsList();
List<StructField> fields = new ArrayList<>();
for (Row fieldName : it) {
System.out.println(fieldName.get(0));
System.out.println(Integer.parseInt(fieldName.get(1).toString()));
ds1 = ds.withColumn(
fieldName.get(0).toString(),
substrings(ds, "_c0", lens,
Integer.parseInt(fieldName.get(1).toString()),
fieldName.get(1).toString())); // selectExpr("substring("+"_c0"+","+lens+","+Integer.parseInt(fieldName.get(1).toString())+")");
s.add(fieldName.get(0).toString());
lens += Integer.parseInt((String) fieldName.get(1));
System.out.println("Lengths:" + lens);
ds1.show();
StructField field = DataTypes.createStructField(
fieldName.toString(), DataTypes.StringType, true);
fields.add(field);
}
StructType schema = DataTypes.createStructType(fields);
System.out.println(schema);
for (String s1 : s) {
System.out.println(s1);
}
}
private static Column substrings(Dataset<Row> ds, String string, int lens,
int i, String cols) {
return ds.col("_c0").substr(lens, i);
}
}
任何形式的帮助和建议都将不胜感激。 提前谢谢
# 1 楼答案
我知道你的问题已经很老了,但也许其他人也会遇到这个问题,并希望得到答案。我认为您刚刚添加了错误的数据集,因此在添加后删除了列
可能的解决方案: