有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java JMSTemplate检查主题是否存在并获取订户计数

我一直在寻找一些文档/示例,用于检查动态创建的主题是否存在,如果存在,如何获取该主题的订户数量

我使用以下代码向主题发送消息-

jmsTemplate.send(destination, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage message = session.createTextMessage();
                message.setText(commandStr);

                return message;
            }
        });

这段代码似乎创建了主题并将消息发布到主题

  1. 在创建主题之前,我需要检查主题是否存在
  2. 检查主题是否有订户

提前谢谢

我找到了(1)问题的解决办法(希望这有帮助)-

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
ActiveMQConnection connection = (ActiveMQConnection)connectionFactory.createConnection();
connection.start();
DestinationSource ds = connection.getDestinationSource();
Set<ActiveMQTopic> topics = ds.getTopics();

共 (1) 个答案

  1. # 1 楼答案

    要获取目的地名称,正如您所做的那样,这是正确的,您可以通过JMX来获取统计信息,例如订户计数

    import java.util.HashMap;
    import java.util.Map;
    
    import javax.management.MBeanServerConnection;
    import javax.management.MBeanServerInvocationHandler;
    import javax.management.ObjectName;
    import javax.management.remote.JMXConnector;
    import javax.management.remote.JMXConnectorFactory;
    import javax.management.remote.JMXServiceURL;
    
    import org.apache.activemq.broker.jmx.BrokerViewMBean;
    import org.apache.activemq.broker.jmx.TopicViewMBean;
    
    public class JMXGetDestinationInfos {
    
        public static void main(String[] args) throws Exception {
            JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi");
            Map<String, String[]> env = new HashMap<>();
            String[] creds = { "admin", "admin" };
            env.put(JMXConnector.CREDENTIALS, creds);
            JMXConnector jmxc = JMXConnectorFactory.connect(url, env);
            MBeanServerConnection conn = jmxc.getMBeanServerConnection();
    
            ObjectName activeMq = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost");
    
            BrokerViewMBean mbean = MBeanServerInvocationHandler.newProxyInstance(conn, activeMq, BrokerViewMBean.class,
                    true);
            for (ObjectName name : mbean.getTopics()) {
                if (("YOUR_TOPIC_NAME".equals(name.getKeyProperty("destinationName")))) {
                    TopicViewMBean topicMbean = MBeanServerInvocationHandler.newProxyInstance(conn, name,
                            TopicViewMBean.class, true);
                    System.out.println(topicMbean.getConsumerCount());
                }
            }
        }
    }
    

    import java.util.Set;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.JMSException;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.advisory.DestinationSource;
    import org.apache.activemq.command.ActiveMQQueue;
    import org.apache.activemq.command.ActiveMQTopic;
    
    public class AdvisorySupportGetAllDestinationsNames {
    
        public static void main(String[] args) throws JMSException {
            Connection conn = null;
            try {
                ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
                conn = cf.createConnection();
                conn.start();
                DestinationSource destinationSource = ((ActiveMQConnection) conn).getDestinationSource();
                Set<ActiveMQQueue> queues = destinationSource.getQueues();
                Set<ActiveMQTopic> topics = destinationSource.getTopics();
                System.out.println(queues);
                System.out.println(topics);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (conn != null) {
                    try {
                        conn.close();
                    } catch (Exception e) {
                    }
                }
            }
        }
    }
    

    更新

    您可以使用AdvisorySupport.getConsumerAdvisoryTopic()

    Note that the consumer start/stop advisory messages also have a consumerCount header to indicate the number of active consumers on the destination when the advisory message was sent.