本文将介绍如何实现一个基于websocket分布式聊天(IM)系统。
使用golang实现websocket通讯,单机可以支持百万连接,使用gin框架、nginx负载、可以水平部署、程序内部相互通讯、使用grpc通讯协议。
本文内容比较长,如果直接想clone项目体验直接进入项目体验 goWebSocket项目下载 ,文本从介绍webSocket是什么开始,然后开始介绍这个项目,以及在Nginx中配置域名做webSocket的转发,然后介绍如何搭建一个分布式系统。
本文将介绍如何实现一个基于websocket聊天(IM)分布式系统。
使用golang实现websocket通讯,单机支持百万连接,使用gin框架、nginx负载、可以水平部署、程序内部相互通讯、使用grpc通讯协议。
WebSocket 协议在2008年诞生,2011年成为国际标准。所有浏览器都已经支持了。
它的最大特点就是,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息,是真正的双向平等对话,属于服务器推送技术的一种。
HTTP和WebSocket在通讯过程的比较
HTTP和webSocket都支持配置证书,ws://
无证书 wss://
配置证书的协议标识
golang、java、php、node.js、python、nginx 都有不错的支持
Android可以使用java-webSocket对webSocket支持
iOS 4.2及更高版本具有WebSockets支持
目前大多数的请求都是使用HTTP,都是由客户端发起一个请求,有服务端处理,然后返回结果,不可以服务端主动向某一个客户端主动发送数据
客户端发起升级协议的请求,采用标准的HTTP报文格式,在报文中添加头部信息
Connection: Upgrade
表明连接需要升级
Upgrade: websocket
需要升级到 websocket协议
Sec-WebSocket-Version: 13
协议的版本为13
Sec-WebSocket-Key: I6qjdEaqYljv3+9x+GrhqA==
这个是base64 encode 的值,是浏览器随机生成的,与服务器响应的 Sec-WebSocket-Accept
对应
# Request Headers
Connection: Upgrade
Host: im.91vh.com
Origin: http://im.91vh.com
Pragma: no-cache
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
Sec-WebSocket-Key: I6qjdEaqYljv3+9x+GrhqA==
Sec-WebSocket-Version: 13
Upgrade: websocket
服务端接收到升级协议的请求,如果服务端支持升级协议会做如下响应
返回:
Status Code: 101 Switching Protocols
表示支持切换协议
# Response Headers
Connection: upgrade
Date: Fri, 09 Aug 2019 07:36:59 GMT
Sec-WebSocket-Accept: mB5emvxi2jwTUhDdlRtADuBax9E=
Server: nginx/1.12.1
Upgrade: websocket
websocket需要监听端口,所以需要在golang
成功的 main
函数中用协程的方式去启动程序
main.go 实现启动
go websocket.StartWebSocket()
init_acc.go 启动程序
// 启动程序
func StartWebSocket() {
http.HandleFunc("/acc", wsPage)
http.ListenAndServe(":8089", nil)
}
客户端是通过http请求发送到服务端,我们需要对http协议进行升级为websocket协议
对http请求协议进行升级 golang 库gorilla/websocket 已经做得很好了,我们直接使用就可以了
在实际使用的时候,建议每个连接使用两个协程处理客户端请求数据和向客户端发送数据,虽然开启协程会占用一些内存,但是读取分离,减少收发数据堵塞的可能
init_acc.go
func wsPage(w http.ResponseWriter, req *http.Request) {
// 升级协议
conn, err := (&websocket.Upgrader{CheckOrigin: func(r *http.Request) bool {
fmt.Println("升级协议", "ua:", r.Header["User-Agent"], "referer:", r.Header["Referer"])return true
}}).Upgrade(w, req, nil)
if err != nil {
http.NotFound(w, req)return
}
fmt.Println("webSocket 建立连接:", conn.RemoteAddr().String())
currentTime := uint64(time.Now().Unix())
client := NewClient(conn.RemoteAddr().String(), conn, currentTime)
go client.read()
go client.write()
// 用户连接事件
clientManager.Register <- client
}
当前程序有多少用户连接,还需要对用户广播的需要,这里我们就需要一个管理者(clientManager),处理这些事件:
记录全部的连接、登录用户的可以通过 appId+uuid 查到用户连接
使用map存储,就涉及到多协程并发读写的问题,所以需要加读写锁
定义四个channel ,分别处理客户端建立连接、用户登录、断开连接、全员广播事件
// 连接管理
type ClientManager struct {
Clients map[Client]bool // 全部的连接
ClientsLock sync.RWMutex // 读写锁
Users map[string]Client // 登录的用户 // appId+uuid
UserLock sync.RWMutex // 读写锁
Register chan *Client // 连接连接处理
Login chan *login // 用户登录处理
Unregister chan *Client // 断开连接处理程序
Broadcast chan []byte // 广播 向全部成员发送数据
}
// 初始化
func NewClientManager() (clientManager ClientManager) {
clientManager = &ClientManager{
Clients: make(map[Client]bool),
Users: make(map[string]*Client),
Register: make(chan *Client, 1000),
Login: make(chan *login, 1000),
Unregister: make(chan *Client, 1000),
Broadcast: make(chan []byte, 1000),
}
return
}
防止发生程序崩溃,所以需要捕获异常
为了显示异常崩溃位置这里使用string(debug.Stack())
打印调用堆栈信息
如果写入数据失败了,可能连接有问题,就关闭连接
client.go
// 向客户端写数据
func (c *Client) write() {
defer func() {
if r := recover(); r != nil {
fmt.Println("write stop", string(debug.Stack()), r)
}
}()
defer func() {
clientManager.Unregister <- c
c.Socket.Close()
fmt.Println("Client发送数据 defer", c)
}()
for {
select {
case message, ok := <-c.Send:
if !ok {
// 发送数据错误 关闭连接
fmt.Println("Client发送数据 关闭连接", c.Addr, "ok", ok) return
}
c.Socket.WriteMessage(websocket.TextMessage, message)
}
}
}
循环读取客户端发送的数据并处理
如果读取数据失败了,关闭channel
client.go
// 读取客户端数据
func (c *Client) read() {
defer func() {
if r := recover(); r != nil {
fmt.Println("write stop", string(debug.Stack()), r)
}
}()
defer func() {
fmt.Println("读取客户端数据 关闭send", c)
close(c.Send)
}()
for {
_, message, err := c.Socket.ReadMessage()
if err != nil {
fmt.Println("读取客户端数据 错误", c.Addr, err) return
}
// 处理程序
fmt.Println("读取客户端数据 处理:", string(message))
ProcessData(c, message)
}
}
约定发送和接收请求数据格式,为了js处理方便,采用了json
的数据格式发送和接收数据(人类可以阅读的格式在工作开发中使用是比较方便的)
登录发送数据示例:
{"seq":"1565336219141-266129","cmd":"login","data":{"userId":"马远","appId":101}}
登录响应数据示例:
{"seq":"1565336219141-266129","cmd":"login","response":{"code":200,"codeMsg":"Success","data":null}}
websocket是双向的数据通讯,可以连续发送,如果发送的数据需要服务端回复,就需要一个seq来确定服务端的响应是回复哪一次的请求数据
cmd 是用来确定动作,websocket没有类似于http的url,所以规定 cmd 是什么动作
目前的动作有:login/heartbeat 用来发送登录请求和连接保活(长时间没有数据发送的长连接容易被浏览器、移动中间商、nginx、服务端程序断开)
为什么需要AppId,UserId是表示用户的唯一字段,设计的时候为了做成通用性,设计AppId用来表示用户在哪个平台登录的(web、app、ios等),方便后续扩展
request_model.go 约定的请求数据格式
/ 请求数据 **/
// 通用请求数据格式
type Request struct {
Seq string json:"seq"
// 消息的唯一Id
Cmd string json:"cmd"
// 请求命令字
Data interface{} json:"data,omitempty"
// 数据 json
}
// 登录请求数据
type Login struct {
ServiceToken string json:"serviceToken"
// 验证用户是否登录
AppId uint32 json:"appId,omitempty"
UserId string json:"userId,omitempty"
}
// 心跳请求数据
type HeartBeat struct {
UserId string json:"userId,omitempty"
}
response_model.go
/ 响应数据 **/
type Head struct {
Seq string json:"seq"
// 消息的Id
Cmd string json:"cmd"
// 消息的cmd 动作
Response *Response json:"response"
// 消息体
}
type Response struct {
Code uint32 json:"code"
CodeMsg string json:"codeMsg"
Data interface{} json:"data"
// 数据 json
}
使用路由的方式处理由客户端发送过来的请求数据
以后添加请求类型以后就可以用类是用http相类似的方式(router-controller)去处理
acc_routers.go
// Websocket 路由
func WebsocketInit() {
websocket.Register("login", websocket.LoginController)
websocket.Register("heartbeat", websocket.HeartbeatController)
}
client_manager.go
// 定时清理超时连接
func ClearTimeoutConnections() {
currentTime := uint64(time.Now().Unix())
for client := range clientManager.Clients {
if client.IsHeartbeatTimeout(currentTime) {
fmt.Println("心跳时间超时 关闭连接", client.Addr, client.UserId, client.LoginTime, client.HeartbeatTime)
client.Socket.Close()
}
}
}
write()
Goroutine写入数据失败,关闭c.Socket.Close()
连接,会关闭read()
Goroutine read()
Goroutine读取数据失败,关闭close(c.Send)
连接,会关闭write()
GoroutineClientManager
删除连接js 建立连接,并处理连接成功、收到数据、断开连接的事件处理
ws = new WebSocket("ws://127.0.0.1:8089/acc");
ws.onopen = function(evt) {
console.log("Connection open …");
};
ws.onmessage = function(evt) {
console.log( "Received Message: " + evt.data);
data_array = JSON.parse(evt.data);
console.log( data_array);
};
ws.onclose = function(evt) {
console.log("Connection closed.");
};
需要注意:连接建立成功以后才可以发送数据
建立连接以后由客户端向服务器发送数据示例
登录:
ws.send('{"seq":"2323","cmd":"login","data":{"userId":"11","appId":101}}');
心跳:
ws.send('{"seq":"2324","cmd":"heartbeat","data":{}}');
ping 查看服务是否正常:
ws.send('{"seq":"2325","cmd":"ping","data":{}}');
关闭连接:
ws.close();
本项目是基于webSocket实现的分布式IM系统
客户端随机分配用户名,所有人进入一个聊天室,实现群聊的功能
单台机器(24核128G内存)支持百万客户端连接
支持水平部署,部署的机器之间可以相互通讯
项目架构图
本项目只需要使用 redis 和 golang
本项目使用govendor管理依赖,克隆本项目就可以直接使用
github.com/gin-gonic/gin@v1.4.0
github.com/go-redis/redis
github.com/gorilla/websocket
github.com/spf13/viper
google.golang.org/grpc
github.com/golang/protobuf
克隆项目
git clone git@github.com:link1st/gowebsocket.git
修改项目配置
cd gowebsocket
cd config
mv app.yaml.example app.yaml
vim app.yaml
cd ..
配置文件说明
app:
logFile: log/gin.log # 日志文件位置
httpPort: 8080 # http端口
webSocketPort: 8089 # webSocket端口
rpcPort: 9001 # 分布式部署程序内部通讯端口
httpUrl: 127.0.0.1:8080
webSocketUrl: 127.0.0.1:8089
redis:
addr: "localhost:6379"
password: ""
DB: 0
poolSize: 30
minIdleConns: 30
启动项目
go run main.go
进入IM聊天地址
http://127.0.0.1:8080/home/index
到这里,就可以体验到基于webSocket的IM系统
使用域名 im.91vh.com 为示例,参考配置
一级目录im.91vh.com/acc 是给webSocket使用,是用nginx stream转发功能(nginx 1.3.31 开始支持,使用Tengine配置也是相同的),转发到golang 8089 端口处理
其它目录是给HTTP使用,转发到golang 8080 端口处理
upstream go-im
{
server 127.0.0.1:8080 weight=1 max_fails=2 fail_timeout=10s;
keepalive 16;
}
upstream go-acc
{
server 127.0.0.1:8089 weight=1 max_fails=2 fail_timeout=10s;
keepalive 16;
}
server {
listen 80 ;
server_name im.91vh.com;
index index.html index.htm ;
location /acc {
proxy_set_header Host $host;
proxy_pass http://go-acc;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;
proxy_set_header Connection "";
proxy_redirect off;
proxy_intercept_errors on;
client_max_body_size 10m;
}
location /
{
proxy_set_header Host $host;
proxy_pass http://go-im;
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_redirect off;
proxy_intercept_errors on;
client_max_body_size 30m;
}
access_log /link/log/nginx/access/im.log;
error_log /link/log/nginx/access/im.error.log;
}
运行nginx测试命令,查看配置文件是否正确
/link/server/tengine/sbin/nginx -t
如果出现错误
nginx: [emerg] unknown "connection_upgrade" variable
configuration file /link/server/tengine/conf/nginx.conf test failed
处理方法
在nginx.com添加
http{
fastcgi_temp_file_write_size 128k;
….. # 需要添加的内容
#support websocket
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
…..
gzip on;
}
原因:Nginx代理webSocket的时候就会遇到Nginx的设计问题 End-to-end and Hop-by-hop Headers
设置文件打开句柄数
ulimit -n 1000000
设置sockets连接参数
vim /etc/sysctl.conf
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_tw_recycle = 0
待压测,如果大家有压测的结果欢迎补充
后续会出专门的教程,从申请机器、写压测用例、内核优化、得出压测数据
关于压测请移步 go-stress-testing,从申请机器开始,优化内核,部署项目压测,解释压测的原理
在线用户数
cup
内存
I/O
net.out
1W
10W
100W
参考本项目源码
为了方便演示,IM系统和webSocket(acc)系统合并在一个系统中
IM系统接口:
获取全部在线的用户,查询单前服务的全部用户+集群中服务的全部用户
发送消息,这里采用的是http接口发送(微信网页版发送消息也是http接口),这里考虑主要是两点:
1.服务分离,让acc系统尽量的简单一点,不掺杂其它业务逻辑
2.发送消息是走http接口,不使用webSocket连接,才用收和发送数据分离的方式,可以加快收发数据的效率
用水平部署两个项目(gowebsocket和gowebsocket1)演示分部署
项目之间如何相互通讯:项目启动以后将项目Ip、rpcPort注册到redis中,让其它项目可以发现,需要通讯的时候使用gRpc进行通讯
gowebsocket
app:
logFile: log/gin.log
httpPort: 8080
webSocketPort: 8089
rpcPort: 9001
httpUrl: im.91vh.com
webSocketUrl: im.91vh.com
go run main.go
gowebsocket1
cp -rf gowebsocket gowebsocket1
app:
logFile: log/gin.log
httpPort: 8081
webSocketPort: 8090
rpcPort: 9002
httpUrl: im.91vh.com
webSocketUrl: im.91vh.com
go run main.go
Nginx配置
在之前Nginx配置项中添加第二台机器的Ip和端口
upstream go-im
{
server 127.0.0.1:8080 weight=1 max_fails=2 fail_timeout=10s;
server 127.0.0.1:8081 weight=1 max_fails=2 fail_timeout=10s;
keepalive 16;
}
upstream go-acc
{
server 127.0.0.1:8089 weight=1 max_fails=2 fail_timeout=10s;
server 127.0.0.1:8090 weight=1 max_fails=2 fail_timeout=10s;
keepalive 16;
}
查看请求是否落在两个项目上
实验两个用户分别连接不同的项目(gowebsocket和gowebsocket1)是否也可以相互发送消息
本项目只是演示了这个项目如何分布式部署,以及分布式部署以后模块如何进行相互通讯
完全解决系统没有单点的故障,还需 Nginx集群、redis cluster等
IM实现细节:
go-stress-testing 单台机器100w连接压测实战
github 搜:link1st 查看项目 gowebsocket
手机扫一扫
移动阅读更方便
你可能感兴趣的文章