有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

rabbitmq rabbitmq java客户端并行消费

我想并行处理来自rabbitMq队列的消息。队列配置为autoAck=false。我正在使用对camel endpoints的camel rabbitMQ支持,它支持threadPoolSize参数,但这并没有达到预期的效果。即使threadpoolsize=20,消息仍会在队列外连续处理

通过代码调试,我可以看到threadpoolsize参数用于创建ExecutorService,该服务用于传递到rabbit connectionfactory,如here所述。这一切看起来都很好,直到你进入兔子ConsumerWorkService。在这里,消息以最大16条消息的块进行处理。一个块中的每条消息都会被串行处理,如果还有更多的工作要做,执行器服务将与下一个块一起调用。下面是一段代码片段。从executor服务的使用中,我看不出如何并行处理消息。executorservice一次只能执行一项工作

我错过了什么

private final class WorkPoolRunnable implements Runnable {

        public void run() {
            int size = MAX_RUNNABLE_BLOCK_SIZE;
            List<Runnable> block = new ArrayList<Runnable>(size);
            try {
                Channel key = ConsumerWorkService.this.workPool.nextWorkBlock(block, size);
                if (key == null) return; // nothing ready to run
                try {
                    for (Runnable runnable : block) {
                        runnable.run();
                    }
                } finally {
                    if (ConsumerWorkService.this.workPool.finishWorkBlock(key)) {
                        ConsumerWorkService.this.executor.execute(new WorkPoolRunnable());
                    }
                }
            } catch (RuntimeException e) {
                Thread.currentThread().interrupt();
            }
        }

共 (2) 个答案

  1. # 1 楼答案

    如果您有一个Channel实例,它将连续调用其注册的使用者,正如您通过检查ConsumerWorkService正确发现的那样。有两种方法可以克服这个问题:

    1. 使用多个频道,而不是一个
    2. 使用单一渠道,但以特殊方式实现消费者。他们应该只从队列中选取传入消息,并将其作为任务放入内部线程池

    你可以在this post中找到更多细节

  2. # 2 楼答案

    RabbitMQ的文档对此并不十分清楚,但是,即使ConsumerWorkService正在使用线程池,这个池似乎并没有以并行方式处理消息:

    Each Channel has its own dispatch thread. For the most common use case of one Consumer per Channel, this means Consumers do not hold up other Consumers. If you have multiple Consumers per Channel be aware that a long-running Consumer may hold up dispatch of callbacks to other Consumers on that Channel.

    http://www.rabbitmq.com/api-guide.html

    本文档建议每个线程使用一个Channel,事实上,如果只创建所需并发级别的Channel,消息将在链接到这些通道的使用者之间发送

    我用两个通道和消费者进行了测试:当队列中有两条消息时,每个消费者一次只选择一条消息。你提到的16条信息似乎没有干扰,这是一件好事

    事实上,Spring AMQP还创建了多个并发处理消息的通道。这是通过以下方式实现的:

    我还测试了它是否按预期工作