java Spring for Kafka 2.3使用KafkaMessageListenerContainer在运行时为特定侦听器设置偏移量
我必须实现一个功能,将某个主题/分区的侦听器(重新)设置为任何给定的偏移量。因此,如果将事件提交到偏移量5,并且管理员决定将偏移量重置为2,则应重新处理事件3、4和5
我们正在为卡夫卡2.3使用Spring,我试图遵循ConsumerSeekAware上的文档,这似乎正是我想要的
然而,问题是我们使用的主题也是在运行时创建的。我们使用KafkaMessageListenerContainer
到DefaultKafkaConsumerFactory
来实现这个目的,我不知道把registerSeekCallback或类似的东西放在哪里
有没有办法做到这一点?我无法理解使用@KafkaListener
注释的类如何映射到工厂中创建侦听器的方式
任何帮助都将不胜感激。即使这只是解释这些东西是如何一起工作的
这就是卡夫卡消息容器的基本创建方式:
public KafkaMessageListenerContainer<String, Object> createKafkaMessageListenerContainer(String topicName,
ContainerPropertiesStrategy containerPropertiesStrategy) {
MessageListener<String, String> messageListener = getMessageListener(topicName);
ConsumerFactory<String, Object> consumerFactory = new DefaultKafkaConsumerFactory<>(getConsumerFactoryConfiguration());
KafkaMessageListenerContainer<String, Object> kafkaMessageListenerContainer = createKafkaMessageListenerContainer(topicName, messageListener, bootstrapServers, containerPropertiesStrategy, consumerFactory);
return kafkaMessageListenerContainer;
}
public MessageListener<String, String> getMessageListener(String topic) {
MessageListener<String, String> messageListener = new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> message) {
try {
consumerService.consume(topic, message.value());
} catch (IOException e) {
log.log(Level.WARNING, "Message couldn't be consumed", e);
}
}
};
return messageListener;
}
public static KafkaMessageListenerContainer<String, Object> createKafkaMessageListenerContainer(
String topicName, MessageListener<String, String> messageListener, String bootstrapServers, ContainerPropertiesStrategy containerPropertiesStrategy,
ConsumerFactory<String, Object> consumerFactory) {
ContainerProperties containerProperties = containerPropertiesStrategy.getContainerPropertiesForTopic(topicName);
containerProperties.setMessageListener(messageListener);
KafkaMessageListenerContainer<String, Object> kafkaMessageListenerContainer = new KafkaMessageListenerContainer<>(
consumerFactory, containerProperties);
kafkaMessageListenerContainer.setBeanName(topicName);
return kafkaMessageListenerContainer;
}
希望有帮助
# 1 楼答案
我认为可以像这样为spring kafka使用一些注释,尽管在运行时在注释中设置偏移量可能会很尴尬
或者,这里是我编写的一些代码,用于从给定的偏移量中使用,我模拟了消费者在消息中失败,这是使用KafkCasumer,而不是KafkCamessageListenerContainer
# 2 楼答案
关键的组成部分是
AbstractConsumerSeekAware
。希望这能让你开始在侦听器上调用
reseek()
会在消费者线程从poll()中醒来时(实际上在下一个之前)对其进行排队