当前位置:   article > 正文

使用netty实现WebSocket协议通信_netty 和webserk

netty 和webserk

服务器与浏览器之间实现通信,一般都是由浏览器发起http请求,服务端对http请求进行响应,要实现服务端主动向浏览器推送数据,一般采用的方案都是websocket主动推送,或者前端实现轮询方式拉取数据,轮询方式多少有点浪费资源,并且消息推送也不够及时。目前很多系统都是采用websocket协议进行主动推送数据给前端。在springboot中是支持websocket协议的,但是这里想讲的是通过netty实现websocket通信。
首先需要引入netty的依赖包

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.90.Final</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

这里面已经包含了websocket协议相关的编解码。下面介绍两种方案使用websocket协议,一种是内置的处理ws消息,另外一种是自己实现相关消息的解析和处理。
首先介绍第一种使用,这种方案只需要用户自己定义一个handler实现消息的接收和业务处理,把处理结果返回给浏览器就可以了,大致代码逻辑如下:

  1. 定义handler用于处理ws消息:
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

/**
 * 用户自定义websocket消息处理handler
 *
 * @Author xingo
 * @Date 2023/11/21
 */
public class UserWebsocketInHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame frame) throws Exception {
        String text = frame.text();
        System.out.println(Thread.currentThread().getName() + "|" + text);

        ctx.writeAndFlush(new TextWebSocketFrame("server send message : " + text));
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  1. 服务端引入websocket相关handler和自定义handler
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * websocket服务端
 *
 * @Author xingo
 * @Date 2023/11/21
 */
public class NettyWebsocketServer implements Runnable {

    /**
     * 服务端IP地址
     */
    private String ip;
    /**
     * 服务端端口号
     */
    private int port;

    public NettyWebsocketServer(String ip, int port) {
        this.ip = ip;
        this.port = port;
    }

    @Override
    public void run() {
        // 指定boss线程数:主要负责接收连接请求,一般设置为1就可以
        final EventLoopGroup boss = new NioEventLoopGroup(1, new ThreadFactory() {
            private AtomicInteger index = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NioBoss_%d", this.index.incrementAndGet()));
            }
        });

        // 指定worker线程数:主要负责处理连接就绪的连接,一般设置为CPU的核心数
        final int totalThread = 12;
        final EventLoopGroup worker = new NioEventLoopGroup(totalThread, new ThreadFactory() {
            private AtomicInteger index = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NioSelector_%d_%d", totalThread, this.index.incrementAndGet()));
            }
        });

        // 指定任务处理线程数:主要负责读取数据和处理响应,一般该值设置的比较大,与业务相对应
        final int jobThreads = 1024;
        final EventLoopGroup job = new DefaultEventLoopGroup(jobThreads, new ThreadFactory() {
            private AtomicInteger index = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NioJob_%d_%d", jobThreads, this.index.incrementAndGet()));
            }
        });

        // 日志处理handler:类定义上面有Sharable表示线程安全,可以将对象定义在外面使用
        final LoggingHandler LOGGING_HANDLER = new LoggingHandler();

        // 指定服务端bootstrap
        ServerBootstrap server = new ServerBootstrap();
        server.group(boss, worker)
                // 指定通道类型
                .channel(NioServerSocketChannel.class)
                // 指定全连接队列大小:windows下默认是200,linux/mac下默认是128
                .option(ChannelOption.SO_BACKLOG, 2048)
                // 维持链接的活跃,清除死链接
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                // 关闭延迟发送
                .childOption(ChannelOption.TCP_NODELAY, true)
                // 添加handler处理链
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel channel) throws Exception {
                        ChannelPipeline pipeline = channel.pipeline();

                        // 日志处理
                        pipeline.addLast(LOGGING_HANDLER);
                        // 心跳检测:读超时时间、写超时时间、全部超时时间(单位是秒,0表示不处理)
                        pipeline.addLast(new IdleStateHandler(30,0,0, TimeUnit.SECONDS));
                        pipeline.addLast(new ChannelDuplexHandler() {
                            @Override
                            public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                                IdleStateEvent event = (IdleStateEvent) evt;

                                System.out.println("心跳事件 : " + event.state());

                                super.userEventTriggered(ctx, evt);
                            }
                        });

                        // 处理http请求的编解码器
                        pipeline.addLast(job, "httpServerCodec", new HttpServerCodec());
                        pipeline.addLast(job, "chunkedWriteHandler", new ChunkedWriteHandler());
                        pipeline.addLast(job, "httpObjectAggregator", new HttpObjectAggregator(65536));
                        // 处理websocket的编解码器
                         pipeline.addLast(job, "webSocketServerProtocolHandler", new WebSocketServerProtocolHandler("/", "WebSocket", true, 655360));

                        // 自定义处理器
                        pipeline.addLast(job, "userInHandler", new UserWebsocketInHandler());
                    }
                });
        try {
            // 服务端绑定对外服务地址
            ChannelFuture future = server.bind(ip, port).sync();
            System.out.println("netty server start ok.");
            // 等待服务关闭,关闭后释放相关资源
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
            job.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        new Thread(new NettyWebsocketServer("127.0.0.1", 8899)).start();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138

以上就实现了websocket服务端,客户端连接到服务端实现双向通信。
另外一种实现方式是自己定义一个handler用于ws协议数据的解析和处理,这样协议的整个处理过程对于用户来说很清楚明白,下面是实现的逻辑代码:

  1. 首先定义一个handler用于ws协议解析和处理:
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.AttributeKey;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;

/**
 *
 *
 * @Author xingo
 * @Date 2023/11/21
 */
@Slf4j
public class WebsocketServerHandler extends SimpleChannelInboundHandler<Object> {

    private WebSocketServerHandshaker handshaker;

    public WebsocketServerHandler() {
    }

    private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame msg) {
        if (msg instanceof CloseWebSocketFrame) {
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) msg.retain());
            return;
        }
        if (msg instanceof PingWebSocketFrame) {
            log.info("websocket ping message");
            ctx.channel().write(new PingWebSocketFrame(msg.content().retain()));
        } else if (msg instanceof TextWebSocketFrame) {
            // websocket消息解压成字符串让下一个handler处理
            String text = ((TextWebSocketFrame) msg).text();
            log.info("请求数据|{}", text);
            // 如果不调用这个方法后面的handler就获取不到数据
            ctx.fireChannelRead(text);
        } else {
            log.error("不支持的消息格式");
            throw new UnsupportedOperationException("不支持的消息格式");
        }
    }

    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest msg) {
        if (!msg.decoderResult().isSuccess()
                || (!"websocket".equalsIgnoreCase(msg.headers().get(HttpHeaderNames.UPGRADE)))) {
            sendHttpResponse(ctx, msg, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
            return;
        }
        WebSocketServerHandshakerFactory wsShakerFactory = new WebSocketServerHandshakerFactory(
                "ws://" + msg.headers().get(HttpHeaderNames.HOST), null, false);
        handshaker = wsShakerFactory.newHandshaker(msg);
        if (handshaker == null) {
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
        } else {
            // 建立websocket连接握手
            handshaker.handshake(ctx.channel(), msg);
            ctx.channel().attr(AttributeKey.valueOf("add")).set(Boolean.TRUE);
        }
    }

    private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest msg, DefaultFullHttpResponse response) {
        if (response.status().code() != HttpResponseStatus.OK.code()) {
            ByteBuf buf = Unpooled.copiedBuffer(response.status().toString(), CharsetUtil.UTF_8);
            response.content().writeBytes(buf);
            buf.release();
        }
        ChannelFuture cf = ctx.channel().writeAndFlush(response);
        if (!HttpUtil.isKeepAlive(msg) || response.status().code() != HttpResponseStatus.OK.code()) {
            cf.addListener(ChannelFutureListener.CLOSE);
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.channel().attr(AttributeKey.valueOf("add")).set(Boolean.FALSE);
        ctx.close();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        ctx.channel().attr(AttributeKey.valueOf("add")).set(Boolean.FALSE);
        ctx.close();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof FullHttpRequest) {
            handleHttpRequest(ctx, (FullHttpRequest) msg);
        } else if (msg instanceof WebSocketFrame) {
            handleWebSocketFrame(ctx, (WebSocketFrame) msg);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102

上面对ws协议进行了处理,处理后的数据直接解析成字符串给后续的handler。

  1. 定义两个handler用于数据处理和封装:
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

/**
 * 入站处理器:获取请求数据,完成业务处理,推送消息给浏览器
 * 
 * @Author xingo
 * @Date 2023/11/21
 */
public class UserWebsocketInHandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(Thread.currentThread().getName() + "|" + msg);

//        ctx.writeAndFlush(new TextWebSocketFrame("server send message : " + msg));
        ctx.writeAndFlush("server send message : " + msg);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
import io.netty.channel.*;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

/**
 * 出站处理器:判断数据是否需要进行封装
 * 
 * @Author xingo
 * @Date 2023/11/21
 */
public class UserWebsocketOutHandler extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if(msg instanceof String) {
            ctx.write(new TextWebSocketFrame((String) msg), promise);
        } else {
            super.write(ctx, msg, promise);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  1. websocket服务端代码
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * websocket服务端
 *
 * @Author xingo
 * @Date 2023/11/21
 */
public class NettyWebsocketServer implements Runnable {

    /**
     * 服务端IP地址
     */
    private String ip;
    /**
     * 服务端端口号
     */
    private int port;

    public NettyWebsocketServer(String ip, int port) {
        this.ip = ip;
        this.port = port;
    }

    @Override
    public void run() {
        // 指定boss线程数:主要负责接收连接请求,一般设置为1就可以
        final EventLoopGroup boss = new NioEventLoopGroup(1, new ThreadFactory() {
            private AtomicInteger index = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NioBoss_%d", this.index.incrementAndGet()));
            }
        });

        // 指定worker线程数:主要负责处理连接就绪的连接,一般设置为CPU的核心数
        final int totalThread = 12;
        final EventLoopGroup worker = new NioEventLoopGroup(totalThread, new ThreadFactory() {
            private AtomicInteger index = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NioSelector_%d_%d", totalThread, this.index.incrementAndGet()));
            }
        });

        // 指定任务处理线程数:主要负责读取数据和处理响应,一般该值设置的比较大,与业务相对应
        final int jobThreads = 1024;
        final EventLoopGroup job = new DefaultEventLoopGroup(jobThreads, new ThreadFactory() {
            private AtomicInteger index = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NioJob_%d_%d", jobThreads, this.index.incrementAndGet()));
            }
        });

        // 日志处理handler:类定义上面有Sharable表示线程安全,可以将对象定义在外面使用
        final LoggingHandler LOGGING_HANDLER = new LoggingHandler();

        // 指定服务端bootstrap
        ServerBootstrap server = new ServerBootstrap();
        server.group(boss, worker)
                // 指定通道类型
                .channel(NioServerSocketChannel.class)
                // 指定全连接队列大小:windows下默认是200,linux/mac下默认是128
                .option(ChannelOption.SO_BACKLOG, 2048)
                // 维持链接的活跃,清除死链接
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                // 关闭延迟发送
                .childOption(ChannelOption.TCP_NODELAY, true)
                // 添加handler处理链
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel channel) throws Exception {
                        ChannelPipeline pipeline = channel.pipeline();

                        // 日志处理
                        pipeline.addLast(LOGGING_HANDLER);
                        // 心跳检测:读超时时间、写超时时间、全部超时时间(单位是秒,0表示不处理)
                        pipeline.addLast(new IdleStateHandler(30,0,0, TimeUnit.SECONDS));
                        pipeline.addLast(new ChannelDuplexHandler() {
                            @Override
                            public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                                IdleStateEvent event = (IdleStateEvent) evt;

                                System.out.println("心跳事件 : " + event.state());

                                super.userEventTriggered(ctx, evt);
                            }
                        });

                        // 处理http请求的编解码器
                        pipeline.addLast(job, "httpServerCodec", new HttpServerCodec());
                        pipeline.addLast(job, "chunkedWriteHandler", new ChunkedWriteHandler());
                        pipeline.addLast(job, "httpObjectAggregator", new HttpObjectAggregator(65536));
                        // 处理websocket的编解码器
                        pipeline.addLast(job, "websocketHandler", new WebsocketServerHandler());

                        // 自定义处理器
                        pipeline.addLast(job, "userOutHandler", new UserWebsocketOutHandler());
                        pipeline.addLast(job, "userInHandler", new UserWebsocketInHandler());
                    }
                });
        try {
            // 服务端绑定对外服务地址
            ChannelFuture future = server.bind(ip, port).sync();
            System.out.println("netty server start ok.");
            // 等待服务关闭,关闭后释放相关资源
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
            job.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        new Thread(new NettyWebsocketServer("127.0.0.1", 8899)).start();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138

上面这种方式同样实现了websocket通信,并且可以清楚的知道连接创建和数据交互的整个过程。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/码创造者/article/detail/973938
推荐阅读
相关标签
  

闽ICP备14008679号