java DefaultMessageListenerContainer未从IBM MQ读取消息
我试图创建一些Spring引导代码,使用DefaultMessageListenerContainer来侦听来自IBM MQ的消息
我可以创建MQQueueConnectionFactory,并使用JmsTemplate发送和接收消息,但这是为了提高吞吐量,并且希望使用侦听器而不是轮询
我已经将大部分代码整合到一个组件中,所以我希望我拥有所有相关的内容
如果我安排receiveMessage方法,它将拾取排队的消息,这样我就知道sendMessage方法正在排队消息
@Component
class AllInOneTest {
private MessagingConfiguration.QueueConfig config;
private MQQueueConnectionFactory connectionFactory;
private JmsTemplate jmsTemplate;
private DefaultMessageListenerContainer listenerContainer;
private final Logger logger = LoggerFactory.getLogger(getClass());
public AllInOneTest(MessagingManager manager) throws JMSException {
String detailsName = "default";
config = manager.getMessagingDetails(detailsName).getConfig();
logger.debug("AllInOneTest Initializing Connection Factory: {}", detailsName);
connectionFactory = new MQQueueConnectionFactory();
connectionFactory.setHostName(config.getHost());
connectionFactory.setPort(config.getPort());
connectionFactory.setTransportType(config.getTransportType());
connectionFactory.setQueueManager(config.getQueueManager());
connectionFactory.setChannel(config.getChannel());
logger.debug("AllInOneTest Initializing Message Listener: {}", detailsName);
DefaultMessageListenerContainer defaultListener = new DefaultMessageListenerContainer();
defaultListener.setConnectionFactory(connectionFactory);
defaultListener.setExceptionListener((ee) -> {
logger.warn(String.format("AllInOneTest Message Listener Error: %s", detailsName), ee);
});
defaultListener.setDestinationResolver((session, name, pubSub) -> {
Destination ret = session.createQueue(name);
logger.debug("AllInOneTest Created Listener Destination: {}", ret);
return ret;
});
defaultListener.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
logger.info("AllInOneTest Listening For Message: {}", message);
}
});
// TODO Configure subscription.
// defaultListener.setSubscriptionDurable(true);
// defaultListener.setSubscriptionName("masher-service");
// TODO Configure concurrency.
// defaultListener.setConcurrency(config.getConcurrency());
// TODO Configure transaction.
// defaultListener.setSessionTransacted(config.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE);
listenerContainer = defaultListener;
logger.debug("AllInOneTest Initializing JMS Template: {}", detailsName);
jmsTemplate = new JmsTemplate(connectionFactory);
jmsTemplate.setMessageConverter(new SpringToJMSMessageConverter());
jmsTemplate.setReceiveTimeout(1000L);
jmsTemplate.setDefaultDestinationName(config.getOutputQueue());
jmsTemplate.setDestinationResolver((session, name, pubSub) -> {
Destination ret = session.createQueue(name);
logger.debug("AllInOneTest Created JMS Template Destination: {}", ret);
return ret;
});
listenerContainer.setDestinationName(config.getOutputQueue());
logger.debug("AllInOneTest Starting Message Listener: {} on {}", detailsName, config.getOutputQueue());
listenerContainer.start();
}
// @Scheduled(fixedRate = 500L)
public void receiveMessage() {
Object message = jmsTemplate.receiveAndConvert();
if (message != null) {
logger.info("AllInOneTest Received: {}", message);
}
}
@Scheduled(fixedRate = 1500L)
public void sendMessage() {
int count = counter.incrementAndGet();
org.springframework.messaging.Message<String> message = MessageBuilder.withPayload(String.format("JMS Masher Message %d %s %s", count,
new SimpleDateFormat("HH:mm:ss.SSS").format(new Date()), UUID.randomUUID().toString())).build();
logger.info("AllInOneTest Sending: {} [{}]", message.getPayload(), message.getHeaders());
jmsTemplate.convertAndSend(config.getInputQueue(), message);
}
}
我正在呼叫DefaultMessageListenerContainer。开始()但我感觉这不是“开始”,我肯定错过了什么
为JmsTemplate而不是DefaultMessageListenerContainer调用DestinationResolver
我在控制台中没有看到任何异常
谢谢你的帮助, 韦斯
# 1 楼答案
当您以编程方式创建容器时,而不是让Spring将其作为
@Bean
进行管理,您必须调用afterPropertiesSet()
(在设置所有属性之后,在start()
之前)对于许多弹簧组件来说都是如此。一般来说,让Spring来管理它们会更好