赞
踩
websocket就是通过服务器向客户端推送消息,客户端也可以主动向服务器发送消息,是真正的双向平等对话,是一种长连接,只需要通过一次请求进行初始化。
1.websocket是一个持久化协议,通过一次http request建立连接后,后续不需要重新发送httprequest。2.websocket是双向通信的连接,一个tcp连接上既可以发送,也可以接收。
3.具有多路复用功能,几个不同的url用同一个websocket连接,类似/websocket/{userid},不同的user推送消息。
以下为协议机制:
在http协议握手中,多了key,客户端请求时添加key,服务器根据加密解密方式返回accept,则成功建立。
发送消息可以采用以下方法:
session.getBasicRemote().sendText(message, false);
session.getBasicRemote().sendBinary(data);
session.getBasicRemote().sendText(message, true);
要实现服务端推送消息至客户端,并且支持客户端对用户点对点消息发送的社交功能。服务端给客户端推送消息,可以选择原生的WebSocket,或者更加高级的Netty框架实现。
使用netty,因为一款好的框架一般都是在原生的基础上进行包装成更加实用方便,很多我们需要自己考虑的问题都基本可以不用去考虑,不过此文不会去讲netty有多么的高深莫测,因为这些概念性的东西随处可见,而是通过实战来达到推送消息的目的。
整合Netty和WebSocket。我们需要使用netty对接websocket连接,实现双向通信,这一步需要有服务端的netty程序,用来处理客户端的websocket连接操作,例如建立连接,断开连接,收发数据等。
后端为netty框架,前端为websocket
前端使用WebSocket与服务端创建连接的时候,将用户ID传给服务端,服务端将用户ID与channel关联起来存储,同时将channel放入到channel组中。
如果需要给所有用户发送消息,直接执行channel组的writeAndFlush()方法;
如果需要给指定用户发送消息,根据用户ID查询到对应的channel,然后执行writeAndFlush()方法;
前端获取到服务端推送的消息之后,将消息内容展示到文本域中
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.33.Final</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.2.3</version>
</dependency>
server:
port: 8899
#netty的配置信息(端口号,webSocket路径)
webSocket:
netty:
port: 58080
path: /webSocket
readerIdleTime: 30 #读空闲超时时间设置(Netty心跳检测配置)
writerIdleTime: 30 #写空闲超时时间设置(Netty心跳检测配置)
allIdleTime: 30 #读写空闲超时时间设置(Netty心跳检测配置)
创建单例channel组,管理所有通道,定义一个map,管理用户和通道的关系
import io.netty.channel.Channel; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; import java.util.concurrent.ConcurrentHashMap; /** * NettyConfig类 * * @author hs * @date 2021-09-18 */ public class NettyConfig { /** * 定义一个channel组,管理所有的channel * GlobalEventExecutor.INSTANCE 是全局的事件执行器,是一个单例 */ private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); /** * 存放用户与Chanel的对应信息,用于给指定用户发送消息 */ private static ConcurrentHashMap<String,Channel> userChannelMap = new ConcurrentHashMap<>(); private NettyConfig() {} /** * 获取channel组 * @return */ public static ChannelGroup getChannelGroup() { return channelGroup; } /** * 获取用户channel map * @return */ public static ConcurrentHashMap<String,Channel> getUserChannelMap(){ return userChannelMap; } }
定义两个EventLoopGroup,bossGroup辅助客户端的tcp连接请求, workGroup负责与客户端之间的读写操作,需要说明的是,需要开启一个新的线程来执行netty server,要不然会阻塞主线程,到时候就无法调用项目的其他controller接口了
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.codec.serialization.ObjectEncoder; import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.handler.timeout.IdleStateHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.net.InetSocketAddress; import java.util.concurrent.TimeUnit; /** * Netty初始化服务 * * @author hs */ @Component public class NettyServer{ private static final Logger log = LoggerFactory.getLogger(NettyServer.class); /** * webSocket协议名 */ private static final String WEBSOCKET_PROTOCOL = "WebSocket"; /** * 端口号 */ @Value("${webSocket.netty.port}") private int port; /** * webSocket路径 */ @Value("${webSocket.netty.path}") private String webSocketPath; /** * 在Netty心跳检测中配置 - 读空闲超时时间设置 */ @Value("${webSocket.netty.readerIdleTime}") private long readerIdleTime; /** * 在Netty心跳检测中配置 - 写空闲超时时间设置 */ @Value("${webSocket.netty.writerIdleTime}") private long writerIdleTime; /** * 在Netty心跳检测中配置 - 读写空闲超时时间设置 */ @Value("${webSocket.netty.allIdleTime}") private long allIdleTime; @Autowired private WebSocketHandler webSocketHandler; private EventLoopGroup bossGroup; private EventLoopGroup workGroup; /** * 启动 * @throws InterruptedException */ private void start() throws InterruptedException { bossGroup = new NioEventLoopGroup(); workGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); // bossGroup辅助客户端的tcp连接请求, workGroup负责与客户端之前的读写操作 bootstrap.group(bossGroup,workGroup); // 设置NIO类型的channel bootstrap.channel(NioServerSocketChannel.class); // 设置监听端口 bootstrap.localAddress(new InetSocketAddress(port)); // 连接到达时会创建一个通道 bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // 心跳检测(一般情况第一个设置,如果超时了,则会调用userEventTriggered方法,且会告诉你超时的类型) ch.pipeline().addLast(new IdleStateHandler(readerIdleTime, writerIdleTime, allIdleTime, TimeUnit.MINUTES)); // 流水线管理通道中的处理程序(Handler),用来处理业务 // webSocket协议本身是基于http协议的,所以这边也要使用http编解码器 ch.pipeline().addLast(new HttpServerCodec()); ch.pipeline().addLast(new ObjectEncoder()); // 以块的方式来写的处理器 ch.pipeline().addLast(new ChunkedWriteHandler()); /* 说明: 1、http数据在传输过程中是分段的,HttpObjectAggregator可以将多个段聚合 2、这就是为什么,当浏览器发送大量数据时,就会发送多次http请求 */ ch.pipeline().addLast(new HttpObjectAggregator(8192)); /* 说明: 1、对应webSocket,它的数据是以帧(frame)的形式传递 2、浏览器请求时 ws://localhost:58080/xxx 表示请求的uri 3、核心功能是将http协议升级为ws协议,保持长连接 */ ch.pipeline().addLast(new WebSocketServerProtocolHandler(webSocketPath, WEBSOCKET_PROTOCOL, true, 65536 * 10)); // 自定义的handler,处理业务逻辑 ch.pipeline().addLast(webSocketHandler); } }); // 配置完成,开始绑定server,通过调用sync同步方法阻塞直到绑定成功 ChannelFuture channelFuture = bootstrap.bind().sync(); log.info("Server started and listen on:{}",channelFuture.channel().localAddress()); // 对关闭通道进行监听 channelFuture.channel().closeFuture().sync(); } /** * 释放资源 * @throws InterruptedException */ @PreDestroy public void destroy() throws InterruptedException { if(bossGroup != null){ bossGroup.shutdownGracefully().sync(); } if(workGroup != null){ workGroup.shutdownGracefully().sync(); } } /** * 初始化(新线程开启) */ //PostConstruct为spring初始加载的一个注解 @PostConstruct() public void init() { //需要开启一个新的线程来执行netty server 服务器 new Thread(() -> { try { start(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } }
注意:启动方法需要开启一个新线程执行netty server服务,服务中配置了IdleStateHandler心跳检测,此类要在创建一个通道的第一个设置,如果超时了,则会调用userEventTriggered方法,且会告诉你超时的类型)
创建Netty配置的操作执行类WebSocketHandler,userEventTriggered为心跳检测超时所调用的方法,超时后ctx.channel().close()执行完毕会主动调用handlerRemoved删除通道及用户信息
import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import io.netty.channel.*; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.AttributeKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; /** * 操作执行类 * * TextWebSocketFrame类型,表示一个文本帧 * @author hs */ @Component @ChannelHandler.Sharable public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { private static final Logger log = LoggerFactory.getLogger(WebSocketHandler.class); /** * 一旦连接,第一个被执行 * @param ctx * @throws Exception */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { log.info("handlerAdded 被调用"+ctx.channel().id().asLongText()); // 添加到channelGroup 通道组 NettyConfig.getChannelGroup().add(ctx.channel()); } /** * 读取数据 */ @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { // 获取用户ID,关联channel JSONObject jsonObject = JSONUtil.parseObj(msg.text()); String uid = jsonObject.getStr("uid"); // 当用户ID已存入通道内,则不进行写入,只有第一次建立连接时才会存入,其他情况发送uid则为心跳需求 if(!NettyConfig.getUserChannelMap().containsKey(uid)){ log.info("服务器收到消息:{}",msg.text()); NettyConfig.getUserChannelMap().put(uid,ctx.channel()); // 将用户ID作为自定义属性加入到channel中,方便随时channel中获取用户ID AttributeKey<String> key = AttributeKey.valueOf("userId"); ctx.channel().attr(key).setIfAbsent(uid); // 回复消息 ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器连接成功!")); }else{ // 前端定时请求,保持心跳连接,避免服务端误删通道 ctx.channel().writeAndFlush(new TextWebSocketFrame("keep alive success!")); } } /** * 移除通道及关联用户 * @param ctx * @throws Exception */ @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { log.info("handlerRemoved 被调用"+ctx.channel().id().asLongText()); // 删除通道 NettyConfig.getChannelGroup().remove(ctx.channel()); removeUserId(ctx); } /** * 异常处理 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.info("异常:{}",cause.getMessage()); // 删除通道 NettyConfig.getChannelGroup().remove(ctx.channel()); removeUserId(ctx); ctx.close(); } /** * 心跳检测相关方法 - 会主动调用handlerRemoved * @param ctx * @param evt * @throws Exception */ @Override public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception { if(evt instanceof IdleStateEvent){ IdleStateEvent event = (IdleStateEvent)evt; if(event.state() == IdleState.ALL_IDLE){ //清除超时会话 ChannelFuture writeAndFlush = ctx.writeAndFlush("you will close"); writeAndFlush.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { ctx.channel().close(); } }); } }else{ super.userEventTriggered(ctx, evt); } } /** * 删除用户与channel的对应关系 * @param ctx */ private void removeUserId(ChannelHandlerContext ctx){ AttributeKey<String> key = AttributeKey.valueOf("userId"); String userId = ctx.channel().attr(key).get(); NettyConfig.getUserChannelMap().remove(userId); log.info("删除用户与channel的对应关系,uid:{}",userId); } }
/** * 推送消息接口 * * @author hs */ public interface PushService { /** * 推送给指定用户 * @param userId 用户ID * @param msg 消息信息 */ void pushMsgToOne(String userId,String msg); /** * 推送给所有用户 * @param msg 消息信息 */ void pushMsgToAll(String msg); /** * 获取当前连接数 * @return 连接数 */ int getConnectCount(); }
import io.netty.channel.Channel; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import org.springframework.stereotype.Service; import java.util.concurrent.ConcurrentHashMap; /** * 推送消息接口实现类 * * @author hs */ @Service public class PushServiceImpl implements PushService { /** * 推送给指定用户 * @param userId 用户ID * @param msg 消息信息 */ @Override public void pushMsgToOne(String userId, String msg){ ConcurrentHashMap<String, Channel> userChannelMap = NettyConfig.getUserChannelMap(); Channel channel = userChannelMap.get(userId); channel.writeAndFlush(new TextWebSocketFrame(msg)); } /** * 推送给所有用户 * @param msg 消息信息 */ @Override public void pushMsgToAll(String msg){ NettyConfig.getChannelGroup().writeAndFlush(new TextWebSocketFrame(msg)); } /** * 获取当前连接数 * @return 连接数 */ @Override public int getConnectCount() { return NettyConfig.getChannelGroup().size(); } }
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; /** * 请求Controller(用于postman测试) * * @author hs */ @RestController @RequestMapping("/push") public class PushController { @Autowired private PushService pushService; /** * 推送给所有用户 * @param msg 消息信息 */ @PostMapping("/pushAll") public void pushToAll(@RequestParam("msg") String msg){ pushService.pushMsgToAll(msg); } /** * 推送给指定用户 * @param userId 用户ID * @param msg 消息信息 */ @PostMapping("/pushOne") public void pushMsgToOne(@RequestParam("userId") String userId,@RequestParam("msg") String msg){ pushService.pushMsgToOne(userId,msg); } /** * 获取当前连接数 */ @GetMapping("/getConnectCount") public int getConnectCout(){ return pushService.getConnectCount(); } }
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> <script src="/static/jquery-2.2.4.min.js" charset="utf-8"></script> </head> <body> <script> var socket; var userId = "123456"; // 判断当前浏览器是否支持webSocket if(window.WebSocket){ socket = new WebSocket("ws://127.0.0.1:58080/webSocket") // 相当于channel的read事件,ev 收到服务器回送的消息 socket.onmessage = function (ev) { var rt = document.getElementById("responseText"); rt.value = rt.value + "\n" + ev.data; } // 相当于连接开启 socket.onopen = function (ev) { var rt = document.getElementById("responseText"); rt.value = "连接开启了..." socket.send( JSON.stringify({ // 连接成功将,用户ID传给服务端 uid: userId }) ); } //接受到服务端关闭连接时的回调方法 socket.onclose = function (ev) { var rt = document.getElementById("responseText"); rt.value = rt.value + "\n" + "连接关闭了..."; } // 监听窗口事件,当窗口关闭时,主动断开websocket连接,防止连接没断开就关闭窗口,server端报错 window.onbeforeunload = function(){ socket.close(); } } else { alert("当前浏览器不支持webSocket") } // 如果前端需要保持连接,则需要定时往服务器针对自己发送请求,返回的参数和发送参数一致则证明时间段内有交互,服务端则不进行连接断开操作 var int = self.setInterval("clock()",10000); function clock() { socket.send( JSON.stringify({ // 连接成功将,用户ID传给服务端 uid: userId }) ); } </script> <form onsubmit="return false"> <textarea id="responseText" style="height: 150px; width: 300px;"></textarea> <br> <input type="button" value="清空内容" onclick="document.getElementById('responseText').value=''"> </form> </body> </html>
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。