基于WebSocket的实时消息传递设计
阅读原文时间:2023年07月08日阅读:2

目录

概述

web管理系统中可以对业务数据执行新增和删除,现在需要当业务数据发生新增或删除操作后,尽可能实时的反应到WPF客户端上面。

web管理系统用VUE编写,后端服务为SpringBoot,WPF客户端基于.Netframework4.8编写。

整体架构

sequenceDiagram
title: 交互时序图

web前台->>+web后端服务:新增数据
Note over web前台,web后端服务:caremaId,labelInfo,……

web后端服务->>+WebSocketServer:创建websocker消息
Note over web后端服务,WebSocketServer:Must:cameraId=clientId

WPF客户端1-->>+WebSocketServer:创建监听
Note over WPF客户端1,WebSocketServer:clientId

WPF客户端2-->>+WebSocketServer:创建监听
Note over WPF客户端2,WebSocketServer:clientId

WebSocketServer->>WPF客户端1:分发websocker消息
Note over WebSocketServer,WPF客户端1:依据:cameraId=clientId

WebSocketServer->>WPF客户端2:分发websocker消息
Note over WebSocketServer,WPF客户端2:依据:cameraId=clientId

设计

  • 用户在浏览器界面执行新增业务数据操作,调用后端新增接口
  • WPF客户端在启动的时候初始化websocket客户端,并创建对server的监听
  • 后端新增接口先将数据落库,而后调用websocket服务端产生消息,消息在产生后立马被发送到了正在监听中的websocket-client
  • websocket-server和websocket-client是一对多的关系,如何保证业务数据被正确的分发?监听的时候给server端传递一个全局唯一的clientId,业务数据在产生的时候关联到一个BizId上面,只要保证clientId=BizId就可以了。
  • 删除流程和新增类似

WebSocketServer

概述

WebSocketServer端采用SpringBoot框架实现,通过在springboot-web项目中集成 org.springframework.boot:spring-boot-starter-websocket

实现websocket的能力。

新增pom

<!-- &nbsp;websocket &nbsp;-->
<dependency>
&nbsp; &nbsp; <groupId>org.springframework.boot</groupId>
&nbsp; &nbsp; <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

新增配置类

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.web.socket.config.annotation.EnableWebSocket;

import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@Configuration
@EnableWebSocket
public class WebSocketConfig {
&nbsp; &nbsp; @Bean
&nbsp; &nbsp; public ServerEndpointExporter serverEndpointExporter() {
&nbsp; &nbsp; &nbsp; &nbsp; return new ServerEndpointExporter();
&nbsp; &nbsp; }
}

创建websocket端点

import com.alibaba.fastjson.JSON;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;

@ServerEndpoint("/ws/label/{clientId}")
@Component
public class LabelWebSocket {
&nbsp; &nbsp; /**
&nbsp; &nbsp; &nbsp;* session list
&nbsp; &nbsp; &nbsp;*/
&nbsp; &nbsp; private static ConcurrentHashMap<String, Session> sessionList = new ConcurrentHashMap<>();
&nbsp; &nbsp; /**
&nbsp; &nbsp; &nbsp;* 当前 clientId
&nbsp; &nbsp; &nbsp;*/
&nbsp; &nbsp; private String currentClientId = "";
&nbsp; &nbsp; @OnOpen
&nbsp; &nbsp; public void open(Session session, @PathParam("clientId") String clientId) throws IOException {
&nbsp; &nbsp; &nbsp; &nbsp; if (sessionList.containsKey(clientId)) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; sessionList.remove(clientId);
&nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; sessionList.put(clientId, session);
&nbsp; &nbsp; &nbsp; &nbsp; currentClientId = clientId;
&nbsp; &nbsp; &nbsp; &nbsp; this.sendMsg(session, "connectok");
&nbsp; &nbsp; }
&nbsp; &nbsp; @OnClose
&nbsp; &nbsp; public void close(Session session) throws IOException {
&nbsp; &nbsp; &nbsp; &nbsp; sessionList.remove(currentClientId);
&nbsp; &nbsp; &nbsp; &nbsp; System.out.println("连接关闭,session=" + JSON.toJSONString(session.getId()));
&nbsp; &nbsp; }
&nbsp; &nbsp; @OnMessage
&nbsp; &nbsp; public void receiveMsg(Session session, String msg) throws IOException {
&nbsp; &nbsp; &nbsp; &nbsp; this.sendMsg(session, "接收到的消息为:" + msg);
// &nbsp; &nbsp; &nbsp; &nbsp;throw new RuntimeException("主动抛异常");
&nbsp; &nbsp; }
&nbsp; &nbsp; @OnError
&nbsp; &nbsp; public void error(Session session, Throwable e) throws IOException {
&nbsp; &nbsp; &nbsp; &nbsp; System.out.println("连接异常,session=" + JSON.toJSONString(session.getId()) + ";currentClientId=" + currentClientId);
&nbsp; &nbsp; &nbsp; &nbsp; this.sendMsg(session, "发生异常,e=" + e.getMessage());
&nbsp; &nbsp; &nbsp; &nbsp; e.printStackTrace();
&nbsp; &nbsp; }
&nbsp; &nbsp; /**
&nbsp; &nbsp; &nbsp;* @param clientId
&nbsp; &nbsp; &nbsp;* @param msg
&nbsp; &nbsp; &nbsp;*/
&nbsp; &nbsp; public boolean sendMsg(String clientId, String msg) throws IOException {
&nbsp; &nbsp; &nbsp; &nbsp; if (sessionList.containsKey(clientId)) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Session session = sessionList.get(clientId);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; this.sendMsg(session, msg);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return true;
&nbsp; &nbsp; &nbsp; &nbsp; } else {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return false;
&nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; }
&nbsp; &nbsp; private void sendMsg(Session session, String msg) throws IOException {
&nbsp; &nbsp; &nbsp; &nbsp; session.getBasicRemote().sendText(msg);
&nbsp; &nbsp; }
}

WebSocketClient

概述

WebSocketClient端集成在WPF应用客户端中,通过前期调研,选中 WebSocketSharp 作为websocketclient工具,WebSocketSharp 是托管在Github的开源项目,MITLicense,目前4.9K的star。

安装WebSocketSharp

//nuget

Install-Package WebSocketSharp -Pre

初始化client

WebSocket ws = new WebSocket("ws://127.0.0.1:8083/ws/xx/clientId");

创建连接

private void InitWebSocket()
{
&nbsp; &nbsp; ws.OnOpen += (sender, e) =>
&nbsp; &nbsp; {
&nbsp; &nbsp; &nbsp; &nbsp; Console.WriteLine("onOpen");
&nbsp; &nbsp; };
&nbsp; &nbsp; //允许ping
&nbsp; &nbsp; ws.EmitOnPing = true;
&nbsp; &nbsp; //接收到xiaoxi
&nbsp; &nbsp; ws.OnMessage += (sender, e) =>
&nbsp; &nbsp; {
&nbsp; &nbsp; &nbsp; &nbsp; ReceiveMessage(sender, e);
&nbsp; &nbsp; };
&nbsp; &nbsp; ws.Connect();
    //发送消息
&nbsp; &nbsp; //ws.Send("BALUS")
&nbsp; &nbsp; ;
}
private void ReceiveMessage(object sender, MessageEventArgs e)
{
&nbsp; &nbsp; if (e.IsText)
&nbsp; &nbsp; {
&nbsp; &nbsp; &nbsp; &nbsp; // Do something with e.Data.like jsonstring
&nbsp; &nbsp; &nbsp; &nbsp; Console.WriteLine(e.Data);
&nbsp; &nbsp; &nbsp; &nbsp; return;
&nbsp; &nbsp; }
&nbsp; &nbsp; if (e.IsBinary)
&nbsp; &nbsp; {
&nbsp; &nbsp; &nbsp; &nbsp; // Do something with e.RawData. like &nbsp;byte[]
&nbsp; &nbsp; &nbsp; &nbsp; return;
&nbsp; &nbsp; }
&nbsp; &nbsp; if (e.IsPing)
&nbsp; &nbsp; {
&nbsp; &nbsp; &nbsp; &nbsp; // Do something to notify that a ping has been received.
&nbsp; &nbsp; &nbsp; &nbsp; return;
&nbsp; &nbsp; }
}

新增接口

概述

目前WebSocketServer和web后端服务是在同一个SpringBoot的工程中,所以只要将WebSocketServer托管到SpringContainer中,web后端服务可以通过 DI 的方式直接访问 WebSocketEndPoint。

如果考虑程序的低耦合,可以在WebSocketServer和web后端服务之间架设一个MQ。

核心代码

&nbsp; &nbsp; @Autowired
&nbsp; &nbsp; private LabelWebSocket ws;
&nbsp; &nbsp; @GetMapping("/create")
&nbsp; &nbsp; public boolean createLabel() throws IOException {
&nbsp; &nbsp; &nbsp; &nbsp; String cameraId = "cml";
&nbsp; &nbsp; &nbsp; &nbsp; //todo
&nbsp; &nbsp; &nbsp; &nbsp; boolean result = ws.sendMsg(cameraId, "新增标签");
&nbsp; &nbsp; &nbsp; &nbsp; return result;
&nbsp; &nbsp; }

风险

当前在 WebSocketServer 中,已经连接的client信息是记录在当前进程的cache中,如果服务做横向扩容,cache信息无法在多实例进程中传递,将导致无法正确的处理业务数据,并可能会发生意想不到的异常和bug,此问题在并发越高的情况下造成的影响越大

web后端服务为基于java语言的springboot程序,这种类型程序的特点是内存消耗特别严重。WebSocketServer服务在本项目中仅用作消息中间件,连通web后端服务和WPF客户端。

首先WebSocketServer没有太多的计算能力的消耗,内存消耗会随着连接客户端数量的增长而增长,网络将是最大的开销,一方面需要转发来自web后端服务的业务数据,并和WPF客户端保持长连接;另一方面WebSocketServer和WPF客户端的交互可能会走公网,而其和web后端服务必然是在局域网环境。

综上,将web后端服务和WebSocketServer分开部署对于硬件资源成本和利用率来说是最好的选择。

未引入重试机制,当某一个环节失败之后,将导致异常情况发生。