有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java Apache Flink:如何拆分组合的广播状态并将其放入processBroadcastElement()中单独的MapStateDescriptor中

我使用联合运营商和广播将两个广播流合并。在过程中,函数类I接收到组合流数据。现在我想将其拆分,并将两个数据流放入单独的MapStateDescriptor中

public class ApacheFlinkTest {

MapStateDescriptor<String, Either<String, Integer>> COMBINED_STATE_DESCRIPTOR = new MapStateDescriptor<>(
        "combined_stream", BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(new TypeHint<Either<String, Integer>>(){}));



public static void main(String args[]) throws Exception{
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> dataStream = env.fromElements("john", "alice", "david","bob");   

//Broadcast Streams
DataStream<String> strings = env.fromElements("one", "two", "three");
DataStream<Integer> ints = env.fromElements(1, 2, 3);

DataStream<Either<String, Integer>> stringsOnTheLeft = strings
        .map(new MapFunction<String, Either<String, Integer>>() {
            @Override
            public Either<String, Integer> map(String s) throws Exception {
                return Either.Left(s);
            }
        });

DataStream<Either<String, Integer>> intsOnTheRight = ints
        .map(new MapFunction<Integer, Either<String, Integer>>() {
            @Override
            public Either<String, Integer> map(Integer i) throws Exception {
                return Either.Right(i);
            }
        });

DataStream<Either<String, Integer>> combinedStream = stringsOnTheLeft.union(intsOnTheRight);

BroadcastStream<Either<String, Integer>> broadCastStream = combinedStream.broadcast(COMBINED_STATE_DESCRIPTOR);


dataStream.keyBy(
    .................
    ......................
).connect(broadCastStream).process(new ProcessDataFunction());

  }
}

流程流数据:

public class ProcessDataFunction extends  KeyedBroadcastProcessFunction<String, String, Either<String, Integer>, Tuple2<String, String>> {


    private static final long serialVersionUID = 1L;

    int i=0;
    private transient MapStateDescriptor<String, String> STRING_STATE_DESCRIPTOR = new MapStateDescriptor<>(
            "String_Stream", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);

    private transient MapStateDescriptor<String, Integer> INTEGER_STATE_DESCRIPTOR = new MapStateDescriptor<>(
            "Integer_Stream", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);         

    MapStateDescriptor<String, Either<String, Integer>> COMBINED_STATE_DESCRIPTOR = new MapStateDescriptor<>(
            "combined_stream", BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(new TypeHint<Either<String, Integer>>(){}));

    @Override
    public void open(Configuration parameters) throws Exception {
        // TODO 

    }


    @Override
    public void processBroadcastElement(
            Either<String, Integer> broadCastStreamData,
            KeyedBroadcastProcessFunction<String, String, Either<String, Integer>, Tuple2<String, String>>.Context context,
            Collector<Tuple2<String, String>> arg2) throws Exception {

        if(broadCastStreamData.isLeft()){

            //This is working
            context.getBroadcastState(COMBINED_STATE_DESCRIPTOR).put("str_key1"+i,Either.Left(broadCastStreamData.left()));
            //This is not working
            context.getBroadcastState(STRING_STATE_DESCRIPTOR).put("str_key1"+i,broadCastStreamData.left());

        }
        else if(broadCastStreamData.isRight()){
            //This is working
            context.getBroadcastState(COMBINED_STATE_DESCRIPTOR).put("int_key1"+i,Either.Right(broadCastStreamData.right()));

            //This is not working
            context.getBroadcastState(INTEGER_STATE_DESCRIPTOR).put("int_key1"+i,broadCastStreamData.right());
        }

        i++;

    }


    @Override
    public void processElement(
            String arg0,
            KeyedBroadcastProcessFunction<String, String, Either<String, Integer>, Tuple2<String, String>>.ReadOnlyContext arg1,
            Collector<Tuple2<String, String>> arg2) throws Exception {
            // TODO Auto-generated method stub

    }

}

当我运行时,会发生以下异常:

Caused by: java.lang.IllegalArgumentException: The requested state does not exist. Check for typos in your state descriptor, or specify the state descriptor in the datastream.broadcast(...) call if you forgot to register it.
    at org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator$ReadWriteContextImpl.getBroadcastState(CoBroadcastWithKeyedOperator.java:189)
    at com.flinkkafka.test.ProcessEitherType.processBroadcastElement(ProcessDataFunction.java:57)
    at com.flinkkafka.test.ProcessEitherType.processBroadcastElement(ProcessDataFunction.java:1)
    at org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator.processElement2(CoBroadcastWithKeyedOperator.java:121)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processRecord2(StreamTwoInputProcessor.java:145)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.lambda$new$1(StreamTwoInputProcessor.java:107)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$$Lambda$776/1415080802.accept(Unknown Source)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:362)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:185)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$710/1653823325.runDefaultAction(Unknown Source)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.lang.Thread.run(Thread.java:745)

我试图通过getRuntimeContext()open()方法中初始化STRING_STATE_DESCRIPTORINTEGER_STATE_DESCRIPTOR。但不起作用。如何初始化STRING_STATE_DESCRIPTORINTEGER_STATE_DESCRIPTOR

我期望的是STRING_STATE_DESCRIPTOR映射应该有“一”、“二”、“三”和INTEGER_STATE_DESCRIPTOR应该有1,2,3。 如何做到这一点


共 (0) 个答案