有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

javaapacheflink如何映射和匹配一个带有主键的alternatekey到一个键控流

我想要一个更简单、更好、更优雅的方法来解决下面的问题。我还没有看到关于这个主题的任何文档,我确信我目前的方法有一些瓶颈,谢谢

我有一个将Json映射到POJO的流

DataStream<MYPOJO> stream = env.
             addSource( <<kafkaSource>>).map(new EventToPOJO());

POJO的一些字段将有一个填充的主键,一些字段将有一个填充的备用键,一些字段将同时具有这两个键。我在Flink文档中找到的使用两个键的唯一示例是,对复合键使用keyselector,而对备用键则不使用keyselector

我目前的做法如下:

  1. 使用RichFlatMap函数将主键的所有元素收集到流中,Astream
  2. 使用RichFlatMap函数将备用密钥的所有元素收集到流中,b流
  3. 对同时具有主键和备用键的项目使用richFlatMap,CStream
  4. 使用主键上的Cstream连接Astream
  5. 将B流与备用键上的Cstream连接起来
  6. 最后,按主键输入

 DataStream<MyPOJO> primaryKey = stream.flatMap(new RichFlatMapFunction<MyPOJO mypojo, MyPOJO mypojo>() {
            @Override
            public void flatMap(MyPOJO mypojo, Collector<MyPOJO> collector) throws Exception {
                if(mypojo.PrimaryKey() != null){
                 
                    collector.collect(MyPOJO);
                }
            }
        });


 DataStream<MyPOJO> alternateKey = stream.flatMap(new RichFlatMapFunction<MyPOJO mypojo, MyPOJO mypojo>() {
            @Override
            public void flatMap(MyPOJO mypojo, Collector<MyPOJO> collector) throws Exception {
                if(mypojo.getAlternateKey() != null){
                 
                    collector.collect(mypojo);
                }
            }
        });


 DataStream<MyPOJO> both = stream.flatMap(new RichFlatMapFunction<MyPOJO mypojo, MyPOJO mypojo>() {
            @Override
            public void flatMap(MyPOJO mypojo, Collector<MYPOJO> collector) throws Exception {
                if(mypojo.getAlternateKey() != null && mypojo.getPrimaryKey() !=null ){
                 
                    collector.collect(mypojo);
                }
            }
        });



//Join them 

   both.join(alternateKey)
                .where(MyPOJO::getAlternateKey)
                .equalTo(MyPOJO::getAlternateKey)
                .window(TumblingEventTimeWindows.of(Time.milliseconds(1)))
                .apply (new JoinFunction<MyPOJO, MyPOJO, MyPOJO>(){
                   
                    @Override
                    public StateObject join(MyPOJO Mypojo, MyPOJO mypojo2) throws Exception {

                      // Some Join logic to keep both states 
                        return stateObject2;
                    }
                });

:: repeat for primary key stream ...


// keyby at the end
both.keyBy(MyPOJO::getPrimaryKey)


我相信我也可以使用一个过滤函数来实现这3个流,但我不想在第一时间分裂成3个流,请不要,为了可读性,我已经简化了上面的内容,所以请不要介意你可能发现的任何语法错误


共 (1) 个答案

  1. # 1 楼答案

    您应该实现一个包含主&;辅助键。它需要有equals()hashCode()方法,当两个记录相等时,它们实现所需的逻辑(*)。看见 hashCode() and equals() method for custom classes in flink了解有关为什么必须这样做的更多详细信息

    添加一个MyPOJO.getJoiningKey()返回此自定义POJO

    然后只需基于.where(r -> r.getJoiningKey()).equals(r -> r.getJoiningKey())进行一次连接

    (*)我还是不确定你想要什么样的逻辑。例如,如果左侧初级和;次键不为空,右侧主键为空,但次键不为空,您想比较什么