当前位置:   article > 正文

SpringBoot中使用Netty开发WebSocket服务-netty-websocket-spring-boot-starter开源项目使用与改造多线程群发消息

netty-websocket-spring-boot-starter

场景

SpringBoot+Vue整合WebSocket实现前后端消息推送:

SpringBoot+Vue整合WebSocket实现前后端消息推送_霸道流氓气质的博客-CSDN博客

SpringCloud(若依微服务版为例)集成WebSocket实现前后端的消息推送:

SpringCloud(若依微服务版为例)集成WebSocket实现前后端的消息推送_springcloud +websocket 后端到前端推送_霸道流氓气质的博客-CSDN博客

若依前后端分离版手把手教你本地搭建环境并运行项目:

若依前后端分离版手把手教你本地搭建环境并运行项目_前后端分离项目本地运行_霸道流氓气质的博客-CSDN博客

在上面的基础上,使用websocket仍有不足,比如可能出现如下问题

Nginx代理websocket配置(解决websocket异常断开连接tcp连接不断问题):

Nginx代理websocket配置(解决websocket异常断开连接tcp连接不断问题)_nginx配置websocker连接_霸道流氓气质的博客-CSDN博客

另外如果业务场景中需要高频定时任务通过websocket给多个客户端发动消息,

则短时间内需要使用多线程/自定义线程池实现群发消息功能。

关于自定义线程池相关可参考如下

Java中创建线程的方式以及线程池创建的方式、推荐使用ThreadPoolExecutor以及示例:

Java中创建线程的方式以及线程池创建的方式、推荐使用ThreadPoolExecutor以及示例_霸道流氓气质的博客-CSDN博客

Java中线程的常用操作-后台线程、自定义线程工厂ThreadFactpry、join加入一个线程、线程异常捕获:

Java中线程的常用操作-后台线程、自定义线程工厂ThreadFactpry、join加入一个线程、线程异常捕获_霸道流氓气质的博客-CSDN博客

注:

博客:
霸道流氓气质的博客_CSDN博客-C#,架构之路,SpringBoot领域博主

实现

1、netty-websocket-spring-boot-starter与若依集成websocket

本项目帮助你在spring-boot中使用Netty来开发WebSocket服务器,并像spring-websocket的注解开发一样简单。

仓库地址:

netty-websocket-spring-boot-starter: 轻量级、高性能的WebSocket框架

https://github.com/YeautyYE/netty-websocket-spring-boot-starter/blob/master/README_zh.md

除了此开源项目之外,这里还基于若依开源项目框架中基于websocket的插件集成

插件集成 | RuoYi

2、项目中添加依赖

  1.         <dependency>
  2.             <groupId>org.yeauty</groupId>
  3.             <artifactId>netty-websocket-spring-boot-starter</artifactId>
  4.             <version>0.12.0</version>
  5.         </dependency>

然后集合以上两个开源框架的示例代码新建如下类

3、其中WebSocketServer类为端点类,实现代码

  1. package com.badao.demo.websocket;
  2. import io.netty.handler.codec.http.HttpHeaders;
  3. import io.netty.handler.timeout.IdleStateEvent;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.yeauty.annotation.*;
  6. import org.yeauty.pojo.Session;
  7. import java.io.IOException;
  8. import java.util.concurrent.Semaphore;
  9. @ServerEndpoint(path = "/websocket/{userName}", port = "${ws.port}", readerIdleTimeSeconds = "${ws.readerIdleTimeSeconds}", writerIdleTimeSeconds = "${ws.writerIdleTimeSeconds}", allIdleTimeSeconds = "${ws.allIdleTimeSeconds}")
  10. @Slf4j
  11. public class WebSocketServer {
  12.     /**
  13.      * 默认最多允许同时在线人数 200
  14.      */
  15.     public static int socketMaxOnlineCount = 200;
  16.     private static final Semaphore socketSemaphore = new Semaphore(socketMaxOnlineCount);
  17.     @OnOpen
  18.     public void onOpen(Session session, HttpHeaders headers, @PathVariable String userName) {
  19.         boolean semaphoreFlag = false;
  20.         // 尝试获取信号量
  21.         semaphoreFlag = SemaphoreUtils.tryAcquire(socketSemaphore);
  22.         if (!semaphoreFlag) {
  23.             // 未获取到信号量
  24.             log.error("\n 当前人数 - {} , 限制人数:{} ", WebSocketUsers.getUsers().size(), socketMaxOnlineCount);
  25.             WebSocketUsers.sendMessageToUserByText(session, "当前人数:" + WebSocketUsers.getUsers().size() + " 限制人数:" + socketMaxOnlineCount);
  26.             session.close();
  27.         } else {
  28.             // 添加用户
  29.             WebSocketUsers.sendMessageToUserByText(session, "连接成功");
  30.             WebSocketUsers.put(userName, session);
  31.             log.warn("\n 用户:{} 连接后 , 当前人数 : {}", userName, WebSocketUsers.getUsers().size());
  32.         }
  33.     }
  34.     @OnClose
  35.     public void onClose(Session session, @PathVariable String userName) throws IOException {
  36.         // 移除用户
  37.         WebSocketUsers.remove(userName);
  38.         log.warn("\n 用户:{} 关闭后 , 当前人数 : {}", userName, WebSocketUsers.getUsers().size());
  39.         // 获取到信号量则需释放
  40.         SemaphoreUtils.release(socketSemaphore);
  41.     }
  42.     @OnError
  43.     public void onError(Session session, @PathVariable String userName, Throwable exception) {
  44.         if (session.isOpen()) {
  45.             // 关闭连接
  46.             session.close();
  47.         }
  48.         log.warn("\n 连接异常 - {}", userName);
  49.         log.warn("\n 异常信息 - {}", exception);
  50.         // 移出用户
  51.         WebSocketUsers.remove(userName);
  52.         // 获取到信号量则需释放
  53.         SemaphoreUtils.release(socketSemaphore);
  54.     }
  55.     @OnMessage
  56.     public void onMessage(Session session, String message) {
  57.         WebSocketUsers.sendMessageToUserByText(session, message);
  58.     }
  59.     @OnBinary
  60.     public void onBinary(Session session, byte[] bytes) {
  61.         for (byte b : bytes) {
  62.             System.out.println(b);
  63.         }
  64.         session.sendBinary(bytes);
  65.     }
  66.     @OnEvent
  67.     public void onEvent(@PathVariable String userName, Session session, Object evt) {
  68.         if (evt instanceof IdleStateEvent) {
  69.             IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
  70.             switch (idleStateEvent.state()) {
  71.                 case READER_IDLE:
  72.                     log.error("user-{} Read timeout!", userName);
  73.                     session.close();
  74.                     break;
  75.                 case WRITER_IDLE:
  76.                     log.error("user-{} Write timeout!", userName);
  77.                     session.close();
  78.                     break;
  79.                 case ALL_IDLE:
  80.                     log.error("user-{} All timeout!", userName);
  81.                     session.close();
  82.                     break;
  83.                 default:
  84.                     break;
  85.             }
  86.         }
  87.     }
  88. }

注意这里的注解@ServerEndpoint是org.yeauty路径下的,别引用错了包。

在端点类上加上@ServerEndpoint注解,并在相应的方法上加上

@BeforeHandshake、@OnOpen、@OnClose、@OnError、@OnMessage、@OnBinary、@OnEvent注解。

注解说明:

@ServerEndpoint

当ServerEndpointExporter类通过Spring配置进行声明并被使用,它将会去扫描带有@ServerEndpoint注解的类

被注解的类将被注册成为一个WebSocket端点 所有的配置项都在这个注解的属性中 ( 如:@ServerEndpoint("/ws") )

@BeforeHandshake

当有新的连接进入时,对该方法进行回调 注入参数的类型:Session、HttpHeaders...

@OnOpen

当有新的WebSocket连接完成时,对该方法进行回调 注入参数的类型:Session、HttpHeaders...

@OnClose

当有WebSocket连接关闭时,对该方法进行回调 注入参数的类型:Session

@OnError

当有WebSocket抛出异常时,对该方法进行回调 注入参数的类型:Session、Throwable

@OnMessage

当接收到字符串消息时,对该方法进行回调 注入参数的类型:Session、String

@OnBinary

当接收到二进制消息时,对该方法进行回调 注入参数的类型:Session、byte[]

@OnEvent

当接收到Netty的事件时,对该方法进行回调 注入参数的类型:Session、Object

配置说明

所有的配置项都在这个注解的属性中。

属性默认值说明
path"/"WebSocket的path,也可以用value来设置
host"0.0.0.0"WebSocket的host,"0.0.0.0"即是所有本地地址
port80WebSocket绑定端口号。如果为0,则使用随机端口(端口获取可见 多端点服务)
bossLoopGroupThreads0bossEventLoopGroup的线程数
workerLoopGroupThreads0workerEventLoopGroup的线程数
useCompressionHandlerfalse是否添加WebSocketServerCompressionHandler到pipeline
optionConnectTimeoutMillis30000与Netty的ChannelOption.CONNECT_TIMEOUT_MILLIS一致
optionSoBacklog128与Netty的ChannelOption.SO_BACKLOG一致
childOptionWriteSpinCount16与Netty的ChannelOption.WRITE_SPIN_COUNT一致
childOptionWriteBufferHighWaterMark64*1024与Netty的ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK一致,但实际上是使用ChannelOption.WRITE_BUFFER_WATER_MARK
childOptionWriteBufferLowWaterMark32*1024与Netty的ChannelOption.WRITE_BUFFER_LOW_WATER_MARK一致,但实际上是使用 ChannelOption.WRITE_BUFFER_WATER_MARK
childOptionSoRcvbuf-1(即未设置)与Netty的ChannelOption.SO_RCVBUF一致
childOptionSoSndbuf-1(即未设置)与Netty的ChannelOption.SO_SNDBUF一致
childOptionTcpNodelaytrue与Netty的ChannelOption.TCP_NODELAY一致
childOptionSoKeepalivefalse与Netty的ChannelOption.SO_KEEPALIVE一致
childOptionSoLinger-1与Netty的ChannelOption.SO_LINGER一致
childOptionAllowHalfClosurefalse与Netty的ChannelOption.ALLOW_HALF_CLOSURE一致
readerIdleTimeSeconds0IdleStateHandler中的readerIdleTimeSeconds一致,并且当它不为0时,将在pipeline中添加IdleStateHandler
writerIdleTimeSeconds0IdleStateHandler中的writerIdleTimeSeconds一致,并且当它不为0时,将在pipeline中添加IdleStateHandler
allIdleTimeSeconds0IdleStateHandler中的allIdleTimeSeconds一致,并且当它不为0时,将在pipeline中添加IdleStateHandler
maxFramePayloadLength65536最大允许帧载荷长度
useEventExecutorGrouptrue是否使用另一个线程池来执行耗时的同步业务逻辑
eventExecutorGroupThreads16eventExecutorGroup的线程数
sslKeyPassword""(即未设置)与spring-boot的server.ssl.key-password一致
sslKeyStore""(即未设置)与spring-boot的server.ssl.key-store一致
sslKeyStorePassword""(即未设置)与spring-boot的server.ssl.key-store-password一致
sslKeyStoreType""(即未设置)与spring-boot的server.ssl.key-store-type一致
sslTrustStore""(即未设置)与spring-boot的server.ssl.trust-store一致
sslTrustStorePassword""(即未设置)与spring-boot的server.ssl.trust-store-password一致
sslTrustStoreType""(即未设置)与spring-boot的server.ssl.trust-store-type一致
corsOrigins{}(即未设置)与spring-boot的@CrossOrigin#origins一致
corsAllowCredentials""(即未设置)与spring-boot的@CrossOrigin#allowCredentials一致

且这里的配置可以在yml中进行配置。

需要在application.yml中添加如下配置项

  1. ws:
  2.   # websocket 端口
  3.   port: 8071
  4.   # websocket 读超时
  5.   readerIdleTimeSeconds: 10
  6.   # websocket 写超时
  7.   writerIdleTimeSeconds: 10
  8.   # websocket 所有信道超时
  9.   allIdleTimeSeconds: 15

4、注意上面的信号量相关处理封装的工具类

  1. package com.badao.demo.websocket;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import java.util.concurrent.Semaphore;
  5. /**
  6.  * 信号量相关处理
  7.  */
  8. public class SemaphoreUtils {
  9.     /**
  10.      * SemaphoreUtils 日志控制器
  11.      */
  12.     private static final Logger LOGGER = LoggerFactory.getLogger(SemaphoreUtils.class);
  13.     /**
  14.      * 获取信号量
  15.      *
  16.      * @param semaphore
  17.      * @return
  18.      */
  19.     public static boolean tryAcquire(Semaphore semaphore) {
  20.         boolean flag = false;
  21.         try {
  22.             flag = semaphore.tryAcquire(1);
  23.         } catch (Exception e) {
  24.             LOGGER.error("获取信号量异常", e);
  25.         }
  26.         return flag;
  27.     }
  28.     /**
  29.      * 释放信号量
  30.      *
  31.      * @param semaphore
  32.      */
  33.     public static void release(Semaphore semaphore) {
  34.         try {
  35.             semaphore.release();
  36.         } catch (Exception e) {
  37.             LOGGER.error("释放信号量异常", e);
  38.         }
  39.     }
  40. }

5、存储用户信息的WebSocketUsers类实现

  1. package com.badao.demo.websocket;
  2. import com.badao.demo.config.MyThreadFactory;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.yeauty.pojo.Session;
  5. import java.util.Collection;
  6. import java.util.Map;
  7. import java.util.Set;
  8. import java.util.concurrent.*;
  9. @Slf4j
  10. public class WebSocketUsers {
  11.     private static final ThreadPoolExecutor pool = new ThreadPoolExecutor(10,WebSocketServer.socketMaxOnlineCount,10, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000),new MyThreadFactory("websocket-"));
  12.     /**
  13.      * 用户集
  14.      */
  15.     private static Map<String, Session> USERS = new ConcurrentHashMap<String, Session>();
  16.     /**
  17.      * 存储用户
  18.      *
  19.      * @param key     唯一键
  20.      * @param session 用户信息
  21.      */
  22.     public static void put(String key, Session session) {
  23.         USERS.put(key, session);
  24.     }
  25.     /**
  26.      * 移除用户
  27.      *
  28.      * @param session 用户信息
  29.      * @return 移除结果
  30.      */
  31.     public static boolean remove(Session session) {
  32.         String key = null;
  33.         boolean flag = USERS.containsValue(session);
  34.         if (flag) {
  35.             Set<Map.Entry<String, Session>> entries = USERS.entrySet();
  36.             for (Map.Entry<String, Session> entry : entries) {
  37.                 Session value = entry.getValue();
  38.                 if (value.equals(session)) {
  39.                     key = entry.getKey();
  40.                     break;
  41.                 }
  42.             }
  43.         } else {
  44.             return true;
  45.         }
  46.         return remove(key);
  47.     }
  48.     /**
  49.      * 移出用户
  50.      *
  51.      * @param key
  52.      */
  53.     public static boolean remove(String key) {
  54.         Session remove = USERS.remove(key);
  55.         if (remove != null) {
  56.             boolean containsValue = USERS.containsValue(remove);
  57.             log.warn("\n 移出结果 - {}", containsValue ? "失败" : "成功");
  58.             return containsValue;
  59.         } else {
  60.             return true;
  61.         }
  62.     }
  63.     /**
  64.      * 获取在线用户列表
  65.      *
  66.      * @return 返回用户集合
  67.      */
  68.     public static Map<String, Session> getUsers() {
  69.         return USERS;
  70.     }
  71.     /**
  72.      * 群发消息文本消息
  73.      *
  74.      * @param message 消息内容
  75.      */
  76.     public static void sendMessageToUsersByText(String message) {
  77.         Collection<Session> values = USERS.values();
  78.         for (Session value : values) {
  79.             pool.submit(() -> {
  80.                 synchronized (value) {
  81.                     value.sendText(message);
  82.                 }
  83.             });
  84.         }
  85.     }
  86.     /**
  87.      * 发送文本消息
  88.      *
  89.      * @param session 自己的用户名
  90.      * @param message 消息内容
  91.      */
  92.     public static void sendMessageToUserByText(Session session, String message) {
  93.         if (session != null) {
  94.             session.sendText(message);
  95.         } else {
  96.             log.info("\n[你已离线]");
  97.         }
  98.     }
  99. }

这里用的线程池相关概念参考上面博客,附自定义线程工厂MyThreadFactory实现

  1. package com.badao.demo.config;
  2. import java.util.concurrent.ThreadFactory;
  3. import java.util.concurrent.atomic.AtomicInteger;
  4. public class MyThreadFactory implements ThreadFactory {
  5.     private static final AtomicInteger poolNumber = new AtomicInteger(1);
  6.     private final ThreadGroup group;
  7.     private final AtomicInteger threadNumber = new AtomicInteger(1);
  8.     private final String namePrefix;
  9.     public MyThreadFactory(String threadName) {
  10.         SecurityManager s = System.getSecurityManager();
  11.         group = (s !=null)?s.getThreadGroup():Thread.currentThread().getThreadGroup();
  12.         if(threadName == null || threadName.isEmpty()){
  13.             threadName = "pool";
  14.         }
  15.         namePrefix = threadName + poolNumber.getAndIncrement()+"-thread-";
  16.     }
  17.     @Override
  18.     public Thread newThread(Runnable r) {
  19.         Thread t = new Thread(group,r,namePrefix+threadNumber.getAndIncrement(),0);
  20.         if(t.isDaemon()){
  21.             t.setDaemon(false);
  22.         }
  23.         if(t.getPriority()!= Thread.NORM_PRIORITY){
  24.             t.setPriority(Thread.NORM_PRIORITY);
  25.         }
  26.         return t;
  27.     }
  28. }

这里定义了线程名前缀。

6、又新建了一个Controller目的是为了获取当前所有的用户,因为前面限制了只能允许最多200用户

  1. package com.badao.demo.websocket;
  2. import org.springframework.web.bind.annotation.GetMapping;
  3. import org.springframework.web.bind.annotation.RequestMapping;
  4. import org.springframework.web.bind.annotation.RestController;
  5. @RestController
  6. @RequestMapping("/websocket")
  7. public class WebsocketController  {
  8.     @GetMapping("/total")
  9.     public String websocketTotal(){
  10.         return WebSocketUsers.getUsers().keySet().toString();
  11.     }
  12. }

7、启动项目并使用websocket客户端工具进行测试

源码下载:

https://download.csdn.net/download/BADAO_LIUMANG_QIZHI/88912442

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/404228
推荐阅读
相关标签
  

闽ICP备14008679号