有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

驼峰路由中生产者流控制的java问题(持久消息)

我在寻找正确的activemq配置集以确保Apache驼峰路由中消息的一致吞吐量时遇到问题。 当前配置使用以下技术:

  • 骆驼(2.15.2)
  • ActiveMQ(5.12.1)
  • Tomcat(7.0.56)

    下面是在Camel for ActiveMQ中使用的一组bean配置:

    <bean id="jmsConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:6616?jms.prefetchPolicy.queuePrefetch=100" />
         <property name="watchTopicAdvisories" value="false" />
         <property name="producerWindowSize" value="2300" />
    </bean>
    
    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
        init-method="start" destroy-method="stop">
        <property name="maxConnections" value="20" />
        <property name="connectionFactory" ref="jmsConnectionFactory" />
        <property name="idleTimeout" value="0"/>
    </bean>
    
    <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
            <property name="connectionFactory" ref="pooledConnectionFactory"/> 
            <property name="transactionManager" ref="jmsTransactionManager"/> 
            <property name="transacted" value="true"/>
    

    ——>;

    <bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
            <property name="connectionFactory" ref="jmsConnectionFactory" />
    </bean>
    
    <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
        <property name="configuration" ref="jmsConfig" />
    </bean>
    

下面是在activemq中找到的特定于代理的配置。xml文件:

<broker xmlns="http://activemq.apache.org/schema/core"
        brokerName="localhost" dataDirectory="./activemq/data/" advisorySupport="false">
        <destinationPolicy>
            <policyMap>
                <policyEntries>
                    <policyEntry queue="PICKAXE.L5.PROC.>" producerFlowControl="true" storeUsageHighWaterMark="50" />
                    <policyEntry queue="PICKAXE.L5.COL.>" producerFlowControl="true" storeUsageHighWaterMark="95" />
                </policyEntries>
            </policyMap>
        </destinationPolicy>

        <managementContext>
            <managementContext createConnector="true" />
        </managementContext>

        <persistenceAdapter>
            <kahaDB directory="./activemq/kahadb/" />
        </persistenceAdapter>

        <systemUsage>
            <systemUsage sendFailIfNoSpaceAfterTimeout="3000000">
                <memoryUsage>
                    <memoryUsage limit="750 mb" />
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="2 gb" />
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="500 mb" />
                </tempUsage>
            </systemUsage>
        </systemUsage>

        <transportConnectors>

            <transportConnector name="openwire"
                uri="tcp://0.0.0.0:6616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" />
            <transportConnector name="amqp"
                uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" />
        </transportConnectors>
        <shutdownHooks>
            <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
        </shutdownHooks>
    </broker>

我正在运行下面的骆驼路线。队列A接收大量消息(1000/s),因此它开始相当快地填满,因为这些消息的最终消费者无法跟上。随着消息量最终达到持久性存储生成器的50%,流控制规则将阻止进一步的消息放在队列A上。但是,当我通过JMX检查队列深度时,队列A和B都不会改变,就好像消费者也被阻塞一样

    from(activemq:queue:PICKAXE.L5.PROC.A)
        .to(activemq:queue:PICKAXE.L5.COL.B);

    from(activemq:queue:PICKAXE.L5.COL.B)
        .autoStartup(!localFlag)
        .to(customEndpoint)
        .routeId(collectionRouteId);

大约一个星期以来,我一直在尝试各种jms/activemq配置的排列方式,但运气不佳,所以我很感激有任何想法。我所追求的理想行为是,此流中消息的使用者继续从持久性存储中删除消息,这将允许消息继续彻底地流动


共 (1) 个答案

  1. # 1 楼答案

    该问题是由在上述配置中设置为300000的过大的sendFailIfNoSpaceAfterTimeout导致的。这导致代理在确认send()命令因持久存储已满而失败之前等待

    将上述配置替换为以下配置:

    <systemUsage sendFailIfNoSpaceAfterTimeout="300">
    

    这确保了(由于消息是持久的,队列被集成到驼峰路由中),当持久存储已满导致故障时,send()操作每0.3秒重试一次