有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java订阅主题。MQTT

我有以下代码部分:

    package com.company;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import javax.security.auth.callback.Callback;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;

public class Main implements MqttCallback {
    private static String sTopic;
    private static int iQos;
    private static MqttClient mqttClient;
    private static String sUsername;
    private static Frame frame = new Frame();

    public static void main(String[] args) throws MqttException {

        frame.getConnect().addActionListener(new ActionListener() {
            @Override
            public void actionPerformed(ActionEvent e) {
                int iPort;
                String sIp = frame.getBrokerAddressValue();
                sUsername = frame.getUsernameValue();
                try {
                    String broker = "tcp://"; //bridge and host
                    iPort = frame.getPortValue();
                    broker+=sIp+":"+iPort;
                    mqttClient = new MqttClient(broker, sUsername, new MemoryPersistence());  //URI, ClientId, Persistence
                    MqttConnectOptions connectOptions = new MqttConnectOptions();
                    connectOptions.setCleanSession(true);
                    System.out.println("Connecting to broker: "+broker);
                    mqttClient.connect();

                    System.out.println("Connected");
                }catch (NumberFormatException exc){
                    System.out.println("Wrong port format");
                } catch (MqttException e1) {
                    e1.printStackTrace();
                }

            }
        });

        frame.getSubscribe().addActionListener(new ActionListener() {
            @Override
            public void actionPerformed(ActionEvent e) {
                sTopic = frame.getTopicValue();
                try {
                    mqttClient.subscribe(sTopic);
                } catch (MqttException e1) {
                    e1.printStackTrace();
                }
                System.out.println("Subscribed");
            }
        });

        frame.getPublish().addActionListener(new ActionListener() {
            @Override
            public void actionPerformed(ActionEvent e) {
                String sMessage = frame.getMessageValue();
                MqttMessage message = new MqttMessage(sMessage.getBytes());
                iQos = frame.getQosValue();
                message.setQos(iQos);
                try {
                    mqttClient.publish(sTopic,message);
                } catch (MqttException e1) {
                    e1.printStackTrace();
                }
                System.out.println("Message published");
            }
        });
    }

    @Override
    public void connectionLost(Throwable throwable) {

    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception{
        frame.getTextArea().setText(String.valueOf(message));
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

    }
}

这是我实现mqtt客户机的一部分。方法getConnect正在处理点击按钮“Connect”,方法getSubscribe正在处理点击按钮“Subscribe”,方法getPublish正在处理点击按钮“Publish”。问题在于:当我订阅该主题时,消息不会到达订阅该主题的客户端。怎么了


共 (1) 个答案

  1. # 1 楼答案

    如果我正确理解您的问题,您订阅了主题“sTopic”,但当有人将消息发布到主题“sTopic”时,您没有收到消息

    您确定Mqtt客户端已成功连接到代理吗?在进行subscribe调用之前,先连接mqtt客户机

                if( mqttClient.isConnected()) {
                    mqttClient.subscribe(sTopic);
                 }
    

    这同样适用于发布

                   if( mqttClient.isConnected()) {
                       mqttClient.publish(sTopic,message);
                    }
    

    一旦这些调用结束,您应该在messageArrived回调方法中看到已发布的消息

    您需要将回调方法设置为mqttClient

                    clientCallback = new MqttCallback() {
    
                        @Override
                        public void connectionLost(Throwable cause) {
    
                        }
    
                        @Override
                        public void messageArrived(String topic, MqttMessage message) throws Exception {
    
                        }
    
                        @Override
                        public void deliveryComplete(IMqttDeliveryToken token) {
    
                        }
                    };
    
                    mqttClient.setCallback(clientCallback);