java卡夫卡启用。汽车commit false与commitSync()结合使用
我有一个场景enable.auto.commit
被设置为false
。对于每一个poll()
,获得的记录被卸载到一个threadPoolExecutor
。而commitSync()
的发生是断章取义的。但是,我怀疑这是否是正确的处理方式,因为在提交消息时,我的线程池可能仍然处理很少的消息
while (true) {
ConsumerRecords < String, NormalizedSyslogMessage > records = consumer.poll(100);
Date startTime = Calendar.getInstance().getTime();
for (ConsumerRecord < String, NormalizedSyslogMessage > record: records) {
NormalizedSyslogMessage normalizedMessage = record.value();
normalizedSyslogMessageList.add(normalizedMessage);
}
Date endTime = Calendar.getInstance().getTime();
long durationInMilliSec = endTime.getTime() - startTime.getTime();
// execute process thread on message size equal to 5000 or timeout > 4000
if (normalizedSyslogMessageList.size() == 5000) {
CorrelationProcessThread correlationProcessThread = applicationContext
.getBean(CorrelationProcessThread.class);
List < NormalizedSyslogMessage > clonedNormalizedSyslogMessages = deepCopy(normalizedSyslogMessageList);
correlationProcessThread.setNormalizedMessage(clonedNormalizedSyslogMessages);
taskExecutor.execute(correlationProcessThread);
normalizedSyslogMessageList.clear();
}
consumer.commitSync();
}
# 1 楼答案
完全同意Lalit所说的。目前,我正经历着同样的情况,我的处理在不同的线程和消费者&;不同线程中的生产者。我使用了一个ConcurrentHashMap在生产者和消费者线程之间共享,它会更新特定偏移量是否已被处理
在使用者端,可以使用本地LinkedHashMap来维护从主题/分区消费记录的顺序,并在使用者线程本身中进行手动提交
如果处理线程正在维护任何消耗的记录顺序,则可以参考以下链接。 Transactions in Kafka
在我的方法中需要提到的一点是,如果出现任何故障,数据都有可能被复制