赞
踩
- type Message struct {
- MsgId int64 `json:"msg_id,omitempty" form:"msg_id" bson:"msg_id"` // 消息ID
- OwnerId string `json:"owner_id,omitempty" form:"owner_id" bson:"owner_id"` // 己方,发出者
- OtherId string `json:"other_id,omitempty" form:"other_id" bson:"other_id"` // 对方,接收者
- MsgType int `json:"msg_type,omitempty" form:"msg_type" bson:"msg_type"` // 消息类型
- Media int `json:"media,omitempty" form:"media" bson:"media"` // 消息按什么样式展示
- Content string `json:"content,omitempty" form:"content" bson:"content"` // 消息内容
- Pic string `json:"pic,omitempty" form:"pic" bson:"pic"` // 预览图片
- Url string `json:"url,omitempty" form:"url" bson:"url"` // 图片地址
- Desc string `json:"desc,omitempty" form:"desc" bson:"desc"` // 描述内容
- CreateTime int64 `json:"create_time" form:"create_time" bson:"create_time"` // 消息时间
- }
其中消息类型,以及消息的展示形式如下
- const (
- // type 消息类型
- MSG_TYPE_HEART = 1 // 心跳消息
- MSG_TYPE_SINGLE = 2 // 单聊消息
- MSG_TYPE_ROOM = 3 // 群聊消息
- MSG_TYPE_ACK = 4 // 应答消息
- Msg_TYPE_OTHER = 4 // 其他业务消息
-
- // media 消息展示样式
- MSG_MEDIA_TEXT = 1 // 文本
- MSG_MEDIA_IMAGE = 2 // 图片
- MSG_MEDIA_FILE = 3 // 文件
- )
- type Client struct {
- Uid string // 用户id
- Manager *Manager // 对应管理者
- Conn *websocket.Conn // websocket 连接
- Send chan []byte // 待发送出去的消息
- }
-
- // ReadPump 等待用户发来消息,并将消息转交到Manager,由Manager负责发给对方或存储为离线消息
- func (c *Client) ReadPump() {
- defer func() {
- c.Manager.Unregister <- c
- c.Conn.Close()
- }()
- c.Conn.SetReadLimit(maxMessageSize)
- c.Conn.SetReadDeadline(time.Now().Add(pongWait))
- c.Conn.SetPongHandler(func(string) error { c.Conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
- for {
- _, message, err := c.Conn.ReadMessage()
- if err != nil {
- if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
- log.Printf("error: %v", err)
- }
- break
- }
- message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
- c.Manager.MsgBuff <- message
- }
- }
-
- // WritePump 待发送消息通道收到消息,直接将消息发送给客户端
- func (c *Client) WritePump() {
- // 定时向客户端发送ping消息,属于一种心跳保活机制
- // 活跃用户基数庞大的IM中,一般由客户端发起,减小服务端压力
- ticker := time.NewTicker(pingPeriod)
-
- defer func() {
- ticker.Stop()
- c.Manager.Unregister <- c
- c.Conn.Close()
- }()
- for {
- select {
- case message, ok := <-c.Send:
- c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
- if !ok {
- c.Conn.WriteMessage(websocket.CloseMessage, []byte{})
- return
- }
-
- w, err := c.Conn.NextWriter(websocket.TextMessage)
- if err != nil {
- return
- }
- w.Write(message)
-
- // Add queued chat messages to the current websocket message
- n := len(c.Send)
- for i := 0; i < n; i++ {
- w.Write(newline)
- w.Write(<-c.Send)
- }
-
- if err := w.Close(); err != nil {
- return
- }
- case <-ticker.C:
- c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
- if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil {
- return
- }
- }
- }
- }

- // 管理员,负责管理client
- type Manager struct {
- Clients map[string]*Client // 客户端列表
- Register chan *Client // 注册器,当有新连接时,注册到客户端列表
- Unregister chan *Client // 注销器,发现有连接断开时,将其从客户端列表中移除
- MsgBuff chan []byte // 入站消息队列
- }
-
- // Run 启动管理
- func (m *Manager) Run() {
- for {
- select {
- case client := <-m.Register: // 注册
- m.Clients[client.Uid] = client
- case client := <-m.Unregister: // 注销
- if _, ok := m.Clients[client.Uid]; ok {
- delete(m.Clients, client.Uid)
- close(client.Send)
- }
- case msg := <-m.MsgBuff:
- var message model.Message
- if err := json.Unmarshal(msg, &message); err != nil {
- fmt.Println("消息解析失败:", string(msg))
- break
- }
-
- // 对方在线,或者对方离线
- if client, ok := m.Clients[message.OtherId]; ok {
- client.Send <- msg
- } else {
- fmt.Println("other down")
- }
- }
- }
- }

用户上线后,就会与服务端连接,产生一个Client,并给到Manager统一管理;
- func WsHandler(c *gin.Context, m *service.Manager) {
- uid := c.Query("uid")
- conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
- if err != nil {
- http.NotFound(c.Writer, c.Request)
- return
- }
-
- client := &service.Client{
- Uid: uid,
- Manager: m,
- Conn: conn,
- Send: make(chan []byte, 256),
- }
-
- fmt.Println("有用户上线:", uid)
-
- // 将client注册到Manager
- client.Manager.Register <- client
-
- // new goroutines.
- go client.WritePump()
- go client.ReadPump()
- }

用户发送消息,Client收到客户端发来的消息,将消息转交给Manager(c.Manager.MsgBuff <- message),再由Manager找到对方Client,将消息发出去
- // 对方在线,或者对方离线
- if client, ok := m.Clients[message.OtherId]; ok {
- client.Send <- msg
- } else {
- fmt.Println("other down")
- }
Message、Client、Manager之间的关系这里就讲清楚了,对片段代码不是很理解,这个项目结束会将项目代码放到码云,请关注后续。
IM即时通讯-核心结构体设计(四)<本文>
IM即时通讯-消息id(五)
IM即时通讯-会话列表和会话信箱(六)
IM即时通讯-1.0版成果展示与后续扩展(七)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。