java RabbitMQ创建交换,但不创建队列,并且没有订阅者接收消息
我正在使用Micronaut构建一些微服务。对服务的一个要求是,它们都必须生成事件,但任何服务都可以侦听其他服务的事件。我想构建它,以便您可以在本地插入RabbitMQ,但在任何环境中运行时,您都可以使用PaaS代理
为了实现这一点,我想为每个微服务配置一个exchange。例如,产品microservice将向名为product_events
的exchange生成事件。此服务还碰巧侦听来自支付服务的事件:
broker:
publish:
exchange: product_events
subscribe:
exchanges:
- payment_events
发布者将邮件发布到其自己的exchange:
public void sendToTopic(final String routingKey, final String data) throws BrokerPublishException {
final String rabbitMqFormattedRoutingKey = routingKey.replaceAll("[^A-Za-z0-9 ]", ".");
final ConnectionFactory factory = getConnectionFactory();
try (final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel()) {
LOG.info("Publishing message on exchange '{}' using routing key '{}' with data: {}", exchange, rabbitMqFormattedRoutingKey, data);
channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC, true);
channel.basicPublish(exchange, rabbitMqFormattedRoutingKey, null, data.getBytes(StandardCharsets.UTF_8));
} catch (final IOException | TimeoutException e) {
throw new BrokerPublishException("Failed to publish message", e);
}
}
调用服务控制器时,我可以看到消息已生成并添加到exchange:
Publishing message on exchange 'product_events' using routing key 'product.added with data: {"aggregateId":"product"}
然后,我希望消费者在应用程序启动时启动:
@Context // Eager start
@ExecuteOn(TaskExecutors.MESSAGE_CONSUMER) // Decouple message consuming to executor
public class SubscriberService {
private final BrokerSubscriber subscriber;
public SubscriberService(final BrokerSubscriber subscriber) {
this.subscriber = subscriber;
start();
}
private void start() {
subscriber.receive();
}
}
subscribe的RabbitMQ实现如下所示:
public void receive() throws BrokerSubscribeException {
final ConnectionFactory factory = getConnectionFactory();
try (final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel()){
for (final String exchange : exchanges) {
channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC, true);
final String queueName = channel.queueDeclare(UUID.randomUUID().toString(), true, true, true, null).getQueue();
channel.queueBind(queueName, exchange, "#");
LOG.info("Waiting for messages on exchange '{}', with bound queue '{}'", exchange, queueName);
final DeliverCallback deliverCallback = (consumerTag, delivery) -> {
final String message = new String(delivery.getBody(), "UTF-8");
LOG.info("Received '{}' with message\n:{}", delivery.getEnvelope().getRoutingKey(), message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
} catch (final IOException | TimeoutException e) {
throw new BrokerSubscribeException("Failed when subscribing to messages", e);
}
}
subscribe在启动时生成以下日志消息:
Waiting for messages on exchange 'payment_events', with bound queue 'e60dc206-f757-49c7-8f3f-fee6b2f3d255'
但是控制台中没有队列,我可以重用打印到控制台的guid,并通过管理控制台创建队列,而不会发生任何冲突或警告。我的服务应该在彼此的交换上侦听#
,而不会收到任何消息
我可以通过管理控制台创建队列,并将其绑定到exchange并获取发布者放置的消息,但我无法让我的服务执行此操作
简言之:
- 应用程序启动,订户创建交换机并侦听
#
- 产品服务获取添加新产品的操作
- 产品服务将有关已添加产品的消息发布到其自己的exchange
- 支付服务侦听产品服务交换,并应记录收到的消息
- 什么也没发生
我在同一Micronaut应用程序中创建到同一代理的两个连接是否存在任何问题
我真的不知道我做错了什么
# 1 楼答案
这相当尴尬
订阅者不应使用try with资源,因为连接和通道将在语句结束时立即关闭。只是改变了一下:
进入: