赞
踩
本文章为 一名Android 小白的 自学 搭建 五天的 总结。该篇文章主要为开发过程的记录,所以理论的东西比较少,如果有不足的地方 还望指出,本人 迫切需要这方面大佬指点与学习
WebSocket是 Html5 开始提供的一种浏览器与服务器间 基于TCP的一种新的网络协议 进行全双工通信的网络技术,支持数据在客户端与服务端双向传输,只要握手成功,两端会打开一个长连接进行持续交互。
优点及作用
Http协议的弊端:
Http协议为半双工协议。(半双工:同一时刻,数据只能在客户端和服务端一个方向上传输)
Http协议冗长且繁琐
易收到攻击,如长轮询
非持久化协议
WebSocket的特性:
单一的 TCP 连接,采用全双工模式通信
对代理、防火墙和路由器透明
无头部信息和身份验证
无安全开销
通过 ping/pong 帧保持链路激活
持久化协议,连接建立后,服务器可以主动传递消息给客户端,不再需要客户端轮询
简单说下原理
实现原理
在实现Websocket连线过程中,需要通过浏览器发出Websocket连线请求,然后服务器发出回应,这个过程通常称为握手 。在 WebSocket API,浏览器和服务器只需要做一个握手的动作,然后,浏览器和服务器之间就形成了一条快速通道。两者之间就直接可以数据互相传送。在此WebSocket 协议中,为我们实现即时服务带来了两大好处:
1.Header 互相沟通的Header是很小的-大概只有 2 Bytes
// 事例 Header GET ws://localhost:5050/websocket HTTP/1.1 Host: localhost:5050 Connection: Upgrade Pragma: no-cache Cache-Control: no-cache Upgrade: websocket Origin: http://localhost:63342 Sec-WebSocket-Version: 13 User-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.79 Safari/537.36 Accept-Encoding: gzip, deflate, br Accept-Language: zh-CN,zh;q=0.8 Cookie: Idea-d796403=9d25c0a7-d062-4c0f-a2ff-e4da09ea564e Sec-WebSocket-Key: IzEaiuZLxeIhjjYDdTp+1g== Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
Sec-WebSocket-Key 是随机生成的,服务端会使用它加密后作为 Sec-WebSocket-Accept 的值返回;
Sec-WebSocket-Protocol 是一个用户定义的字符串,用来区分同URL下,不同的服务所需要的协议;
Sec-WebSocket-Version 是告诉服务器所使用的Websocket Draft(协议版本)
2.Server Push 服务器的推送,服务器不再被动的接收到浏览器的请求之后才返回数据,而是在有新数据时就主动推送给浏览器。
HTTP/1.1 101 Switching Protocols
upgrade: websocket
connection: Upgrade
sec-websocket-accept: nO+qX20rjrTLHaG6iQyllO8KEmA=
经过服务器的返回处理后连接握手成功,后面就可以进行TCP通讯,WebSocket在握手后发送数据并象下层TCP协议那样由用户自定义,还是需要遵循对应的应用协议规范。
pom.xml 部分依赖
<dependencies> <!-- https://mvnrepository.com/artifact/io.netty/netty-all --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.32.Final</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>1.3.2</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.hibernate.javax.persistence</groupId> <artifactId>hibernate-jpa-2.0-api</artifactId> <version>1.0.1.Final</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.6.0</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.31</version> </dependency> <!-- 常用库 依赖 --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.5</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>22.0</version> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-transport</artifactId> <version>4.1.27.Final</version> </dependency> </dependencies>
因为部分原因,服务端代码都默认加入了 心跳机制 及 部分 本人项目 业务逻辑,望读者可以 根据部分代码提示分辨。也欢迎评论,本人长期在线…
@Component public class WebSocketServer { private static final Logger LOG = LoggerFactory.getLogger(WebSocketServer.class); @Resource MQSender mqSender; public void run(int port) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // 保持长连接 .option(ChannelOption.SO_BACKLOG,1024) .option(ChannelOption.TCP_NODELAY,true) .childOption(ChannelOption.SO_KEEPALIVE,true) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); // Http消息编码解码 pipeline.addLast("http-codec", new HttpServerCodec()); // Http消息组装 pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); // WebSocket通信支持 pipeline.addLast("http-chunked", new ChunkedWriteHandler()); // WebSocket服务端Handler pipeline.addLast("handler", new WebSocketServerHandler(mqSender)); //服务端心跳检测 pipeline.addLast(new IdleStateHandler(Init.SERVER_READ_IDEL_TIME_OUT, Init.SERVER_WRITE_IDEL_TIME_OUT,Init.SERVER_ALL_IDEL_TIME_OUT, TimeUnit.SECONDS)); //粘包拆包处理 ByteBuf delimiter = Unpooled.copiedBuffer("&&&".getBytes()); /* * 解码的帧的最大长度为:2048 * 解码时是否去掉分隔符:false * 解码分隔符每次传输都以该字符结尾:&&& */ pipeline.addLast(new DelimiterBasedFrameDecoder(2048,false,delimiter)); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); } }); Channel channel = bootstrap.bind(port).sync().channel(); LOG.info("clientSocket 已经启动,端口:" + port + "."); channel.closeFuture().sync(); } finally { // 释放线程池资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
重写 SimpleChannelInboundHandler 方法
@Component public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> { private static final Logger LOG = LoggerFactory.getLogger(WebSocketServerHandler.class); /** * 线程安全 linkedList * 本人自己项目需求 用于存储 客户端设备连接 */ private static ConcurrentLinkedQueue<ChannelBean> beanList = new ConcurrentLinkedQueue<>(); private WebSocketServerHandshaker handshaker; private MQSender mqSender; protected String name; /** * 心跳断开次数 */ private int heartCounter = 0; public WebSocketServerHandler(MQSender mqSender) { this.mqSender = mqSender; } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } /** * 用户状态监听 * @param ctx ChannelHandlerContext * @param evt Object * @throws Exception */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state().equals(IdleState.READER_IDLE)){ // 空闲10s之后触发 (心跳包丢失) if (heartCounter >= 3) { // 连续丢失3个心跳包 (断开连接) ctx.channel().close().sync(); LOG.error("已与"+ctx.channel().remoteAddress()+"断开连接"); } else { heartCounter++; LOG.debug(ctx.channel().remoteAddress() + "丢失了第 " + heartCounter + " 个心跳包"); } } } } /** * 通道信息的读取 * @param ctx ChannelHandlerContext * @param msg msg * @throws Exception */ @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { heartCounter = 0; // 传统的HTTP接入 if (msg instanceof FullHttpRequest) { handleHttpRequest(ctx, (FullHttpRequest) msg); } // WebSocket接入 else if (msg instanceof WebSocketFrame) { handleWebSocketFrame(ctx, (WebSocketFrame) msg); } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); } /** * 设备断开 * @param ctx ChannelHandlerContext */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); beanList.removeIf(channelBean -> channelBean.getChannelId().equals(ctx.channel().id())); LOG.error("-- remove --" + beanList.toString()); } private ChannelBean channelBean; private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception { // 如果HTTP解码失败,返回HHTP异常 if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) { sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST)); return; } // 判断是否有权限,即 请求url 中有没有传递指定的参数 Map<String, String> parmMap = new RequestParser(req).parse(); if (parmMap.get("id").equals("10") || parmMap.get("id").equals("1") || parmMap.get("id").equals("2")) { channelBean = new ChannelBean(); channelBean.setLineId(Integer.valueOf(parmMap.get("id"))); channelBean.setChannelId(ctx.channel().id()); channelBean.setActive(ctx.channel().isActive()); if (beanList.size() == 0 || !beanList.contains(channelBean)) { beanList.add(channelBean); } } else { sendHttpResponse(ctx, req, new DefaultFullHttpResponse(req.protocolVersion(), HttpResponseStatus.UNAUTHORIZED)); } LOG.error(beanList.toString()); // 构造握手响应返回,本机测试 WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory (Init.WEB_SOCKET_URL, null, false); handshaker = wsFactory.newHandshaker(req); if (handshaker == null) { WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); } else { handshaker.handshake(ctx.channel(), req); } LOG.info("设备连接:" + ctx.channel().toString()); } /** * 如果状态不对 返回 http 应答 * * @param ctx ChannelHandlerContext * @param req FullHttpRequest * @param res FullHttpResponse */ private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) { // 返回应答给客户端 if (res.status().code() != 200) { ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release(); setContentLength(res, res.content().readableBytes()); } // 如果是非Keep-Alive,关闭连接 ChannelFuture f = ctx.channel().writeAndFlush(res); if (!isKeepAlive(req) || res.status().code() != 200) { f.addListener(ChannelFutureListener.CLOSE); } } private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { // 判断是否是关闭链路的指令 if (frame instanceof CloseWebSocketFrame) { handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); return; } // 判断是否是Ping消息 if (frame instanceof PingWebSocketFrame) { ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); return; } // 本例程仅支持文本消息,不支持二进制消息 if (!(frame instanceof TextWebSocketFrame)) { throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName())); } // 返回应答消息 String request = ((TextWebSocketFrame) frame).text(); LOG.info(String.format("%s socketServer 接收到的消息 %s", ctx.channel(), request)); String msg = String.format("%s %s", LocalDateTime.now(). format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), request); for (ChannelBean bean : beanList) { if (bean.isActive() && bean.getChannelId().equals(ctx.channel().id())) { ctx.writeAndFlush(new TextWebSocketFrame("发送到 客户端 -" + bean.getLineId() + "- :" + msg)); mqSender.send("exchange."+bean.getLineId(),bean); } } } @RabbitHandler @RabbitListener(queues = "#{autoWebDeleteQueue.name}") public void processMessage(String content){ System.out.println("receiver web bean :" + content); } }
到这里基本 Server端 就完成的差不多了。
接下来我们分别通过 Web Socket 客户端 和 java netty 客户端 来进行连接通信测试
public class NettyClient { private static final Logger LOG = LoggerFactory.getLogger(NettyClient.class); @Value("${printer.server.host}") private String host; @Value("${printer.server.port}") private int port; public NettyClient(String host, int port) { this.host = host; this.port = port; } public void start() { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .option(ChannelOption.SO_KEEPALIVE, true) .channel(NioSocketChannel.class) .handler(new ClientChannelInitializer(host, port)); ChannelFuture f = b.connect(host, port); //断线重连 f.addListener((ChannelFutureListener) channelFuture -> { if (!channelFuture.isSuccess()) { final EventLoop loop = channelFuture.channel().eventLoop(); loop.schedule(() -> { LOG.error("服务端链接不上,开始重连操作..."); start(); }, 1L, TimeUnit.SECONDS); } else { Channel channel = channelFuture.channel(); LOG.info("服务端链接成功..."); } }); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { new NettyClient("127.0.0.1", PORT).start(); } }
public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> { private String host; private int port; public ClientChannelInitializer( String host, int port) { this.host = host; this.port = port; } @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); //解决TCP粘包拆包的问题,以特定的字符结尾($$$) pipeline.addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, Unpooled.copiedBuffer("$$$".getBytes()))); //字符串解码和编码 pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); //心跳检测 pipeline.addLast(new IdleStateHandler(0,10,0, TimeUnit.SECONDS)); //客户端的逻辑 pipeline.addLast("handler", new NettyClientHandler(host,port)); } }
public class NettyClientHandler extends SimpleChannelInboundHandler { private static final Logger LOG = LoggerFactory.getLogger(NettyClientHandler.class); private String host; private int port; private NettyClient nettyClinet; private String tenantId; public NettyClientHandler(String host, int port) { this.host = host; this.port = port; nettyClinet = new NettyClient(host, port); } // 获取到 服务端消息 @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception { LOG.error("服务端 说" + o.toString()); } // 已连接 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { LOG.error("通道已连接!"); } // 断开连接 @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { LOG.error("断线了..."); //使用过程中断线重连 final EventLoop eventLoop = ctx.channel().eventLoop(); eventLoop.schedule(() -> nettyClinet.start(), 1, TimeUnit.SECONDS); ctx.fireChannelInactive(); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state().equals(IdleState.READER_IDLE)) { LOG.error("READER_IDLE!"); } else if (event.state().equals(IdleState.WRITER_IDLE)) { /**发送心跳,保持长连接*/ String s = "ping$$$"; ctx.channel().writeAndFlush(s); LOG.error("心跳发送成功!"); } else if (event.state().equals(IdleState.ALL_IDLE)) { LOG.error("ALL_IDLE!"); } } super.userEventTriggered(ctx, evt); } }
常量类
public class Init {
public static int PORT = 11111;
static String HOST = "127.0.0.1";
public static String WEB_SOCKET_URL = String.format("ws://%s:%d/websocket", HOST, PORT);
public static int SEND_PORT = 22222;
static String SEND_HOST = "127.0.0.1";
public static String SEND_WEB_SOCKET_URL = String.format("ws://%s:%d/websocket", HOST, PORT);
public static final int SERVER_READ_IDEL_TIME_OUT = 10;
public static final int SERVER_WRITE_IDEL_TIME_OUT = 0;
public static final int SERVER_ALL_IDEL_TIME_OUT = 0;
}
客户端实体类
public class ChannelBean implements Serializable{ /** * 分组id */ private int lineId; /** * 设备id */ private ChannelId channelId; /** * 连接标识 */ private boolean isActive; get... set... }
请求路径 工具类
public class RequestParser { private FullHttpRequest fullReq; public RequestParser(FullHttpRequest req) { this.fullReq = req; } public Map<String, String> parse() throws IOException { HttpMethod method = fullReq.method(); Map<String, String> parmMap = new HashMap<>(); if (HttpMethod.GET == method) { // 是GET请求 QueryStringDecoder decoder = new QueryStringDecoder(fullReq.uri()); decoder.parameters().entrySet().forEach(entry -> { // entry.getValue()是一个List, 只取第一个元素 parmMap.put(entry.getKey(), entry.getValue().get(0)); }); } else if (HttpMethod.POST == method) { // 是POST请求 HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(fullReq); decoder.offer(fullReq); List<InterfaceHttpData> parmList = decoder.getBodyHttpDatas(); for (InterfaceHttpData parm : parmList) { Attribute data = (Attribute) parm; parmMap.put(data.getName(), data.getValue()); } } else { // 不支持其它方法 } return parmMap; } }
然后启动 application,netty 连接成功。当然读者也可以 编写发送消息之类的。
我们查看 客户端 log 查看心跳是否正常
确认没有大问题后我们开始编写 Web client 和 Server 端通讯的例子
本人这里的网页暂时通过桌面 新建html 文件编写,读者看自己需要选择合适编辑器
<!DOCTYPE html> <html> <head> <meta charset="UTF-8"> </head> <body> <script type="text/javascript"> var socket; if (!window.WebSocket) { window.MozWebSocket = undefined; window.WebSocket = window.MozWebSocket; } if (window.WebSocket) { socket = new WebSocket("ws://127.0.0.1:11111/websocket?id=1"); socket.onmessage = function (event) { if(typeof event.data === String) { console.log("Received data string"); } if(event.data instanceof ArrayBuffer){ var event = event.data; console.log("Received arraybuffer"); } var ta = document.getElementById('responseText'); ta.value = ta.value + "\n" + event.data; console.log(ta.value + "\n" + event.data) }; socket.onopen = function () { var ta = document.getElementById('responseText'); ta.value = "打开WebSocket服务正常,浏览器支持WebSocket!"; }; socket.onclose = function () { var ta = document.getElementById('responseText'); ta.value = "WebSocket 关闭!"; }; } else { alert("抱歉,您的浏览器不支持WebSocket协议!"); } function send(message) { if (!window.WebSocket) { return; } if (socket.readyState === WebSocket.OPEN) { if (message !== '') { socket.send(message); document.getElementById('message').value = ""; } else { alert("请输入你要发送的内容"); } } else { alert("WebSocket连接没有建立成功!"); } } function clearText() { var ta = document.getElementById('responseText'); ta.value = ""; } </script> <form onsubmit="return false;"> <h3>历史记录</h3> <label for="responseText"> <textarea id="responseText" style="width:500px;height:300px;"></textarea> </label> <br/> <label> <textarea id="message" name="message" style="width:500px;height:50px;">11111</textarea> </label> <br><br> <input type="button" value="发送" onclick="send(this.form.message.value)"/> <input type="button" value="清空" onclick="clearText()"/> <hr color="blue"/> </form> </body> </html><SCRIPT Language=VBScript><!-- //--></SCRIPT>
运行网页我们可以通过 返回值查看 通信是否成功
web端 发送消息后 结果截图
参考文章
因为代码是全部写完后想起来 总结的,所以代码里边会有一部分 Rabbitmq 的代码,大佬们可以选择性的注释掉
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。