赞
踩
SpringBoot+Vue整合WebSocket实现前后端消息推送:
SpringBoot+Vue整合WebSocket实现前后端消息推送_霸道流氓气质的博客-CSDN博客
SpringCloud(若依微服务版为例)集成WebSocket实现前后端的消息推送:
SpringCloud(若依微服务版为例)集成WebSocket实现前后端的消息推送_springcloud +websocket 后端到前端推送_霸道流氓气质的博客-CSDN博客
若依前后端分离版手把手教你本地搭建环境并运行项目:
若依前后端分离版手把手教你本地搭建环境并运行项目_前后端分离项目本地运行_霸道流氓气质的博客-CSDN博客
在上面的基础上,使用websocket仍有不足,比如可能出现如下问题
Nginx代理websocket配置(解决websocket异常断开连接tcp连接不断问题):
Nginx代理websocket配置(解决websocket异常断开连接tcp连接不断问题)_nginx配置websocker连接_霸道流氓气质的博客-CSDN博客
另外如果业务场景中需要高频定时任务通过websocket给多个客户端发动消息,
则短时间内需要使用多线程/自定义线程池实现群发消息功能。
关于自定义线程池相关可参考如下
Java中创建线程的方式以及线程池创建的方式、推荐使用ThreadPoolExecutor以及示例:
Java中创建线程的方式以及线程池创建的方式、推荐使用ThreadPoolExecutor以及示例_霸道流氓气质的博客-CSDN博客
Java中线程的常用操作-后台线程、自定义线程工厂ThreadFactpry、join加入一个线程、线程异常捕获:
Java中线程的常用操作-后台线程、自定义线程工厂ThreadFactpry、join加入一个线程、线程异常捕获_霸道流氓气质的博客-CSDN博客
注:
博客:
霸道流氓气质的博客_CSDN博客-C#,架构之路,SpringBoot领域博主
1、netty-websocket-spring-boot-starter与若依集成websocket
本项目帮助你在spring-boot中使用Netty来开发WebSocket服务器,并像spring-websocket的注解开发一样简单。
仓库地址:
netty-websocket-spring-boot-starter: 轻量级、高性能的WebSocket框架
https://github.com/YeautyYE/netty-websocket-spring-boot-starter/blob/master/README_zh.md
除了此开源项目之外,这里还基于若依开源项目框架中基于websocket的插件集成
2、项目中添加依赖
- <dependency>
- <groupId>org.yeauty</groupId>
- <artifactId>netty-websocket-spring-boot-starter</artifactId>
- <version>0.12.0</version>
- </dependency>
然后集合以上两个开源框架的示例代码新建如下类
3、其中WebSocketServer类为端点类,实现代码
- package com.badao.demo.websocket;
-
- import io.netty.handler.codec.http.HttpHeaders;
- import io.netty.handler.timeout.IdleStateEvent;
- import lombok.extern.slf4j.Slf4j;
- import org.yeauty.annotation.*;
- import org.yeauty.pojo.Session;
-
- import java.io.IOException;
- import java.util.concurrent.Semaphore;
-
- @ServerEndpoint(path = "/websocket/{userName}", port = "${ws.port}", readerIdleTimeSeconds = "${ws.readerIdleTimeSeconds}", writerIdleTimeSeconds = "${ws.writerIdleTimeSeconds}", allIdleTimeSeconds = "${ws.allIdleTimeSeconds}")
- @Slf4j
- public class WebSocketServer {
-
- /**
- * 默认最多允许同时在线人数 200
- */
- public static int socketMaxOnlineCount = 200;
-
- private static final Semaphore socketSemaphore = new Semaphore(socketMaxOnlineCount);
-
- @OnOpen
- public void onOpen(Session session, HttpHeaders headers, @PathVariable String userName) {
- boolean semaphoreFlag = false;
- // 尝试获取信号量
- semaphoreFlag = SemaphoreUtils.tryAcquire(socketSemaphore);
- if (!semaphoreFlag) {
- // 未获取到信号量
- log.error("\n 当前人数 - {} , 限制人数:{} ", WebSocketUsers.getUsers().size(), socketMaxOnlineCount);
- WebSocketUsers.sendMessageToUserByText(session, "当前人数:" + WebSocketUsers.getUsers().size() + " 限制人数:" + socketMaxOnlineCount);
- session.close();
- } else {
- // 添加用户
- WebSocketUsers.sendMessageToUserByText(session, "连接成功");
- WebSocketUsers.put(userName, session);
- log.warn("\n 用户:{} 连接后 , 当前人数 : {}", userName, WebSocketUsers.getUsers().size());
- }
- }
-
- @OnClose
- public void onClose(Session session, @PathVariable String userName) throws IOException {
- // 移除用户
- WebSocketUsers.remove(userName);
- log.warn("\n 用户:{} 关闭后 , 当前人数 : {}", userName, WebSocketUsers.getUsers().size());
- // 获取到信号量则需释放
- SemaphoreUtils.release(socketSemaphore);
- }
-
- @OnError
- public void onError(Session session, @PathVariable String userName, Throwable exception) {
- if (session.isOpen()) {
- // 关闭连接
- session.close();
- }
- log.warn("\n 连接异常 - {}", userName);
- log.warn("\n 异常信息 - {}", exception);
- // 移出用户
- WebSocketUsers.remove(userName);
- // 获取到信号量则需释放
- SemaphoreUtils.release(socketSemaphore);
- }
-
- @OnMessage
- public void onMessage(Session session, String message) {
- WebSocketUsers.sendMessageToUserByText(session, message);
- }
-
- @OnBinary
- public void onBinary(Session session, byte[] bytes) {
- for (byte b : bytes) {
- System.out.println(b);
- }
- session.sendBinary(bytes);
- }
-
- @OnEvent
- public void onEvent(@PathVariable String userName, Session session, Object evt) {
- if (evt instanceof IdleStateEvent) {
- IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
- switch (idleStateEvent.state()) {
- case READER_IDLE:
- log.error("user-{} Read timeout!", userName);
- session.close();
- break;
- case WRITER_IDLE:
- log.error("user-{} Write timeout!", userName);
- session.close();
- break;
- case ALL_IDLE:
- log.error("user-{} All timeout!", userName);
- session.close();
- break;
- default:
- break;
- }
- }
- }
- }
注意这里的注解@ServerEndpoint是org.yeauty路径下的,别引用错了包。
在端点类上加上@ServerEndpoint注解,并在相应的方法上加上
@BeforeHandshake、@OnOpen、@OnClose、@OnError、@OnMessage、@OnBinary、@OnEvent注解。
注解说明:
@ServerEndpoint
当ServerEndpointExporter类通过Spring配置进行声明并被使用,它将会去扫描带有@ServerEndpoint注解的类
被注解的类将被注册成为一个WebSocket端点 所有的配置项都在这个注解的属性中 ( 如:@ServerEndpoint("/ws") )
@BeforeHandshake
当有新的连接进入时,对该方法进行回调 注入参数的类型:Session、HttpHeaders...
@OnOpen
当有新的WebSocket连接完成时,对该方法进行回调 注入参数的类型:Session、HttpHeaders...
@OnClose
当有WebSocket连接关闭时,对该方法进行回调 注入参数的类型:Session
@OnError
当有WebSocket抛出异常时,对该方法进行回调 注入参数的类型:Session、Throwable
@OnMessage
当接收到字符串消息时,对该方法进行回调 注入参数的类型:Session、String
@OnBinary
当接收到二进制消息时,对该方法进行回调 注入参数的类型:Session、byte[]
@OnEvent
当接收到Netty的事件时,对该方法进行回调 注入参数的类型:Session、Object
配置说明
所有的配置项都在这个注解的属性中。
属性 | 默认值 | 说明 |
---|---|---|
path | "/" | WebSocket的path,也可以用value 来设置 |
host | "0.0.0.0" | WebSocket的host,"0.0.0.0" 即是所有本地地址 |
port | 80 | WebSocket绑定端口号。如果为0,则使用随机端口(端口获取可见 多端点服务) |
bossLoopGroupThreads | 0 | bossEventLoopGroup的线程数 |
workerLoopGroupThreads | 0 | workerEventLoopGroup的线程数 |
useCompressionHandler | false | 是否添加WebSocketServerCompressionHandler到pipeline |
optionConnectTimeoutMillis | 30000 | 与Netty的ChannelOption.CONNECT_TIMEOUT_MILLIS 一致 |
optionSoBacklog | 128 | 与Netty的ChannelOption.SO_BACKLOG 一致 |
childOptionWriteSpinCount | 16 | 与Netty的ChannelOption.WRITE_SPIN_COUNT 一致 |
childOptionWriteBufferHighWaterMark | 64*1024 | 与Netty的ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK 一致,但实际上是使用ChannelOption.WRITE_BUFFER_WATER_MARK |
childOptionWriteBufferLowWaterMark | 32*1024 | 与Netty的ChannelOption.WRITE_BUFFER_LOW_WATER_MARK 一致,但实际上是使用 ChannelOption.WRITE_BUFFER_WATER_MARK |
childOptionSoRcvbuf | -1(即未设置) | 与Netty的ChannelOption.SO_RCVBUF 一致 |
childOptionSoSndbuf | -1(即未设置) | 与Netty的ChannelOption.SO_SNDBUF 一致 |
childOptionTcpNodelay | true | 与Netty的ChannelOption.TCP_NODELAY 一致 |
childOptionSoKeepalive | false | 与Netty的ChannelOption.SO_KEEPALIVE 一致 |
childOptionSoLinger | -1 | 与Netty的ChannelOption.SO_LINGER 一致 |
childOptionAllowHalfClosure | false | 与Netty的ChannelOption.ALLOW_HALF_CLOSURE 一致 |
readerIdleTimeSeconds | 0 | 与IdleStateHandler 中的readerIdleTimeSeconds 一致,并且当它不为0时,将在pipeline 中添加IdleStateHandler |
writerIdleTimeSeconds | 0 | 与IdleStateHandler 中的writerIdleTimeSeconds 一致,并且当它不为0时,将在pipeline 中添加IdleStateHandler |
allIdleTimeSeconds | 0 | 与IdleStateHandler 中的allIdleTimeSeconds 一致,并且当它不为0时,将在pipeline 中添加IdleStateHandler |
maxFramePayloadLength | 65536 | 最大允许帧载荷长度 |
useEventExecutorGroup | true | 是否使用另一个线程池来执行耗时的同步业务逻辑 |
eventExecutorGroupThreads | 16 | eventExecutorGroup的线程数 |
sslKeyPassword | ""(即未设置) | 与spring-boot的server.ssl.key-password 一致 |
sslKeyStore | ""(即未设置) | 与spring-boot的server.ssl.key-store 一致 |
sslKeyStorePassword | ""(即未设置) | 与spring-boot的server.ssl.key-store-password 一致 |
sslKeyStoreType | ""(即未设置) | 与spring-boot的server.ssl.key-store-type 一致 |
sslTrustStore | ""(即未设置) | 与spring-boot的server.ssl.trust-store 一致 |
sslTrustStorePassword | ""(即未设置) | 与spring-boot的server.ssl.trust-store-password 一致 |
sslTrustStoreType | ""(即未设置) | 与spring-boot的server.ssl.trust-store-type 一致 |
corsOrigins | {}(即未设置) | 与spring-boot的@CrossOrigin#origins 一致 |
corsAllowCredentials | ""(即未设置) | 与spring-boot的@CrossOrigin#allowCredentials 一致 |
且这里的配置可以在yml中进行配置。
需要在application.yml中添加如下配置项
- ws:
- # websocket 端口
- port: 8071
- # websocket 读超时
- readerIdleTimeSeconds: 10
- # websocket 写超时
- writerIdleTimeSeconds: 10
- # websocket 所有信道超时
- allIdleTimeSeconds: 15
4、注意上面的信号量相关处理封装的工具类
- package com.badao.demo.websocket;
-
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- import java.util.concurrent.Semaphore;
-
- /**
- * 信号量相关处理
- */
- public class SemaphoreUtils {
- /**
- * SemaphoreUtils 日志控制器
- */
- private static final Logger LOGGER = LoggerFactory.getLogger(SemaphoreUtils.class);
-
- /**
- * 获取信号量
- *
- * @param semaphore
- * @return
- */
- public static boolean tryAcquire(Semaphore semaphore) {
- boolean flag = false;
-
- try {
- flag = semaphore.tryAcquire(1);
- } catch (Exception e) {
- LOGGER.error("获取信号量异常", e);
- }
-
- return flag;
- }
-
- /**
- * 释放信号量
- *
- * @param semaphore
- */
- public static void release(Semaphore semaphore) {
-
- try {
- semaphore.release();
- } catch (Exception e) {
- LOGGER.error("释放信号量异常", e);
- }
- }
- }
5、存储用户信息的WebSocketUsers类实现
- package com.badao.demo.websocket;
-
- import com.badao.demo.config.MyThreadFactory;
- import lombok.extern.slf4j.Slf4j;
- import org.yeauty.pojo.Session;
-
- import java.util.Collection;
- import java.util.Map;
- import java.util.Set;
- import java.util.concurrent.*;
-
-
- @Slf4j
- public class WebSocketUsers {
-
- private static final ThreadPoolExecutor pool = new ThreadPoolExecutor(10,WebSocketServer.socketMaxOnlineCount,10, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000),new MyThreadFactory("websocket-"));
-
-
- /**
- * 用户集
- */
- private static Map<String, Session> USERS = new ConcurrentHashMap<String, Session>();
-
- /**
- * 存储用户
- *
- * @param key 唯一键
- * @param session 用户信息
- */
- public static void put(String key, Session session) {
- USERS.put(key, session);
- }
-
- /**
- * 移除用户
- *
- * @param session 用户信息
- * @return 移除结果
- */
- public static boolean remove(Session session) {
- String key = null;
- boolean flag = USERS.containsValue(session);
- if (flag) {
- Set<Map.Entry<String, Session>> entries = USERS.entrySet();
- for (Map.Entry<String, Session> entry : entries) {
- Session value = entry.getValue();
- if (value.equals(session)) {
- key = entry.getKey();
- break;
- }
- }
- } else {
- return true;
- }
- return remove(key);
- }
-
- /**
- * 移出用户
- *
- * @param key 键
- */
- public static boolean remove(String key) {
- Session remove = USERS.remove(key);
- if (remove != null) {
- boolean containsValue = USERS.containsValue(remove);
- log.warn("\n 移出结果 - {}", containsValue ? "失败" : "成功");
- return containsValue;
- } else {
- return true;
- }
- }
-
- /**
- * 获取在线用户列表
- *
- * @return 返回用户集合
- */
- public static Map<String, Session> getUsers() {
- return USERS;
- }
-
- /**
- * 群发消息文本消息
- *
- * @param message 消息内容
- */
- public static void sendMessageToUsersByText(String message) {
- Collection<Session> values = USERS.values();
- for (Session value : values) {
- pool.submit(() -> {
- synchronized (value) {
- value.sendText(message);
- }
- });
- }
- }
-
- /**
- * 发送文本消息
- *
- * @param session 自己的用户名
- * @param message 消息内容
- */
- public static void sendMessageToUserByText(Session session, String message) {
- if (session != null) {
- session.sendText(message);
- } else {
- log.info("\n[你已离线]");
- }
- }
- }
这里用的线程池相关概念参考上面博客,附自定义线程工厂MyThreadFactory实现
- package com.badao.demo.config;
-
- import java.util.concurrent.ThreadFactory;
- import java.util.concurrent.atomic.AtomicInteger;
-
- public class MyThreadFactory implements ThreadFactory {
-
- private static final AtomicInteger poolNumber = new AtomicInteger(1);
- private final ThreadGroup group;
- private final AtomicInteger threadNumber = new AtomicInteger(1);
- private final String namePrefix;
-
- public MyThreadFactory(String threadName) {
- SecurityManager s = System.getSecurityManager();
- group = (s !=null)?s.getThreadGroup():Thread.currentThread().getThreadGroup();
- if(threadName == null || threadName.isEmpty()){
- threadName = "pool";
- }
- namePrefix = threadName + poolNumber.getAndIncrement()+"-thread-";
- }
-
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(group,r,namePrefix+threadNumber.getAndIncrement(),0);
- if(t.isDaemon()){
- t.setDaemon(false);
- }
- if(t.getPriority()!= Thread.NORM_PRIORITY){
- t.setPriority(Thread.NORM_PRIORITY);
- }
- return t;
- }
- }
这里定义了线程名前缀。
6、又新建了一个Controller目的是为了获取当前所有的用户,因为前面限制了只能允许最多200用户
- package com.badao.demo.websocket;
-
-
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- @RestController
- @RequestMapping("/websocket")
- public class WebsocketController {
-
- @GetMapping("/total")
- public String websocketTotal(){
- return WebSocketUsers.getUsers().keySet().toString();
- }
-
- }
7、启动项目并使用websocket客户端工具进行测试
源码下载:
https://download.csdn.net/download/BADAO_LIUMANG_QIZHI/88912442
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。