有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java如何在springcloudstreambinderkafkastreams中使用函数方法检索/设置头:3.1.1

我在函数式编程中使用spring-cloud-stream-binder-kafka-streams:3.1.1。如何在processor函数中检索所有标题

Java代码

@SpringBootApplication
public class KafkaMessageApplication {
    public static void main(String args[]) {
        SpringApplication.run(KafkaMessageApplication.class, args);
    }

    @Bean
    public Function<KStream<String, String>, KStream<String, String>> process() {
        // TODO investigate headers on the incoming message
        // For example, find partition key on which message was received and publish to same partition key on destination topic
        return input -> input;
    }
}

共 (1) 个答案

  1. # 1 楼答案

    为了访问这样的头文件,您需要在Kafka Streams中使用低级处理器/转换器API。您可以混合使用低级处理器API和DSL,同时仍将其用作Spring云流应用程序。有关更多详细信息,请参见this。基本上,您需要在消费者的情况下使用处理器,在函数的情况下使用转换器。处理器是终端API,不允许您继续。另一方面,在使用transformer时,您可以在检查标头后将其作为KStream继续。例如,这里有一个想法:

    input -> input
                        .transform(new TransformerSupplier<String, String, KeyValue<String, String>>() {
                            @Override
                            public Transformer<String, String, KeyValue<String, String>> get() {
                                return new Transformer<Object, String, KeyValue<Object, String>>() {
                                    ProcessorContext context;
                                    @Override
                                    public void init(ProcessorContext context) {
                                        this.context = context;
                                    }
    
                                    @Override
                                    public KeyValue<Object, String> transform(Object key, String value) {
    
    // Here you can access the headers using this.context.headers()
                                        return new KeyValue<>(key, value);
                                    }
    
                                    @Override
                                    public void close() {
    
                                    }
                                };
                            }
                        })
                        .map(...)
                        .groupBy(...)
                        ...
    

    查看transform方法中的注释。在那里,您可以访问每个传入记录的标题

    通过查看您的问题,我看到您正在尝试获取传入记录的分区id。为此,您可以直接调用上下文。分区()。我认为你不需要访问标题

    下面是一个SO线程,用于访问头文件