有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

火花和值和计数不同值Java

我的数据集如下所示:

a,b,c,d
---------
1,2005,A,2
1,2005,A,3
1,2005,B,4
2,2005,A,4

输出应按字段ab分组,然后对d值进行求和,并对不同的c值进行计数。因此,输出应为:

1,2005,2,9
2,2005,1,4

编辑

我的代码如下:

    JavaRDD<String> csv = spark.read().texfile("path.csv").javaRDD();
    JavaRDD<String[]> rdd = csv.map(s -> s.split(","))   
    JavaPairRDD<String , Tuple2<Long, String>> tuple = rdd.mapToPair(x -> new Tuple2<>(x[0]+","+ x[1], new Tuple2<>(x[2], x[3])));
    JavaPairRDD<String , Tuple2<Long, String>> tuple2 = tuple.reduceByKey((x,y) -> x._2()+y._2());

但我不知道如何计算不同的c


共 (1) 个答案

  1. # 1 楼答案

    下面的问题加载与您提供的输入类似的CSV

    a,b,c,d
    1,2005,A,2
    1,2005,A,3
    1,2005,B,4
    2,2005,A,4
    

    以及所需的映射和减少操作

    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SparkSession;
    import scala.Tuple2;
    import scala.Tuple4;
    
    import java.util.HashSet;
    import java.util.List;
    import java.util.Set;
    
    public class SimpleDataframe {
    
        public static void main(String[] args) {
            SparkSession spark = Constant.getSparkSess();
    
    
            JavaRDD<Row> rdd = spark.read().option("header", "true").csv("src/main/resources/simple.csv").rdd().toJavaRDD();
    
            List<Tuple4<String, String, Integer, Integer>> output =
                    rdd.mapToPair(   // Map input to key(String,String) and value (Collection,Num)
                    (PairFunction<Row, Tuple2<String, String>, Tuple2<Set<String>, Integer>>) row -> {
                        Tuple2<String, String> tup1 = new Tuple2<>(row.getString(0), row.getString(1));
                        Set<String> set = new HashSet<>();
                        set.add(row.getString(2));
                        Tuple2<Set<String>, Integer> tup2 = new Tuple2<>(set, Integer.parseInt(row.getString(3)));
                        return new Tuple2<>(tup1, tup2);
                    }
            ).reduceByKey((Function2<Tuple2<Set<String>, Integer>,   // Combine out by key to single tuple per unique tuple1
                            Tuple2<Set<String>, Integer>, Tuple2<Set<String>, Integer>>) (v1, v2) -> {
                Set<String> set = new HashSet<>();
                set.addAll(v1._1);
                set.addAll(v2._1);
                int num = v1._2 + v2._2;
                return new Tuple2<>(set, num);
            }) //// Simplest operation Mapping the combined result to required output
                            .map(tuple -> new Tuple4<>(tuple._1._1, tuple._1._2, tuple._2._1.size(), tuple._2._2))
                    .collect();
    
            System.out.println(output);
    
        }
    }