有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java Spring for Kafka 2.3使用KafkaMessageListenerContainer在运行时为特定侦听器设置偏移量

我必须实现一个功能,将某个主题/分区的侦听器(重新)设置为任何给定的偏移量。因此,如果将事件提交到偏移量5,并且管理员决定将偏移量重置为2,则应重新处理事件3、4和5

我们正在为卡夫卡2.3使用Spring,我试图遵循ConsumerSeekAware上的文档,这似乎正是我想要的

然而,问题是我们使用的主题也是在运行时创建的。我们使用KafkaMessageListenerContainerDefaultKafkaConsumerFactory来实现这个目的,我不知道把registerSeekCallback或类似的东西放在哪里

有没有办法做到这一点?我无法理解使用@KafkaListener注释的类如何映射到工厂中创建侦听器的方式

任何帮助都将不胜感激。即使这只是解释这些东西是如何一起工作的

这就是卡夫卡消息容器的基本创建方式:

public KafkaMessageListenerContainer<String, Object> createKafkaMessageListenerContainer(String topicName,
        ContainerPropertiesStrategy containerPropertiesStrategy) {
    MessageListener<String, String> messageListener = getMessageListener(topicName);

    ConsumerFactory<String, Object> consumerFactory = new DefaultKafkaConsumerFactory<>(getConsumerFactoryConfiguration());

    KafkaMessageListenerContainer<String, Object> kafkaMessageListenerContainer = createKafkaMessageListenerContainer(topicName, messageListener, bootstrapServers, containerPropertiesStrategy, consumerFactory);
    return kafkaMessageListenerContainer;
}

public MessageListener<String, String> getMessageListener(String topic) {
    MessageListener<String, String> messageListener = new MessageListener<String, String>() {

        @Override
        public void onMessage(ConsumerRecord<String, String> message) {
            try {
                consumerService.consume(topic, message.value());
            } catch (IOException e) {
                log.log(Level.WARNING, "Message couldn't be consumed", e);
            }
        }
    };
    return messageListener;
}

public static KafkaMessageListenerContainer<String, Object> createKafkaMessageListenerContainer(
  String topicName, MessageListener<String, String> messageListener, String bootstrapServers, ContainerPropertiesStrategy containerPropertiesStrategy,
  ConsumerFactory<String, Object> consumerFactory) {
ContainerProperties containerProperties = containerPropertiesStrategy.getContainerPropertiesForTopic(topicName);
containerProperties.setMessageListener(messageListener);

KafkaMessageListenerContainer<String, Object> kafkaMessageListenerContainer = new KafkaMessageListenerContainer<>(
    consumerFactory, containerProperties);
kafkaMessageListenerContainer.setBeanName(topicName);
return kafkaMessageListenerContainer;
}

希望有帮助


共 (2) 个答案

  1. # 1 楼答案

    我认为可以像这样为spring kafka使用一些注释,尽管在运行时在注释中设置偏移量可能会很尴尬

        @KafkaListener(topicPartitions =
        @TopicPartition(topic = "${kafka.consumer.topic}", partitionOffsets = {
                @PartitionOffset(partition = "0", initialOffset = "2")}),
                containerFactory = "filterKafkaListenerContainerFactory", id = "${kafka.consumer.groupId}")
        public void receive(ConsumedObject event) {
            log.info(String.format("Consumed message with correlationId: %s", event.getCorrelationId()));
            consumerHelper.start(event);
        }
    

    或者,这里是我编写的一些代码,用于从给定的偏移量中使用,我模拟了消费者在消息中失败,这是使用KafkCasumer,而不是KafkCamessageListenerContainer

        private static void ConsumeFromOffset(KafkaConsumer<String, Customer> consumer, boolean flag, String topic) {
            Scanner scanner = new Scanner(System.in);
            System.out.print("Enter offset: ");
            int offsetInput = scanner.nextInt();
    
            while (true) {
                ConsumerRecords<String, Customer> records = consumer.poll(500);
    
                for (ConsumerRecord<String, Customer> record : records) {
                    Customer customer = record.value();
                    System.out.println(customer + " has offset ->" + record.offset());
                    if (record.offset() == 7 && flag) {
                        System.out.println("simulating consumer failing after offset 7..");
                        break;
                    }
                }
                consumer.commitSync();
    
                if (flag) {
                    // consumer.seekToBeginning(Stream.of(new TopicPartition(topic, 0)).collect(Collectors.toList())); // consume from the beginning
                    consumer.seek(new TopicPartition(topic, 0), 3); // consume
                    flag = false;
                }
            }
        }
    
  2. # 2 楼答案

    关键的组成部分是AbstractConsumerSeekAware。希望这能让你开始

    @SpringBootApplication
    public class So59682801Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So59682801Application.class, args).close();
        }
    
    
        @Bean
        public ApplicationRunner runner(ListenerCreator creator,
                KafkaTemplate<String, String> template, GenericApplicationContext context) {
    
            return args -> {
                System.out.println("Hit enter to create a listener");
                System.in.read();
    
                ConcurrentMessageListenerContainer<String, String> container =
                        creator.createContainer("so59682801group", "so59682801");
    
                // register the container as a bean so that all the "...Aware" interfaces are satisfied
                context.registerBean("so59682801", ConcurrentMessageListenerContainer.class, () -> container);
                context.getBean("so59682801", ConcurrentMessageListenerContainer.class); // re-fetch to initialize
    
                container.start();
    
                // send some messages
                IntStream.range(0, 10).forEach(i -> template.send("so59682801", "test" + i));
    
                System.out.println("Hit enter to reseek");
                System.in.read();
                ((MyListener) container.getContainerProperties().getMessageListener())
                    .reseek(new TopicPartition("so59682801", 0), 5L);
    
                System.out.println("Hit enter to exit");
                System.in.read();
            };
        }
    
    }
    
    @Component
    class ListenerCreator {
    
        private final ConcurrentKafkaListenerContainerFactory<String, String> factory;
    
        ListenerCreator(ConcurrentKafkaListenerContainerFactory<String, String> factory) {
            factory.getContainerProperties().setIdleEventInterval(5000L);
            this.factory = factory;
        }
    
        ConcurrentMessageListenerContainer<String, String> createContainer(String groupId, String... topics) {
            ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(topics);
            container.getContainerProperties().setGroupId(groupId);
            container.getContainerProperties().setMessageListener(new MyListener());
            return container;
        }
    
    }
    
    class MyListener extends AbstractConsumerSeekAware implements MessageListener<String, String> {
    
        @Override
        public void onMessage(ConsumerRecord<String, String> data) {
            System.out.println(data);
        }
    
        public void reseek(TopicPartition partition, long offset) {
            getSeekCallbackFor(partition).seek(partition.topic(), partition.partition(), offset);
        }
    
    }
    

    在侦听器上调用reseek()会在消费者线程从poll()中醒来时(实际上在下一个之前)对其进行排队