当前位置:   article > 正文

springboot集成netty

springboot集成netty

1.pom引入依赖

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.70.Final</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.80</version>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.5.8</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

2.application.yml配置

server:
  port: 8095
  servlet:
    context-path: /netty
netty:
  url: 0.0.0.0  #0.0.0.0表示绑定任意ip
  port: 20004
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

3.启动类配置



import com.example.demo.server.NioNettyServer;
import io.netty.channel.ChannelFuture;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.scheduling.annotation.EnableAsync;


@EnableAsync
@SpringBootApplication
public class DemoApplication implements CommandLineRunner {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =  SpringApplication.run(DemoApplication.class, args);
        context.getBean(NioNettyServer.class).start();
    }

    @Override
    public void run(String... args) throws Exception {

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

4.创建netty服务

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.ipfilter.IpFilterRule;
import io.netty.handler.ipfilter.IpFilterRuleType;
import io.netty.handler.ipfilter.IpSubnetFilterRule;
import io.netty.handler.ipfilter.RuleBasedIpFilter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;


@Slf4j
@Component
public class NioNettyServer {
    @Value("${netty.port}")
    private int port;
    @Value("${netty.url}")
    private String url;

    private final EventLoopGroup bossGroup = new NioEventLoopGroup();
    private final EventLoopGroup workerGroup = new NioEventLoopGroup();
    private Channel channel;
//把Netty交给spring托管,不然读不到application.yml的值
    @Autowired
    private ClientMessageHandler clientMessageHandler;

    @Async
    public void start() {
        final ClientMessageHandler serverHandler = clientMessageHandler;
        ChannelFuture f = null;
        // 接收连接
        EventLoopGroup boss = new NioEventLoopGroup();
        // 处理信息
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            // 定义server
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            // 添加分组
            serverBootstrap.group(bossGroup, workerGroup)
                    // 添加通道设置非阻塞
                    .channel(NioServerSocketChannel.class)
                    // 服务端可连接队列数量
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .localAddress(new InetSocketAddress(url,port))
                    // 开启长连接
                    .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
//                            为监听客户端read/write事件的Channel添加用户自定义的ChannelHandler
                            socketChannel.pipeline().addLast(serverHandler);

                   //--------------------- 以下代码用了设置黑名单,白名单过滤ip,没有需求可以注释掉  (58.61.0.1,16)表示模糊匹配前两位,8表示匹配IP第一位,16表示匹配IP一,二位,以此类推-------------------------------------------------------
                   //  32表示全匹配
                            //白名单
                     //       IpSubnetFilterRule rule1 = new IpSubnetFilterRule("127.0.0.1", 32, IpFilterRuleType.ACCEPT);
                            //黑名单
                     //       IpSubnetFilterRule rule2 = new IpSubnetFilterRule("127.0.0.1", 32, IpFilterRuleType.REJECT);
                      //      IpFilterRule rejectAll = new IpFilterRule() {
                      //          @Override
                        //        public boolean matches(InetSocketAddress remoteAddress) {
                      //              return true;
                      //          }
                      //          @Override
                      //          public IpFilterRuleType ruleType() {
                     //               return IpFilterRuleType.REJECT;
                     //           }
                     //       };
                     //       RuleBasedIpFilter filter = new RuleBasedIpFilter(rule1, rejectAll );
                    //        socketChannel.pipeline().addLast("ipFilter", filter);
                     //       socketChannel.pipeline().addLast("encoder", new StringEncoder());
                     //       socketChannel.pipeline().addLast("decoder", new StringDecoder());
                     //----------------------------------------------------------以上为ip过滤-----------------------------------------------------------

                       }

                    });
            // 绑定端口
            f = serverBootstrap.bind().sync();
            channel = f.channel();
            log.info("-------------netty启动成功--------端口{}", port);
        } catch (Exception e) {
            log.error("connection error",e.getMessage(), e);
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98

4.创建netty客户端信息接收器



import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import io.netty.channel.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;


@Component
@Slf4j
@ChannelHandler.Sharable
public class ClientMessageHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //把数据解析转成字符串
        ByteBuf in = (ByteBuf) msg;
        String str= in.toString(CharsetUtil.UTF_8);
        log.info("接收到数据: "+str);
//         返回请求结果
        Channel channel = ctx.channel();
        channel.writeAndFlush(JSONUtil.toJsonStr(str));
    }
    
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }
    
//只要Netty抛出错误就会执行,Netty断会开连接会抛出连接超时的错误
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.info("关闭通道");
        cause.printStackTrace();
        ctx.close();
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

5.启动项目用main方法测试

 public static void main(String[] args) throws Exception {
        Socket socket = new Socket("127.0.0.1", 20004);
    socket.getOutputStream().write("[netty,2022-10-21 14:12:09]".getBytes("UTF-8"));
    }
  • 1
  • 2
  • 3
  • 4
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/206521
推荐阅读
相关标签
  

闽ICP备14008679号