java如何在springcloudstreambinderkafkastreams中使用函数方法检索/设置头:3.1.1
我在函数式编程中使用spring-cloud-stream-binder-kafka-streams:3.1.1
。如何在processor函数中检索所有标题
Java代码
@SpringBootApplication
public class KafkaMessageApplication {
public static void main(String args[]) {
SpringApplication.run(KafkaMessageApplication.class, args);
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> process() {
// TODO investigate headers on the incoming message
// For example, find partition key on which message was received and publish to same partition key on destination topic
return input -> input;
}
}
# 1 楼答案
为了访问这样的头文件,您需要在Kafka Streams中使用低级处理器/转换器API。您可以混合使用低级处理器API和DSL,同时仍将其用作Spring云流应用程序。有关更多详细信息,请参见this。基本上,您需要在消费者的情况下使用处理器,在函数的情况下使用转换器。处理器是终端API,不允许您继续。另一方面,在使用transformer时,您可以在检查标头后将其作为
KStream
继续。例如,这里有一个想法:查看
transform
方法中的注释。在那里,您可以访问每个传入记录的标题通过查看您的问题,我看到您正在尝试获取传入记录的分区id。为此,您可以直接调用上下文。分区()。我认为你不需要访问标题
下面是一个SO线程,用于访问头文件