MQTT(EMQX) - Java 调用 MQTT Demo 代码
阅读原文时间:2023年07月11日阅读:1

POM

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.2</version>
</dependency>

Service.java

package com.vipsoft.mqtt;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import java.util.Scanner;

public class Service {

    public static void main(String[] args) throws Exception {
        String host = "tcp://172.16.3.88:1883";
        String topic = "VipSoft_MQTT";
        String clientId = "server_id"; // clientId不能重复这个是server的id
        //新建mqtt连接
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);
        //新建mqtt客户端
        MqttClient client = new MqttClient(host, clientId);
        client.connect(options);
        //新建mqtt消息
        MqttMessage message = new MqttMessage();

        @SuppressWarnings("resource")
        Scanner scanner = new Scanner(System.in);
        System.out.println("请输入要发送的内容:");
        while (true) {
            String MsgMessage= scanner.nextLine();
            message.setPayload(MsgMessage.getBytes());
            client.publish(topic, message);
        }

    }
}

Client.java

package com.vipsoft.mqtt;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;

public class Client {

    public static void main(String[] args) throws Exception {
        String host = "tcp://172.16.3.88:1883";
        String topic = "VipSoft_MQTT";
        String clientId = "client_id";
        // 1.设置mqtt连接属性
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);
        // 2.实例化mqtt客户端
        MqttClient client = new MqttClient(host, clientId);
        // 3.连接
        client.connect(options);
        //这里的setCallback需要新建一个Callback类并实现 MqttCallback 这个类
        client.setCallback(new PushCallback());
        while (true) {
            client.subscribe(topic, 2);
        }
    }
}

PushCallback.java

package com.vipsoft.mqtt;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Date;

/**
 * 发布消息的回调类
 *
 * 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。
 * 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。
 * 在回调中,将它用来标识已经启动了该回调的哪个实例。
 * 必须在回调类中实现三个方法:
 *
 *  public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。
 *
 *  public void connectionLost(Throwable cause)在断开连接时调用。
 *
 *  public void deliveryComplete(MqttDeliveryToken token))
 *  接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。
 *  由 MqttClient.connect 激活此回调。
 *
 */
public class PushCallback implements MqttCallback {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public void connectionLost(Throwable cause) {
        // 连接丢失后进行重连
        System.out.println("连接断开,可以做重连");
        logger.info("掉线时间:{}", new Date());
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("deliveryComplete---------" + token.isComplete());
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // subscribe后得到的消息会执行到这里面
        // System.out.println(message);
        System.out.println("接收消息主题 : " + topic);
        System.out.println("接收消息Qos : " + message.getQos());
        System.out.println("接收消息内容 : " + new String(message.getPayload()));
    }
}