赞
踩
服务端发布消息,推送给在线用户
1、先定义通知管道配置和消息写入管道接口
- package sse
-
-
- // 消息管道
- type SseChannel struct {
- ChannelId string // 消息管道id
- ChannelChan chan string // 消息发送的chan
- }
-
- // 存储用户与该用户的消息通道,(key->value userId->消息管道列表)
- // 管道列表之所以是多个,是因为一个用户可能打开多个页面,可以建议多个管道,
- // 当关闭其中一个页面时,只闭关当前页面关联消息管道,未闭关页面关联的消息管道不受影响
- var Clients = make(map[string][]*SseChannel)
- package sseServ
-
- import (
- "bootpkg/cmd/api/model/vo"
- "bootpkg/common/sse"
- "encoding/json"
- )
-
- // 发送消息接口
- func Broadcast(userId string, data vo.SseNotifyVo) {
- if len(sse.Clients[userId]) == 0 {
- return
- }
-
- dataBytes, _ := json.Marshal(data)
- // 遍历用户所有打开的消息管道,发送消息
- for _, client := range sse.Clients[userId] {
- client.ChannelChan <- string(dataBytes)
- }
- }
2、定义服务端接口(在main方法中调用即可)
- // router定义
- func sseRouterConfig() {
- router := gin.Default()
- router.Use(configCors())
- router.GET("/api/sse/notify/subscribe/:channel_id", connect)
- router.POST("/api/sse/notify/send", sendMsg)
- err := router.Run(":9669")
- if err != nil {
- fmt.Println(err)
- }
- }
-
- // 配置跨域
- func configCors() gin.HandlerFunc {
- return func(c *gin.Context) {
- method := c.Request.Method
- c.Header("Access-Control-Allow-Origin", "*")
- c.Header("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE, UPDATE")
- c.Header("Access-Control-Allow-Headers", "*")
- c.Header("Access-Control-Expose-Headers", "Content-Length, Access-Control-Allow-Origin, Access-Control-Allow-Headers, Cache-Control, Content-Language, Content-Type")
- c.Header("Access-Control-Allow-Credentials", "true")
- //放行所有OPTIONS方法
- if method == "OPTIONS" {
- c.AbortWithStatus(http.StatusNoContent)
- }
- // 处理请求
- c.Next()
- }
- }
-
-
- // 前端初始化时连接该接口
- func connect(c *gin.Context) {
- // 建立连接,记录哪个用户建立了连接,并配置该用户接受消息的通道id(uuid即可)
- userId := c.DefaultQuery("user_id", "")
- channelId := c.Param("channel_id")
-
- c.Header("Content-Type", "text/event-stream")
- c.Header("Cache-Control", "no-cache")
- c.Header("Connection", "keep-alive")
-
- println("Client connected user_id=" + userId + ";channel_id=" + channelId)
- eventSseChannel := &sse.SseChannel{
- ChannelId: channelId,
- ChannelChan: make(chan string),
- }
-
- // 如果用户消息管道尚未初始化,则初始化用户消息管道
- if sse.Clients[userId] == nil {
- sse.Clients[userId] = []*sse.SseChannel{}
- }
-
- // 将新建立的消息管道添加到用户管道列表中
- sse.Clients[userId] = append(sse.Clients[userId], eventSseChannel) // Add the client to the clients map
- defer func() {
- for _, v := range sse.Clients[userId] {
- if v != eventSseChannel {
- sse.Clients[userId] = append(sse.Clients[userId], v)
- }
- }
- close(eventSseChannel.ChannelChan)
- }()
-
- // Listen for client close and remove the client from the list
- notify := c.Writer.CloseNotify()
- go func() {
- <-notify
- fmt.Println("Client disconnected, user_id=" + userId + ";channel_id=" + channelId)
-
- r := make([]*sse.SseChannel)
- for _, v := range sse.Clients[userId] {
- if v.ChannelId != channelId {
- r = append(r, v)
- }else {
- // 关闭管道
- close(v.ChannelChan)
- }
- }
- sse.Clients[userId] = r
- }()
-
- // Continuously send data to the client
- for {
- // 从用户管道读取消息
- data := <-eventSseChannel.ChannelChan
- println("Sending data to user_id=" + userId + ";channel_id=" + channelId + "; data=" + data)
- // 将消息输出到客户端
- fmt.Fprintf(c.Writer, "data: %s\n", data)
- c.Writer.Flush()
- }
- }
-
- // 发送消息接口,
- func sendMsg(c *gin.Context) {
- // 指定用户发送消息
- userId := c.DefaultQuery("user_id", "")
-
- if len(sse.Clients[userId]) == 0 {
- response.FailErrJSON(c, response.ERROR_PARAMETER, "用户未连接。")
- return
- }
- var jsonp vo.SseNotifyVo
- err := c.ShouldBind(&jsonp)
- if err != nil {
- response.FailErrJSON(c, response.ERROR_PARAMETER, err.Error())
- return
- }
- // 消息写入消息管道
- sseServ.Broadcast(userId, jsonp)
- response.SuccessJSON(c, "Success")
- }
测试:
建立连接,打开1~3个terminal,建立消息管道
curl 'http://localhost:9669/api/sse/notify/subscribe/87761706050125845?user_id=111'
curl 'http://localhost:9669/api/sse/notify/subscribe/87761706050125847?user_id=111'
curl 'http://localhost:9669/api/sse/notify/subscribe/87761706050125848?user_id=111'
消息发送测试curl
curl --location 'http://localhost:9669/api/sse/notify/send?user_id=111' \
--header 'Accept: application/json, text/plain, */*' \
--header 'Cache-Control: no-cache' \
--header 'ClientType: h5' \
--header 'Connection: keep-alive' \
--header 'GP-TM: 1718083388' \
--header 'Pragma: no-cache' \
--header 'Sec-Fetch-Dest: empty' \
--header 'Sec-Fetch-Mode: cors' \
--header 'Sec-Fetch-Site: same-origin' \
--header 'User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36' \
--header 'language: zh' \
--header 'merchantCode: XYYL' \
--header 'sec-ch-ua: "Chromium";v="124", "Google Chrome";v="124", "Not-A.Brand";v="99"' \
--header 'sec-ch-ua-mobile: ?0' \
--header 'Content-Type: application/json' \
--data '{
"id": "123",
"title": "title123",
"content": "content123",
"create_time": "2024-06-12 09:26:34"
}'
可以看到两个terminal都会有输出
关闭一个terminal,并没有将用户的所有管道给关闭,仅关闭terminal关联的chennal。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。