有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java RabbitMQ创建交换,但不创建队列,并且没有订阅者接收消息

我正在使用Micronaut构建一些微服务。对服务的一个要求是,它们都必须生成事件,但任何服务都可以侦听其他服务的事件。我想构建它,以便您可以在本地插入RabbitMQ,但在任何环境中运行时,您都可以使用PaaS代理

为了实现这一点,我想为每个微服务配置一个exchange。例如,产品microservice将向名为product_events的exchange生成事件。此服务还碰巧侦听来自支付服务的事件:

broker:
  publish:
    exchange: product_events
  subscribe:
    exchanges:
      - payment_events

发布者将邮件发布到其自己的exchange:

public void sendToTopic(final String routingKey, final String data) throws BrokerPublishException {
    final String rabbitMqFormattedRoutingKey = routingKey.replaceAll("[^A-Za-z0-9 ]", ".");
    final ConnectionFactory factory = getConnectionFactory();
    try (final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel()) {
        LOG.info("Publishing message on exchange '{}' using routing key '{}' with data: {}", exchange, rabbitMqFormattedRoutingKey, data);
        channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC, true);
        channel.basicPublish(exchange, rabbitMqFormattedRoutingKey, null, data.getBytes(StandardCharsets.UTF_8));
    } catch (final IOException | TimeoutException e) {
        throw new BrokerPublishException("Failed to publish message", e);
    }
}

调用服务控制器时,我可以看到消息已生成并添加到exchange:

Publishing message on exchange 'product_events' using routing key 'product.added with data: {"aggregateId":"product"}

然后,我希望消费者在应用程序启动时启动:

@Context // Eager start
@ExecuteOn(TaskExecutors.MESSAGE_CONSUMER) // Decouple message consuming to executor
public class SubscriberService {

    private final BrokerSubscriber subscriber;

    public SubscriberService(final BrokerSubscriber subscriber) {
        this.subscriber = subscriber;
        start();
    }

    private void start() {
        subscriber.receive();
    }
}

subscribe的RabbitMQ实现如下所示:

public void receive() throws BrokerSubscribeException {
    final ConnectionFactory factory = getConnectionFactory();
    try (final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel()){
        for (final String exchange : exchanges) {
            channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC, true);
            final String queueName = channel.queueDeclare(UUID.randomUUID().toString(), true, true, true, null).getQueue();
            channel.queueBind(queueName, exchange, "#");

            LOG.info("Waiting for messages on exchange '{}', with bound queue '{}'", exchange, queueName);

            final DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                final String message = new String(delivery.getBody(), "UTF-8");
                LOG.info("Received '{}' with message\n:{}", delivery.getEnvelope().getRoutingKey(), message);
            };
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
        }
    } catch (final IOException | TimeoutException e) {
        throw new BrokerSubscribeException("Failed when subscribing to messages", e);
    }
} 

subscribe在启动时生成以下日志消息:

Waiting for messages on exchange 'payment_events', with bound queue 'e60dc206-f757-49c7-8f3f-fee6b2f3d255'

一切正常启动,我可以看到在管理控制台中创建的交换: enter image description here

但是控制台中没有队列,我可以重用打印到控制台的guid,并通过管理控制台创建队列,而不会发生任何冲突或警告。我的服务应该在彼此的交换上侦听#,而不会收到任何消息

我可以通过管理控制台创建队列,并将其绑定到exchange并获取发布者放置的消息,但我无法让我的服务执行此操作

简言之:

  1. 应用程序启动,订户创建交换机并侦听#
  2. 产品服务获取添加新产品的操作
  3. 产品服务将有关已添加产品的消息发布到其自己的exchange
  4. 支付服务侦听产品服务交换,并应记录收到的消息
  5. 什么也没发生

我在同一Micronaut应用程序中创建到同一代理的两个连接是否存在任何问题

我真的不知道我做错了什么


共 (1) 个答案

  1. # 1 楼答案

    这相当尴尬

    订阅者不应使用try with资源,因为连接和通道将在语句结束时立即关闭。只是改变了一下:

    try (final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel()){
        // Do stuff
    }
    
    

    进入:

    try {
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        // Do stuff
    }