当前位置:   article > 正文

亲测!超详细的使用netty进行tcp数据通信的流程,附带完整的源代码_netty tcp

netty tcp

加粗样式Netty 是一个强大的网络框架,能够高效处理 TCP 数据通讯,适用于各种高并发、高性能的网络应用。

使用 TCP 通信的优点

可靠性:TCP 是面向连接的协议,保证数据包的顺序到达和数据的完整性。
流量控制:TCP 提供了流量控制和拥塞控制机制,能够适应不同的网络条件。
广泛应用:适用于各种需要高可靠性的数据传输场景,如文件传输、电子邮件、Web 服务等。
双向通信:TCP 支持全双工通信,允许客户端和服务器同时发送和接收数据。

1. 确认通讯数据帧格式(十六进制)

格式帧头帧长校验码循环码通讯渠道设备id码数据方向设备类型时间命令数据帧尾
字节数2byte2byte2byte4byte1byte12byte1byte1byte12byte2byte+nbyte2byte
注释通知对方本帧数据包开始传输表示本帧数据包(不含帧头)长度为0x0042CRC16的校验码,0x54为高字节,0x36为低字节表示是开机以来第0x00000005次帧包发送表示是通过以太网渠道设备ID码(3个WORD)是0x01020304,0x05060708,0x090a0b0c数据方向是设备上传到平台表示是行人过街设备表示2024年1月8日,9时12分36秒见指令概述部分通知对方本帧数据包结束传输

之前讨论过似乎这种数据帧的格式不合理,指出帧长应该放在帧尾处…具体情况具体分析,该通讯数据帧应该软硬件协商一致后进行代码开发。

2. 建立服务端对象

2.1 导入netty依赖

<!-- netty -->
<dependencies>
	<dependency>
   		<groupId>io.netty</groupId>
    	<artifactId>netty-all</artifactId>
    	<version>4.1.25.Final</version>
	</dependency>
</dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

2.2 配置文件指定测试端口和正式端口

netty:
  port: xxxxx
  • 1
  • 2

2.3 创建netty服务端

在 Spring Boot 应用启动的同时,异步启动一个 Netty 服务器

@SpringBootApplication
public class SentryServiceApplication implements CommandLineRunner {
	@Value("${netty.port}")
    private int nettyPort;
    
    @Override
    public void run(String... args) throws Exception {
        new Thread(() -> {
            try{
                new NettyServer(nettyPort).run();
            } catch (Exception e){
                e.printStackTrace();
            }
        }).start();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

NettyServer 类的主要职责是启动和配置 Netty 服务器,包括创建线程组、配置服务器引导、绑定端口等。该类封装了 Netty 服务器的启动逻辑和生命周期管理。

/**
 * netty服务器,主要用于与客户端通讯
 */
public class NettyServer {
    private final int port; //监听端口

    //连接map
    public  static Map<String, ChannelHandlerContext> map = new HashMap<String, ChannelHandlerContext>();


    public NettyServer(int port) {
        this.port = port;
    }

    //编写run方法,处理客户端的请求
    public void run() throws Exception {

        //创建两个线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个NioEventLoop

        try {
            ServerBootstrap b = new ServerBootstrap();
            /*将线程组传入*/
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)/*指定使用NIO进行网络传输*/
                    .option(ChannelOption.SO_BACKLOG, 128) // 设置线程的连接个数
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    /*服务端每接收到一个连接请求,就会新启一个socket通信,也就是channel,
                   所以下面这段代码的作用就是为这个子channel增加handle*/
                    .childHandler(new NettyServerInitializer());
            System.out.println("netty服务器启动成功(port:" + port + ")......");
            /*异步绑定到服务器,sync()会阻塞直到完成*/
            ChannelFuture channelFuture = b.bind(port).sync();
            //监听关闭 /*阻塞直到服务器的channel关闭*/
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            /**关闭线程组*/
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

3.管理Netty会话

NettySessionNettySessionManager类用于管理客户端会话

NettySession

public class NettySession {
    /**
     *channel   id
     */
    private String channelId;

    /**
     * 业务关联的key
     */
    private String bizKey;

    /**
     * 真正的连接对象
     */
    private Channel channel = null;

    /**
     * 连接上下文
     */
    private ChannelHandlerContext ctx = null;

    private LocalDateTime createTime;

    private LocalDateTime lastTime;

    /**
     * 消息流水号 word(2字节) 按发送顺序从 0 开始循环累加
     */
    private int currentFlowId = 0;


    public static NettySession build(String bizKey, ChannelHandlerContext ctx) {
        NettySession session = new NettySession();
        Channel channel = ctx.channel();
        session.bizKey = bizKey;
        session.channelId = channel.id().asLongText();
        session.ctx = ctx;
        session.channel = channel;
        session.createTime = LocalDateTime.now();
        session.lastTime = session.createTime;
        return session;
    }
    
    public boolean isActive() {
        if (ctx == null) {
            return false;
        }

        if (channel == null) {
            return false;
        }

        return channel.isActive();
    }

    public boolean isWritable() {
        if (ctx == null) {
            return false;
        }

        if (channel == null) {
            return false;
        }

        return channel.isWritable();
    }
    public boolean writeAndFlush(byte[] bytes) {
        if (bytes == null) {
            return false;
        }

        if (bytes.length == 0) {
            return false;
        }

        if (!isActive()) {
            return false;
        }

        if (!isWritable()) {
            return false;
        }

        ByteBuf retBuf = channel.alloc().buffer(bytes.length);
        retBuf.writeBytes(bytes);
        channel.writeAndFlush(retBuf);

        return true;
    }

    public boolean writeAndFlush(String msg){
        if(StrUtil.isBlank(msg)) {
            return false;
        }

        if (msg.length() == 0) {
            return false;
        }

        if (!isActive()) {
            return false;
        }

        if (!isWritable()) {
            return false;
        }
        channel.writeAndFlush(msg);
        return true;
    }

    /**
     * 关闭连接
     */
    public void close() {
        try {
            ctx.close();
        } catch (Exception ex) {
            log.error(ex.getLocalizedMessage(), ex);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
NettySessionManager
@Slf4j
public class NettySessionManager {

    private static Map<String, NettySession> sessionMap = new ConcurrentHashMap<>();

    private static Map<String, Set<String>> bizMap = new ConcurrentHashMap<>();

    public static void addSession(NettySession session) {
        if (session == null) {
            return;
        }

        String bizKey = session.getBizKey();
        if (StrUtil.isBlank(bizKey)) {
            return;
        }

        String sessionId = session.getChannelId();
        if (StrUtil.isBlank(sessionId)) {
            return;
        }

        sessionMap.put(sessionId, session);
        log.info("-------------新增客户端连接【addSession】,sessionId:{},session:{}----------------", sessionId, session);

        bizMap.compute(bizKey, (key, sessionIdSet) -> {
            if (sessionIdSet == null) {
                sessionIdSet = ConcurrentHashMap.newKeySet();
            }
            sessionIdSet.add(sessionId);
            log.info("-------------新增客户端连接【addSession】,bizMap:{},bizKey:{},sessionIdSet:{}----------------", bizMap, key, sessionIdSet);
            return sessionIdSet;
        });
    }

    public static void removeSession(NettySession session) {
        if (session == null) {
            return;
        }

        removeSession(session.getChannel());
    }

    public static void removeSession(ChannelHandlerContext ctx) {
        if (ctx == null) {
            return;
        }
        log.info("Removing session for ChannelHandlerContext: {}", ctx);
        removeSession(ctx.channel());
    }

    public static void removeSession(Channel channel) {
        if (channel == null) {
            return;
        }

        String channelId = channel.id()
                .asLongText();
        log.info("Removing session for Channel: {}", channelId);
        removeSession(channelId);
    }

    public static void removeSession(String channelId) {
        if (StrUtil.isEmpty(channelId)) {
            return;
        }
        String trimmedChannelId = channelId.trim();
        log.info("Removing session for Channel ID: {}", trimmedChannelId);

        NettySession session = sessionMap.remove(trimmedChannelId);
        if (session == null) {
            log.info("No session found for Channel ID: {}", trimmedChannelId);
            return;
        }

        String bizKey = session.getBizKey();
        if (StrUtil.isBlank(bizKey)) {
            log.info("No bizKey found for Channel ID: {}", trimmedChannelId);
            return;
        }
        log.info("Removing bizKey: {} for Channel ID: {}", bizKey, trimmedChannelId);
        bizMap.computeIfPresent(bizKey, (key, sessionIdSet) -> {
            sessionIdSet.remove(trimmedChannelId);
            log.info("Removed session ID from bizKey: {}. Remaining session IDs: {}", bizKey, sessionIdSet);
            return sessionIdSet.isEmpty() ? null : sessionIdSet;
        });

        // Ensure channel is fully closed
        Channel channel = session.getChannel();
        if (channel != null && channel.isActive()) {
            log.info("Closing channel associated with session: {}", trimmedChannelId);
            channel.close();
        }
    }

    public static boolean isExistChannelId(ChannelHandlerContext ctx ){
        if (ctx == null) {
            return false;
        }
        return isExistChannelId(ctx.channel());
    }
    public static boolean isExistChannelId(Channel channel){
        if (channel == null) {
            return false;
        }
        String channelId=channel.id().asLongText();
        return isExistChannelId(channelId);
    }
    public static boolean isExistChannelId(String channelId){
        return sessionMap.containsKey(channelId);
    }

    public static Optional<NettySession> getSession(ChannelHandlerContext ctx) {
        if (ctx == null) {
            return Optional.empty();
        }

        return getSession(ctx.channel());
    }

    public static Optional<NettySession> getSession(Channel channel) {
        if (channel == null) {
            return Optional.empty();
        }

        String channelId = channel.id()
                .asLongText();

        return getSession(channelId);
    }

    public static Optional<NettySession> getSession(String channelId) {
        if (StrUtil.isEmpty(channelId)) {
            return Optional.empty();
        }

        if (!sessionMap.containsKey(channelId)) {
            return Optional.empty();
        }

        NettySession nettySession = sessionMap.get(channelId);
        if (nettySession == null) {
            return Optional.empty();
        }

        return Optional.of(nettySession);
    }

    /**
     * 根据BizKey获取Session列表,可能为null
     * @param bizKey
     * @return
     */
    public static List<NettySession> getSessionByBizKey(String bizKey) {
        if (StrUtil.isBlank(bizKey)) {
            return Collections.emptyList();
        }

        if (!bizMap.containsKey(bizKey)) {
            return Collections.emptyList();
        }

        Set<String> idList = bizMap.get(bizKey);
        if (idList == null || idList.isEmpty()) {
            bizMap.remove(bizKey);
            return Collections.emptyList();
        }

        List<NettySession> list = new ArrayList<>();
        for (String id : idList) {
            Optional<NettySession> op = getSession(id);
            op.ifPresent(list::add);
        }
        return list;
    }

    public static boolean isHaveSessionId(String bizKey){
        Set<String> idList = bizMap.get(bizKey);
        if (idList == null || idList.isEmpty()) {
            return true;
        }
        return false;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184

管理客户端标识ID与Netty Channel对象之间的映射关系

public class ChannelMap {
    /**
     * 存放客户端标识ID(消息ID)与channel的对应关系
     */
    private static volatile ConcurrentHashMap<String, Channel> channelMap = null;

    private ChannelMap() {
    }

    public static ConcurrentHashMap<String, Channel> getChannelMap() {
        if (null == channelMap) {
            synchronized (ChannelMap.class) {
                if (null == channelMap) {
                    channelMap = new ConcurrentHashMap<>();
                }
            }
        }
        return channelMap;
    }

    public static Channel getChannel(String id) {
        return getChannelMap().get(id);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

4. 配置消息编解码

4.1 自定义初始化,用于自定义的编解码

@Component
@ChannelHandler.Sharable
public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        pipeline.addLast(new LineBasedFrameDecoder(1024,false,true));
        // 限制消息读写时间,如果超过指定时间未进行数据通讯,则认为客户端闲置离线
        pipeline.addLast(new IdleStateHandler(0,0,180, TimeUnit.SECONDS));
        // 字符串解码 和 编码
        pipeline.addLast(new MyDecoder());
        pipeline.addLast(new MyEncoder());
        // 自己的逻辑Handler
        pipeline.addLast("handler", new NettyServerHandler());
    }
}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

4.2 解码

@Slf4j
public class MyDecoder extends ByteToMessageDecoder {

    private static StringBuffer MsgBuffer = new StringBuffer();

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
        //创建字节数组,buffer.readableBytes可读字节长度
        byte[] b = new byte[buffer.readableBytes()];
        //复制内容到字节数组b
        buffer.readBytes(b);
        String msg = toHexString1(b);
        msgHandle(msg,out);
    }

    public void msgHandle(String msg,List<Object> out){
        log.info("-----------------【MyDecoder.msgHandle】待处理的数据帧:{}", msg);
        // 判断是否需要拼接
        if (msg.indexOf("0a") > 0 && !(msg.indexOf("fefd") >= 0 && (msg.length() - msg.lastIndexOf("0d0a") == 4))) {
            MsgBuffer.append(msg);
            String msgBuf = MsgBuffer.toString();
            if (msgBuf.indexOf("fefd") >= 0 && (msgBuf.length() - msgBuf.lastIndexOf("0d0a") == 4)) {
                // 校验和处理拼接后的完整消息
                if (processMessage(msgBuf, out)) {
                    MsgBuffer.delete(0, MsgBuffer.length());
                } else {
                    MsgBuffer.delete(0, MsgBuffer.length());
                }
            }
            return
        }else {
            processMessage(msg, out);
        }
    }

    // 处理完整的消息,返回是否校验成功
    private boolean processMessage(String msg, List<Object> out) {
        // 检查帧头和帧尾
        if (!msg.startsWith("fefd") || !msg.endsWith("0d0a")) {
            return false;
        }

        // 提取校验码
        String checkCode = msg.substring(8, 12);

        // 计算 CRC 校验码
        byte[] crcbyte = MyEncoder.hexString2Bytes(msg.substring(12));
        int crc = CRCUtil.calcCrc16(crcbyte);
        crc = CRCUtil.revert(crc);
        String crcCode = String.format("%04x", crc);

        // 比较校验码
        if (!checkCode.equals(crcCode)) {
            log.error("--------------【processMessage】CRC校验失败!服务端的校验码{},设备端的校验码{}", crcCode, checkCode);
            return false;
        }

        // 校验通过,添加消息到输出列表
        out.add(msg);
        return true;
    }

    public String bytesToHexString(byte[] bArray) {
        StringBuffer sb = new StringBuffer(bArray.length);
        String sTemp;
        for (int i = 0; i < bArray.length; i++) {
            sTemp = Integer.toHexString(0xFF & bArray[i])+" ";
            if (sTemp.length() < 3){
                sb.append(0);
            }
            sb.append(sTemp.toUpperCase());
        }
        return sb.toString();
    }

    public static String toHexString1(byte[] b) {
        StringBuffer buffer = new StringBuffer();
        for (int i = 0; i < b.length; ++i) {
            buffer.append(toHexString1(b[i]));
        }
        return buffer.toString();
    }

    public static String toHexString1(byte b) {
        String s = Integer.toHexString(b & 0xFF);
        if (s.length() == 1) {
            return "0" + s;
        } else {
            return s;
        }
    }

    /**
     * 十六进制字符串转字符串
     *
     * @param hexStr 原16进制字符串
     * @return 字符串
     * */
    public static String decodeHex(String hexStr) {
        // 定义字符数组,用于保存字符串字符,长度为16进制字符串的一半
        byte[] strs = new byte[hexStr.length() / 2];
        // 遍历赋值
        for (int i = 0; i < strs.length; i++) {
            // 截取高位,使用Integer.parseInt进行转换
            int high = Integer.parseInt(hexStr.substring(i * 2, i * 2 + 1), 16);
            // 截取低位,使用Integer.parseInt进行转换
            int low = Integer.parseInt(hexStr.substring(i * 2 + 1, i * 2 + 2), 16);
            // 拼接赋值
            strs[i] = (byte)(high * 16 + low);
        }
        // 将字符数组转换为字符串,返回结果
        return new String(strs);
    }

    /**
     * 字符串转换成为16进制(无需Unicode编码)
     * @param str
     * @return
     */
    public static String str2HexStr(String str) {
        char[] chars = "0123456789ABCDEF".toCharArray();
        StringBuilder sb = new StringBuilder("");
        byte[] bs = str.getBytes();
        int bit;
        for (int i = 0; i < bs.length; i++) {
            bit = (bs[i] & 0x0f0) >> 4;
            sb.append(chars[bit]);
            bit = bs[i] & 0x0f;
            sb.append(chars[bit]);
            // sb.append(' ');
        }
        return sb.toString().trim();
    }

  
    /**
     * 16进制高低位转换
     * @param hex
     * @return java.lang.String
     * @author wangguangle
     * @date: 2021/4/8 19:33
     */
    public static String reverseHex(String hex) {
        char[] charArray = hex.toCharArray();
        int length = charArray.length;
        int times = length / 2;
        for (int c1i = 0; c1i < times; c1i += 2) {
            int c2i = c1i + 1;
            char c1 = charArray[c1i];
            char c2 = charArray[c2i];
            int c3i = length - c1i - 2;
            int c4i = length - c1i - 1;
            charArray[c1i] = charArray[c3i];
            charArray[c2i] = charArray[c4i];
            charArray[c3i] = c1;
            charArray[c4i] = c2;
        }
        return new String(charArray);
    }

    /**
     * 十六进制转ASCII码
     * @other > Integer.toHexString(int) -> 10 to 16
     * @param hex
     * @return
     */
    public static String convertHexToString(String hex) {

        StringBuilder sb = new StringBuilder();
        StringBuilder temp = new StringBuilder();

        for (int i = 0; i < hex.length() - 1; i += 2) {

            // grab the hex in pairs
            String output = hex.substring(i, (i + 2));
            // convert hex to decimal
            int decimal = Integer.parseInt(output, 16);
            // convert the decimal to character
            sb.append((char) decimal);

            temp.append(decimal);
        }
        return sb.toString();
    }

    /**
     * 十六进制转ASCII码对应值
     * @other
     * @param hexString
     * @return
     */
    public static String hexStringToAscii(String hexString) {

        int len = hexString.length();
        byte[] bytes = new byte[len / 2];
        for (int i = 0; i < len; i += 2) {
            bytes[i / 2] = (byte) ((Character.digit(hexString.charAt(i), 16) << 4)
                    + Character.digit(hexString.charAt(i+1), 16));
        }
        String asciiString = new String(bytes);
        return asciiString;
    }

    /**
     * 十六进制转10进制 按位计算,位值乘权重
     * @Author @zzh
     * @Description // 十六进制转10进制
     * @Date 14:59 2023/5/4
     * @param hex
     * @return int
     **/
    public static int hexToDecimal(String hex) {
        int decimal = 0;
        String digits = "0123456789abcdef";
        for (int i = 0; i < hex.length(); i++) {
            char c = hex.charAt(i);
            int d = digits.indexOf(c);
            decimal = decimal * 16 + d;
        }
        return decimal;
    }

    /**
     * 汉字转GB2312
     *
     * @param chinese 汉字
     * @return
     * @throws UnsupportedEncodingException
     */
    public static String StringToGb(String chinese) throws UnsupportedEncodingException {
        // 先把字符串按gb2312转成byte数组
        byte[] bytes = chinese.getBytes("GB2312");
        StringBuilder gbString = new StringBuilder();
        // 遍历数组
        for (byte b : bytes){
            // 再用Integer中的方法,把每个byte转换成16进制输出
            String temp = Integer.toHexString(b);
            // 截取
            if (temp.length() > 2){
                temp = temp.substring(6, 8);
            }else if (temp.length() == 2){
                // 为数字,数字的区为A3
                gbString.append("A3");
                temp = "B" + temp.substring(1);
            }
            gbString.append(temp);
        }
        return gbString.toString();
    }

    /**
     * GB2312转汉字
     *
     * @param string gb3212码
     * @return
     * @throws Exception
     */
    public static String GbToString(String string) throws Exception{
        byte[] bytes = new byte[string.length() / 2];
        for(int i = 0; i < bytes.length; i ++){
            byte high = Byte.parseByte(string.substring(i * 2, i * 2 + 1), 16);
            byte low = Byte.parseByte(string.substring(i * 2 + 1, i * 2 + 2), 16);
            bytes[i] = (byte) (high << 4 | low);
        }
        String result = new String(bytes, "GB2312");
        return result;
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254
  • 255
  • 256
  • 257
  • 258
  • 259
  • 260
  • 261
  • 262
  • 263
  • 264
  • 265
  • 266
  • 267
  • 268
  • 269

4.3 编码

public class MyEncoder extends MessageToByteEncoder<String> {

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, String s, ByteBuf byteBuf) throws Exception {
        //将16进制字符串转为数组
        byteBuf.writeBytes(hexString2Bytes(s));
    }
    /**
     * @Title:hexString2Bytes
     * @Description:16进制字符串转字节数组
     * @param src 16进制字符串
     * @return 字节数组
     */
    public static byte[] hexString2Bytes(String src) {
//        System.out.println("编码:" + src);
        int l = src.length() / 2;
        byte[] ret = new byte[l];
        for (int i = 0; i < l; i++) {
            ret[i] = (byte) Integer.valueOf(src.substring(i * 2, i * 2 + 2), 16).byteValue();
        }
        return ret;
    }

    /**
     * 计算出返回码的数据
     * @param str
     */
    public static String getCountNum(String str){
        StringBuilder sb = new StringBuilder();
        byte[] bytes =  hexString2Bytes(str);
        for (int i=bytes.length-3;i>13;i--){
            bytes[i] = (byte) (bytes[i] - 51);
            sb.append(Integer.toHexString(0xFF & bytes[i]));
        }
        return sb.toString();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

5. 客户端消息处理

@Slf4j
@Component
/*不加这个注解那么在增加到childHandler时就必须new出来*/
@ChannelHandler.Sharable
public class NettyServerHandler extends SimpleChannelInboundHandler<Object> {
    private static NettyServerHandler nettyServerHandler;

    private static SimpleDateFormat sdf = null;
    static {
        sdf = new SimpleDateFormat("yyMMddHHmmss");
    }

    @Autowired
    private  MsgSender msgSender;

    private byte[] req;

    private int counter;
    
    @PostConstruct
    public void init()
    {
        nettyServerHandler = this;
    }

    @Autowired
    private IDeviceTerminalService deviceTerminalService;

    @Autowired
    private RedisUtil redisUtil;

    //保留所有与服务器建立连接的channel对象
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    // 读取客户端消息
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
//        Channel channel = ctx.channel();
        String message=msg.toString();
        log.info("----------设备交互数据[channelRead0]---------" + message);
        if(message.length()<80){
            return;
        }
        DataPacket dataPacket = new DataPacket(message);
        String bizKey=dataPacket.getDeviceId();

        // 判断当前连接是否为新连接
        Tuple2<Boolean, NettySession> newSession = isNewSession(bizKey,ctx);
        Boolean isNewSession = newSession.getT1();
        NettySession nettySession = newSession.getT2();

        if(isNewSession){
            NettySessionManager.addSession(nettySession);
            deviceOnline(dataPacket);
        }
        handleMessage(dataPacket);
    }

    /**
     * 判断当前连接是否是新链接
     * @param bizKey 设备id
     * @param ctx  ChannelHandlerContext
     * @return
     */
    private Tuple2<Boolean, NettySession> isNewSession(String bizKey,ChannelHandlerContext ctx) {

        boolean isNewSession = true;
        Optional<NettySession> optionalSession = NettySessionManager.getSession(ctx);
        NettySession currentSession = null;

        if (optionalSession.isPresent()) {
            // 存在session,代表非第一次连接
            isNewSession = false;
            currentSession = optionalSession.get();
            currentSession.setLastTime(LocalDateTime.now());
            log.info("----------旧TCP连接[有Session]----------, bizKey=[{}], channelId=[{}]", bizKey, currentSession.getChannelId());
        } else {
            currentSession = NettySession.build(bizKey, ctx);
            log.info("----------新TCP连接[无Session]----------, bizKey=[{}], channelId=[{}]", bizKey, ctx.channel().id().asLongText());
        }
        return Tuples.of(isNewSession, currentSession);
    }

    private void handleMessage(DataPacket dataPacket) throws Exception {
        if (dataPacket == null) {
            return;
        }
        String commandCode = dataPacket.getCommand(); // 命令
        String strMessage = JSON.toJSONString(dataPacket);
        MsgSender msgSender = SpringUtil.getBean(MsgSender.class);
        switch (commandCode) {
            case ComConstant.CommandUp.SENTRY_HEARTBEAT:
                // 发送设备心跳消息
//                System.out.println("数据上报,接收心跳数据:" + strMessage);
                msgSender.asyncSendDeviceUpHeartBeat(strMessage);
                break;
            case ComConstant.CommandUp.SENTRY_RADAR_STATUS:
                // 雷达数据上报参数
//                System.out.println("数据上报,接收雷达数据:" + strMessage);
                msgSender.syncSendDeviceUpRadar(strMessage);
                break;
            default:
                //除了心跳,雷达以外的,所有数据上报
                msgSender.asyncSendDeviceUpCommand(strMessage);
                break;
        }
    }

    /*
     * 数据读取完毕
     *
     * 覆盖 channelActive 方法 在channel被启用的时候触发 (在建立连接的时候)
     *
     * */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("连接的客户端地址 : " + ctx.channel().remoteAddress() + " active !");
        // 获取业务键(假设从Channel获取业务键的方法)
        String bizKey = ctx.channel().id().asLongText();
        if (!NettySessionManager.isHaveSessionId(bizKey)) {
            List<NettySession> existingSessions = NettySessionManager.getSessionByBizKey(bizKey);
            if (!existingSessions.isEmpty()) {
                log.warn("Existing sessions found for bizKey: {}, removing old sessions", bizKey);
                for (NettySession session : existingSessions) {
                    NettySessionManager.removeSession(session.getChannel().id().asLongText());
                }
            }
        }

        super.channelActive(ctx);
    }

    // 设备上线
    private void deviceOnline(DataPacket dataPacket){
        log.info("----------设备:{},上线!---------", dataPacket.getDeviceId());
        if(dataPacket==null){
            return;
        }
        DeviceTerminal deviceTerminal = new DeviceTerminal();
        String bizKey=dataPacket.getDeviceId();
        deviceTerminal.setDeviceId(bizKey);
        deviceTerminal.setCode(bizKey);
        deviceTerminal.setProductionNumber(bizKey);
        deviceTerminal.setDeviceType(String.valueOf(dataPacket.getType()));
        deviceTerminal.setDeviceTime(dataPacket.getDatetime());
        deviceTerminal.setOnlineTime(DateUtil.date());
        deviceTerminal.setStatus(String.valueOf(DeviceStateEnum.on.getValue()));
        IDeviceTerminalService deviceTerminalService=SpringUtil.getBean(DeviceTerminalServiceImpl.class);
        deviceTerminalService.updateDeviceState(deviceTerminal);

    }

    //设备下线
    private void deviceDownline(Optional<NettySession> nettySessionOptional){
        nettySessionOptional.ifPresent(session -> {
            DeviceTerminal deviceTerminal = new DeviceTerminal();
            String bizKey=session.getBizKey();
            deviceTerminal.setDeviceId(bizKey);
            deviceTerminal.setOnlineTime(DateUtil.date());
            deviceTerminal.setStatus(String.valueOf(DeviceStateEnum.down.getValue()));
            IDeviceTerminalService deviceTerminalService=SpringUtil.getBean(DeviceTerminalServiceImpl.class);
            log.info("---------更新设备状态为:{},【deviceDownline】" , DeviceStateEnum.down.getValue());
            deviceTerminalService.updateDeviceState(deviceTerminal);
        });
    }



    //表示服务端与客户端连接建立
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();  //其实相当于一个connection

        /**
         * 调用channelGroup的writeAndFlush其实就相当于channelGroup中的每个channel都writeAndFlush
         *
         * 先去广播,再将自己加入到channelGroup中
         */
        channelGroup.writeAndFlush(" 【服务器】 -" +channel.remoteAddress() +" 加入\n");
        channelGroup.add(channel);
    }


    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIpPort = remoteAddress.getAddress().getHostAddress() + ":" + remoteAddress.getPort();
        log.info("Client IP:Port: {}", clientIpPort);
        Optional<NettySession> sessionOptional=NettySessionManager.getSession(ctx.channel());
        deviceDownline(sessionOptional);
        log.info("----------设备下线[channelInactive]----------");
        NettySessionManager.removeSession(ctx);
        super.channelInactive(ctx);
    }

    /** 发生异常后的处理*/
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIpPort = remoteAddress.getAddress().getHostAddress() + ":" + remoteAddress.getPort();
        log.info("Client IP:Port: {}", clientIpPort);
        Optional<NettySession> sessionOptional = NettySessionManager.getSession(ctx.channel());
        if (ctx.channel().isActive()) {
            if (cause instanceof SocketTimeoutException) {
                deviceDownline(sessionOptional);
                log.error("----------设备连接超时异常下线[exceptionCaught]----------");
            } else if (cause instanceof IOException) {
                deviceDownline(sessionOptional);
                log.error("----------设备网络异常下线[exceptionCaught]----------");
                log.error("Network exception occurred: {}", cause.getMessage(), cause);
            } else {
                deviceDownline(sessionOptional);
                log.error("----------设备未知异常下线[exceptionCaught]----------");
                log.error("Unexpected exception occurred: {}", cause.getMessage(), cause);
            }

            NettySessionManager.removeSession(ctx);
            if (NettySessionManager.getSession(ctx.channel()).isPresent()) {
                log.error("Session not fully removed, additional cleanup needed.");
                NettySessionManager.removeSession(ctx.channel());
            }
            log.error("----------设备异常下线,移除会话[exceptionCaught],{}", ctx.channel());
            ctx.close();
            log.info("----------设备异常下线,关闭通道[exceptionCaught]----------");
        } else {
            log.error("----------Channel已经不活跃,无需处理下线逻辑[exceptionCaught]----------");
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIpPort = remoteAddress.getAddress().getHostAddress() + ":" + remoteAddress.getPort();
        log.info("Client IP:Port: {}", clientIpPort);
        if (ctx.channel().isActive()) {
            Optional<NettySession> sessionOptional = NettySessionManager.getSession(ctx.channel());
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent event = (IdleStateEvent) evt;
                switch (event.state()) {
                    case READER_IDLE:
                        log.info("----------客户端读取闲置超时异常下线[READER_IDLE]----------");
                        break;
                    case WRITER_IDLE:
                        log.info("----------客户端写入闲置超时异常下线[WRITER_IDLE]----------");
                        break;
                    case ALL_IDLE:
                        log.info("----------客户端全部闲置超时异常下线[ALL_IDLE]----------");
                        break;
                    default:
                        log.info("---------客户端闲置超时异常下线,IdleState.{} ----------", event.state());
                        break;
                }
                deviceDownline(sessionOptional);
                NettySessionManager.removeSession(ctx);
                ctx.channel().close();
            } else {
                super.userEventTriggered(ctx, evt);
                log.info("----------客户端闲置超时异常下线,evt不是IdleStateEvent类型的事件,未清除通道信息[ALL_IDLE]----------{}", evt.getClass().getName());
                deviceDownline(sessionOptional);
            }
        } else {
            log.info("----------Channel已经不活跃,无需处理下线逻辑[userEventTriggered]----------");
        }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254
  • 255
  • 256
  • 257
  • 258
  • 259
  • 260
  • 261
  • 262
  • 263
  • 264
  • 265

有问题可以留言/私信我,帮你解答,有帮助请一键三连,点个再看,传下去!如果需要可以给你源文档+通讯手册!

欢迎关注微信公众号:小红的成长日记,一起学Java!

在这里插入图片描述
完整的源代码放在网盘中了,已经实测,大家放心获取;
解压密码,在公众号后台回复:tcp,即可获得。

链接:
https://pan.baidu.com/s/1mk5A-gSotAbkQC1ZCc74sg?pwd=8ggb
提取码:8ggb

**加粗样式**

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小惠珠哦/article/detail/973941
推荐阅读
相关标签
  

闽ICP备14008679号