当前位置:   article > 正文

百万级并发 - IM项目实战_im实战

im实战

百万级并发 - IM项目实战

需求分析:

项目目的:

项目背景:IM对性能和体验敏感度非常高 。 大厂必备

你将获得什么:

熟悉开发流程 ,熟练相关技术栈 gin+GORM+swagger + logrus auth 等中间件,三高性能

核心功能:

​ 发送和接受消息,文字 表情 图片 音频 ,访客,点对点,群聊 ,广播,快捷回复,撤回,心跳检测…

技术栈:

​ 前端 后端 (webSocket ,channel/goroutine ,gin ,temlate,gorm ,sql,nosql,mq…)

系统架构:

​ 四层:前端,接入层,逻辑层,持久层

消息发送流程:

​ A > 登录> 鉴权>(游客) > 消息类型 >(群/广播) > B

环境搭建:

​ go version go1.17.8 windows/amd64

​ set GO111MODULE=on

​ go mod init go_exam

​ go mod tidy

系统架构

image-1666343867758

核心流程:

image-1666343894583

image-1666343928806

image-1666343946893

效果展示:

image-1666344179175

image-1666344192229

image-1666344202937

image-1666344215099

image-1666344227541

image-1666344237243

image-1666344247334

image-1666344268342

image-1666344280635

image-1666344295006

部分项目代码展示

image-1666344380115

image-1666344404391

image-1666344418358

image-1666344432655

功能实现

完成用户模块基本的

加入修改电话号码和邮箱 并校验

先引入

  1. get github.com/asaskevich/govalidator
  2. 结构体字段后面 加检验规则
  3. 最后service govalidator.ValidatorStrut(user)

1.router包 app.go

  1. r.GET("/user/getUserList", service.GetUserList)
  2. r.GET("/user/createUser", service.CreateUser)
  3. r.GET("/user/deleteUser", service.DeleteUser)
  4. r.POST("/user/updateUser", service.UpdateUser)

2.service 包 userservice.go

  1. // GetUserList
  2. // @Summary 所有用户
  3. // @Tags 用户模块
  4. // @Success 200 {string} json{"code","message"}
  5. // @Router /user/getUserList [get]
  6. func GetUserList(c *gin.Context) {
  7. data := make([]*models.UserBasic, 10)
  8. data = models.GetUserList()
  9. c.JSON(200, gin.H{
  10. "message": data,
  11. })
  12. }
  13. // CreateUser
  14. // @Summary 新增用户
  15. // @Tags 用户模块
  16. // @param name query string false "用户名"
  17. // @param password query string false "密码"
  18. // @param repassword query string false "确认密码"
  19. // @Success 200 {string} json{"code","message"}
  20. // @Router /user/createUser [get]
  21. func CreateUser(c *gin.Context) {
  22. user := models.UserBasic{}
  23. user.Name = c.Query("name")
  24. password := c.Query("password")
  25. repassword := c.Query("repassword")
  26. if password != repassword {
  27. c.JSON(-1, gin.H{
  28. "message": "两次密码不一致!",
  29. })
  30. return
  31. }
  32. user.PassWord = password
  33. models.CreateUser(user)
  34. c.JSON(200, gin.H{
  35. "message": "新增用户成功!",
  36. })
  37. }
  38. // DeleteUser
  39. // @Summary 删除用户
  40. // @Tags 用户模块
  41. // @param id query string false "id"
  42. // @Success 200 {string} json{"code","message"}
  43. // @Router /user/deleteUser [get]
  44. func DeleteUser(c *gin.Context) {
  45. user := models.UserBasic{}
  46. id, _ := strconv.Atoi(c.Query("id"))
  47. user.ID = uint(id)
  48. models.DeleteUser(user)
  49. c.JSON(200, gin.H{
  50. "message": "删除用户成功!",
  51. })
  52. }
  53. // UpdateUser
  54. // @Summary 修改用户
  55. // @Tags 用户模块
  56. // @param id formData string false "id"
  57. // @param name formData string false "name"
  58. // @param password formData string false "password"
  59. // @param phone formData string false "phone"
  60. // @param email formData string false "email"
  61. // @Success 200 {string} json{"code","message"}
  62. // @Router /user/updateUser [post]
  63. func UpdateUser(c *gin.Context) {
  64. user := models.UserBasic{}
  65. id, _ := strconv.Atoi(c.PostForm("id"))
  66. user.ID = uint(id)
  67. user.Name = c.PostForm("name")
  68. user.PassWord = c.PostForm("password")
  69. user.Phone = c.PostForm("phone")
  70. user.Email = c.PostForm("email")
  71. fmt.Println("update :", user)
  72. _, err := govalidator.ValidateStruct(user)
  73. if err != nil {
  74. fmt.Println(err)
  75. c.JSON(200, gin.H{
  76. "message": "修改参数不匹配!",
  77. })
  78. } else {
  79. models.UpdateUser(user)
  80. c.JSON(200, gin.H{
  81. "message": "修改用户成功!",
  82. })
  83. }
  84. }

3.modesl包 user_basic.go

  1. Phone string `valid:"matches(^1[3-9]{1}\\d{9}$)"`
  2. Email string `valid:"email"`

4,然后测试 

重复注册校验:

  1. func FindUserByName(name string) UserBasic {
  2. user := UserBasic{}
  3. utils.DB.Where("name = ?", name).First(&user)
  4. return user
  5. }
  6. func FindUserByPhone(phone string) *gorm.DB {
  7. user := UserBasic{}
  8. return utils.DB.Where("Phone = ?", phone).First(&user)
  9. }
  10. func FindUserByEmail(email string) *gorm.DB {
  11. user := UserBasic{}
  12. return utils.DB.Where("email = ?", email).First(&user)
  13. }
  14. 再到service层 加入判断
  15. data := models.FindUserByName(user.Name)
  16. if data.Name != "" {
  17. c.JSON(-1, gin.H{
  18. "message": "用户名已注册!",
  19. })
  20. return
  21. }

注册 加密操作

  1. package utils
  2. import (
  3. "crypto/md5"
  4. "encoding/hex"
  5. "fmt"
  6. "strings"
  7. )
  8. //小写
  9. func Md5Encode(data string) string {
  10. h := md5.New()
  11. h.Write([]byte(data))
  12. tempStr := h.Sum(nil)
  13. return hex.EncodeToString(tempStr)
  14. }
  15. //大写
  16. func MD5Encode(data string) string {
  17. return strings.ToUpper(Md5Encode(data))
  18. }
  19. //加密
  20. func MakePassword(plainpwd, salt string) string {
  21. return Md5Encode(plainpwd + salt)
  22. }
  23. //解密
  24. func ValidPassword(plainpwd, salt string, password string) bool {
  25. md := Md5Encode(plainpwd + salt)
  26. fmt.Println(md + " " + password)
  27. return md == password
  28. }
  29. service层 判断之后加入
  30. //user.PassWord = password
  31. user.PassWord = utils.MakePassword(password, salt)
  32. user.Salt = salt //表更新了字段 db.AutoMigrate(&models.UserBasic{})
  33. fmt.Println(user.PassWord)
  34. models.CreateUser(user)

登录解密 :

  1. //dao层
  2. func FindUserByNameAndPwd(name string, password string) UserBasic {
  3. user := UserBasic{}
  4. utils.DB.Where("name = ? and pass_word=?", name, password).First(&user)
  5. return user
  6. }
  7. // GetUserList
  8. // @Summary 所有用户
  9. // @Tags 用户模块
  10. // @param name query string false "用户名"
  11. // @param password query string false "密码"
  12. // @Success 200 {string} json{"code","message"}
  13. // @Router /user/findUserByNameAndPwd [get]
  14. func FindUserByNameAndPwd(c *gin.Context) {
  15. data := models.UserBasic{}
  16. name := c.Query("name")
  17. password := c.Query("password")
  18. user := models.FindUserByName(name)
  19. if user.Name == "" {
  20. c.JSON(200, gin.H{
  21. "message": "该用户不存在",
  22. })
  23. return
  24. }
  25. flag := utils.ValidPassword(password, user.Salt, user.PassWord)
  26. if !flag {
  27. c.JSON(200, gin.H{
  28. "message": "密码不正确",
  29. })
  30. return
  31. }
  32. pwd := utils.MakePassword(password, user.Salt)
  33. data = models.FindUserByNameAndPwd(name, pwd)
  34. c.JSON(200, gin.H{
  35. "message": data,
  36. })
  37. }
  38. router层 :
  39. r.POST("/user/findUserByNameAndPwd", service.FindUserByNameAndPwd)

token的加入对返回的结构调整 。

修改登录的方法:

  1. func FindUserByNameAndPwd(name string, password string) UserBasic {
  2. user := UserBasic{}
  3. utils.DB.Where("name = ? and pass_word=?", name, password).First(&user)
  4. //token加密
  5. str := fmt.Sprintf("%d", time.Now().Unix())
  6. temp := utils.MD5Encode(str)
  7. utils.DB.Model(&user).Where("id = ?", user.ID).Update("identity", temp)
  8. return user
  9. }
  10. // 返回的结果:
  11. c.JSON(200, gin.H{
  12. "code": 0, // 0成功 -1失败
  13. "message": "修改用户成功!",
  14. "data": user,
  15. })

加入Redis

go get github.com/go-redis/redis

配置redis

  1. redis:
  2. addr: "192.168.137.131:6379"
  3. password: ""
  4. DB: 0
  5. poolSize: 30
  6. minIdleConn: 30

然后main方法中

utils.InitRedis()

最后再 utils

  1. func InitRedis() {
  2. Red = redis.NewClient(&redis.Options{
  3. Addr: viper.GetString("redis.addr"),
  4. Password: viper.GetString("redis.password"),
  5. DB: viper.GetInt("redis.DB"),
  6. PoolSize: viper.GetInt("redis.poolSize"),
  7. MinIdleConns: viper.GetInt("redis.minIdleConn"),
  8. })
  9. pong, err := Red.Ping().Result()
  10. if err != nil {
  11. fmt.Println("init redis 。。。。", err)
  12. } else {
  13. fmt.Println(" Redis inited 。。。。", pong)
  14. }
  15. }

测试看是否正常

通过WebSocket通信

  1. go get github.com/gorilla/websocket
  2. go get github.com/go-redis/redis/v8
  3. package utils
  4. import (
  5. "context"
  6. "fmt"
  7. "log"
  8. "os"
  9. "time"
  10. "github.com/go-redis/redis/v8"
  11. "github.com/spf13/viper"
  12. "gorm.io/driver/mysql"
  13. "gorm.io/gorm"
  14. "gorm.io/gorm/logger"
  15. )
  16. var (
  17. DB *gorm.DB
  18. Red *redis.Client
  19. )
  20. func InitConfig() {
  21. viper.SetConfigName("app")
  22. viper.AddConfigPath("config")
  23. err := viper.ReadInConfig()
  24. if err != nil {
  25. fmt.Println(err)
  26. }
  27. fmt.Println("config app inited 。。。。")
  28. }
  29. func InitMySQL() {
  30. //自定义日志模板 打印SQL语句
  31. newLogger := logger.New(
  32. log.New(os.Stdout, "\r\n", log.LstdFlags),
  33. logger.Config{
  34. SlowThreshold: time.Second, //慢SQL阈值
  35. LogLevel: logger.Info, //级别
  36. Colorful: true, //彩色
  37. },
  38. )
  39. DB, _ = gorm.Open(mysql.Open(viper.GetString("mysql.dns")),
  40. &gorm.Config{Logger: newLogger})
  41. fmt.Println(" MySQL inited 。。。。")
  42. //user := models.UserBasic{}
  43. //DB.Find(&user)
  44. //fmt.Println(user)
  45. }
  46. func InitRedis() {
  47. Red = redis.NewClient(&redis.Options{
  48. Addr: viper.GetString("redis.addr"),
  49. Password: viper.GetString("redis.password"),
  50. DB: viper.GetInt("redis.DB"),
  51. PoolSize: viper.GetInt("redis.poolSize"),
  52. MinIdleConns: viper.GetInt("redis.minIdleConn"),
  53. })
  54. }
  55. const (
  56. PublishKey = "websocket"
  57. )
  58. //Publish 发布消息到Redis
  59. func Publish(ctx context.Context, channel string, msg string) error {
  60. var err error
  61. fmt.Println("Publish 。。。。", msg)
  62. err = Red.Publish(ctx, channel, msg).Err()
  63. if err != nil {
  64. fmt.Println(err)
  65. }
  66. return err
  67. }
  68. //Subscribe 订阅Redis消息
  69. func Subscribe(ctx context.Context, channel string) (string, error) {
  70. sub := Red.Subscribe(ctx, channel)
  71. fmt.Println("Subscribe 。。。。", ctx)
  72. msg, err := sub.ReceiveMessage(ctx)
  73. if err != nil {
  74. fmt.Println(err)
  75. return "", err
  76. }
  77. fmt.Println("Subscribe 。。。。", msg.Payload)
  78. return msg.Payload, err
  79. }
  80. userservice.go中加入
  81. //防止跨域站点伪造请求
  82. var upGrader = websocket.Upgrader{
  83. CheckOrigin: func(r *http.Request) bool {
  84. return true
  85. },
  86. }
  87. func SendMsg(c *gin.Context) {
  88. ws, err := upGrader.Upgrade(c.Writer, c.Request, nil)
  89. if err != nil {
  90. fmt.Println(err)
  91. return
  92. }
  93. defer func(ws *websocket.Conn) {
  94. err = ws.Close()
  95. if err != nil {
  96. fmt.Println(err)
  97. }
  98. }(ws)
  99. MsgHandler(c, ws)
  100. }
  101. func MsgHandler(c *gin.Context, ws *websocket.Conn) {
  102. for {
  103. msg, err := utils.Subscribe(c, utils.PublishKey)
  104. if err != nil {
  105. fmt.Println(" MsgHandler 发送失败", err)
  106. }
  107. tm := time.Now().Format("2006-01-02 15:04:05")
  108. m := fmt.Sprintf("[ws][%s]:%s", tm, msg)
  109. err = ws.WriteMessage(1, []byte(m))
  110. if err != nil {
  111. log.Fatalln(err)
  112. }
  113. }
  114. }
  115. router层 app.go
  116. //发送消息
  117. r.GET("/user/sendMsg", service.SendMsg)

测试: http://www.jsons.cn/websocket/

ws://localhost:8081/user/sendMsg

设计 关系表 ,群信息表 ,消息表

  1. package models
  2. import "gorm.io/gorm"
  3. //消息
  4. type Message struct {
  5. gorm.Model
  6. FormId uint //发送者
  7. TargetId uint //接受者
  8. Type string //消息类型 群聊 私聊 广播
  9. Media int //消息类型 文字 图片 音频
  10. Content string //消息内容
  11. Pic string
  12. Url string
  13. Desc string
  14. Amount int //其他数字统计
  15. }
  16. func (table *Message) TableName() string {
  17. return "message"
  18. }
  19. package models
  20. import "gorm.io/gorm"
  21. //群信息
  22. type GroupBasic struct {
  23. gorm.Model
  24. Name string
  25. OwnerId uint
  26. Icon string
  27. Type int
  28. Desc string
  29. }
  30. func (table *GroupBasic) TableName() string {
  31. return "group_basic"
  32. }
  33. package models
  34. import "gorm.io/gorm"
  35. //人员关系
  36. type Contact struct {
  37. gorm.Model
  38. OwnerId uint //谁的关系信息
  39. TargetId uint //对应的谁
  40. Type int //对应的类型 0 1 3
  41. Desc string
  42. }
  43. func (table *Contact) TableName() string {
  44. return "contact"
  45. }

发送消息 接受消息

​ 需要 :发送者ID ,接受者ID ,消息类型,发送的内容,发送类型

​ 校验token ,关系 ,

  1. package models
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "net"
  6. "net/http"
  7. "strconv"
  8. "sync"
  9. "github.com/gorilla/websocket"
  10. "gopkg.in/fatih/set.v0"
  11. "gorm.io/gorm"
  12. )
  13. //消息
  14. type Message struct {
  15. gorm.Model
  16. FormId int64 //发送者
  17. TargetId int64 //接受者
  18. Type int //发送类型 群聊 私聊 广播
  19. Media int //消息类型 文字 图片 音频
  20. Content string //消息内容
  21. Pic string
  22. Url string
  23. Desc string
  24. Amount int //其他数字统计
  25. }
  26. func (table *Message) TableName() string {
  27. return "message"
  28. }
  29. type Node struct {
  30. Conn *websocket.Conn
  31. DataQueue chan []byte
  32. GroupSets set.Interface
  33. }
  34. //映射关系
  35. var clientMap map[int64]*Node = make(map[int64]*Node, 0)
  36. //读写锁
  37. var rwLocker sync.RWMutex
  38. // 需要 :发送者ID ,接受者ID ,消息类型,发送的内容,发送类型
  39. func Chat(writer http.ResponseWriter, request *http.Request) {
  40. //1. 获取参数 并 检验 token 等合法性
  41. //token := query.Get("token")
  42. query := request.URL.Query()
  43. Id := query.Get("userId")
  44. userId, _ := strconv.ParseInt(Id, 10, 64)
  45. //msgType := query.Get("type")
  46. //targetId := query.Get("targetId")
  47. // context := query.Get("context")
  48. isvalida := true //checkToke() 待.........
  49. conn, err := (&websocket.Upgrader{
  50. //token 校验
  51. CheckOrigin: func(r *http.Request) bool {
  52. return isvalida
  53. },
  54. }).Upgrade(writer, request, nil)
  55. if err != nil {
  56. fmt.Println(err)
  57. return
  58. }
  59. //2.获取conn
  60. node := &Node{
  61. Conn: conn,
  62. DataQueue: make(chan []byte, 50),
  63. GroupSets: set.New(set.ThreadSafe),
  64. }
  65. //3. 用户关系
  66. //4. userid 跟 node绑定 并加锁
  67. rwLocker.Lock()
  68. clientMap[userId] = node
  69. rwLocker.Unlock()
  70. //5.完成发送逻辑
  71. go sendProc(node)
  72. //6.完成接受逻辑
  73. go recvProc(node)
  74. sendMsg(userId, []byte("欢迎进入聊天系统"))
  75. }
  76. func sendProc(node *Node) {
  77. for {
  78. select {
  79. case data := <-node.DataQueue:
  80. err := node.Conn.WriteMessage(websocket.TextMessage, data)
  81. if err != nil {
  82. fmt.Println(err)
  83. return
  84. }
  85. }
  86. }
  87. }
  88. func recvProc(node *Node) {
  89. for {
  90. _, data, err := node.Conn.ReadMessage()
  91. if err != nil {
  92. fmt.Println(err)
  93. return
  94. }
  95. broadMsg(data)
  96. fmt.Println("[ws] <<<<< ", data)
  97. }
  98. }
  99. var udpsendChan chan []byte = make(chan []byte, 1024)
  100. func broadMsg(data []byte) {
  101. udpsendChan <- data
  102. }
  103. func init() {
  104. go udpSendProc()
  105. go udpRecvProc()
  106. }
  107. //完成udp数据发送协程
  108. func udpSendProc() {
  109. con, err := net.DialUDP("udp", nil, &net.UDPAddr{
  110. IP: net.IPv4(192, 168, 0, 255),
  111. Port: 3000,
  112. })
  113. defer con.Close()
  114. if err != nil {
  115. fmt.Println(err)
  116. }
  117. for {
  118. select {
  119. case data := <-udpsendChan:
  120. _, err := con.Write(data)
  121. if err != nil {
  122. fmt.Println(err)
  123. return
  124. }
  125. }
  126. }
  127. }
  128. //完成udp数据接收协程
  129. func udpRecvProc() {
  130. con, err := net.ListenUDP("udp", &net.UDPAddr{
  131. IP: net.IPv4zero,
  132. Port: 3000,
  133. })
  134. if err != nil {
  135. fmt.Println(err)
  136. }
  137. defer con.Close()
  138. for {
  139. var buf [512]byte
  140. n, err := con.Read(buf[0:])
  141. if err != nil {
  142. fmt.Println(err)
  143. return
  144. }
  145. dispatch(buf[0:n])
  146. }
  147. }
  148. //后端调度逻辑处理
  149. func dispatch(data []byte) {
  150. msg := Message{}
  151. err := json.Unmarshal(data, &msg)
  152. if err != nil {
  153. fmt.Println(err)
  154. return
  155. }
  156. switch msg.Type {
  157. case 1: //私信
  158. sendMsg(msg.TargetId, data)
  159. // case 2: //群发
  160. // sendGroupMsg()
  161. // case 3://广播
  162. // sendAllMsg()
  163. //case 4:
  164. //
  165. }
  166. }
  167. func sendMsg(userId int64, msg []byte) {
  168. rwLocker.RLock()
  169. node, ok := clientMap[userId]
  170. rwLocker.RUnlock()
  171. if ok {
  172. node.DataQueue <- msg
  173. }
  174. }

集成html 登录和注册

  1. //app.go 加入
  2. //首页
  3. r.GET("/", service.GetIndex)
  4. r.GET("/index", service.GetIndex)
  5. r.GET("/toRegister", service.ToRegister)
  6. // index.go
  7. package service
  8. import (
  9. "text/template"
  10. "github.com/gin-gonic/gin"
  11. )
  12. // GetIndex
  13. // @Tags 首页
  14. // @Success 200 {string} welcome
  15. // @Router /index [get]
  16. func GetIndex(c *gin.Context) {
  17. ind, err := template.ParseFiles("index.html", "views/chat/head.html")
  18. if err != nil {
  19. panic(err)
  20. }
  21. ind.Execute(c.Writer, "index")
  22. // c.JSON(200, gin.H{
  23. // "message": "welcome !! ",
  24. // })
  25. }
  26. func ToRegister(c *gin.Context) {
  27. ind, err := template.ParseFiles("views/user/register.html")
  28. if err != nil {
  29. panic(err)
  30. }
  31. ind.Execute(c.Writer, "register")
  32. // c.JSON(200, gin.H{
  33. // "message": "welcome !! ",
  34. // })
  35. }

然后页面 :

  1. <!DOCTYPE html>
  2. <html>
  3. <head>
  4. <!--js include-->
  5. {
  6. {template "/chat/head.shtml"}}
  7. </head>
  8. <body>
  9. <header class="mui-bar mui-bar-nav">
  10. <h1 class="mui-title">登录</h1>
  11. </header>
  12. {
  13. {.}}
  14. <div class
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/858773
推荐阅读
相关标签
  

闽ICP备14008679号