rabbitmq rabbitmq java客户端并行消费
我想并行处理来自rabbitMq队列的消息。队列配置为autoAck=false。我正在使用对camel endpoints的camel rabbitMQ支持,它支持threadPoolSize参数,但这并没有达到预期的效果。即使threadpoolsize=20,消息仍会在队列外连续处理
通过代码调试,我可以看到threadpoolsize参数用于创建ExecutorService,该服务用于传递到rabbit connectionfactory,如here所述。这一切看起来都很好,直到你进入兔子ConsumerWorkService
。在这里,消息以最大16条消息的块进行处理。一个块中的每条消息都会被串行处理,如果还有更多的工作要做,执行器服务将与下一个块一起调用。下面是一段代码片段。从executor服务的使用中,我看不出如何并行处理消息。executorservice一次只能执行一项工作
我错过了什么
private final class WorkPoolRunnable implements Runnable {
public void run() {
int size = MAX_RUNNABLE_BLOCK_SIZE;
List<Runnable> block = new ArrayList<Runnable>(size);
try {
Channel key = ConsumerWorkService.this.workPool.nextWorkBlock(block, size);
if (key == null) return; // nothing ready to run
try {
for (Runnable runnable : block) {
runnable.run();
}
} finally {
if (ConsumerWorkService.this.workPool.finishWorkBlock(key)) {
ConsumerWorkService.this.executor.execute(new WorkPoolRunnable());
}
}
} catch (RuntimeException e) {
Thread.currentThread().interrupt();
}
}
# 1 楼答案
如果您有一个
Channel
实例,它将连续调用其注册的使用者,正如您通过检查ConsumerWorkService
正确发现的那样。有两种方法可以克服这个问题:你可以在this post中找到更多细节
# 2 楼答案
RabbitMQ的文档对此并不十分清楚,但是,即使
ConsumerWorkService
正在使用线程池,这个池似乎并没有以并行方式处理消息:(http://www.rabbitmq.com/api-guide.html)
本文档建议每个线程使用一个
Channel
,事实上,如果只创建所需并发级别的Channel
,消息将在链接到这些通道的使用者之间发送我用两个通道和消费者进行了测试:当队列中有两条消息时,每个消费者一次只选择一条消息。你提到的16条信息似乎没有干扰,这是一件好事
事实上,Spring AMQP还创建了多个并发处理消息的通道。这是通过以下方式实现的:
SimpleMessageListenerContainer.setConcurrentConsumers(...)
:http://docs.spring.io/spring-amqp/docs/1.3.6.RELEASE/api/CachingConnectionFactory.setChannelCacheSize(...)
:http://docs.spring.io/spring-amqp/docs/1.3.6.RELEASE/api/我还测试了它是否按预期工作