有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java如何在Spring AMQP中侦听现有队列?

我有一个远程RabbitMQ服务器,它有一些我想要监听的队列。我试过这个:

@RabbitListener(queues = "queueName")
public void receive(String message) {
    System.out.println(message);
}

但它试图创建一个新队列。结果是可预测的-访问被拒绝

o.s.a.r.listener.BlockingQueueConsumer   : Failed to declare queue: queueName

我没有以任何其他方式声明任何队列

如何侦听远程服务器上的现有队列?另外,是否有方法检查此队列是否存在?我看到了这条线

@RabbitListener(queues = "#{autoDeleteQueue2.name}")

在教程中。{}是什么意思

日志和堆栈跟踪的开始:

2018-08-30 22:10:21.968  WARN 12124 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer   : Failed to declare queue: queueName
2018-08-30 22:10:21.991  WARN 12124 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer   : Queue declaration failed; retries left=3

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[queueName]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:711) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:588) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:996) [spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]

共 (3) 个答案

  1. # 1 楼答案

    下面是一个关于如何使用rabbitMq侦听队列的示例:

    @Component
    public class RabbitConsumer implements MessageListener {
    
        @RabbitListener(bindings =
        @QueueBinding(
                value = @Queue(value = "${queue.topic}", durable = "true"),
                exchange = @Exchange(value = "${queue.exchange}", type = ExchangeTypes.FANOUT, durable = "true")
        )
        )
        @Override
        public void onMessage(Message message) {
            // ...
        }
    }
    

    和配置(application.yaml):

    queue:
      topic: mytopic
      exchange: myexchange
    

    在rabbitmq中,使用者与交换相关联。它允许您定义必须如何使用消息(所有消费者是否都在收听所有消息?如果只有一个消费者阅读消息,这是否足够?…)

  2. # 2 楼答案

    下面是一个如何使用Spring集成侦听特定“队列”的示例:

    SpringIntegrationConfiguration。java

    @Configuration
    public class SpringIntegrationConfiguration {
    
    @Value("${rabbitmq.queueName}")
    private String queueName;
    
    @Bean
    public IntegrationFlow ampqInbound(ConnectionFactory connectionFactory) {
        return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, queueName))
                .handle(System.out::println)
                .get();
      }
    }
    

    应用程序配置。java

    @Configuration
    public class ApplicationConfiguration {
    
    @Value("${rabbitmq.topicExchangeName}")
    private String topicExchangeName;
    
    @Value("${rabbitmq.queueName}")
    private String queueName;
    
    @Value("${rabbitmq.routingKey}")
    private String routingKey;
    
    @Bean
    Queue queue() {
        return new Queue(queueName, false);
    }
    
    @Bean
    TopicExchange exchange() {
        return new TopicExchange(topicExchangeName);
    }
    
    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(routingKey);
      }
    
    }
    

    应用程序。yml

    rabbitmq:
    topicExchangeName: spring-boot-exchange
    queueName: spring-boot
    routingKey: foo.bar.#
    
  3. # 3 楼答案

    即使您没有代理的配置权限,侦听器使用的queueDeclarePassive也是允许的(它检查队列的存在)

    o.s.a.r.listener.BlockingQueueConsumer : Failed to declare queue: queueName

    这只意味着队列不存在

    @RabbitListener(queues = "#{autoDeleteQueue2.name}")

    用于在运行时获取队列名称(当您有权创建队列时)

    例如

    @Bean
    public AnonymousQueue autoDeleteQueue2() {
        return new AnonymousQueue();
    }
    

    Spring将使用一个随机、唯一的名称将该队列添加到代理中。然后使用实际队列名称配置侦听器