前端 与 RabbitMQ 实时消息推送实践
阅读原文时间:2023年07月12日阅读:1

web 页面的未读消息(小红点)怎么实现比较简单,之前说过可以用 rabbitmq 的 MQTT 协议做智能家居的指令推送,里边还提到过能用 MQTT 协议做 web 的消息推送,而未读消息(小红点)功能刚好应用到实时消息推送了。

MQTT 协议就不再赘述了,没接触过的同学翻翻前边的文章温习一下吧,今天还是主要以实践为主!

![](https://article.cdnof.com/2307/67c3233f-daf3-4803-ad88-ad66f1990f57.png)

web 端实时消息推送,常用的实现方式比较多,但万变不离其宗,底层基本上还是依赖于 websocketMQTT 协议也不例外。

RabbitMQ的基础搭建就不详细说了,自行百度一步一步搞问题不大,这里主要说一下两个比较重要的配置。

1、开启 mqtt 协议

默认情况下RabbitMQ是不开启MQTT 协议的,所以需要我们手动的开启相关的插件,而RabbitMQMQTT 协议分为两种。

第一种 rabbitmq_mqtt 提供与后端服务交互使用,对应端口1883

rabbitmq-plugins enable rabbitmq_mqtt

第二种 rabbitmq_web_mqtt 提供与前端交互使用,对应端口15675

rabbitmq-plugins enable rabbitmq_web_mqtt 

在 RabbitMQ 管理后台看到如下的显示,就表示MQTT 协议开启成功,到这中间件环境就搭建完毕了。

协议对应端口号

使用MQTT 协议默认的交换机 Exchange 为 amp.topic,而我们订阅的主题会在 Queues 注册一个客户端队列,路由 Routing key 就是我们设置的主题。

交换机信息

web 端实时消息推送一般都是单向的推送,前端接收服务端推送的消息显示即可,所以就只实现消息发送即可。

1、mqtt 客户端依赖包

引入 spring-integration-mqttorg.eclipse.paho.client.mqttv3 两个工具包实现

<!--mqtt依赖包--><dependency>&nbsp;&nbsp;&nbsp;&nbsp;<groupId>org.springframework.integration</groupId>&nbsp;&nbsp;&nbsp;&nbsp;<artifactId>spring-integration-mqtt</artifactId></dependency><dependency>&nbsp;&nbsp;&nbsp;&nbsp;<groupId>org.eclipse.paho</groupId>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<artifactId>org.eclipse.paho.client.mqttv3</artifactId>&nbsp;&nbsp;&nbsp;&nbsp;<version>1.2.0</version></dependency>

2、消息发送者

消息的发送比较简单,主要是应用到 @ServiceActivator 注解,需要注意messageHandler.setAsync属性,如果设置成 false,关闭异步模式发送消息时可能会阻塞。

@Configurationpublic&nbsp;class&nbsp;IotMqttProducerConfig&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;@Autowired&nbsp;&nbsp;&nbsp;&nbsp;private&nbsp;MqttConfig&nbsp;mqttConfig;&nbsp;&nbsp;&nbsp;&nbsp;@Bean&nbsp;&nbsp;&nbsp;&nbsp;public&nbsp;MqttPahoClientFactory&nbsp;mqttClientFactory()&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;DefaultMqttPahoClientFactory&nbsp;factory&nbsp;=&nbsp;new&nbsp;DefaultMqttPahoClientFactory();&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;factory.setServerURIs(mqttConfig.getServers());&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return&nbsp;factory;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;@Bean&nbsp;&nbsp;&nbsp;&nbsp;public&nbsp;MessageChannel&nbsp;mqttOutboundChannel()&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return&nbsp;new&nbsp;DirectChannel();&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;@Bean&nbsp;&nbsp;&nbsp;&nbsp;@ServiceActivator(inputChannel&nbsp;=&nbsp;"iotMqttInputChannel")&nbsp;&nbsp;&nbsp;&nbsp;public&nbsp;MessageHandler&nbsp;mqttOutbound()&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;MqttPahoMessageHandler&nbsp;messageHandler&nbsp;=&nbsp;new&nbsp;MqttPahoMessageHandler(mqttConfig.getServerClientId(),&nbsp;mqttClientFactory());&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;messageHandler.setAsync(false);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return&nbsp;messageHandler;&nbsp;&nbsp;&nbsp;&nbsp;}}

MQTT 对外提供发送消息的 API 时,需要使用 @MessagingGateway 注解,去提供一个消息网关代理,参数 defaultRequestChannel 指定发送消息绑定的channel

可以实现三种API接口,payload 为发送的消息,topic 发送消息的主题,qos 消息质量。

@MessagingGateway(defaultRequestChannel&nbsp;=&nbsp;"iotMqttInputChannel")public&nbsp;interface&nbsp;IotMqttGateway&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;向默认的&nbsp;topic&nbsp;发送消息&nbsp;&nbsp;&nbsp;&nbsp;void&nbsp;sendMessage2Mqtt(String&nbsp;payload);&nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;向指定的&nbsp;topic&nbsp;发送消息&nbsp;&nbsp;&nbsp;&nbsp;void&nbsp;sendMessage2Mqtt(String&nbsp;payload,@Header(MqttHeaders.TOPIC)&nbsp;String&nbsp;topic);&nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;向指定的&nbsp;topic&nbsp;发送消息,并指定服务质量参数&nbsp;&nbsp;&nbsp;&nbsp;void&nbsp;sendMessage2Mqtt(@Header(MqttHeaders.TOPIC)&nbsp;String&nbsp;topic,&nbsp;@Header(MqttHeaders.QOS)&nbsp;int&nbsp;qos,&nbsp;String&nbsp;payload);}

前端使用与服务端对应的工具 paho-mqtt mqttws31.js实现,实现方式与传统的 websocket 方式差不多,核心方法 client = new Paho.MQTT.Client 和 各种监听事件,代码比较简洁。

注意:要保证前后端 clientId的全局唯一性,我这里就简单用随机数解决了

<script&nbsp;type="text/javascript">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;mqtt协议rabbitmq服务&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;var&nbsp;brokerIp&nbsp;=&nbsp;location.hostname;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;mqtt协议端口号&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;var&nbsp;port&nbsp;=&nbsp;15675;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;接受推送消息的主题&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;var&nbsp;topic&nbsp;=&nbsp;"push_message_topic";&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;mqtt连接&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;client&nbsp;=&nbsp;new&nbsp;Paho.MQTT.Client(brokerIp,&nbsp;port,&nbsp;"/ws",&nbsp;"clientId_"&nbsp;+&nbsp;parseInt(Math.random()&nbsp;*&nbsp;100,&nbsp;10));&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;var&nbsp;options&nbsp;=&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;timeout:&nbsp;3,&nbsp;//超时时间&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;keepAliveInterval:&nbsp;30,//心跳时间&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;onSuccess:&nbsp;function&nbsp;()&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;console.log(("连接成功~"));&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;client.subscribe(topic,&nbsp;{qos:&nbsp;1});&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;},&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;onFailure:&nbsp;function&nbsp;(message)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;console.log(("连接失败~"&nbsp;+&nbsp;message.errorMessage));&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;};&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;考虑到https的情况&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(location.protocol&nbsp;==&nbsp;"https:")&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;options.useSSL&nbsp;=&nbsp;true;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;client.connect(options);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;console.log(("已经连接到"&nbsp;+&nbsp;brokerIp&nbsp;+&nbsp;":"&nbsp;+&nbsp;port));&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;连接断开事件&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;client.onConnectionLost&nbsp;=&nbsp;function&nbsp;(responseObject)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;console.log("失去连接&nbsp;-&nbsp;"&nbsp;+&nbsp;responseObject.errorMessage);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;};&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;接收消息事件&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;client.onMessageArrived&nbsp;=&nbsp;function&nbsp;(message)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;console.log("接受主题:&nbsp;"&nbsp;+&nbsp;message.destinationName&nbsp;+&nbsp;"的消息:&nbsp;"&nbsp;+&nbsp;message.payloadString);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;$("#arrivedDiv").append("<br/>"+message.payloadString);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;var&nbsp;count&nbsp;=&nbsp;$("#count").text();&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;count&nbsp;=&nbsp;Number(count)&nbsp;+&nbsp;1;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;$("#count").text(count);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;};&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;推送给指定主题&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;function&nbsp;sendMessage()&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;var&nbsp;a&nbsp;=&nbsp;$("#message").val();&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(client.isConnected())&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;var&nbsp;message&nbsp;=&nbsp;new&nbsp;Paho.MQTT.Message(a);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;message.destinationName&nbsp;=&nbsp;topic;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;client.send(message);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;</script>

前后端的代码并不多,接下来我们测试一下,弄了个页面看看效果。

首先用 postman 模拟后端发送消息

http://127.0.0.1:8080/fun/sendMessage?message=我是程序员内点事&topic=push_message_topic

模拟发送消息

再看一下前端订阅消息的效果,看到消息被实时推送到了前端,这里只做了未读消息数量统计,一般还会做未读消息详情列表。

实时消息推送动图

未读消息是一个十分常见的功能,不管是 web端还是移动端系统都是必备的模块,MQTT 协议只是其中的一种实现方式,还是有必要掌握一种方法。具体用什么工具实现还是要看具体的业务场景和学习成本,像我用RabbitMQ 做还考虑到一些运维成本在里边。


本文完整代码地址:https://github.com/chengxy-nds/Springboot-Notebook/tree/master/springboot-mqtt-messagepush

鸣谢:

https://mp.weixin.qq.com/s?__biz=MzAxNTM4NzAyNg==&mid=2247487818&idx=1&sn=19393de4304e1ddd3179d0e45ec16cd7&scene=21#wechat_redirect