Java MQTT 客户端之 Paho
阅读原文时间:2023年07月13日阅读:1

Paho 自动重连后订阅的主题会清空,所以需要实现 MqttCallbackExtended 接口,在 connectComplete 方法添加订阅主题;而不是实现 MqttCallback 接口

一、添加引用

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.5</version>
</dependency>

二、添加配置

mqtt:
  client:
    username: admin
    password: public
    serverURI: tcp://192.168.137.101:1883
    clientId: paho_${random.int[1000,9999]}
    keepAliveInterval: 120
    connectionTimeout: 30
  producer:
    defaultQos: 1
    defaultRetained: true
    defaultTopic: topic/test1
  consumer:
    consumerTopics: topic/test2,topic/test3

三、代码

@Configuration
public class MqttConfig {
    @Value("${mqtt.client.username}")
    private String username;
    @Value("${mqtt.client.password}")
    private String password;
    @Value("${mqtt.client.serverURI}")
    private String serverURI;
    @Value("${mqtt.client.clientId}")
    private String clientId;
    @Value("${mqtt.client.keepAliveInterval}")
    private int keepAliveInterval;
    @Value("${mqtt.client.connectionTimeout}")
    private int connectionTimeout;

    @Autowired
    private MyMqttCallback myMqttCallback;

    @Bean
    public MqttClient mqttClient() {
        try {
            MqttClientPersistence persistence = mqttClientPersistence();
            MqttClient client = new MqttClient(serverURI, clientId, persistence);

            myMqttCallback.setMqttClient(client);
            client.setCallback(myMqttCallback);

            client.connect(mqttConnectOptions());
//            client.subscribe(subTopic);

            return client;
        } catch (MqttException e) {
            System.out.println(e.getMessage());
            return null;
        }
    }

    @Bean
    public MqttConnectOptions mqttConnectOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        options.setCleanSession(true);
        options.setAutomaticReconnect(true);
        options.setConnectionTimeout(connectionTimeout);
        options.setKeepAliveInterval(keepAliveInterval);

        return options;
    }

    public MqttClientPersistence mqttClientPersistence() {
        return new MemoryPersistence();
    }
}


@Component
public class MyMqttCallback implements MqttCallbackExtended {

    @Value("${mqtt.consumer.consumerTopics}")
    private String[] consumerTopics;

    @Autowired
    private MqttService mqttService;

    private MqttClient mqttClient;

    @Override
    public void connectionLost(Throwable throwable) {
        System.out.println("连接断开");
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        mqttService.message(topic, message);
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        System.out.println("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
    }

    @Override
    public void connectComplete(boolean b, String s) {
        try {
            mqttClient.subscribe(consumerTopics);
        } catch (MqttException e) {
            System.out.println(e.getMessage());
        }
    }

    public void setMqttClient(MqttClient mqttClient) {
        this.mqttClient = mqttClient;
    }
}


@Component
public class MqttProducer {

    @Value("${mqtt.producer.defaultQos}")
    private int defaultProducerQos;
    @Value("${mqtt.producer.defaultRetained}")
    private boolean defaultRetained;
    @Value("${mqtt.producer.defaultTopic}")
    private String defaultTopic;

    @Autowired
    private MqttClient mqttClient;

    public void send(String payload) {
        this.send(defaultTopic, payload);
    }

    public void send(String topic, String payload) {
        this.send(topic, defaultProducerQos, payload);
    }

    public void send(String topic, int qos, String payload) {
        this.send(topic, qos, defaultRetained, payload);
    }

    public void send(String topic, int qos, boolean retained, String payload) {
        try {
            mqttClient.publish(topic, payload.getBytes(), qos, retained);
        } catch (MqttException e) {
            System.out.println(e.getMessage());
        }
    }
}


@RestController
public class MqttController {

    @Autowired
    private MqttProducer mqttProducer;

    @RequestMapping("/send")
    public void send() {

        mqttProducer.send("test content");

    }
}

完整代码:GitHub

参考

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器