赞
踩
websocket是一个通信协议,而netty是一个Java网络编程框架。我们可以利用netty实现websocket通信,也可以用其他的。
先来区分三者的含义
1.单工: 数据传输只允许在一个方向上的传输,只能一方来发送数据,另一方来接收数据并发送。例如:对讲机
2.半双工:数据传输允许两个方向上的传输,但是同一时间内,只可以有一方发送或接受消息。例如:打电话
3.全双工:同时可进行双向传输。例如:websocket
1.http1.0:单工。因为是短连接,客户端发起请求之后,服务端处理完请求并收到客户端的响应后即断开连接。
2.http1.1:半双工。默认开启长连接keep-alive,开启一个连接可发送多个请求。
3.http2.0:全双工,允许服务端主动向客户端发送数据。
WebSocket借助http协议进行握手,三次握手成功后,就会变身为TCP通道,从此与http不再相见。
在WebSocket API中,浏览器和服务器只需要完成一次握手(不是指建立TCP连接的那个三次握手,是指在建立TCP连接后传输一次握手数据),两者之间就直接可以创建持久性的连接,并进行双向数据传输
在HTTP/1.0中默认使用短连接。也就是说,客户端和服务器每进行一次HTTP操作,就建立一次连接,任务结束就中断连接。当客户端浏览器访问的某个HTML或其他类型的Web页中包含有其他的Web资源(如JavaScript文件、图像文件、CSS文件等),每遇到这样一个Web资源,浏览器就会重新建立一个HTTP会话。
而从HTTP/1.1起,默认使用长连接,用以保持连接特性。使用长连接的HTTP协议,会在响应头加入这行代码:
Connection:keep-alive
在使用长连接的情况下,当一个网页打开完成后,客户端和服务器之间用于传输HTTP数据的TCP连接不会关闭,客户端再次访问这个服务器时,会继续使用这一条已经建立的连接。Keep-Alive不会永久保持连接,它有一个保持时间,可以在不同的服务器软件(如Apache)中设定这个时间。实现长连接需要客户端和服务端都支持长连接。
Netty是建立在NIO基础之上,Netty在NIO之上又提供了更高层次的抽象。
在Netty里面,Accept连接可以使用单独的线程池去处理,读写操作又是另外的线程池来处理。
Accept连接和读写操作也可以使用同一个线程池来进行处理。而请求处理逻辑既可以使用单独的线程池进行处理,也可以跟放在读写线程一块处理。线程池中的每一个线程都是NIO线程。用户可以根据实际情况进行组装,构造出满足系统需求的高性能并发模型
如果不用netty,使用原生JDK的话,有如下问题:
1、API复杂
2、对多线程很熟悉:因为NIO涉及到Reactor模式
3、高可用的话:需要出路断连重连、半包读写、失败缓存等问题
4、JDK NIO的bug
而Netty来说,他的api简单、性能高而且社区活跃(dubbo、rocketmq等都使用了它)
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.33.Final</version>
</dependency>
@Component public class NettyServer { private static final Logger log = LoggerFactory.getLogger(NettyServer.class); /** * netty端口 */ private static int port = 8080; private static class SingletionWSServer { static final NettyServer instance = new NettyServer(); } public static NettyServer getInstance() { return SingletionWSServer.instance; } private EventLoopGroup mainGroup; private EventLoopGroup subGroup; private ServerBootstrap server; private ChannelFuture future; public NettyServer() { mainGroup = new NioEventLoopGroup(); subGroup = new NioEventLoopGroup(); server = new ServerBootstrap(); server.group(mainGroup, subGroup) .channel(NioServerSocketChannel.class) .childHandler(new NettyChannelInitializer()); } public void start() { this.future = server.bind(port); log.info("netty server server 启动完毕... port = "+port); } }
管道初始化
public class NettyChannelInitializer extends ChannelInitializer<SocketChannel>{ private static final Logger log = LoggerFactory.getLogger(NettyChannelInitializer.class); @Override protected void initChannel(SocketChannel ch) throws Exception { log.info(" 管道初始化...... "); ChannelPipeline pipeline = ch.pipeline(); // websocket 基于http协议,所以要有http编解码器 pipeline.addLast("HttpServerCodec",new HttpServerCodec()); // 对写大数据流的支持 pipeline.addLast(new ChunkedWriteHandler()); // 对httpMessage进行聚合,聚合成FullHttpRequest或FullHttpResponse // 几乎在netty中的编程,都会使用到此hanler pipeline.addLast(new HttpObjectAggregator(1024*64)); // 增加心跳支持 start // 针对客户端,如果在1分钟时没有向服务端发送读写心跳(ALL),则主动断开 // 如果是读空闲或者写空闲,不处理 pipeline.addLast(new IdleStateHandler(8, 10, 12)); // 自定义的空闲状态检测 pipeline.addLast(new NettyWsChannelInboundHandler()); // 以下是支持httpWebsocket /** * websocket 服务器处理的协议,用于指定给客户端连接访问的路由 : /ws * 本handler会帮你处理一些繁重的复杂的事 * 会帮你处理握手动作: handshaking(close, ping, pong) ping + pong = 心跳 * 对于websocket来讲,都是以frames进行传输的,不同的数据类型对应的frames也不同 */ pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); // 自定义的wshandler pipeline.addLast(new NettyWsChannelInboundHandler()); // 自定义 http pipeline.addLast(new NettyHttpChannelInboundHandler()); } }
处理消息的handler
public class NettyWsChannelInboundHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{ private static final Logger log = LoggerFactory.getLogger(NettyWsChannelInboundHandler.class); /** * 用于记录和管理所有客户端的channle */ public static ChannelGroup users = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); /** * 用户信息列表 */ public static List<FriendInfo> friendList = new ArrayList<FriendInfo>(); /** * 从channel缓冲区读数据 */ @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { // 获得 channel Channel currentChannel = ctx.channel(); // 获取客户端传输过来的消息 String content = msg.text(); // 1. 获取客户端发来的消息 DataContent dataContent = JsonUtils.jsonToPojo(content, DataContent.class); Integer action = dataContent.getAction(); // 2. 判断消息类型,根据不同的类型来处理不同的业务 if (action == MsgActionEnum.CONNECT.type) { // 2.1 当websocket 第一次open的时候,初始化channel,把用的channel和userid关联起来 String sendId = dataContent.getChatMsg().getSendId(); UserChannelRel.put(sendId, currentChannel); UserChannelRel.output(); } else if (action == MsgActionEnum.CHAT.type) { // 2.2 聊天类型的消息,把聊天记录保存到数据库,同时标记消息的签收状态[未签收] ImChatMsgLogsService imChatMsgLogsService = (ImChatMsgLogsService)SpringBeanUtil.getBean("imChatMsgLogsServiceImpl"); ChatMsg chatMsg = dataContent.getChatMsg(); String sendId = chatMsg.getSendId(); String receiveId = chatMsg.getReceiveId(); String msgContent = chatMsg.getMsgContent(); Integer mainUserId = chatMsg.getMainUserId(); Integer userId = chatMsg.getUserId(); // 保存消息到数据库,并且标记为 未签收 ImChatMsgLogs logs = new ImChatMsgLogs(); logs.setMainUserId(mainUserId); logs.setUserId(userId); logs.setSendId(sendId); logs.setReceiveId(receiveId); logs.setMsgContent(msgContent); logs.setToType(1); Integer msgId = imChatMsgLogsService.saveWebMsgLogs(logs); chatMsg.setMsgId(msgId.toString()); // 消息发送时间 chatMsg.setSendTime(new Date()); DataContent dataContentMsg = new DataContent(); dataContentMsg.setChatMsg(chatMsg); // 给自己发送成功消息 Channel sendIdChannel = UserChannelRel.get(sendId); sendIdChannel.writeAndFlush( new TextWebSocketFrame( JsonUtils.objectToJson(dataContent))); // 发送消息 从全局用户Channel关系中获取接受方的channel Channel receiverChannel = UserChannelRel.get(receiveId); if (receiverChannel == null) { // TODO channel为空代表用户离线,推送消息(JPush,个推,小米推送) 添加离线消息记录 log.info(" 用户离线1 ... receiverChannel 是 null"); imChatMsgLogsService.updateOfflineStatusTwo(msgId); } else { // 当receiverChannel不为空的时候,从ChannelGroup去查找对应的channel是否存在 Channel findChannel = users.find(receiverChannel.id()); if (findChannel != null) { // 用户在线 receiverChannel.writeAndFlush( new TextWebSocketFrame( JsonUtils.objectToJson(dataContent))); } else { // 用户离线 TODO 推送消息 添加离线消息记录 log.info(" 用户离线2 ... findChannel 是 null"); imChatMsgLogsService.updateOfflineStatusTwo(msgId); } } } else if (action == MsgActionEnum.SIGNED.type) { log.info(" 消息通知..... "); // 扩展字段在signed类型的消息中,代表需要去签收的消息id,逗号间隔 String msgIdsStr = dataContent.getExtand(); String msgIds[] = msgIdsStr.split(","); List<String> msgIdList = new ArrayList<>(); for (String mid : msgIds) { if (StringUtils.isNotBlank(mid)) { msgIdList.add(mid); } } if (msgIdList != null && !msgIdList.isEmpty() && msgIdList.size() > 0) { // 2.3 签收消息类型,针对具体的消息进行签收,修改数据库中对应消息的签收状态[已签收] // 批量签收 ImChatMsgLogsService imChatMsgLogsService = (ImChatMsgLogsService)SpringBeanUtil.getBean("imChatMsgLogsServiceImpl"); imChatMsgLogsService.updateMsgReadStatusOne(msgIdList); } } else if (action == MsgActionEnum.KEEPALIVE.type) { // 2.4 心跳类型的消息 log.info("收到来自channel为[" + currentChannel + "]的心跳包..."); }else if (action == MsgActionEnum.FRIEND_REQUEST.type) { // 好友申请 ChatMsg chatMsg = dataContent.getChatMsg(); String sendId = chatMsg.getSendId(); String receiveId = chatMsg.getReceiveId(); log.info("sendId = "+sendId +".... 好友申请.... receiveId =" +receiveId); }else if (action == MsgActionEnum.GROUP_MSG.type) { // 群消息发送 ImChatMsgLogsService imChatMsgLogsService = (ImChatMsgLogsService)SpringBeanUtil.getBean("imChatMsgLogsServiceImpl"); ChatMsg chatMsg = dataContent.getChatMsg(); String sendId = chatMsg.getSendId(); //String receiveId = chatMsg.getReceiveId(); String msgContent = chatMsg.getMsgContent(); Integer mainUserId = chatMsg.getMainUserId(); Integer userId = chatMsg.getUserId(); // 保存消息到数据库,并且标记为 未签收 ImChatMsgLogs logs = new ImChatMsgLogs(); logs.setMainUserId(mainUserId); logs.setUserId(userId); logs.setSendId(sendId); // 1 是测试群 logs.setGroupInfoId(1); //logs.setReceiveId(receiveId); logs.setMsgContent(msgContent); logs.setToType(2); Integer msgId = imChatMsgLogsService.saveWebMsgLogs(logs); chatMsg.setMsgId(msgId.toString()); // 消息发送时间 chatMsg.setSendTime(new Date()); DataContent dataContentMsg = new DataContent(); dataContentMsg.setChatMsg(chatMsg); // 给所有在线的 im用户 发送信息 for (Channel c : users) { c.writeAndFlush( new TextWebSocketFrame( JsonUtils.objectToJson(dataContent))); } // 更新消息状态为已读 log.info(" 群消息发送... users.size = "+users.size()); } } /** * 当客户端连接服务端之后(打开连接) * 获取客户端的channle,并且放到ChannelGroup中去进行管理 */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { users.add(ctx.channel()); log.info(" netty 获得连接..... "); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { String channelId = ctx.channel().id().asShortText(); // 当触发handlerRemoved,ChannelGroup会自动移除对应客户端的channel users.remove(ctx.channel()); log.info("客户端被移除,channelId为:" + channelId); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); // 发生异常之后关闭连接(关闭channel),随后从ChannelGroup中移除 ctx.channel().close(); users.remove(ctx.channel()); log.info(" netty 异常了...... "); } }
前端发起websocket
window.IM_CHAT = { socket: null, init: function() { if (window.WebSocket) { IM_CHAT.socket = new WebSocket(wsWebSocketUrl); IM_CHAT.socket.onopen = function() { $(".im-connect-status").text("im连接成功.."+imChatUser.user.nickname); IM_CHAT.chatConnect(); }, IM_CHAT.socket.onclose = function() { $(".im-connect-status").text("连接关闭.."+imChatUser.user.nickname); }, IM_CHAT.socket.onerror = function() { $(".im-connect-status").text("发生错误.."+imChatUser.user.nickname); }, IM_CHAT.socket.onmessage = function(e) { console.log("接受到消息:" + e.data); var dataContent = JSON.parse(e.data); var action = dataContent.action; var chatMsg = dataContent.chatMsg; var msgContent = chatMsg.msgContent; // 发送者id var sendId = chatMsg.sendId; var sendTime = chatMsg.sendTime; // 接受者id var receiveId = chatMsg.receiveId; var html = ""; var sendNickname = ""; if(imChatUser.user.id==sendId){ // 自己发的 html+='<li class="layim-chat-mine"> '; html+='<div class="layim-chat-user">'; html+='<img src="'+imChatUser.user.pictureUrl+'">'; html+='<cite><i>'+commonDateFormat(sendTime,"yyyy-MM-dd hh:mm:ss")+'</i>'+imChatUser.user.nickname+'</cite>'; html+='</div>'; html+='<div class="layim-chat-text" style="background-color: #00c1de;color: #efeff4;">'+msgContent+'</div>'; html+='</li>'; }else{ for (var i = 0; i < all_user_list_cache.length; i++) { var obj = all_user_list_cache[i]; if(obj.id==sendId){ // 收到别人发的 html+='<li>'; html+='<div class="layim-chat-user">'; html+='<img src="'+obj.pictureUrl+'">'; html+='<cite>'+obj.nickname+'<i>'+commonDateFormat(sendTime,"yyyy-MM-dd hh:mm:ss")+'</i></cite>'; html+='</div>'; html+='<div class="layim-chat-text">'+msgContent+'</div>'; html+='</li>'; sendNickname = obj.nickname; break; } } } // 2, "聊天消息" 7, "群消息" if(action==2){ if(imChatUser.user.id==sendId){ $(".im-chat-msg-logs-to-receive-friend-user-id-"+sendId+receiveId).append(html); $(".layim-chat-logs-main.friend-user-id-"+receiveId).scrollTop($(".layim-chat-logs-main.friend-user-id-"+receiveId)[0].scrollHeight); $(".sed-msg-friend-user-id-"+receiveId).val(""); }else{ $(".im-chat-msg-logs-to-receive-friend-user-id-"+receiveId+sendId).append(html); $(".layim-chat-logs-main.friend-user-id-"+sendId).scrollTop($(".layim-chat-logs-main.friend-user-id-"+sendId)[0].scrollHeight); //$(".sed-msg-friend-user-id-"+receiveId).val(""); voicePlaying("好友消息: "+sendNickname+" 发送了, 内容为: "+msgContent) } }else if(action==7){ $(".im-chat-msg-logs-to-group").append(html); $(".layui-form-item.layim-chat-logs-main").scrollTop($(".layui-form-item.layim-chat-logs-main")[0].scrollHeight); if(imChatUser.user.id==sendId){ $("textarea[name='sedGroupMsg']").val("") }else{ // 群消息 voicePlaying("群消息: "+sendNickname+" 发送了, 内容为: "+msgContent) } } } } else { alert("浏览器不支持websocket协议..."); } }, chatConnect: function() { if (!imChatUser) { layer.msg("请先登录,获取用户失败"); return false; } var chatMsgObj = { sendId:imChatUser.user.id, // 发送者的用户id receiveId:"", // 接受者的用户id msgContent:"", // 聊天内容 msgId:"" // 用于消息的签收 } var msgObj = { action:1, // CONNECT(1, "第一次(或重连)初始化连接"), CHAT(2, "聊天消息"), SIGNED(3, "消息签收"), // KEEPALIVE(4, "客户端保持心跳"), PULL_FRIEND(5, "拉取好友"); chatMsg:chatMsgObj, // 用户的聊天内容entity extand:"" // 扩展字段 } var msgObjJson = JSON.stringify(msgObj); IM_CHAT.socket.send(msgObjJson); // 查询 群信息列表 selectGroupLogsList(1); }, chatSendMsg: function(msg,rId) { if (!imChatUser) { layer.msg("请先登录,获取用户失败"); return false; } if (!msg) { layer.msg("消息不能为空"); return false; } var chatMsgObj = { sendId:imChatUser.user.id, // 发送者的用户id receiveId:rId, // 接受者的用户id msgContent:msg, // 聊天内容 msgId:"" // 用于消息的签收 } var msgObj = { action:2, // CONNECT(1, "第一次(或重连)初始化连接"), CHAT(2, "聊天消息"), SIGNED(3, "消息签收"), // KEEPALIVE(4, "客户端保持心跳"), PULL_FRIEND(5, "拉取好友"); chatMsg:chatMsgObj, // 用户的聊天内容entity extand:"" // 扩展字段 } var msgObjJson = JSON.stringify(msgObj); IM_CHAT.socket.send(msgObjJson); }, chatSendGroupMsg: function(msg) { if (!imChatUser) { layer.msg("请先登录,获取用户失败"); return false; } if (!msg) { layer.msg("消息不能为空"); return false; } var chatMsgObj = { sendId:imChatUser.user.id, // 发送者的用户id receiveId:"", // 接受者的用户id msgContent:msg, // 聊天内容 msgId:"" // 用于消息的签收 } var msgObj = { action:7, // CONNECT(1, "第一次(或重连)初始化连接"), CHAT(2, "聊天消息"), SIGNED(3, "消息签收"), // KEEPALIVE(4, "客户端保持心跳"), PULL_FRIEND(5, "拉取好友"); chatMsg:chatMsgObj, // 用户的聊天内容entity extand:"" // 扩展字段 } var msgObjJson = JSON.stringify(msgObj); IM_CHAT.socket.send(msgObjJson); }, };
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。