有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java卡夫卡启用。汽车commit false与commitSync()结合使用

我有一个场景enable.auto.commit被设置为false。对于每一个poll(),获得的记录被卸载到一个threadPoolExecutor。而commitSync()的发生是断章取义的。但是,我怀疑这是否是正确的处理方式,因为在提交消息时,我的线程池可能仍然处理很少的消息

while (true) {
 ConsumerRecords < String, NormalizedSyslogMessage > records = consumer.poll(100);
 Date startTime = Calendar.getInstance().getTime();
 for (ConsumerRecord < String, NormalizedSyslogMessage > record: records) {
  NormalizedSyslogMessage normalizedMessage = record.value();
  normalizedSyslogMessageList.add(normalizedMessage);
 }
 Date endTime = Calendar.getInstance().getTime();
 long durationInMilliSec = endTime.getTime() - startTime.getTime();
 // execute process thread on message size equal to 5000 or timeout > 4000
 if (normalizedSyslogMessageList.size() == 5000) {
  CorrelationProcessThread correlationProcessThread = applicationContext
   .getBean(CorrelationProcessThread.class);
  List < NormalizedSyslogMessage > clonedNormalizedSyslogMessages = deepCopy(normalizedSyslogMessageList);
  correlationProcessThread.setNormalizedMessage(clonedNormalizedSyslogMessages);
  taskExecutor.execute(correlationProcessThread);
  normalizedSyslogMessageList.clear();
 }
 consumer.commitSync();
}

共 (1) 个答案

  1. # 1 楼答案

    完全同意Lalit所说的。目前,我正经历着同样的情况,我的处理在不同的线程和消费者&;不同线程中的生产者。我使用了一个ConcurrentHashMap在生产者和消费者线程之间共享,它会更新特定偏移量是否已被处理

    ConcurrentHashMap<OffsetAndMetadata, Boolean>
    

    在使用者端,可以使用本地LinkedHashMap来维护从主题/分区消费记录的顺序,并在使用者线程本身中进行手动提交

    LinkedHashMap<OffsetAndMetadata, TopicPartition>
    

    如果处理线程正在维护任何消耗的记录顺序,则可以参考以下链接。 Transactions in Kafka

    在我的方法中需要提到的一点是,如果出现任何故障,数据都有可能被复制