赞
踩
用户AB对话效果
用户上下线通知
历史消息记录可以滚动
客户端发送心跳
创建一个SpringBoot项目:spring-boot-websocket-demo1
项目的pom.xml
文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.11</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.study.websocket</groupId> <artifactId>spring-boot-websocket-demo1</artifactId> <version>0.0.1-SNAPSHOT</version> <name>spring-boot-websocket-demo1</name> <description>spring-boot-websocket-demo1</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <!-- spring-boot-starter --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <!-- spring-boot-starter-test --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- spring-boot-starter-web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- netty-all(Netty依赖) --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.28.Final</version> </dependency> <!-- fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.83</version> </dependency> <!-- lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
项目的application.yml
# 日志配置 logging: level: com.study.websocket: debug # 自定义的websocket配置 websocket: netty: # netty服务器要使用的端口号 port: 19999 # WebSocket服务的接口地址 path: /websocket server: port: 9999
spring-boot-websocket-demo1\webapp
路径下,这里不再详细描述前端@ComponentScan
注解扫描组件package com.study.websocket.demo1; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.ComponentScan; @SpringBootApplication @ComponentScan("com.study.websocket") public class SpringBootWebsocketDemo1Application { public static void main(String[] args) { SpringApplication.run(SpringBootWebsocketDemo1Application.class, args); } }
WebSocketServer
WebSocketServer
代码package com.study.websocket.demo1.server; import com.study.websocket.demo1.handler.WebSocketServerInitializer; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.Objects; /** * @author 启年 * @date 2023-05-12 19:56 */ @Slf4j @Component public class WebSocketServer { /** * Netty 服务的端口号 */ @Value("${websocket.netty.port:19999}") public int port; @Value("${websocket.netty.path:/websocket}") public String webSocketPath; private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; /** * 启动WebSocket服务器 */ private void start() throws InterruptedException { bossGroup = new NioEventLoopGroup(1); workerGroup = new NioEventLoopGroup(); Channel channel = new ServerBootstrap() .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new WebSocketServerInitializer(webSocketPath)) .bind(port) .sync() .channel(); log.debug("服务端启动成功,端口号:{}", port); channel .closeFuture() .sync(); } /** * 释放资源 * PreDestroy注解:在容器销毁该组件之前被调用 * 注解使用前提:该类的实例必须是由容器创建和管理的,如 Spring、JavaEE 容器等。 */ @PreDestroy public void destroy() { if (Objects.nonNull(bossGroup)) { bossGroup.shutdownGracefully(); } if (Objects.nonNull(workerGroup)) { bossGroup.shutdownGracefully(); } } /** * 初始化WebSocketServer(调用init()) * PostConstruct注解:用于指示一个方法在容器创建该组件之后立即调用 * 注解使用前提:该类的实例必须是由容器创建和管理的,如 Spring、JavaEE 容器等。 */ @PostConstruct public void init() { //这里要新开一个线程,否则会阻塞原本的controller等业务 new Thread(() -> { try { start(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } }
WebSocketServerInitializer
WebSocketServerInitializer
代码package com.study.websocket.demo1.handler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; /** * @author 启年 * @date 2023-05-12 22:55 */ @Slf4j @AllArgsConstructor public class WebSocketServerInitializer extends ChannelInitializer<NioSocketChannel> { /** * WebSocket 服务的接口地址 */ public String webSocketPath; @Override protected void initChannel(NioSocketChannel ch) throws Exception { log.debug("服务的接口地址:{}", webSocketPath); ChannelPipeline pipeline = ch.pipeline(); //自定义的Handler-心跳检测 pipeline.addLast(new WebSocketIdleStateHandler()); //HTTP协议编解码器,用于处理HTTP请求和响应的编码和解码。其主要作用是将HTTP请求和响应消息转换为Netty的ByteBuf对象,并将其传递到下一个处理器进行处理。 pipeline.addLast(new HttpServerCodec()); //用于HTTP服务端,将来自客户端的HTTP请求和响应消息聚合成一个完整的消息,以便后续的处理。 pipeline.addLast(new HttpObjectAggregator(65536)); //用于对WebSocket消息进行压缩和解压缩操作。 pipeline.addLast(new WebSocketServerCompressionHandler()); //可以对整个WebSocker通信进行初始化(当Http请求中有升级为WebSocker的请求时),以及握手处理 pipeline.addLast(new WebSocketServerProtocolHandler(webSocketPath, null, true)); //自定义的Handler-处理WebSocket文本类型的消息 pipeline.addLast(new WebSocketTextHandler()); } }
NettyConfig
创建
ConcurrentMap<String, Channel>
存储在线连接的userId与channel的对应管理package com.study.websocket.demo1.config; import io.netty.channel.Channel; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; import org.springframework.context.annotation.Configuration; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; /** * Netty配置类 * * @author 启年 * @date 2023-05-12 19:32 */ @Configuration public class NettyConfig { /** * 存储所有在线的客户端Channel */ private static final ChannelGroup onlineChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); /** * 存储所有在线的UserId与之对应的Channel */ private static final ConcurrentMap<String, Channel> onlineUserChannelMap = new ConcurrentHashMap<>(); /** * 获取所有在线的客户端Channel */ public static ChannelGroup getOnlineChannelGroup() { return onlineChannelGroup; } /** * 获取所有在线的UserId与之对应的Channel */ public static ConcurrentMap<String, Channel> getOnlineUserChannelMap() { return onlineUserChannelMap; } }
WebSocketIdleStateHandler
WebSocketIdleStateHandler
代码package com.study.websocket.demo1.handler; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.TimeUnit; /** * WebSocket服务端的心跳检测 * * @author 启年 * @date 2023-05-14 10:05 */ @Slf4j public class WebSocketIdleStateHandler extends IdleStateHandler { /** * 默认的读空闲时间 */ private static final int DEFAULT_READER_IDLE_TIME = 5; /** * 默认30秒读空闲断开客户端 */ public WebSocketIdleStateHandler() { super(DEFAULT_READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS); } /** * 指定心跳时间(秒) * * @param readerIdleTimeSeconds 读空闲时间 * @param writerIdleTimeSeconds 写空闲时间 * @param allIdleTimeSeconds 读写空闲时间 */ public WebSocketIdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) { super(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds, TimeUnit.SECONDS); } /** * 指定心跳时间及时间单位 * * @param readerIdleTime 读空闲时间 * @param writerIdleTime 写空闲时间 * @param allIdleTime 读写空闲时间 * @param unit 时间单位 */ public WebSocketIdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) { super(readerIdleTime, writerIdleTime, allIdleTime, unit); } /** * 当空闲事件触发时执行 */ @Override protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception { //如果是读空闲 if (evt.state().equals(IdleState.READER_IDLE)) { Channel channel = ctx.channel(); log.debug("服务端未检测到客户端【{}】的心跳包,强制关闭客户端!", channel.id()); channel.close(); } super.channelIdle(ctx,evt); } }
WebSocketTextHandler
WebSocketTextHandler
代码package com.study.websocket.demo1.handler; import com.alibaba.fastjson.JSON; import com.study.websocket.demo1.config.NettyConfig; import com.study.websocket.demo1.entity.HeartbeatMessage; import com.study.websocket.demo1.entity.RegisterMessage; import com.study.websocket.demo1.entity.TextMessage; import com.study.websocket.demo1.enums.MessageTypeEnum; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.util.Attribute; import io.netty.util.AttributeKey; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import java.util.Arrays; import java.util.Objects; /** * 处理WbeSocket的文本消息 * * @author 启年 * @date 2023-05-12 20:04 */ @Slf4j @Component @ChannelHandler.Sharable public class WebSocketTextHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { /** * 存储在 Channel 中的 attr 属性名(存储用户Id) */ public static final String USER_ID = "userId"; public WebSocketTextHandler() { log.debug("WebSocketTextHandler 启动..."); } /** * 在新的 Channel 被添加到 ChannelPipeline 中时被调用。这通常发生在连接建立时,即 Channel 已经被成功绑定并注册到 EventLoop 中。 * 在连接建立时被调用一次 */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); //将新连接的客户端Channel存储起来 NettyConfig.getOnlineChannelGroup().add(channel); log.debug("新客户端建立链接 --> {},在线用户数量:{}", channel.id(), NettyConfig.getOnlineChannelGroup().size()); } /** * 在 WebSocket 连接断开时,Netty 会自动触发 channelInactive 事件,并将该事件交给事件处理器进行处理。 * 在 channelInactive 事件的处理过程中,会调用 handlerRemoved 方法,用于进行一些资源释放等操作,确保 WebSocket 连接正常断开。 */ @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { //移除断开的客户端的Channel Channel channel = ctx.channel(); cleanChannel(channel); log.debug("客户端断开链接 --> {},在线用户数量:{}", channel.id(), NettyConfig.getOnlineChannelGroup().size()); } /** * 处理客户端非正常断开(WebSocket 连接发生异常时调用) */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //获取断开连接的客户端的Channel Channel channel = ctx.channel(); //移除断开的客户端的Channel cleanChannel(channel); log.debug("客户端异常断开 --> {},在线用户数量:{}", channel.id(), NettyConfig.getOnlineChannelGroup().size()); //当发生异常时,手动关闭Channel channel.close(); } /** * 当 Channel 的连接建立并准备好接收数据时被调用。这意味着连接已经成功建立,可以开始发送和接收数据了。 * 在每次连接激活时被调用 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } /** * 当接收到前端发送的WebSocket时处理 */ @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { //接收到的文本信息的二进制形式 ByteBuf content = msg.content(); //接收到的文本信息 String text = msg.text(); //获取到该条消息的标识,前端的字段必须后端的枚举名大小写一致 //根据消息类型进行不同业务处理 String type = JSON.parseObject(text).get("type").toString(); MessageTypeEnum messageTypeEnum = MessageTypeEnum.valueOf(type); //普通文本消息 if (MessageTypeEnum.TEXT.compareTo(messageTypeEnum) == 0) { //发送消息 sendMsg(text); } else if (MessageTypeEnum.HEARTBEAT.compareTo(messageTypeEnum) == 0) { HeartbeatMessage heartbeatMessage = JSON.parseObject(text, HeartbeatMessage.class); String userId = heartbeatMessage.getUserId(); //接收到客户端的心跳 log.debug("来自【{}】的心跳", userId); } else if (MessageTypeEnum.REGISTER.compareTo(messageTypeEnum) == 0) { //注册 register(ctx, text); } } /** * 将连接的客户端注册到服务端中 * * @param ctx * @param text */ private void register(ChannelHandlerContext ctx, String text) { RegisterMessage registerMessage = JSON.parseObject(text, RegisterMessage.class); String userId = registerMessage.getUserId(); //注册客户端 //给 Channel 绑定一个存储 UserId 的 AttributeKey Channel channel = ctx.channel(); //设置一个名为 userId 的 AttributeKey AttributeKey<Object> userIdKey = AttributeKey.valueOf("userId"); //将 Channel 的 attr 设置一个名为 userId channel //在 Channel 中寻找名为 userIdKey 的 AttributeKey .attr(userIdKey) //给这个 AttributeKey 设置值 .set(userId); //当自定义属性在属性集合中不存在时才进行添加 //.setIfAbsent(userId); //将UserId与Channel建立联系 NettyConfig.getOnlineUserChannelMap().put(userId, channel); log.debug("在线用户 --> {}", NettyConfig.getOnlineUserChannelMap().keySet()); //通知所有用户都上线了 NettyConfig.getOnlineChannelGroup().writeAndFlush(new TextWebSocketFrame( "用户:【" + userId + "】上线啦!" )); } /** * 给指定的用户发送消息 * * @param textMessageJson */ private void sendMsg(String textMessageJson) { //获取接收到的消息的实体类 TextMessage textMessage = JSON.parseObject(textMessageJson, TextMessage.class); String userId = textMessage.getUserId(); String userMsg = textMessage.getMsg(); String receiver = textMessage.getReceiver(); //给指定的用户发送消息 Channel receiverChannel = NettyConfig.getOnlineUserChannelMap().get(receiver); if (Objects.nonNull(receiverChannel)) { //TODO 这里可以设计为结构化的数据,以返回JSON数据便于解析 receiverChannel.writeAndFlush(new TextWebSocketFrame(userId + ":" + userMsg)); } log.debug("用户【{}】给【{}】发送的消息:{}", userId, receiver, userMsg); //TODO 服务端给客户端回复消息(可以设计为失败时返回) //channel.writeAndFlush(new TextWebSocketFrame("服务端已接收到消息")); } /** * 删除断开连接的客户端在程序中的数据 * * @param channel 断开连接的客户端的 Channel */ private void cleanChannel(Channel channel) { //获取客户端 Channel 中存储的名为 userId 的 Attribute Attribute<String> userIdKey = channel.attr(AttributeKey.valueOf(USER_ID)); String userId = userIdKey.get(); //从 ChannelGroup 中移除断开的 Channel NettyConfig.getOnlineChannelGroup().remove(channel); //从 Map 中移除 UserId 与 Channel 的对照关系 NettyConfig.getOnlineUserChannelMap().remove(userId); //通知所有用户某用户下线了 NettyConfig.getOnlineChannelGroup().writeAndFlush(new TextWebSocketFrame( "用户:【" + userId + "】下线啦!" )); } /** * 检查给定的字符串是否不是空串、空格、null * * @param strs 需要检查的字符串 */ private boolean checkHasText(String... strs) { return Arrays.stream(strs).sequential().allMatch(StringUtils::hasText); } }
HeartbeatMessage
package com.study.websocket.demo1.entity; import lombok.Data; import java.io.Serializable; /** * @author 启年 * @date 2023-05-14 13:39 */ @Data public class HeartbeatMessage implements Serializable { private static final long serialVersionUID = 1290124171105321491L; /** * 发送心跳消息的用户Id */ private String userId; }
RegisterMessage
package com.study.websocket.demo1.entity; import lombok.Data; import java.io.Serializable; /** * @author 启年 * @date 2023-05-14 13:40 */ @Data public class RegisterMessage implements Serializable { private static final long serialVersionUID = -4953615574208683170L; /** * 注册到服务端的用户Id */ private String userId; }
TextMessage
package com.study.websocket.demo1.entity; import lombok.Data; import java.io.Serializable; /** * 文本消息实体类 * * @author 启年 * @date 2023-05-12 20:16 */ @Data public class TextMessage implements Serializable { private static final long serialVersionUID = -4851870722684661727L; /** * 发送消息的用户Id */ private String userId; /** * 消息的接收者 */ private String receiver; /** * 用户发送的消息 */ private String msg; }
User
package com.study.websocket.demo1.entity; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import java.io.Serializable; /** * @author 启年 * @date 2023-05-13 12:47 */ @Data @AllArgsConstructor @NoArgsConstructor public class User implements Serializable { private static final long serialVersionUID = 3147392908880170895L; /** * 用户Id */ private String userId; /** * 用户名 */ private String username; }
MessageTypeEnum
package com.study.websocket.demo1.enums; /** * 消息类型枚举 * * @author 启年 * @date 2023-05-14 13:36 */ public enum MessageTypeEnum { TEXT("普通文本消息"), HEARTBEAT("心跳数据"), REGISTER("注册数据"); MessageTypeEnum(String desc) { } }
ISendMessageService
ISendMessageService
代码package com.study.websocket.demo1.service; /** * @author 启年 * @date 2023-05-12 */ public interface ISendMessageService { /** * 根据 UserId 将信息发送给指定的用户 * * @param userId 发送消息的用户Id * @param receiver 接收消息的用户Id * @param msg 要发送的消息 */ void sendMsgToUserByUserId(String userId, String receiver, String msg); /** * 给所有的在线用户发送消息 * * @param msg 要发送的消息 */ void sendMsgToGroup(String msg); }
ISendMessageService
实现类:SendMessageService
package com.study.websocket.demo1.service.impl; import com.study.websocket.demo1.config.NettyConfig; import com.study.websocket.demo1.service.ISendMessageService; import io.netty.channel.Channel; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import java.util.Objects; /** * @author 启年 * @date 2023-05-12 23:31 */ @Slf4j @Service public class SendMessageService implements ISendMessageService { @Override public void sendMsgToUserByUserId(String userId, String receiver, String msg) { //根据UserId获取对应的Channel Channel channel = NettyConfig.getOnlineUserChannelMap().get(receiver); if (Objects.isNull(channel)) { throw new RuntimeException("UserId:" + receiver + "不存在"); } TextWebSocketFrame textWebSocketFrame = new TextWebSocketFrame(userId + ":" + msg); //将消息发送给指定的用户 channel.writeAndFlush(textWebSocketFrame); log.debug(textWebSocketFrame.text()); } @Override public void sendMsgToGroup(String msg) { //给所有在线的用户发送消息 NettyConfig.getOnlineChannelGroup().writeAndFlush(new TextWebSocketFrame(msg)); } }
SendMsgController
package com.study.websocket.demo1.controller; import com.study.websocket.demo1.service.ISendMessageService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import javax.servlet.http.HttpServletResponse; import java.util.HashMap; import java.util.Map; /** * @author 启年 * @date 2023-05-12 23:33 */ @RestController @RequestMapping("/send") public class SendMsgController { @Autowired private ISendMessageService sendMessageService; /** * 单发消息:根据UserId给某个用户发送消息 */ @PostMapping("/user") public Map<String, Object> sendMsgToUserByUserId( @RequestParam("userId") String userId, @RequestParam("receiver") String receiver, @RequestParam("msg") String msg) { sendMessageService.sendMsgToUserByUserId(userId, receiver,msg); Map<String, Object> response = new HashMap<>(); response.put("code", HttpServletResponse.SC_OK); response.put("msg", "给" + userId + "的消息发送成功"); return response; } /** * 群发消息:给所有在线的客户端发送消息 */ @PostMapping("/group") public Map<String, Object> sendMsgToGroup(@RequestParam("msg") String msg) { sendMessageService.sendMsgToGroup(msg); Map<String, Object> response = new HashMap<>(); response.put("code", HttpServletResponse.SC_OK); response.put("msg", "群发消息成功"); return response; } }
UserController
package com.study.websocket.demo1.controller; import com.study.websocket.demo1.config.NettyConfig; import com.study.websocket.demo1.entity.User; import org.springframework.web.bind.annotation.CrossOrigin; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * @author 启年 * @date 2023-05-13 12:45 */ @RestController @RequestMapping("/user") public class UserController { /** * 返回在线的UserId */ @CrossOrigin(originPatterns = {"http://localhost:8081","http://sso.server.com:9999","http://10.40.129.179:8081"}) @GetMapping("/online/list") public Map<String, Object> onlineList() { Map<String, Object> response = new HashMap<>(); List<User> list = new ArrayList<>(); NettyConfig.getOnlineUserChannelMap().forEach((key, value) -> { User user = new User(key, key); list.add(user); }); response.put("code", 200); response.put("msg", "success"); response.put("data", list); return response; } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。