赞
踩
最近项目中需要用到长连接服务,特地整合Netty+Websocket。我们系统需要给用户主动推送订单消息,还有强制用户下线的功能也需要长连接来推送消息
Netty的介绍就看这里:https://www.jianshu.com/p/b9f3f6a16911
必须要理解到一些基础概念,什么是BIO,NIO,AIO,什么是多路复用,什么是Channel(相当于一个连接),什么是管道等等概念。
环境:
依赖:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.25.Final</version>
</dependency>
这里有个简易版的Demo: https://github.com/MistraR/netty-websocket.git
*会包含部分业务代码
项目结构:
WebSocketServer
@Component @Slf4j public class WebSocketServer { /** * 主线程组 负责接收请求 */ private EventLoopGroup mainGroup; /** * 从线程组 负责处理请求 这里的主从线程组就是典型的多路复用思想 */ private EventLoopGroup subGroup; /** * 启动器 */ private ServerBootstrap server; /** * 某个操作完成时(无论是否成功)future将得到通知。 */ private ChannelFuture future; /** * 单例WbSocketServer */ private static class SingletonWsServer { static final WebSocketServer instance = new WebSocketServer(); } public static WebSocketServer getInstance() { return SingletonWsServer.instance; } public WebSocketServer() { mainGroup = new NioEventLoopGroup(); subGroup = new NioEventLoopGroup(); server = new ServerBootstrap(); server.group(mainGroup, subGroup) .channel(NioServerSocketChannel.class) .childHandler(new WwbSocketServerInitialize());//自定义的初始化类,注册管道内的处理器 } public void start() { this.future = server.bind(8088); log.info("| Netty WebSocket Server 启动完毕,监听端口:8088 | ------------------------------------------------------ |"); } }
WwbSocketServerInitialize
每一个请求到服务的连接都会被这些注册了的处理类(Handler)处理一次,类似于拦截器,相当于一个商品要经过一次流水线,要被流水线上的工人加工一道工序。
public class WwbSocketServerInitialize extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //定义管道------------------------------------------------ ChannelPipeline pipeline = socketChannel.pipeline(); //定义管道中的众多处理器 //HTTP的编解码处理器 HttpRequestDecoder, HttpResponseEncoder pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new ChunkedWriteHandler()); // 对httpMessage进行聚合,聚合成FullHttpRequest或FullHttpResponse pipeline.addLast(new HttpObjectAggregator(1024 * 64)); // 增加心跳支持 // 针对客户端,如果在1分钟时没有向服务端发送读写心跳(ALL),则主动断开 pipeline.addLast(new IdleStateHandler(60, 60, 60)); pipeline.addLast(new HeartBeatHandler());//自定义的心跳处理器 // ====================== 以下是支持httpWebsocket ====================== /** * websocket 服务器处理的协议,用于指定给客户端连接访问的路由 : /ws * 对于websocket来讲,都是以frames进行传输的,不同的数据类型对应的frames也不同 */ pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); // 自定义的业务处理handler pipeline.addLast(new NoMaybeHandler()); } }
HeartBeatHandler
心跳支持,如果服务端一段时间没收到客户端的心跳,主动断开连接,避免资源浪费。
@Slf4j public class HeartBeatHandler extends ChannelInboundHandlerAdapter { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { // 判断evt是否是IdleStateEvent(用于触发用户事件,包含 读空闲/写空闲/读写空闲 ) if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { log.info("进入读空闲..."); } else if (event.state() == IdleState.WRITER_IDLE) { log.info("进入写空闲..."); } else if (event.state() == IdleState.ALL_IDLE) { log.info("关闭无用的Channel,以防资源浪费。Channel Id:{}", ctx.channel().id()); Channel channel = ctx.channel(); channel.close(); UserChannelRelation.remove(channel); log.info("Channel关闭后,client的数量为:{}", NoMaybeHandler.clients.size()); } } } }
最关键的业务处理Handler - NoMaybeHandler
结合自己的业务需求,对请求到服务器的消息进行业务处理。
@Slf4j public class NoMaybeHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { /** * 管理所有客户端的channel通道 */ public static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception { //获取客户端传输过来的消息 String content = textWebSocketFrame.text(); Channel currentChannel = channelHandlerContext.channel(); try { //将消息转换成pojo WsDataContent wsDataContent = JacksonUtils.stringToObject(content, WsDataContent.class); if (wsDataContent == null) { throw new RuntimeException("连接请求参数错误!"); } Integer action = wsDataContent.getAction(); String msgId = wsDataContent.getMsgId(); //判断消息类型,根据不同的类型来处理不同的业务 if (action.equals(MsgActionEnum.CONNECT.type)) { //当Websocket第一次建立的时候,初始化Channel,把Channel和userId关联起来 UserWebsocketSalt userWebsocketSalt = wsDataContent.getSalt(); if (userWebsocketSalt == null || userWebsocketSalt.getUserId() == null) { //主动断开连接 writeAndFlushResponse(MsgActionEnum.BREAK_OFF.type, msgId, createKickMsgBody(), currentChannel); //currentChannel.close(); return; } String userId = userWebsocketSalt.getUserId(); //我们用loginLabel 标签结合长连接消息来做单点登录,踢设备下线,可以忽略中间的业务代码,这里主要是处理将userId于Channel绑定,存在Map中 -》UserChannelRelation.put(userId, currentChannel) String loginLabel = userWebsocketSalt.getLoginLabel(); Channel existChannel = UserChannelRelation.get(userId); if (existChannel != null) { //存在当前用户的连接,验证登录标签 LinkUserService linkUserService = (LinkUserService) SpringUtil.getBean("linkUserServiceImpl"); if (linkUserService.checkUserLoginLabel(userId, loginLabel)) { //是同一次登录标签,加入新连接,关闭旧的连接 UserChannelRelation.put(userId, currentChannel); writeAndFlushResponse(MsgActionEnum.BREAK_OFF.type, null, createKickMsgBody(), existChannel); writeAndFlushResponse(MsgActionEnum.MESSAGE_SIGN.type, msgId, null, currentChannel); //existChannel.close(); } else { //不是同一次登录标签,拒绝连接 writeAndFlushResponse(MsgActionEnum.BREAK_OFF.type, null, createKickMsgBody(), currentChannel); //currentChannel.close(); } } else { UserChannelRelation.put(userId, currentChannel); writeAndFlushResponse(MsgActionEnum.MESSAGE_SIGN.type, msgId, null, currentChannel); } } else if (action.equals(MsgActionEnum.KEEPALIVE.type)) { //心跳类型的消息 log.info("收到来自Channel为{}的心跳包......", currentChannel); writeAndFlushResponse(MsgActionEnum.MESSAGE_SIGN.type, msgId, null, currentChannel); } else { throw new RuntimeException("连接请求参数错误!"); } } catch (Exception e) { log.debug("当前连接出错!关闭当前Channel!"); closeAndRemoveChannel(currentChannel); } } /** * 响应客户端 */ public static void writeAndFlushResponse(Integer action, String msgId, Object data, Channel channel) { WsDataContent wsDataContent = new WsDataContent(); wsDataContent.setAction(action); wsDataContent.setMsgId(msgId); wsDataContent.setData(data); channel.writeAndFlush(new TextWebSocketFrame(JSONObject.toJSONString(wsDataContent))); } /** * 构建强制下线消息体 * * @return */ public static PushMessageData createKickMsgBody() { PushMessageData pushMessageData = new PushMessageData(); pushMessageData.setMsgType(MessageEnums.MsgTp.ClientMsgTp.getId()); pushMessageData.setMsgVariety(MessageEnums.ClientMsgTp.FORCED_OFFLINE.getCode()); pushMessageData.setTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli()); pushMessageData.setMsgBody(null); return pushMessageData; } /** * 构建派单消息体 * * @return */ public static PushMessageData createDistributeOrderMsgBody(String orderId) { PushMessageData pushMessageData = new PushMessageData(); pushMessageData.setMsgType(MessageEnums.MsgTp.OrderMsgTp.getId()); pushMessageData.setMsgVariety(MessageEnums.OrderMsgTp.PUSH_CODE_ORDER_ROB.getCode()); pushMessageData.setTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli()); MsgBodyVO msgBodyVO = new MsgBodyVO(orderId); pushMessageData.setMsgBody(msgBodyVO); return pushMessageData; } /** * 当客户端连接服务端之后(打开连接) * 获取客户端的channel,并且放到ChannelGroup中去进行管理 */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { log.info("客户端建立连接,Channel Id为:{}", ctx.channel().id().asShortText()); clients.add(ctx.channel()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { //当触发handlerRemoved,ChannelGroup会自动移除对应客户端的channel Channel channel = ctx.channel(); clients.remove(channel); UserChannelRelation.remove(channel); log.info("客户端断开连接,Channel Id为:{}", channel.id().asShortText()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //发生异常之后关闭连接(关闭channel),随后从ChannelGroup中移除 Channel channel = ctx.channel(); cause.printStackTrace(); channel.close(); clients.remove(channel); UserChannelRelation.remove(channel); log.info("连接发生异常,Channel Id为:{}", channel.id().asShortText()); } /** * 关闭Channel * * @param channel */ public static void closeAndRemoveChannel(Channel channel) { channel.close(); clients.remove(channel); } }
UserChannelRelation
Map存储userId于Channel的对应关系
public class UserChannelRelation { private static Logger logger = LoggerFactory.getLogger(UserChannelRelation.class); private static HashMap<String, Channel> manager = new HashMap<>(); public static void put(String userId, Channel channel) { manager.put(userId, channel); } public static Channel get(String userId) { return manager.get(userId); } public static void remove(String userId) { manager.remove(userId); } public static void output() { for (HashMap.Entry<String, Channel> entry : manager.entrySet()) { logger.info("UserId:{},ChannelId{}", entry.getKey(), entry.getValue().id().asLongText()); } } /** * 移除Channel * * @param channel */ public static void remove(Channel channel) { for (Map.Entry<String, Channel> entry : manager.entrySet()) { if (entry.getValue().equals(channel)) { manager.remove(entry.getKey()); } } } }
消息类型枚举MsgActionEnum
public enum MsgActionEnum { /** * Websocket消息类型,WsDataContent.action */ CONNECT(1, "客户端初始化建立连接"), KEEPALIVE(2, "客户端保持心跳"), MESSAGE_SIGN(3, "客户端连接请求-服务端响应-消息签收"), BREAK_OFF(4, "服务端主动断开连接"), BUSINESS(5, "服务端主动推送业务消息"), SEND_TO_SOMEONE(9, "发送消息给某人(用于通信测试)"); public final Integer type; public final String content; MsgActionEnum(Integer type, String content) { this.type = type; this.content = content; } public Integer getType() { return type; } }
消息体WsDataContent
@Data public class WsDataContent implements Serializable { private static final long serialVersionUID = 5128306466491454779L; /** * 消息类型 */ private Integer action; /** * msgId */ private String msgId; /** * 发起连接需要的参数 */ private UserWebsocketSalt salt; /** * data */ private Object data; }
UserWebsocketSalt
客户端简历连接是需要提供的参数,userId
@Data
public class UserWebsocketSalt {
/**
* userId
*/
private String userId;
/**
* loginLabel 当前登录标签
*/
private String loginLabel;
}
每一次请求都会经过channelRead0方法的处理,将前端传回来的消息—我们这里是约定好的Json字符串,转换为对应的实体类,然后进行业务操作。
最后让Websocket服务随应用启动NettyNIOServer
@Component
public class NettyNIOServer implements ApplicationListener<ContextRefreshedEvent> {
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
try {
WebSocketServer.getInstance().start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
有部分业务代码没有贴上来,不影响。
这里有个简易版的Demo: https://github.com/MistraR/netty-websocket.git
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。