当前位置:   article > 正文

golang语言开发sse(server-send event)_golang sse client

golang sse client

 服务端发布消息,推送给在线用户

1、先定义通知管道配置和消息写入管道接口

  1. package sse
  2. // 消息管道
  3. type SseChannel struct {
  4. ChannelId string // 消息管道id
  5. ChannelChan chan string // 消息发送的chan
  6. }
  7. // 存储用户与该用户的消息通道,(key->value userId->消息管道列表)
  8. // 管道列表之所以是多个,是因为一个用户可能打开多个页面,可以建议多个管道,
  9. // 当关闭其中一个页面时,只闭关当前页面关联消息管道,未闭关页面关联的消息管道不受影响
  10. var Clients = make(map[string][]*SseChannel)
  1. package sseServ
  2. import (
  3. "bootpkg/cmd/api/model/vo"
  4. "bootpkg/common/sse"
  5. "encoding/json"
  6. )
  7. // 发送消息接口
  8. func Broadcast(userId string, data vo.SseNotifyVo) {
  9. if len(sse.Clients[userId]) == 0 {
  10. return
  11. }
  12. dataBytes, _ := json.Marshal(data)
  13. // 遍历用户所有打开的消息管道,发送消息
  14. for _, client := range sse.Clients[userId] {
  15. client.ChannelChan <- string(dataBytes)
  16. }
  17. }

2、定义服务端接口(在main方法中调用即可)

  1. // router定义
  2. func sseRouterConfig() {
  3. router := gin.Default()
  4. router.Use(configCors())
  5. router.GET("/api/sse/notify/subscribe/:channel_id", connect)
  6. router.POST("/api/sse/notify/send", sendMsg)
  7. err := router.Run(":9669")
  8. if err != nil {
  9. fmt.Println(err)
  10. }
  11. }
  12. // 配置跨域
  13. func configCors() gin.HandlerFunc {
  14. return func(c *gin.Context) {
  15. method := c.Request.Method
  16. c.Header("Access-Control-Allow-Origin", "*")
  17. c.Header("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE, UPDATE")
  18. c.Header("Access-Control-Allow-Headers", "*")
  19. c.Header("Access-Control-Expose-Headers", "Content-Length, Access-Control-Allow-Origin, Access-Control-Allow-Headers, Cache-Control, Content-Language, Content-Type")
  20. c.Header("Access-Control-Allow-Credentials", "true")
  21. //放行所有OPTIONS方法
  22. if method == "OPTIONS" {
  23. c.AbortWithStatus(http.StatusNoContent)
  24. }
  25. // 处理请求
  26. c.Next()
  27. }
  28. }
  29. // 前端初始化时连接该接口
  30. func connect(c *gin.Context) {
  31. // 建立连接,记录哪个用户建立了连接,并配置该用户接受消息的通道id(uuid即可)
  32. userId := c.DefaultQuery("user_id", "")
  33. channelId := c.Param("channel_id")
  34. c.Header("Content-Type", "text/event-stream")
  35. c.Header("Cache-Control", "no-cache")
  36. c.Header("Connection", "keep-alive")
  37. println("Client connected user_id=" + userId + ";channel_id=" + channelId)
  38. eventSseChannel := &sse.SseChannel{
  39. ChannelId: channelId,
  40. ChannelChan: make(chan string),
  41. }
  42. // 如果用户消息管道尚未初始化,则初始化用户消息管道
  43. if sse.Clients[userId] == nil {
  44. sse.Clients[userId] = []*sse.SseChannel{}
  45. }
  46. // 将新建立的消息管道添加到用户管道列表中
  47. sse.Clients[userId] = append(sse.Clients[userId], eventSseChannel) // Add the client to the clients map
  48. defer func() {
  49. for _, v := range sse.Clients[userId] {
  50. if v != eventSseChannel {
  51. sse.Clients[userId] = append(sse.Clients[userId], v)
  52. }
  53. }
  54. close(eventSseChannel.ChannelChan)
  55. }()
  56. // Listen for client close and remove the client from the list
  57. notify := c.Writer.CloseNotify()
  58. go func() {
  59. <-notify
  60. fmt.Println("Client disconnected, user_id=" + userId + ";channel_id=" + channelId)
  61. r := make([]*sse.SseChannel)
  62. for _, v := range sse.Clients[userId] {
  63. if v.ChannelId != channelId {
  64. r = append(r, v)
  65. }else {
  66. // 关闭管道
  67. close(v.ChannelChan)
  68. }
  69. }
  70. sse.Clients[userId] = r
  71. }()
  72. // Continuously send data to the client
  73. for {
  74. // 从用户管道读取消息
  75. data := <-eventSseChannel.ChannelChan
  76. println("Sending data to user_id=" + userId + ";channel_id=" + channelId + "; data=" + data)
  77. // 将消息输出到客户端
  78. fmt.Fprintf(c.Writer, "data: %s\n", data)
  79. c.Writer.Flush()
  80. }
  81. }
  82. // 发送消息接口,
  83. func sendMsg(c *gin.Context) {
  84. // 指定用户发送消息
  85. userId := c.DefaultQuery("user_id", "")
  86. if len(sse.Clients[userId]) == 0 {
  87. response.FailErrJSON(c, response.ERROR_PARAMETER, "用户未连接。")
  88. return
  89. }
  90. var jsonp vo.SseNotifyVo
  91. err := c.ShouldBind(&jsonp)
  92. if err != nil {
  93. response.FailErrJSON(c, response.ERROR_PARAMETER, err.Error())
  94. return
  95. }
  96. // 消息写入消息管道
  97. sseServ.Broadcast(userId, jsonp)
  98. response.SuccessJSON(c, "Success")
  99. }

测试:

建立连接,打开1~3个terminal,建立消息管道
curl 'http://localhost:9669/api/sse/notify/subscribe/87761706050125845?user_id=111'
curl 'http://localhost:9669/api/sse/notify/subscribe/87761706050125847?user_id=111'
curl 'http://localhost:9669/api/sse/notify/subscribe/87761706050125848?user_id=111'

消息发送测试curl
curl --location 'http://localhost:9669/api/sse/notify/send?user_id=111' \
--header 'Accept: application/json, text/plain, */*' \
--header 'Cache-Control: no-cache' \
--header 'ClientType: h5' \
--header 'Connection: keep-alive' \
--header 'GP-TM: 1718083388' \
--header 'Pragma: no-cache' \
--header 'Sec-Fetch-Dest: empty' \
--header 'Sec-Fetch-Mode: cors' \
--header 'Sec-Fetch-Site: same-origin' \
--header 'User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36' \
--header 'language: zh' \
--header 'merchantCode: XYYL' \
--header 'sec-ch-ua: "Chromium";v="124", "Google Chrome";v="124", "Not-A.Brand";v="99"' \
--header 'sec-ch-ua-mobile: ?0' \
--header 'Content-Type: application/json' \
--data '{
    "id": "123",
    "title": "title123",
    "content": "content123",
    "create_time": "2024-06-12 09:26:34"
}'

可以看到两个terminal都会有输出
关闭一个terminal,并没有将用户的所有管道给关闭,仅关闭terminal关联的chennal。

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号