有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

JavaSpring在不重新启动应用程序的情况下无法找到创建的表

我有一个相当具体的场景,其中我遇到了问题:

我有一个Camel路由,它试图从任意数量的数据源中获取数据,并将这些数据插入中央数据库。如果目标表在中央数据库中不存在,我将尝试从源数据库获取DDL。在中央数据库中创建缺少的表非常有效,但是当骆驼路线下次按计划运行时,它找不到该表,即使我可以在我的数据库工具中看到它。如果我简单地重新启动spring启动实例,它会找到表并工作

为了让您更好地了解它是如何工作的,下面是一段出错的路线:

from("direct:ensureTargetTableExists")
    .routeId("ensureTargetTableExists")
    .doTry()
        .toD(String.format("sql:select * from ${in.header.%s} where false?dataSource=%s&outputHeader=%s", QueryHandler.Properties.TARGET_TABLE, DataSourceHandler.Properties.TARGET_DATASOURCE_NAME, "tableExists"))
        .to("direct:handleResultSet")
    .doCatch(BadSqlGrammarException.class)
        .log("entered catch")
        .log(LoggingLevel.INFO, String.format("Could not find table ${in.header.%s} at %s, will attempt to create it by fetching DDL.", QueryHandler.Properties.TARGET_TABLE, DataSourceHandler.Properties.TARGET_DATASOURCE_NAME))
        .to("direct:fetchTableDdlFromSource")
    .endDoTry();

from("direct:fetchTableDdlFromSource")
    .routeId("fetchTableDdlFromSource")
    .streamCaching()
    .bean(DumpHandler.class, "generateDumpCommand")
    .toD(String.format("ssh:{{sync.dump.ssh.user}}@${in.header.%s}?password={{sync.dump.ssh.password}}", DataSourceHandler.Properties.FETCHED_FROM_HOST))
    .to("direct:applyDdlToTargetDatabase");

from("direct:applyDdlToTargetDatabase")
    .routeId("applyDdlToTargetDatabase")
    .transacted("targetTransactionPolicy")
    .toD(String.format("sql:?dataSource=%s&useMessageBodyForSql=true", DataSourceHandler.Properties.TARGET_DATASOURCE_NAME))
    .bean(FileUtil.class, "saveDdl")
    // Nothing else is called after this, since I want the transaction to be completed and commited
    .log(LoggingLevel.INFO, "Table will be populated next scheduled run.");

我让它运行两次迭代,结果如下:

entered catch
Could not find table article_flags at target, will attempt to create it by fetching DDL.
Saving DDL to config/sql/ddl/article_flags-20181206_162310.sql
Successfully created table article_flags
Table will be populated next scheduled run.
entered catch
Could not find table article_flags at target, will attempt to create it by fetching DDL.
Failed delivery for (MessageId: ID-overlord-1544109787810-0-12 on ExchangeId: ID-overlord-1544109787810-0-9). Exhausted after delivery attempt: 1 caught: org.springframework.jdbc.BadSqlGrammarException: PreparedStatementCallback; bad SQL grammar []; nested exception is org.postgresql.util.PSQLException: ERROR: relation "article_flags" already exists

我尝试使用事务管理器和事务策略来解决这个问题,我在应用程序启动时添加了事务管理器和事务策略,但没有效果。如您所见,我在上面的驼峰路由中使用了事务策略targetTransactionPolicy

public class ContextConfiguration implements BeanDefinitionRegistryPostProcessor {

    private static final Logger LOG = LoggerFactory.getLogger(ContextConfiguration.class);

    private Map<String, BasicDataSource> datasources;

    public ContextConfiguration(Map<String, BasicDataSource> datasources) {
        this.datasources = datasources;
    }

    @Override
    public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
        for (Entry<String, BasicDataSource> source : datasources.entrySet()) {
            LOG.info("Adding datasource {} targeting {} to Spring registry", source.getKey(), source.getValue().getUrl());
            PlatformTransactionManager transactionManager = new DataSourceTransactionManager(source.getValue());
            SpringTransactionPolicy transactionPolicy = new SpringTransactionPolicy(transactionManager);
            registry.registerBeanDefinition(source.getKey() + "TransactionPolicy", BeanDefinitionBuilder.genericBeanDefinition(SpringTransactionPolicy.class, () -> transactionPolicy).getBeanDefinition());
            registry.registerBeanDefinition(source.getKey() + "TransactionManager", BeanDefinitionBuilder.genericBeanDefinition(PlatformTransactionManager.class, () -> transactionManager).getBeanDefinition());
            registry.registerBeanDefinition(source.getKey(), BeanDefinitionBuilder.genericBeanDefinition(BasicDataSource.class, () -> source.getValue()).getBeanDefinition());
        }
    }

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException { }

}

这几天我的听力一直在下降,我似乎无法克服这一点。如有任何帮助或见解,将不胜感激。我最好的猜测是它与事务有关,或者与我创建事务管理器、策略和数据源的方式有关

其他信息:

  • 弹簧靴2.1.1
  • 骆驼2.23.0
  • 底层数据源是Postgres 10.6

共 (2) 个答案

  1. # 1 楼答案

    由于以下错误,select查询失败:

    关系“article_flags”已存在

    可能是这个 thread会有帮助。我也不认为这与交易有关

  2. # 2 楼答案

    我想我以令人满意的方式解决了这个问题。这是我所怀疑的交易

    我向目标数据源添加了事务策略:

    @Override
    public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
        registerTargetTransactionPolicy(datasources.get("target"), registry);
        for (Entry<String, BasicDataSource> source : datasources.entrySet()) {
            LOG.info("Adding datasource {} targeting {} to Spring registry", source.getKey(), source.getValue().getUrl());
            registry.registerBeanDefinition(source.getKey(), BeanDefinitionBuilder.genericBeanDefinition(BasicDataSource.class, () -> source.getValue()).getBeanDefinition());
        }
    }
    
    private void registerTargetTransactionPolicy(BasicDataSource datasource, BeanDefinitionRegistry registry) {
        PlatformTransactionManager transactionManager = new DataSourceTransactionManager(datasource);
        SpringTransactionPolicy transactionPolicy = new SpringTransactionPolicy(transactionManager);
        transactionPolicy.setPropagationBehaviorName("PROPAGATION_REQUIRES_NEW");
        registry.registerBeanDefinition(TARGET_REQUIRES_NEW_POLICY, BeanDefinitionBuilder.genericBeanDefinition(SpringTransactionPolicy.class, () -> transactionPolicy).getBeanDefinition());
    }
    

    然后,我使用.policy(ContextConfiguration.TARGET_REQUIRES_NEW_POLICY)在我的骆驼路线中使用它,它工作得完美无瑕