驼峰路由中生产者流控制的java问题(持久消息)
我在寻找正确的activemq配置集以确保Apache驼峰路由中消息的一致吞吐量时遇到问题。 当前配置使用以下技术:
- 骆驼(2.15.2)
- ActiveMQ(5.12.1)
Tomcat(7.0.56)
下面是在Camel for ActiveMQ中使用的一组bean配置:
<bean id="jmsConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:6616?jms.prefetchPolicy.queuePrefetch=100" /> <property name="watchTopicAdvisories" value="false" /> <property name="producerWindowSize" value="2300" /> </bean> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" init-method="start" destroy-method="stop"> <property name="maxConnections" value="20" /> <property name="connectionFactory" ref="jmsConnectionFactory" /> <property name="idleTimeout" value="0"/> </bean> <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration"> <property name="connectionFactory" ref="pooledConnectionFactory"/> <property name="transactionManager" ref="jmsTransactionManager"/> <property name="transacted" value="true"/>
——>;
<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager"> <property name="connectionFactory" ref="jmsConnectionFactory" /> </bean> <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> <property name="configuration" ref="jmsConfig" /> </bean>
下面是在activemq中找到的特定于代理的配置。xml文件:
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="localhost" dataDirectory="./activemq/data/" advisorySupport="false">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue="PICKAXE.L5.PROC.>" producerFlowControl="true" storeUsageHighWaterMark="50" />
<policyEntry queue="PICKAXE.L5.COL.>" producerFlowControl="true" storeUsageHighWaterMark="95" />
</policyEntries>
</policyMap>
</destinationPolicy>
<managementContext>
<managementContext createConnector="true" />
</managementContext>
<persistenceAdapter>
<kahaDB directory="./activemq/kahadb/" />
</persistenceAdapter>
<systemUsage>
<systemUsage sendFailIfNoSpaceAfterTimeout="3000000">
<memoryUsage>
<memoryUsage limit="750 mb" />
</memoryUsage>
<storeUsage>
<storeUsage limit="2 gb" />
</storeUsage>
<tempUsage>
<tempUsage limit="500 mb" />
</tempUsage>
</systemUsage>
</systemUsage>
<transportConnectors>
<transportConnector name="openwire"
uri="tcp://0.0.0.0:6616?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />
<transportConnector name="amqp"
uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />
</transportConnectors>
<shutdownHooks>
<bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
</shutdownHooks>
</broker>
我正在运行下面的骆驼路线。队列A接收大量消息(1000/s),因此它开始相当快地填满,因为这些消息的最终消费者无法跟上。随着消息量最终达到持久性存储生成器的50%,流控制规则将阻止进一步的消息放在队列A上。但是,当我通过JMX检查队列深度时,队列A和B都不会改变,就好像消费者也被阻塞一样
from(activemq:queue:PICKAXE.L5.PROC.A)
.to(activemq:queue:PICKAXE.L5.COL.B);
from(activemq:queue:PICKAXE.L5.COL.B)
.autoStartup(!localFlag)
.to(customEndpoint)
.routeId(collectionRouteId);
大约一个星期以来,我一直在尝试各种jms/activemq配置的排列方式,但运气不佳,所以我很感激有任何想法。我所追求的理想行为是,此流中消息的使用者继续从持久性存储中删除消息,这将允许消息继续彻底地流动
# 1 楼答案
该问题是由在上述配置中设置为300000的过大的sendFailIfNoSpaceAfterTimeout导致的。这导致代理在确认send()命令因持久存储已满而失败之前等待
将上述配置替换为以下配置:
这确保了(由于消息是持久的,队列被集成到驼峰路由中),当持久存储已满导致故障时,send()操作每0.3秒重试一次