HotGo--WebSocket,消息处理,心跳,在线用户
=============================
本篇文章记录了关于 HotGo 中使用 WebSocket 的使用,WebSocket 的创建,事件的监听,心跳,在线用户,涉及到前端和后端的代码,整体上都是 go 和 js 的代码,vue3 鲜有涉及。
后端连接准备
后端需要先注册路由,使用 gin 的中间件做登录验证,然后等待前端来访问,现在就从路由看起
注册路由
// internal/router/websocket.go
group.GET("/", websocket.WsPage)
注册接收事件
// internal/router/websocket.go
websocket.RegisterMsg(websocket.EventHandlers{
"ping": common.Site.Ping, // 心跳
"join": common.Site.Join, // 加入组
"quit": common.Site.Quit, // 退出组
"admin/monitor/trends": admin.Monitor.Trends, // 后台监控,动态数据
"admin/monitor/runInfo": admin.Monitor.RunInfo, // 后台监控,运行信息
})
因为 WebSocket 也有很多类事件,比如心跳事件,监控数据事件等,需要一个字段来区分,这里时用 map 来注册不同的事件类型,通过 event 这个 key 来区分,比如 { event: ping }, 代表的时候心跳事件。
// internal/websocket/init.go
// WsPage ws入口
func WsPage(r *ghttp.Request) {
upGrader := websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true
},
}
conn, err := upGrader.Upgrade(r.Response.Writer, r.Request, nil)
if err != nil {
return
}
currentTime := uint64(gtime.Now().Unix())
client := NewClient(r, conn, currentTime)
go client.read()
go client.write()
// 用户连接事件
clientManager.Register <- client
}
接收到 WebSocket 请求以后,通过 WsPage 函数将协议升级为 WebSocket 协议,并创建一个 client 对象,然后通过两个 goroutine 协程 read 和 write 来处理这个 client 的 socket 消息。
// internal/websocket/client.go
// read 函数
for {
_, message, err := c.Socket.ReadMessage()
if err != nil {
return
}
// 处理消息
handlerMsg(c, message)
}
read 函数中有一个 for 循环,一直在读消息,读到之后就转到 handlerMsg 中处理来自客户端的消息。
// internal/websocket/client.go
// write 函数
for {
select {
case <-c.closeSignal:
g.Log().Infof(mctx, "websocket client quit, user:%+v", c.User)
return
case message, ok := <-c.Send:
if !ok {
// 发送数据错误 关闭连接
g.Log().Warningf(mctx, "client write message, user:%+v", c.User)
return
}
_ = c.Socket.WriteJSON(message)
}
}
write 函数中,通过 for select + channel 的组合来建立了一个消息管道,等待要发送的消息,如果要发送消息,就往这个管道中发送就可以了,这个管道就是 c.Send, 是一个 chan 的指针类型。
前端创建 WebSocket 实例,并连接
前端创建 WebSocket 实例,连接的地址后面带上 token,做认证用,token 是登录时获取的 jwt,里面有部分用户身份信息。连接之后,收发数据使用的是 WebSocket 协议。无论是 HTTP 还是 WebSocket,都是基于 tcp 来传输数据的,这一点大家要清楚。
注意在建立 WebSocket 连接之前,客户端会先发送一个 HTTP 请求到服务器,请求升级到 WebSocket 协议。这个请求会包含一些特殊的头部字段,例如 Upgrade 和 Sec-WebSocket-Key。服务器收到请求后,会进行验证并返回响应。如果验证成功,服务器会返回一个 101 状态码,并包含一些用于建立 WebSocket 连接的参数,例如 Sec-WebSocket-Accept。客户端收到响应后,就完成了 WebSocket 协议的握手升级。
// src\utils\websocket\index.ts
socket = new WebSocket(`${useUserStore.config?.wsAddr}?authorization=${useUserStore.token}`);
init();
在上面的 init 的函数中,注册了 socket 的几个关键回调函数,来处理业务逻辑,onopen、onmessage、onclose、onerror。
心跳包
作者在 index.ts 中创建了一个 heartCheck 对象,用于管理心跳,每隔 5s 发送一个心跳,心跳的内容只有一个简单的json内容:{“event”:”ping”},同样,返回的结构也很简单 {“event”:”ping”,”code”:0,”timestamp”:1715670733}。这个过程中还有一些细节的控制,比如发出心跳后,5s 内服务端没有返回,就关闭 WebSocket,关闭之后,就会尝试重连等。
const heartCheck = {
timeout: 5000,
// ...
start: function () {
// eslint-disable-next-line @typescript-eslint/no-this-alias
const self = this;
clearTimeout(this.timeoutObj);
clearTimeout(this.serverTimeoutObj);
this.timeoutObj = setTimeout(function () {
socket.send(
JSON.stringify({
event: SocketEnum.EventPing,
})
);
self.serverTimeoutObj = setTimeout(function () {
console.log('[WebSocket] 关闭服务');
socket.close();
}, self.timeout);
}, this.timeout);
},
};
服务端在接收到心跳 ping 以后,也做出了简单的回应,注意这个事件是在 controller 层开始接收处理的,通过 SendSuccess 调用到 websocket 模块
// internal/controller/websocket/handler/common/site.go
func (c *cSite) Ping(client *websocket.Client, req *websocket.WRequest) {
websocket.SendSuccess(client, req.Event)
}
// SendSuccess 发送成功消息
func SendSuccess(client *Client, event string, data ...interface{}) {
d := interface{}(nil)
if len(data) > 0 {
d = data[0]
}
client.SendMsg(&WResponse{
Event: event,
Data: d,
Code: gcode.CodeOK.Code(),
Timestamp: gtime.Now().Unix(),
})
before(client)
}
func before(client *Client) {
client.Heartbeat(uint64(gtime.Now().Unix()))
}
在 Heartbeat 函数中更新了,当前 client 的心跳时间,以便下次计算超时,下线等功能。
// internal/router/websocket.go
// 启动websocket监听
websocket.Start()
// internal/websocket/init.go
// Start 启动
func Start() {
go clientManager.start()
go clientManager.ping()
g.Log().Debug(mctx, "start websocket..")
}
// internal/websocket/client_manager.go
// 定时任务,清理超时连接
_, _ = gcron.Add(mctx, "*/30 * * * * *", func(ctx context.Context) {
manager.clearTimeoutConnections()
})
我们可以看到,这里也是通过一个 goroutine 启动了一个异步任务,在 client_manager.go 中,每隔 30s 清理一次已经超时的 client,将超时的连接关闭。
在线用户
在系统监控-在线用户,可以看到在线的用户列表,那么这是怎么实现的呢,在线用户的数据其实都保存在 ClientManager 中,在请求时,直接返回即可,具体代码:
// internal/websocket/init.go
// Start 启动
func Start() {
go clientManager.start()
go clientManager.ping()
g.Log().Debug(mctx, "start websocket..")
}
// internal/websocket/client_manager.go
// 管道处理程序
func (manager *ClientManager) start() {
for {
select {
case conn := <-manager.Register:
// 建立连接事件
manager.EventRegister(conn)
case login := <-manager.Login:
// 用户登录
manager.EventLogin(login)
case conn := <-manager.Unregister:
// 断开连接事件
manager.EventUnregister(conn)
case message := <-manager.Broadcast:
// 全部客户端广播事件
clients := manager.GetClients()
for conn := range clients {
conn.SendMsg(message)
}
// 其他广播事件...
}
在这个 start 函数中,会注册监听很多事件,比如 Register,Unregister,就会影响到 client 的增加与减少,这些用户数据就是通过 ClientManager 中的 Clients 获取的。
// internal/websocket/client_manager.go
// GetClients 获取所有客户端
func (manager *ClientManager) GetClients() (clients map[*Client]bool) {
clients = make(map[*Client]bool)
manager.ClientsRange(func(client *Client, value bool) (result bool) {
clients[client] = value
return true
})
return
}
更多关于 clients 的操作都可以在 client_manager.go 中找到。
另外,除了这些,WebSocket 还可以向用户发送消息,就像上面代码中提到的广播事件。
原文链接: https://juejin.cn/post/7368692841297477686
文章收集整理于网络,请勿商用,仅供个人学习使用,如有侵权,请联系作者删除,如若转载,请注明出处:http://www.cxyroad.com/17436.html