Springboot+Websocket+JWT实现的即时通讯模块
阅读原文时间:2022年02月16日阅读:1

目前做了一个接口:邀请用户成为某课程的管理员,于是我感觉有能在用户被邀请之后能有个立马通知他本人的机(类似微博、朋友圈被点赞后就有立马能收到通知一样),于是就闲来没事搞了一套。

  • Springboot
  • Websocket 协议
  • JWT
  • (非必要)RabbitMQ 消息中间件

推荐阅读:Websocket 协议简介

WebSocket协议是基于TCP的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信——允许服务器主动发送信息给客户端。

为什么使用Websocket?

因为普通的http协议一个最大的问题就是:通信只能由客户端发起,服务器响应(半双工)****,而我们希望可以全双工通信。

因此一句话总结就是:建立websocket(以下简称为ws)连接是为了让服务器主动向前端发消息,而无需等待前端的发起请求调用接口。

业务逻辑

我们现在有:

  • 用户A
  • 用户B
  • Springboot服务器
  • 场景:用户A调用接口邀请用户B成为课程成员
  • 涉及数据库MySQL的数据表:
    • course_member_invitation,记录课程邀请记录,其形式如下(忽略时间等列):

id

course_id

account_id

admin_id

is_accepted

bind_message_id

邀请id

课程id

受邀用户id

邀请人id(因其本身为课程管理员)

受邀用户是否接受了邀请

绑定的消息id

  • course_message,记录消息记录,其形式如下(忽略时间等列):

id

type

account_id

source_id

is_read

is_ignored

消息id

消息类型

收信人用户id

发信人用户id

是否已读

收信人是否忽略

  • (图中没有体现)course_message_type,记录消息类型,其形式如下

id

name

description

消息类型id

消息类型名称

描述

  • 涉及RabbitMQ(因不是重点,所以此处暂不讨论,最后一章叙述)

业务步骤主要涉及两个方法addCourseMemberInvitationsendMessage和一个组件CourseMemberInvitationListener,分别做:

addCourseMemberInvitation:

  1. 用户A调用接口,邀请用户B成为某门课程的管理员
  2. Springboot服务器收到请求,将这一请求生成邀请记录、消息记录,写入下表:
    • course_member_invitation
    • course_message
  3. 写入DB后,调用sendMessage处理发送消息的业务。
  4. 将执行的结果返回给用户A

sendMessage

  1. 将消息记录放入RabbitMQ中对应的消息队列。

CourseMemberInvitationListener:

  1. 持续监听其绑定的消息队列
  2. 一旦消息队列中有新消息,就尝试通过ws连接发送消息。

在Springboot中配置Websocket

  • pom.xml文件

    org.springframework.boot spring-boot-starter-websocket

  • Websocket Server组件配置初步:com.xxxxx.course.webSocket.WebSocketServer

    /**

    • 进行前后端即时通信

    • https://blog.csdn.net/qq_33833327/article/details/105415393

    • session: https://www.codeleading.com/article/6950456772/

    • @author jojo
      / @ServerEndpoint(value = "/ws/{uid}",configurator = WebSocketConfig.class) //响应路径为 /ws/{uid} 的连接请求 @Component public class WebSocketServer { /*

      • 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的
        */
        private static int onlineCount = 0;

      /**

      • concurrent 包的线程安全Set,用来存放每个客户端对应的 myWebSocket对象
      • 根据 用户id 来获取对应的 WebSocketServer 示例
        */
        private static ConcurrentHashMap webSocketMap = new ConcurrentHashMap<>();

      /**

      • 与某个客户端的连接会话,需要通过它来给客户端发送数据
        */
        private Session session;

      /**

      • 用户id
        */
        private String accountId ="";

      /**

      • logger
        */
        private static Logger LOGGER = LoggerUtil.getLogger();

      /**

      • 连接建立成功调用的方法
        *

      • @param session

      • @param uid 用户id
        */
        @OnOpen
        public void onOpen(Session session, @PathParam("uid") String uid) {

        this.session = session;

        //设置超时,同httpSession
        session.setMaxIdleTimeout(3600000);

        this.accountId = uid;

        //存储websocket连接,存在内存中,若有同一个用户同时在线,也会存,不会覆盖原有记录
        webSocketMap.put(accountId, this);
        LOGGER.info("webSocketMap -> " + JSON.toJSONString(webSocketMap.toString()));

        addOnlineCount(); // 在线数 +1
        LOGGER.info("有新窗口开始监听:" + accountId + ",当前在线人数为" + getOnlineCount());

        try {
        sendMessage(JSON.toJSONString("连接成功"));
        } catch (IOException e) {
        e.printStackTrace();
        throw new ApiException("websocket IO异常!!!!");
        }

      }

      /**

      • 关闭连接
        */

      @OnClose
      public void onClose() {
      if (webSocketMap.get(this.accountId) != null) {
      webSocketMap.remove(this.accountId);
      subOnlineCount(); // 人数 -1
      LOGGER.info("有一连接关闭,当前在线人数为:" + getOnlineCount());
      }
      }

      /**

      • 收到客户端消息后调用的方法

      • 这段代码尚未有在使用,可以先不看,在哪天有需求时再改写启用

        • @param message 客户端发送过来的消息
      • @param session
        */
        @OnMessage
        public void onMessage(String message, Session session) {
        LOGGER.info("收到来自用户 [" + this.accountId + "] 的信息:" + message);

        if (!StringTools.isNullOrEmpty(message)) {
        try {
        // 解析发送的报文
        JSONObject jsonObject = JSON.parseObject(message);
        // 追加发送人(防窜改)
        jsonObject.put("fromUserId", this.accountId);
        String toUserId = jsonObject.getString("toUserId");
        // 传送给对应 toUserId 用户的 WebSocket
        if (!StringTools.isNullOrEmpty(toUserId) && webSocketMap.containsKey(toUserId)) {
        webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString());
        } else {
        // 否则不在这个服务器上,发送到 MySQL 或者 Redis
        LOGGER.info("请求的userId:" + toUserId + "不在该服务器上");
        }
        } catch (Exception e) {
        e.printStackTrace();
        }
        }

      }

      /**

      • @param session
      • @param error
        */
        @OnError
        public void onError(Session session, Throwable error) {
        LOGGER.error("用户错误:" + this.accountId + ",原因:" + error);
        }

      /**

      • 实现服务器主动推送
        *
      • @param message 消息字符串
      • @throws IOException
        */
        public void sendMessage(String message) throws IOException {
        //需要使用同步机制,否则多并发时会因阻塞而报错
        synchronized(this.session) {
        try {
        this.session.getBasicRemote().sendText(message);
        } catch (IOException e) {
        LOGGER.error("发送给用户 ["+this.accountId +"] 的消息出现错误",e.getMessage());
        throw e;
        }
        }
        }

      /**

      • 点对点发送

      • 指定用户id

      • @param message 消息字符串

      • @param userId 目标用户id

      • @throws IOException
        */
        public static void sendInfo(String message, String userId) throws Exception {

        Iterator entrys = webSocketMap.entrySet().iterator();
        while (entrys.hasNext()) {
        Map.Entry entry = (Map.Entry) entrys.next();
        if (entry.getKey().toString().equals(userId)) {
        webSocketMap.get(entry.getKey()).sendMessage(message);
        LOGGER.info("发送消息到用户id为 [" + userId + "] ,消息:" + message);
        return;
        }
        }
        //错误说明用户没有在线,不用记录log
        throw new Exception("用户没有在线");
        }

      private static synchronized int getOnlineCount() {
      return onlineCount;
      }

      private static synchronized void addOnlineCount() {
      WebSocketServer.onlineCount++;
      }

      private static synchronized void subOnlineCount() {
      WebSocketServer.onlineCount--;
      }
      }

几点说明:

  • onOpen方法:服务器与前端建立ws连接成功时自动调用。

  • sendInfo方法:是服务器通过用户id向指定用户发送消息的方法,其为静态公有方法,因此可供各service调用。调用的例子:

    // WebSocket 通知前端
    try {
    //调用WebsocketServer向目标用户推送消息
    WebSocketServer.sendInfo(JSON.toJSONString(courseMemberInvitation),courseMemberInvitation.getAccountId().toString());
    LOGGER.info("send to "+courseMemberInvitation.getAccountId().toString());
    }

  • @ServerEndpoint注解:

    @ServerEndpoint(value = "/ws/{uid}",configurator = WebSocketConfig.class) //响应路径为 /ws/{uid} 的连接请求

这么注解之后,前端只用发起 ws://xxx.xxx:xxxx/ws/{uid} 即可开启ws连接(或者wss协议,增加TLS), 比如前端js代码这么写:

<script>
    var socket;

    /* 启动ws连接 */
    function openSocket() {
        if(typeof(WebSocket) == "undefined") {
            console.log("您的浏览器不支持WebSocket");
        }else{
            console.log("您的浏览器支持WebSocket");

            //实现化WebSocket对象,指定要连接的服务器地址与端口  建立连接
            var socketUrl="http://xxx.xxx.xxx:xxxx/ws/"+$("#uid").val();
            socketUrl=socketUrl.replace("https","ws").replace("http","ws"); //转换成ws协议
            console.log("正在连接:"+socketUrl);
            if(socket!=null){
                socket.close();
                socket=null;
            }
            socket = new WebSocket(socketUrl);

            /* websocket 基本方法 */

            //打开事件
            socket.onopen = function() {
                console.log(new Date()+"websocket已打开,正在连接...");
                //socket.send("这是来自客户端的消息" + location.href + new Date());
            };

            //获得消息事件
            socket.onmessage = function(msg) {
                console.log(msg.data);
                //发现消息进入    开始处理前端触发逻辑
            };

            //关闭事件
            socket.onclose = function() {
                console.log(new Date()+"websocket已关闭,连接失败...");
                //重新请求token
            };

            //发生了错误事件
            socket.onerror = function() {
                console.log("websocket连接发生发生了错误");
            }
        }

    }

    /* 发送消息 */
    function sendMessage() {
        if(typeof(WebSocket) == "undefined") {
            console.log("您的浏览器不支持WebSocket");
        }else {
            console.log("您的浏览器支持WebSocket");
            console.log('{"toUserId":"'+$("#toUserId").val()+'","contentText":"'+$("#contentText").val()+'"}');
            socket.send('{"toUserId":"'+$("#toUserId").val()+'","contentText":"'+$("#contentText").val()+'"}');
        }
    }
</script>

存在的问题

一切看起来很顺利,我只要放个用户id进去,就可以想跟谁通讯就跟谁通讯咯!

但设想一个场景, 我是小明,uid为250,我想找uid为520的小花聊天,理论上我只要发起ws://xxx.xxx:xxxx/ws/250请求与服务器连接,小花也发起ws://xxx.xxx:xxxx/ws/520与服务器建立ws连接,我们就能互发消息了吧!

这时候出现了uid为1的小黄,他竟然想挖墙脚!?他竟然学过js,自己发了ws://xxx.xxx:xxxx/ws/520跟服务器建立ws连接,而小花根本不想和我发消息,所以实际上是小黄冒充了小花,把小花NTR了(实际上人家并不在乎),跟我愉快地聊天?!

那怎么办啊?我怎么才能知道在跟我Websocket的究竟是美女小花还是黄毛小黄啊??!

这就引入了JWT!

可以看到后端会响应/ws/{token}的连接请求,前端可以发/ws/{token}的连接请求,一开始写的时候看网上的都是用/ws/{userId}来建立该id的用户与服务器的ws连接,但这样的话可能就很不安全,无法保证使用某个id建立的ws确实就是真实用户发起的连接。~~(小花被小黄NTR的悲惨故事)~~

所以在调研了很多公开的解决方案,看到了可以改用令牌(token)来建立ws连接,同时验证用户身份(事实上一些其他接口也可以用令牌(token)来保证接口安全性)。

//Websocket Server
@ServerEndpoint(value = "/ws/{token}",configurator = WebSocketConfig.class) //响应路径为 /ws/{token} 的连接请求
@Component
public class WebSocketServer {
    ...
}

js:

var socketUrl="http://xxx.xxx.xxx.xxx:xxxx/ws/"+$("#token").val();
socketUrl=socketUrl.replace("https","ws").replace("http","ws"); //转换成ws协议
....
socket = new WebSocket(socketUrl);

业务逻辑

为什么用JWT

最初考虑的是用/ws/{userId}来建立ws连接,然后在后台拿session中的user来对比用户id,判断合法性。

结果发现ws的session和http的session是不同的,还不好拿,可能得想办法把http的session存到redis或者DB(也可以存在内存中,只是可能又要消耗内存资源),在建立ws连接之前去拿出来验证合法性。后面查到了还有JWT这种好东西。

JWT好在哪里?

推荐阅读:什么是 JWT -- JSON WEB TOKEN

我的总结:

  • token可以过期

  • 验证token可以不用存在redis或者DB或者内存,完全依赖算法即可

  • 只要从前端请求中拿到token,后端就可以根据封装好的算法验证这个token合不合法,有没有被篡改过(这点很重要),过期了没有

  • 可以将用户id、用户名等非敏感数据一同封装到token中,后端拿到token后可以解码拿到数据,只要这个token合法,这些发来的数据就是可信的(小黄就算自己发明了token也不作数),是没有被篡改的(小黄就算把我小花的token偷走把用户id改成自己的也没用,后台可以算出来被改过),可以建立ws连接,调用websocket server进行通讯。

    教程

    JWT教程

    整合到本项目中

pom.xml

<!-- JWT 相关     -->
<dependency>
    <groupId>com.auth0</groupId>
    <artifactId>java-jwt</artifactId>
    <version>3.12.1</version>
</dependency>

<!--   base64     -->
<dependency>
    <groupId>commons-codec</groupId>
    <artifactId>commons-codec</artifactId>
    <version>1.12</version>
</dependency>

token的前两个部分是由base64编码的,所以需要codec进行解码。

实现一个JWT工具类

目前基本当作工具使用

com.xxxx.course.util.JWTUtil

/**
 * @author jojo
 * JWT 令牌工具类
 */
public class JWTUtil {

    /**
     * 默认本地密钥
     * @notice: 非常重要,请勿泄露
     */
    private static final String SECRET = "doyoulikevanyouxi?" //乱打的

    /**
     * 默认有效时间单位,为分钟
     */
    private static final int TIME_TYPE = Calendar.MINUTE;

    /**
     * 默认有效时间长度,同http Session时长,为60分钟
     */
    private static final int TIME_AMOUNT = 600;

    /**
     * 全自定生成令牌
     * @param payload payload部分
     * @param secret 本地密钥
     * @param timeType 时间类型:按Calender类中的常量传入:
     *         Calendar.YEAR;
     *         Calendar.MONTH;
     *         Calendar.HOUR;
     *         Calendar.MINUTE;
     *         Calendar.SECOND;等
     * @param expiredTime 过期时间,单位由 timeType 决定
     * @return 令牌
     */
    public static String generateToken(Map<String,String> payload,String secret,int timeType,int expiredTime){
        JWTCreator.Builder builder = JWT.create();

        //payload部分
        payload.forEach((k,v)->{
            builder.withClaim(k,v);
        });
        Calendar instance = Calendar.getInstance();
        instance.add(timeType,expiredTime);

        //设置超时时间
        builder.withExpiresAt(instance.getTime());

        //签名
        return builder.sign(Algorithm.HMAC256(secret)).toString();
    }

    /**
     * 生成token
     * @param payload payload部分
     * @return 令牌
     */
    public static String generateToken(Map<String,String> payload){
        return generateToken(payload,SECRET,TIME_TYPE,TIME_AMOUNT);
    }

    省略了重载方法....

    /**
     * 验证令牌合法性
     * @param token 令牌
     * @return
     */
    public static void verify(String token) {
        //如果有任何验证异常,此处都会抛出异常
        JWT.require(Algorithm.HMAC256(SECRET)).build().verify(token);
    }

    /**
     * 自定义密钥解析
     * @param token 令牌
     * @param secret 密钥
     * @return 结果
     */
    public static DecodedJWT parseToken(String token,String secret) {
        DecodedJWT decodedJWT = JWT.require(Algorithm.HMAC256(secret)).build().verify(token);
        return decodedJWT;
    }

    /**
     * 解析令牌
     * 当令牌不合法将抛出错误
     * @param token
     * @return
     */
    public static DecodedJWT parseToken(String token) {
        return parseToken(token,SECRET);
    }

    /**
     * 解析令牌获得payload,值为claims形式
     * @param token
     * @param secret
     * @return
     */
    public static Map<String,Claim> getPayloadClaims(String token,String secret){
        DecodedJWT decodedJWT = parseToken(token,secret);
        return decodedJWT.getClaims();
    }

    /**
     * 默认解析令牌获得payload,值为claims形式
     * @param token 令牌
     * @return
     */
    public static Map<String,Claim> getPayloadClaims(String token){
        return getPayloadClaims(token,SECRET);
    }

    /**
     * 解析令牌获得payload,值为String形式
     * @param token 令牌
     * @return
     */
    public static Map<String,String> getPayloadString(String token,String secret){
        Map<String, Claim> claims = getPayloadClaims(token,secret);
        Map<String,String> payload = new HashMap<>();
        claims.forEach((k,v)->{
            if("exp".equals(k)){
                payload.put(k,v.asDate().toString());
            }
            else {
                payload.put(k, v.asString());
            }
        });

        return payload;
    }
    /**
     * 默认解析令牌获得payload,值为String形式
     * @param token 令牌
     * @return
     */
    public static Map<String,String> getPayloadString(String token){
        return getPayloadString(token,SECRET);
    }

    /**
     * 通过用户实体生成令牌
     * @param user 用户实体
     * @return
     */
    public static String generateUserToken(Account user){
        return generateUserToken(user.getId());
    }

    /**
     * 通过用户id生成令牌
     * @param accountId 用户id
     * @return
     */
    public static String generateUserToken(Integer accountId){
        return generateUserToken(accountId.toString());
    }

        /**
         *  通过用户id生成令牌
         * @param accountId 用户id
         * @return
         */
    public static String generateUserToken(String accountId){
        Map<String,String> payload = new HashMap<>();
        payload.put("accountId",accountId);
        return generateToken(payload);
    }

    /**
     * 从令牌中解析出用户id
     * @param token 令牌
     * @return
     */
    public static String parseUserToken(String token){
        Map<String, String> payload = getPayloadString(token);
        return payload.get("accountId");
    }

}

调整登陆 service 中,登陆时返回一个token

com.xxxx.course.service.impl.AccountServiceImpl

public JSONObject login(){
    登陆成功...
    ...
    //生成并放入通信令牌token,令牌中带有用户id,用以鉴别身份
    String token = JWTUtil.generateUserToken(user);
    jsonObject.put("token",token);
    ...
    后续操作...
    return jsonObject;
}

WebSocket 连接握手时进行身份验证

之后前端只要携带token进行ws连接即可,写了个ws的配置类,继承了一个websocket连接的监听器ServerEndpointConfig.Configurator,进行token的验证。

com.XXXXX.course.config.webSocket.WebSocketConfig

/**
 * 开启 WebSocket 支持,进行前后端即时通讯
 * https://blog.csdn.net/qq_33833327/article/details/105415393
 * session配置:https://www.codeleading.com/article/6950456772/
 * @author jojo
 */
@Configuration
public class WebSocketConfig extends ServerEndpointConfig.Configurator implements WebSocketConfigurer {

    /**
     * logger
     */
    private static final Logger LOGGER = LoggerUtil.getLogger();

    /**
     * 监听websocket连接,处理握手前行为
     * @param sec
     * @param request
     * @param response
     */
    @Override
    public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {

        String[] path = request.getRequestURI().getPath().split("/");
        String token = path[path.length-1];

        //todo 验证用户令牌是否有效
        try {
            JWTUtil.verify(token);
        } catch (Exception e) {
            LOGGER.info("拦截了非法连接",e.getMessage());
            return;
        }
        super.modifyHandshake(sec, request, response);
    }

    @Bean
    public ServerEndpointExporter serverEndpointExporter(){
        return new ServerEndpointExporter();
    }

    ...
}

这样,每次服务器建立ws连接前,都要验证token的合法性,仅仅通过JWTUtil.verify(token);即可!当token不合法,就会抛出异常。

再配合重写websocket serveronOpen方法,应该就能进行身份可信的通信了!

/**
     * 连接建立成功调用的方法
     *
     * @param session
     * @param token 用户令牌
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("token") String token) {

        this.session = session;
        this.token = token;

        //设置超时,同httpSession
        session.setMaxIdleTimeout(3600000);

        //解析令牌,拿取用户信息
        Map<String, String> payload = JWTUtil.getPayloadString(token);
        String accountId = payload.get("accountId");
        this.accountId = accountId;

        //存储websocket连接,存在内存中,若有同一个用户同时在线,也会存,不会覆盖原有记录
        webSocketMap.put(accountId, this);
        LOGGER.info("webSocketMap -> " + JSON.toJSONString(webSocketMap.toString()));

        addOnlineCount(); // 在线数 +1
        LOGGER.info("有新窗口开始监听:" + accountId + ",当前在线人数为" + getOnlineCount());
        ...

教程

为什么我要用RabbitMQ

  • 正经理由:

    • 可以将写DB与发送消息两件事情异步处理,这样响应会更快。
    • 未来可以拓展为集群
  • 真正理由:

    • 人傻
    • 闲的

整合到本项目中

pom.xml

<!-- rabbitMQ 相关-->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

RabbitMQ 配置类

  • com.xxxx.course.config.rabbitMQ.RabbitMQConfig

    /**

    • @author jojo
      */
      @Configuration
      public class RabbitMQConfig {

      /**

      • 指定环境
        */
        @Value("${spring.profiles.active}")
        private String env;

      /**

      • logger
        */
        public static final Logger LOGGER = LoggerUtil.getLogger();

      /**

      • 交换机名
        */
        public String MEMBER_INVITATION_EXCHANGE = RabbitMQConst.MEMBER_INVITATION_EXCHANGE;

      /**

      • 交换机队列
        */
        public String MEMBER_INVITATION_QUEUE = RabbitMQConst.MEMBER_INVITATION_QUEUE;

      /**

      • 声明 课程成员邀请消息 交换机
      • @return
        */
        @Bean("memberInvitationDirectExchange")
        public Exchange memberInvitationDirectExchange(){
        //根据项目环境起名,比如开发环境会带dev字样
        String exchangeName = RabbitUtil.generateRabbitName(env,MEMBER_INVITATION_EXCHANGE);
        return ExchangeBuilder.directExchange(exchangeName).durable(true).build();
        }

      /**

      • 声明 课程成员邀请消息 队列
      • @return
        */
        @Bean("memberInvitationQueue")
        public Queue memberInvitationQueue(){
        //同上
        String queueName = RabbitUtil.generateRabbitName(env,MEMBER_INVITATION_QUEUE);
        return QueueBuilder.durable(queueName).build();
        }

      /**

      • 课程成员邀请消息的队列与交换机绑定
      • @param queue
      • @param exchange
      • @return
        */
        @Bean
        public Binding memberInvitationBinding(@Qualifier("memberInvitationQueue") Queue queue,@Qualifier("memberInvitationDirectExchange") Exchange exchange){
        String queueName = RabbitUtil.generateRabbitName(env,MEMBER_INVITATION_QUEUE);
        return BindingBuilder.bind(queue).to(exchange).with(queueName).noargs();
        }

      /**

      • Springboot启动时, 验证队列名根据环境命名正确
        */
        @Bean
        public void verify(){
        Queue memberInvitationQueue = SpringUtil.getBean("memberInvitationQueue", Queue.class);
        Exchange memberInvitationDirectExchange = SpringUtil.getBean("memberInvitationDirectExchange", Exchange.class);

        LOGGER.info("消息队列 ["+memberInvitationQueue.getName()+"] 创建成功");
        LOGGER.info("消息交换器 ["+memberInvitationDirectExchange.getName()+"] 创建成功");

        //放入映射中存储
        RabbitMQConst.QUEUE_MAP.put(MessageConst.MEMBER_INVITATION,memberInvitationQueue.getName());
        RabbitMQConst.EXCHANGE_MAP.put(MessageConst.MEMBER_INVITATION,memberInvitationDirectExchange.getName());
        }

      /**

      • 自定义messageConverter使得消息中携带的pojo序列化成json格式
      • @return
        */
        @Bean
        public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
        }

    }

项目运行后,在 RabbitMQ服务器 中就会出现刚刚注册的队列与交换器(图是旧的,没有体现根据环境命名队列,但是其实做到了):

课程管理成员邀请接口

  • com.scholat.course.service.impl.CourseMemberInvitationServiceImpl

    • 首先在接口中注入操作rabbitMQ的Bean

    @Autowired
    RabbitTemplate rabbitTemplate;

  • 在课程管理员业务代码中加入向rabbitMQ发送消息的逻辑

    • CourseMemberInvitationServiceImpl

    @Override
    @Transactional(rollbackFor = Exception.class) //开启事务,以防万一
    public JSONObject addCourseMemberInvitation(Integer courseId, String username, String requestIp) {

    //检查课程是否存在
    courseService.hasCourse(courseId);
    
    //检查用户是否已加入课程平台
    accountService.hasAccount(username);
    
    /* 若存在则查看邀请记录是否已经存在 */
    
    //获取用户id
    Account account = accountService.getAccountByUsernameOrEmail(username);
    
    //检查用户名是否存在
    if(account==null){
        JSONObject result = new JSONObject();
        result.put(RESULT,FAILED);
        result.put(MSG,"用户不存在");
        return result;
    }
    
    Integer accountId = account.getId();
    
    //获得发出邀请人的id
    Account user = (Account) SecurityUtils.getSubject().getSession().getAttribute("user");
    Integer adminId = user.getId();
    
    //检查是否自己邀请自己,是则不再执行
    hasInvitedOneself(accountId,adminId);
    
    //检查是否已经邀请过,是则不再执行
    hasInvited(courseId,accountId,adminId);
    
    /* 若不存在则新建邀请记录 */
    
    CourseMemberInvitation courseMemberInvitation = new CourseMemberInvitation();
    courseMemberInvitation.setCourseId(courseId);
    courseMemberInvitation.setAccountId(accountId);
    courseMemberInvitation.setAdminId(adminId);
    courseMemberInvitation.setCreateTime(new Date());
    courseMemberInvitation.setCreateIp(requestIp);
    
    //新建消息
    CourseMessage courseMessage = courseMessageService.newMessage(MessageConst.MEMBER_INVITATION, accountId, adminId);
    
    //绑定邀请记录与消息记录
    courseMemberInvitation.setBindMessageId(courseMessage.getId());
    
    //插入数据库(这里用的是MybatisPlus)
    int insertResult = courseMemberInvitationDao.insert(courseMemberInvitation);
    
    //根据数据库插入返回值封装json
    JSONObject result = insertCourseMemberInvitationResult(insertResult, courseMemberInvitation);
    
    if(result.get(RESULT).equals(FAILED)){
        //若数据库操作没有成功,则直接返回json
        return result;
    }
    
    /* 发送消息 */
    courseMessageService.sendMessage(courseMessage);
    
    //根据插入情况返回json
    return result;

    }

  • courseMessageService中实现的sendMessage方法:

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Override
    public void sendMessage(CourseMessage courseMessage) {
    //尝试发送
    //将消息放入rabbitMQ
    storeInRabbitMQ(courseMessage);
    }

    private void storeInRabbitMQ(CourseMessage courseMessage){
    //将消息放入rabbitMQ
    String exchangeName = (String) RabbitMQConst.EXCHANGE_MAP.get(courseMessage.getType());
    String routeKey = (String) RabbitMQConst.QUEUE_MAP.get(courseMessage.getType());
    try {
    //送到rabbitMQ队列中
    rabbitTemplate.convertAndSend(exchangeName,routeKey,courseMessage);
    }
    catch (Exception e){
    LOGGER.error("插入rabbitMQ失败",e);
    }
    }

  • com.xxxx.course.rabbitMQ.listener.CourseMemberInvitationListener

该类是用以监听_课程成员邀请_消息的,即是在rabbitMQ服务器建立的member_invitation队列。

/**
 * @author jojo
 */
@Component
public class CourseMemberInvitationListener {

    @Autowired
    MessageHandler messageHandler;

    /**
     * logger
     */
    public static final Logger LOGGER = LoggerUtil.getLogger();

    /**
     * spEL表达式
     * 一旦队列中有新消息,这个方法就会被触发
     */
    @RabbitListener(queues = "#{memberInvitationQueue.name}")
    public void listenCourseMemberInvitation(Message message){
        messageHandler.handleMessage(message);
    }

}
  • com.xxxx.course.rabbitMQ.MessageHandler, 该类是用来处理监听事件的:

    @Service
    public class MessageHandler {

    @Autowired
    MessageConverter messageConverter;
    
    @Autowired
    RabbitTemplate rabbitTemplate;
    
    /**
     * logger
     */
    public static final Logger LOGGER = LoggerUtil.getLogger();
    
    /**
     * 队列消息处理业务
     * @param message
     */
    public void handleMessage(Message message){
        CourseMessage courseMessage = (CourseMessage) messageConverter.fromMessage(message);
    // WebSocket 通知前端
    try {
        //将消息发给指定用户
        WebSocketServer.sendInfo(JSON.toJSONString(courseMessage),courseMessage.getAccountId().toString());
    } catch (Exception e) {
        //消息存在数据库中了,待用户上线后再获取
        LOGGER.info("发送消息id为 ["+courseMessage.getId()+"] 的消息给-&gt;消息待收方id为 ["+courseMessage.getAccountId().toString()+"] 的用户,但其不在线上。");
    }
    }

    }

这样做应该就可以用RabbitMQ了。

本文的难点是ws的认证问题,虽然用超级好用的JWT解决了,但是随之而来的还有很多问题,比如:

  1. 注销登录等场景下 token 还有效

    与之类似的具体相关场景有:

  2. token 的续签问题

    token 有效期一般都建议设置的不太长,那么 token 过期后如何认证,如何实现动态刷新 token,避免用户经常需要重新登录?

这些问题还是有待解决是

本文就当记录一下自己的胡作非为吧

总之,至少小花再也不怕被小黄NTR了

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章