有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java无法使用ChainedKafkatTransaction同步Kafka和MQ事务

我们有一个spring引导应用程序,它使用来自IBM MQ的消息并进行一些转换,然后将结果发布到Kafka主题。我们使用https://spring.io/projects/spring-kafka来实现这一点。我知道卡夫卡不支持XA;但是,在文档中,我找到了一些关于使用ChainedKafkaTransactionManager链接多个事务管理器并同步事务的输入。同一文档还提供了一个示例,说明如何在从卡夫卡读取消息并将其存储在数据库中时同步卡夫卡和数据库

在我的se案例中,我遵循相同的示例,将JmsTransactionManagerKafkaTransactionManager链接在ChainedKafkaTransactionManager的保护伞下。bean定义如下:

@Bean({"mqListenerContainerFactory"})
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(this.connectionFactory());
    factory.setTransactionManager(this.jmsTransactionManager());
    return factory;
}

@Bean
public JmsTransactionManager jmsTransactionManager() {
    return new JmsTransactionManager(this.connectionFactory());
}

@Bean("chainedKafkaTransactionManager")
public ChainedKafkaTransactionManager<?, ?> chainedKafkaTransactionManager(
        JmsTransactionManager jmsTransactionManager, KafkaTransactionManager kafkaTransactionManager) {

    return new ChainedKafkaTransactionManager<>(kafkaTransactionManager, jmsTransactionManager);
}

@Transactional(transactionManager = "chainedKafkaTransactionManager", rollbackFor = Throwable.class)
@JmsListener(destination = "${myApp.sourceQueue}", containerFactory = "mqListenerContainerFactory")
public void receiveMessage(@Headers Map<String, Object> jmsHeaders, String message) {
    // Processing the message here then publishing it to Kafka using KafkaTemplate
    kafkaTemplate.send(sourceTopic,transformedMessage);

    // Then throw an exception just to test the transaction behaviour
    throw new RuntimeException("Not good Pal!");
}

运行应用程序时,发生的情况是消息不断回滚到MQ队列中,但消息在Kafka主题中不断增长,这对我来说意味着kafkaTemplate交互不会回滚

如果我能很好地理解文档中的内容,那么情况就不应该是这样了。如果事务处于活动状态,则在该事务范围内执行的任何KafkaTemplate操作都将使用该事务的生产者

在我们的应用程序中。yaml我们通过设置spring.kafka.producer.transaction-id-prefix将Kafka生产者配置为使用事务

问题是我在这里遗漏了什么,我应该如何修复它。 提前感谢您的投入


共 (1) 个答案

  1. # 1 楼答案

    默认情况下,使用者可以看到未提交的记录;将isolation.level消费者属性设置为read_committed,以避免从回滚事务接收记录