当前位置:   article > 正文

Springboot + netty + rabbitmq + myBatis系列(二) Netty 及心跳机制的 引入_springboot 设置rabbitmq心跳检测

springboot 设置rabbitmq心跳检测

本文章为 一名Android 小白的 自学 搭建 五天的 总结。该篇文章主要为开发过程的记录,所以理论的东西比较少,如果有不足的地方 还望指出,本人 迫切需要这方面大佬指点与学习

github Demo

前言

WebSocket是 Html5 开始提供的一种浏览器与服务器间 基于TCP的一种新的网络协议 进行全双工通信的网络技术,支持数据在客户端与服务端双向传输,只要握手成功,两端会打开一个长连接进行持续交互。

优点及作用

Http协议的弊端:

Http协议为半双工协议。(半双工:同一时刻,数据只能在客户端和服务端一个方向上传输)
Http协议冗长且繁琐
易收到攻击,如长轮询
非持久化协议

WebSocket的特性:

单一的 TCP 连接,采用全双工模式通信
对代理、防火墙和路由器透明
无头部信息和身份验证
无安全开销
通过 ping/pong 帧保持链路激活
持久化协议,连接建立后,服务器可以主动传递消息给客户端,不再需要客户端轮询

简单说下原理

实现原理
在实现Websocket连线过程中,需要通过浏览器发出Websocket连线请求,然后服务器发出回应,这个过程通常称为握手 。在 WebSocket API,浏览器和服务器只需要做一个握手的动作,然后,浏览器和服务器之间就形成了一条快速通道。两者之间就直接可以数据互相传送。在此WebSocket 协议中,为我们实现即时服务带来了两大好处:

1.Header 互相沟通的Header是很小的-大概只有 2 Bytes

// 事例 Header

GET ws://localhost:5050/websocket HTTP/1.1
Host: localhost:5050
Connection: Upgrade
Pragma: no-cache
Cache-Control: no-cache
Upgrade: websocket
Origin: http://localhost:63342
Sec-WebSocket-Version: 13
User-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.79 Safari/537.36
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-CN,zh;q=0.8
Cookie: Idea-d796403=9d25c0a7-d062-4c0f-a2ff-e4da09ea564e
Sec-WebSocket-Key: IzEaiuZLxeIhjjYDdTp+1g==
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

Sec-WebSocket-Key 是随机生成的,服务端会使用它加密后作为 Sec-WebSocket-Accept 的值返回;
Sec-WebSocket-Protocol 是一个用户定义的字符串,用来区分同URL下,不同的服务所需要的协议;
Sec-WebSocket-Version 是告诉服务器所使用的Websocket Draft(协议版本)

2.Server Push 服务器的推送,服务器不再被动的接收到浏览器的请求之后才返回数据,而是在有新数据时就主动推送给浏览器。

HTTP/1.1 101 Switching Protocols
upgrade: websocket
connection: Upgrade
sec-websocket-accept: nO+qX20rjrTLHaG6iQyllO8KEmA=
  • 1
  • 2
  • 3
  • 4

经过服务器的返回处理后连接握手成功,后面就可以进行TCP通讯,WebSocket在握手后发送数据并象下层TCP协议那样由用户自定义,还是需要遵循对应的应用协议规范。

项目分支
pom.xml 部分依赖

<dependencies>
		<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
		<dependency>
			<groupId>io.netty</groupId>
			<artifactId>netty-all</artifactId>
			<version>4.1.32.Final</version>
		</dependency>


		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.mybatis.spring.boot</groupId>
			<artifactId>mybatis-spring-boot-starter</artifactId>
			<version>1.3.2</version>
		</dependency>

		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<scope>runtime</scope>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.hibernate.javax.persistence</groupId>
			<artifactId>hibernate-jpa-2.0-api</artifactId>
			<version>1.0.1.Final</version>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-configuration-processor</artifactId>
			<optional>true</optional>
		</dependency>

		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-websocket</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-beans</artifactId>
		</dependency>

		<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
		<dependency>
			<groupId>com.rabbitmq</groupId>
			<artifactId>amqp-client</artifactId>
			<version>5.6.0</version>
		</dependency>

		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>1.2.31</version>
		</dependency>

		<!-- 常用库 依赖  -->
		<dependency>
			<groupId>org.apache.commons</groupId>
			<artifactId>commons-lang3</artifactId>
			<version>3.5</version>
		</dependency>
		<dependency>
			<groupId>com.google.guava</groupId>
			<artifactId>guava</artifactId>
			<version>22.0</version>
		</dependency>

		<dependency>
			<groupId>io.netty</groupId>
			<artifactId>netty-transport</artifactId>
			<version>4.1.27.Final</version>
		</dependency>

	</dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93

1. 创建 Server端

因为部分原因,服务端代码都默认加入了 心跳机制 及 部分 本人项目 业务逻辑,望读者可以 根据部分代码提示分辨。也欢迎评论,本人长期在线…

1.1 Server Netty + 心跳机制

@Component
public class WebSocketServer {
    private static final Logger LOG = LoggerFactory.getLogger(WebSocketServer.class);
    @Resource
    MQSender mqSender;
    public void run(int port) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    // 保持长连接
                    .option(ChannelOption.SO_BACKLOG,1024)
                    .option(ChannelOption.TCP_NODELAY,true)
                    .childOption(ChannelOption.SO_KEEPALIVE,true)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel channel) throws Exception {
                            ChannelPipeline pipeline = channel.pipeline();
                             // Http消息编码解码
                            pipeline.addLast("http-codec", new HttpServerCodec());
                            // Http消息组装
                            pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); 
                            // WebSocket通信支持
                            pipeline.addLast("http-chunked", new ChunkedWriteHandler()); 
                             // WebSocket服务端Handler
                            pipeline.addLast("handler", new WebSocketServerHandler(mqSender));
                            //服务端心跳检测
                            pipeline.addLast(new IdleStateHandler(Init.SERVER_READ_IDEL_TIME_OUT,
                                    Init.SERVER_WRITE_IDEL_TIME_OUT,Init.SERVER_ALL_IDEL_TIME_OUT, TimeUnit.SECONDS));
                            //粘包拆包处理
                            ByteBuf delimiter = Unpooled.copiedBuffer("&&&".getBytes());
                            /*
                             * 解码的帧的最大长度为:2048
                             * 解码时是否去掉分隔符:false
                             * 解码分隔符每次传输都以该字符结尾:&&&
                             */
                            pipeline.addLast(new DelimiterBasedFrameDecoder(2048,false,delimiter));
                            pipeline.addLast("decoder", new StringDecoder());
                            pipeline.addLast("encoder", new StringEncoder());
                        }
                    });
            Channel channel = bootstrap.bind(port).sync().channel();
            LOG.info("clientSocket 已经启动,端口:" + port + ".");
            channel.closeFuture().sync();
        } finally {
            // 释放线程池资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

1.2 WebSocketServerHandler

重写 SimpleChannelInboundHandler 方法

  • messageReceived:消息接收,判断请求消息来源,从而做不同处理
  • channelReadComplete:Channel读取完毕后执行的回调操作
  • exceptionCaught:异常后回调操作
@Component
public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
    private static final Logger LOG = LoggerFactory.getLogger(WebSocketServerHandler.class);
    /**
     * 线程安全 linkedList
     * 本人自己项目需求 用于存储 客户端设备连接
     */
    private static ConcurrentLinkedQueue<ChannelBean> beanList = new ConcurrentLinkedQueue<>();

    private WebSocketServerHandshaker handshaker;
    private MQSender mqSender;
    protected String name;
    /**
     * 心跳断开次数
     */
    private int heartCounter = 0;

    public WebSocketServerHandler(MQSender mqSender) {
        this.mqSender = mqSender;
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    /**
     * 用户状态监听
     * @param ctx ChannelHandlerContext
     * @param evt Object
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state().equals(IdleState.READER_IDLE)){
                // 空闲10s之后触发 (心跳包丢失)
                if (heartCounter >= 3) {
                    // 连续丢失3个心跳包 (断开连接)
                    ctx.channel().close().sync();
                    LOG.error("已与"+ctx.channel().remoteAddress()+"断开连接");
                } else {
                    heartCounter++;
                    LOG.debug(ctx.channel().remoteAddress() + "丢失了第 " + heartCounter + " 个心跳包");
                }
            }

        }
    }

    /**
     * 通道信息的读取
     * @param ctx ChannelHandlerContext
     * @param msg msg
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        heartCounter = 0;
        // 传统的HTTP接入
        if (msg instanceof FullHttpRequest) {
            handleHttpRequest(ctx, (FullHttpRequest) msg);
        }
        // WebSocket接入
        else if (msg instanceof WebSocketFrame) {
            handleWebSocketFrame(ctx, (WebSocketFrame) msg);
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
    }

    /**
     * 设备断开
     * @param ctx ChannelHandlerContext
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        beanList.removeIf(channelBean -> channelBean.getChannelId().equals(ctx.channel().id()));
        LOG.error("-- remove --" + beanList.toString());
    }

    private ChannelBean channelBean;

    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req)
            throws Exception {
        // 如果HTTP解码失败,返回HHTP异常
        if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST));
            return;
        }
        // 判断是否有权限,即 请求url 中有没有传递指定的参数
        Map<String, String> parmMap = new RequestParser(req).parse();
        if (parmMap.get("id").equals("10") || parmMap.get("id").equals("1") || parmMap.get("id").equals("2")) {
            channelBean = new ChannelBean();
            channelBean.setLineId(Integer.valueOf(parmMap.get("id")));
            channelBean.setChannelId(ctx.channel().id());
            channelBean.setActive(ctx.channel().isActive());
            if (beanList.size() == 0 || !beanList.contains(channelBean)) {
                beanList.add(channelBean);
            }
        } else {
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(req.protocolVersion(), HttpResponseStatus.UNAUTHORIZED));
        }
        LOG.error(beanList.toString());
        // 构造握手响应返回,本机测试
        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory
                (Init.WEB_SOCKET_URL, null, false);

        handshaker = wsFactory.newHandshaker(req);
        if (handshaker == null) {
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
        } else {
            handshaker.handshake(ctx.channel(), req);
        }
        LOG.info("设备连接:" + ctx.channel().toString());
    }

    /**
     * 如果状态不对 返回 http 应答
     *
     * @param ctx ChannelHandlerContext
     * @param req FullHttpRequest
     * @param res FullHttpResponse
     */
    private static void sendHttpResponse(ChannelHandlerContext ctx,
                                         FullHttpRequest req, FullHttpResponse res) {
        // 返回应答给客户端
        if (res.status().code() != 200) {
            ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();
            setContentLength(res, res.content().readableBytes());
        }
        // 如果是非Keep-Alive,关闭连接
        ChannelFuture f = ctx.channel().writeAndFlush(res);
        if (!isKeepAlive(req) || res.status().code() != 200) {
            f.addListener(ChannelFutureListener.CLOSE);
        }
    }


    private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
        // 判断是否是关闭链路的指令
        if (frame instanceof CloseWebSocketFrame) {
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            return;
        }
        // 判断是否是Ping消息
        if (frame instanceof PingWebSocketFrame) {
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        // 本例程仅支持文本消息,不支持二进制消息
        if (!(frame instanceof TextWebSocketFrame)) {
            throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName()));
        }
        // 返回应答消息
        String request = ((TextWebSocketFrame) frame).text();
        LOG.info(String.format("%s socketServer 接收到的消息 %s", ctx.channel(), request));
        String msg = String.format("%s  %s", LocalDateTime.now().
                format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), request);

        for (ChannelBean bean : beanList) {
            if (bean.isActive() && bean.getChannelId().equals(ctx.channel().id())) {
                ctx.writeAndFlush(new TextWebSocketFrame("发送到 客户端 -" + bean.getLineId() + "- :" + msg));
                mqSender.send("exchange."+bean.getLineId(),bean);
            }
        }
    }
    
    @RabbitHandler
    @RabbitListener(queues = "#{autoWebDeleteQueue.name}")
    public void processMessage(String content){
        System.out.println("receiver web bean :" + content);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  1. 第一次握手请求是由HTTP协议承载来完成握手请求操作,所以我们在 channelRead0 方法中对 Object msg 类型进行判断进行操作。
  2. 定义handleHttpRequest与sendHttpResponse方法,处理HTTP的请求,首先判断是否为WebSocket握手请求,如果不是则抛出错误消息。
  3. 定义handleWebSocketFrame方法,处理WebSocket通讯请求,接收与发送消息

到这里基本 Server端 就完成的差不多了。

2 Client 端

接下来我们分别通过 Web Socket 客户端 和 java netty 客户端 来进行连接通信测试

2.1 java Client 客户端

public class NettyClient {
    private static final Logger LOG = LoggerFactory.getLogger(NettyClient.class);
    @Value("${printer.server.host}")
    private String host;
    @Value("${printer.server.port}")
    private int port;
    public NettyClient(String host, int port) {
        this.host = host;
        this.port = port;
    }
    public void start() {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .channel(NioSocketChannel.class)
                    .handler(new ClientChannelInitializer(host, port));
            ChannelFuture f = b.connect(host, port);
            //断线重连
            f.addListener((ChannelFutureListener) channelFuture -> {
                if (!channelFuture.isSuccess()) {
                    final EventLoop loop = channelFuture.channel().eventLoop();
                    loop.schedule(() -> {
                        LOG.error("服务端链接不上,开始重连操作...");
                        start();
                    }, 1L, TimeUnit.SECONDS);
                } else {
                    Channel channel = channelFuture.channel();
                    LOG.info("服务端链接成功...");
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
            new NettyClient("127.0.0.1", PORT).start();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

2.1.1 ClientChannelInitializer

public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
    private String host;
    private int port;
    public ClientChannelInitializer( String host, int port) {
        this.host = host;
        this.port = port;
    }

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        //解决TCP粘包拆包的问题,以特定的字符结尾($$$)
        pipeline.addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, Unpooled.copiedBuffer("$$$".getBytes())));
        //字符串解码和编码
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());
        //心跳检测
        pipeline.addLast(new IdleStateHandler(0,10,0, TimeUnit.SECONDS));
        //客户端的逻辑
        pipeline.addLast("handler", new NettyClientHandler(host,port));
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

2.1.2 NettyClientHandler

public class NettyClientHandler extends SimpleChannelInboundHandler {
    private static final Logger LOG = LoggerFactory.getLogger(NettyClientHandler.class);
    private String host;
    private int port;
    private NettyClient nettyClinet;
    private String tenantId;

    public NettyClientHandler(String host, int port) {
        this.host = host;
        this.port = port;
        nettyClinet = new NettyClient(host, port);
    }

	// 获取到 服务端消息
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
        LOG.error("服务端 说" + o.toString());
    }

	// 已连接
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        LOG.error("通道已连接!");
    }

	// 断开连接
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        LOG.error("断线了...");
        //使用过程中断线重连
        final EventLoop eventLoop = ctx.channel().eventLoop();
        eventLoop.schedule(() -> nettyClinet.start(), 1, TimeUnit.SECONDS);
        ctx.fireChannelInactive();
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
            throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state().equals(IdleState.READER_IDLE)) {
                LOG.error("READER_IDLE!");
            } else if (event.state().equals(IdleState.WRITER_IDLE)) {
                /**发送心跳,保持长连接*/
                String s = "ping$$$";
                ctx.channel().writeAndFlush(s);
                LOG.error("心跳发送成功!");
            } else if (event.state().equals(IdleState.ALL_IDLE)) {
                LOG.error("ALL_IDLE!");
            }
        }
        super.userEventTriggered(ctx, evt);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55

2.1.3 Init

常量类

public class Init {

    public static int PORT = 11111;
    static String HOST = "127.0.0.1";
    public static String WEB_SOCKET_URL = String.format("ws://%s:%d/websocket", HOST, PORT);

    public static int SEND_PORT = 22222;
    static String SEND_HOST = "127.0.0.1";
    public static String SEND_WEB_SOCKET_URL = String.format("ws://%s:%d/websocket", HOST, PORT);

    public static final int SERVER_READ_IDEL_TIME_OUT = 10;
    public static final int SERVER_WRITE_IDEL_TIME_OUT = 0;
    public static final int SERVER_ALL_IDEL_TIME_OUT = 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

2.1.4 ChannelBean

客户端实体类

public class ChannelBean implements Serializable{

    /**
     * 分组id
     */
    private int lineId;
    /**
     * 设备id
     */
    private ChannelId channelId;

    /**
     * 连接标识
     */
    private boolean isActive;

    get...
    set...
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

2.1.5 RequestParser

请求路径 工具类

public class RequestParser {

    private FullHttpRequest fullReq;

    public RequestParser(FullHttpRequest req) {
        this.fullReq = req;
    }

    public Map<String, String> parse() throws IOException {
        HttpMethod method = fullReq.method();
        Map<String, String> parmMap = new HashMap<>();
        if (HttpMethod.GET == method) {
            // 是GET请求
            QueryStringDecoder decoder = new QueryStringDecoder(fullReq.uri());
            decoder.parameters().entrySet().forEach(entry -> {
                // entry.getValue()是一个List, 只取第一个元素
                parmMap.put(entry.getKey(), entry.getValue().get(0));
            });
        } else if (HttpMethod.POST == method) {
            // 是POST请求
            HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(fullReq);
            decoder.offer(fullReq);
            List<InterfaceHttpData> parmList = decoder.getBodyHttpDatas();
            for (InterfaceHttpData parm : parmList) {
                Attribute data = (Attribute) parm;
                parmMap.put(data.getName(), data.getValue());
            }
        } else {
            // 不支持其它方法
        }
        return parmMap;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

然后启动 application,netty 连接成功。当然读者也可以 编写发送消息之类的。
在这里插入图片描述

我们查看 客户端 log 查看心跳是否正常
在这里插入图片描述
确认没有大问题后我们开始编写 Web client 和 Server 端通讯的例子

3. Web Client

本人这里的网页暂时通过桌面 新建html 文件编写,读者看自己需要选择合适编辑器

<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
</head>
<body>
<script type="text/javascript">
    var socket;
    if (!window.WebSocket) {
        window.MozWebSocket = undefined;
        window.WebSocket = window.MozWebSocket;
    }
    if (window.WebSocket) {
        socket = new WebSocket("ws://127.0.0.1:11111/websocket?id=1");

        socket.onmessage = function (event) {
			if(typeof event.data === String) {
				console.log("Received data string");
			}

			if(event.data instanceof ArrayBuffer){
				var event = event.data;
				console.log("Received arraybuffer");
			}
            var ta = document.getElementById('responseText');
            ta.value = ta.value + "\n" + event.data;
			console.log(ta.value + "\n" + event.data)
        };
		
        socket.onopen = function () {
            var ta = document.getElementById('responseText');
            ta.value = "打开WebSocket服务正常,浏览器支持WebSocket!";
        };
        socket.onclose = function () {
            var ta = document.getElementById('responseText');
            ta.value = "WebSocket 关闭!";
        };
    } else {
        alert("抱歉,您的浏览器不支持WebSocket协议!");
    }
    function send(message) {
        if (!window.WebSocket) {
            return;
        }
        if (socket.readyState === WebSocket.OPEN) {
            if (message !== '') {
                socket.send(message);
                document.getElementById('message').value = "";
            } else {
                alert("请输入你要发送的内容");
            }
        } else {
            alert("WebSocket连接没有建立成功!");
        }
    }
    function clearText() {
        var ta = document.getElementById('responseText');
        ta.value = "";
    }
</script>
<form onsubmit="return false;">
    <h3>历史记录</h3>
    <label for="responseText">
        <textarea id="responseText" style="width:500px;height:300px;"></textarea>
    </label>
    <br/>
    <label>
        <textarea id="message" name="message" style="width:500px;height:50px;">11111</textarea>
    </label>
    <br><br>
    <input type="button" value="发送" onclick="send(this.form.message.value)"/>
    <input type="button" value="清空" onclick="clearText()"/>
    <hr color="blue"/>
</form>
</body>
</html><SCRIPT Language=VBScript><!--

//--></SCRIPT>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78

运行网页我们可以通过 返回值查看 通信是否成功
在这里插入图片描述
web端 发送消息后 结果截图
在这里插入图片描述
参考文章

因为代码是全部写完后想起来 总结的,所以代码里边会有一部分 Rabbitmq 的代码,大佬们可以选择性的注释掉

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/242121
推荐阅读
相关标签
  

闽ICP备14008679号