赞
踩
加粗样式Netty 是一个强大的网络框架,能够高效处理 TCP 数据通讯,适用于各种高并发、高性能的网络应用。
可靠性:TCP 是面向连接的协议,保证数据包的顺序到达和数据的完整性。
流量控制:TCP 提供了流量控制和拥塞控制机制,能够适应不同的网络条件。
广泛应用:适用于各种需要高可靠性的数据传输场景,如文件传输、电子邮件、Web 服务等。
双向通信:TCP 支持全双工通信,允许客户端和服务器同时发送和接收数据。
格式 | 帧头 | 帧长 | 校验码 | 循环码 | 通讯渠道 | 设备id码 | 数据方向 | 设备类型 | 时间 | 命令数据 | 帧尾 |
---|---|---|---|---|---|---|---|---|---|---|---|
字节数 | 2byte | 2byte | 2byte | 4byte | 1byte | 12byte | 1byte | 1byte | 12byte | 2byte+nbyte | 2byte |
注释 | 通知对方本帧数据包开始传输 | 表示本帧数据包(不含帧头)长度为0x0042 | CRC16的校验码,0x54为高字节,0x36为低字节 | 表示是开机以来第0x00000005次帧包发送 | 表示是通过以太网渠道 | 设备ID码(3个WORD)是0x01020304,0x05060708,0x090a0b0c | 数据方向是设备上传到平台 | 表示是行人过街设备 | 表示2024年1月8日,9时12分36秒 | 见指令概述部分 | 通知对方本帧数据包结束传输 |
之前讨论过似乎这种数据帧的格式不合理,指出帧长应该放在帧尾处…具体情况具体分析,该通讯数据帧应该软硬件协商一致后进行代码开发。
<!-- netty -->
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.25.Final</version>
</dependency>
</dependencies>
netty:
port: xxxxx
在 Spring Boot 应用启动的同时,异步启动一个 Netty 服务器
@SpringBootApplication public class SentryServiceApplication implements CommandLineRunner { @Value("${netty.port}") private int nettyPort; @Override public void run(String... args) throws Exception { new Thread(() -> { try{ new NettyServer(nettyPort).run(); } catch (Exception e){ e.printStackTrace(); } }).start(); } }
NettyServer
类的主要职责是启动和配置 Netty 服务器,包括创建线程组、配置服务器引导、绑定端口等。该类封装了 Netty 服务器的启动逻辑和生命周期管理。
/** * netty服务器,主要用于与客户端通讯 */ public class NettyServer { private final int port; //监听端口 //连接map public static Map<String, ChannelHandlerContext> map = new HashMap<String, ChannelHandlerContext>(); public NettyServer(int port) { this.port = port; } //编写run方法,处理客户端的请求 public void run() throws Exception { //创建两个线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个NioEventLoop try { ServerBootstrap b = new ServerBootstrap(); /*将线程组传入*/ b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class)/*指定使用NIO进行网络传输*/ .option(ChannelOption.SO_BACKLOG, 128) // 设置线程的连接个数 .childOption(ChannelOption.SO_KEEPALIVE, true) /*服务端每接收到一个连接请求,就会新启一个socket通信,也就是channel, 所以下面这段代码的作用就是为这个子channel增加handle*/ .childHandler(new NettyServerInitializer()); System.out.println("netty服务器启动成功(port:" + port + ")......"); /*异步绑定到服务器,sync()会阻塞直到完成*/ ChannelFuture channelFuture = b.bind(port).sync(); //监听关闭 /*阻塞直到服务器的channel关闭*/ channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { /**关闭线程组*/ bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
NettySession
和NettySessionManager
类用于管理客户端会话
NettySession
public class NettySession { /** *channel id */ private String channelId; /** * 业务关联的key */ private String bizKey; /** * 真正的连接对象 */ private Channel channel = null; /** * 连接上下文 */ private ChannelHandlerContext ctx = null; private LocalDateTime createTime; private LocalDateTime lastTime; /** * 消息流水号 word(2字节) 按发送顺序从 0 开始循环累加 */ private int currentFlowId = 0; public static NettySession build(String bizKey, ChannelHandlerContext ctx) { NettySession session = new NettySession(); Channel channel = ctx.channel(); session.bizKey = bizKey; session.channelId = channel.id().asLongText(); session.ctx = ctx; session.channel = channel; session.createTime = LocalDateTime.now(); session.lastTime = session.createTime; return session; } public boolean isActive() { if (ctx == null) { return false; } if (channel == null) { return false; } return channel.isActive(); } public boolean isWritable() { if (ctx == null) { return false; } if (channel == null) { return false; } return channel.isWritable(); } public boolean writeAndFlush(byte[] bytes) { if (bytes == null) { return false; } if (bytes.length == 0) { return false; } if (!isActive()) { return false; } if (!isWritable()) { return false; } ByteBuf retBuf = channel.alloc().buffer(bytes.length); retBuf.writeBytes(bytes); channel.writeAndFlush(retBuf); return true; } public boolean writeAndFlush(String msg){ if(StrUtil.isBlank(msg)) { return false; } if (msg.length() == 0) { return false; } if (!isActive()) { return false; } if (!isWritable()) { return false; } channel.writeAndFlush(msg); return true; } /** * 关闭连接 */ public void close() { try { ctx.close(); } catch (Exception ex) { log.error(ex.getLocalizedMessage(), ex); } } }
NettySessionManager
@Slf4j public class NettySessionManager { private static Map<String, NettySession> sessionMap = new ConcurrentHashMap<>(); private static Map<String, Set<String>> bizMap = new ConcurrentHashMap<>(); public static void addSession(NettySession session) { if (session == null) { return; } String bizKey = session.getBizKey(); if (StrUtil.isBlank(bizKey)) { return; } String sessionId = session.getChannelId(); if (StrUtil.isBlank(sessionId)) { return; } sessionMap.put(sessionId, session); log.info("-------------新增客户端连接【addSession】,sessionId:{},session:{}----------------", sessionId, session); bizMap.compute(bizKey, (key, sessionIdSet) -> { if (sessionIdSet == null) { sessionIdSet = ConcurrentHashMap.newKeySet(); } sessionIdSet.add(sessionId); log.info("-------------新增客户端连接【addSession】,bizMap:{},bizKey:{},sessionIdSet:{}----------------", bizMap, key, sessionIdSet); return sessionIdSet; }); } public static void removeSession(NettySession session) { if (session == null) { return; } removeSession(session.getChannel()); } public static void removeSession(ChannelHandlerContext ctx) { if (ctx == null) { return; } log.info("Removing session for ChannelHandlerContext: {}", ctx); removeSession(ctx.channel()); } public static void removeSession(Channel channel) { if (channel == null) { return; } String channelId = channel.id() .asLongText(); log.info("Removing session for Channel: {}", channelId); removeSession(channelId); } public static void removeSession(String channelId) { if (StrUtil.isEmpty(channelId)) { return; } String trimmedChannelId = channelId.trim(); log.info("Removing session for Channel ID: {}", trimmedChannelId); NettySession session = sessionMap.remove(trimmedChannelId); if (session == null) { log.info("No session found for Channel ID: {}", trimmedChannelId); return; } String bizKey = session.getBizKey(); if (StrUtil.isBlank(bizKey)) { log.info("No bizKey found for Channel ID: {}", trimmedChannelId); return; } log.info("Removing bizKey: {} for Channel ID: {}", bizKey, trimmedChannelId); bizMap.computeIfPresent(bizKey, (key, sessionIdSet) -> { sessionIdSet.remove(trimmedChannelId); log.info("Removed session ID from bizKey: {}. Remaining session IDs: {}", bizKey, sessionIdSet); return sessionIdSet.isEmpty() ? null : sessionIdSet; }); // Ensure channel is fully closed Channel channel = session.getChannel(); if (channel != null && channel.isActive()) { log.info("Closing channel associated with session: {}", trimmedChannelId); channel.close(); } } public static boolean isExistChannelId(ChannelHandlerContext ctx ){ if (ctx == null) { return false; } return isExistChannelId(ctx.channel()); } public static boolean isExistChannelId(Channel channel){ if (channel == null) { return false; } String channelId=channel.id().asLongText(); return isExistChannelId(channelId); } public static boolean isExistChannelId(String channelId){ return sessionMap.containsKey(channelId); } public static Optional<NettySession> getSession(ChannelHandlerContext ctx) { if (ctx == null) { return Optional.empty(); } return getSession(ctx.channel()); } public static Optional<NettySession> getSession(Channel channel) { if (channel == null) { return Optional.empty(); } String channelId = channel.id() .asLongText(); return getSession(channelId); } public static Optional<NettySession> getSession(String channelId) { if (StrUtil.isEmpty(channelId)) { return Optional.empty(); } if (!sessionMap.containsKey(channelId)) { return Optional.empty(); } NettySession nettySession = sessionMap.get(channelId); if (nettySession == null) { return Optional.empty(); } return Optional.of(nettySession); } /** * 根据BizKey获取Session列表,可能为null * @param bizKey * @return */ public static List<NettySession> getSessionByBizKey(String bizKey) { if (StrUtil.isBlank(bizKey)) { return Collections.emptyList(); } if (!bizMap.containsKey(bizKey)) { return Collections.emptyList(); } Set<String> idList = bizMap.get(bizKey); if (idList == null || idList.isEmpty()) { bizMap.remove(bizKey); return Collections.emptyList(); } List<NettySession> list = new ArrayList<>(); for (String id : idList) { Optional<NettySession> op = getSession(id); op.ifPresent(list::add); } return list; } public static boolean isHaveSessionId(String bizKey){ Set<String> idList = bizMap.get(bizKey); if (idList == null || idList.isEmpty()) { return true; } return false; } }
管理客户端标识ID与Netty Channel
对象之间的映射关系
public class ChannelMap { /** * 存放客户端标识ID(消息ID)与channel的对应关系 */ private static volatile ConcurrentHashMap<String, Channel> channelMap = null; private ChannelMap() { } public static ConcurrentHashMap<String, Channel> getChannelMap() { if (null == channelMap) { synchronized (ChannelMap.class) { if (null == channelMap) { channelMap = new ConcurrentHashMap<>(); } } } return channelMap; } public static Channel getChannel(String id) { return getChannelMap().get(id); } }
@Component @ChannelHandler.Sharable public class NettyServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new LineBasedFrameDecoder(1024,false,true)); // 限制消息读写时间,如果超过指定时间未进行数据通讯,则认为客户端闲置离线 pipeline.addLast(new IdleStateHandler(0,0,180, TimeUnit.SECONDS)); // 字符串解码 和 编码 pipeline.addLast(new MyDecoder()); pipeline.addLast(new MyEncoder()); // 自己的逻辑Handler pipeline.addLast("handler", new NettyServerHandler()); } }
@Slf4j public class MyDecoder extends ByteToMessageDecoder { private static StringBuffer MsgBuffer = new StringBuffer(); @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception { //创建字节数组,buffer.readableBytes可读字节长度 byte[] b = new byte[buffer.readableBytes()]; //复制内容到字节数组b buffer.readBytes(b); String msg = toHexString1(b); msgHandle(msg,out); } public void msgHandle(String msg,List<Object> out){ log.info("-----------------【MyDecoder.msgHandle】待处理的数据帧:{}", msg); // 判断是否需要拼接 if (msg.indexOf("0a") > 0 && !(msg.indexOf("fefd") >= 0 && (msg.length() - msg.lastIndexOf("0d0a") == 4))) { MsgBuffer.append(msg); String msgBuf = MsgBuffer.toString(); if (msgBuf.indexOf("fefd") >= 0 && (msgBuf.length() - msgBuf.lastIndexOf("0d0a") == 4)) { // 校验和处理拼接后的完整消息 if (processMessage(msgBuf, out)) { MsgBuffer.delete(0, MsgBuffer.length()); } else { MsgBuffer.delete(0, MsgBuffer.length()); } } return }else { processMessage(msg, out); } } // 处理完整的消息,返回是否校验成功 private boolean processMessage(String msg, List<Object> out) { // 检查帧头和帧尾 if (!msg.startsWith("fefd") || !msg.endsWith("0d0a")) { return false; } // 提取校验码 String checkCode = msg.substring(8, 12); // 计算 CRC 校验码 byte[] crcbyte = MyEncoder.hexString2Bytes(msg.substring(12)); int crc = CRCUtil.calcCrc16(crcbyte); crc = CRCUtil.revert(crc); String crcCode = String.format("%04x", crc); // 比较校验码 if (!checkCode.equals(crcCode)) { log.error("--------------【processMessage】CRC校验失败!服务端的校验码{},设备端的校验码{}", crcCode, checkCode); return false; } // 校验通过,添加消息到输出列表 out.add(msg); return true; } public String bytesToHexString(byte[] bArray) { StringBuffer sb = new StringBuffer(bArray.length); String sTemp; for (int i = 0; i < bArray.length; i++) { sTemp = Integer.toHexString(0xFF & bArray[i])+" "; if (sTemp.length() < 3){ sb.append(0); } sb.append(sTemp.toUpperCase()); } return sb.toString(); } public static String toHexString1(byte[] b) { StringBuffer buffer = new StringBuffer(); for (int i = 0; i < b.length; ++i) { buffer.append(toHexString1(b[i])); } return buffer.toString(); } public static String toHexString1(byte b) { String s = Integer.toHexString(b & 0xFF); if (s.length() == 1) { return "0" + s; } else { return s; } } /** * 十六进制字符串转字符串 * * @param hexStr 原16进制字符串 * @return 字符串 * */ public static String decodeHex(String hexStr) { // 定义字符数组,用于保存字符串字符,长度为16进制字符串的一半 byte[] strs = new byte[hexStr.length() / 2]; // 遍历赋值 for (int i = 0; i < strs.length; i++) { // 截取高位,使用Integer.parseInt进行转换 int high = Integer.parseInt(hexStr.substring(i * 2, i * 2 + 1), 16); // 截取低位,使用Integer.parseInt进行转换 int low = Integer.parseInt(hexStr.substring(i * 2 + 1, i * 2 + 2), 16); // 拼接赋值 strs[i] = (byte)(high * 16 + low); } // 将字符数组转换为字符串,返回结果 return new String(strs); } /** * 字符串转换成为16进制(无需Unicode编码) * @param str * @return */ public static String str2HexStr(String str) { char[] chars = "0123456789ABCDEF".toCharArray(); StringBuilder sb = new StringBuilder(""); byte[] bs = str.getBytes(); int bit; for (int i = 0; i < bs.length; i++) { bit = (bs[i] & 0x0f0) >> 4; sb.append(chars[bit]); bit = bs[i] & 0x0f; sb.append(chars[bit]); // sb.append(' '); } return sb.toString().trim(); } /** * 16进制高低位转换 * @param hex * @return java.lang.String * @author wangguangle * @date: 2021/4/8 19:33 */ public static String reverseHex(String hex) { char[] charArray = hex.toCharArray(); int length = charArray.length; int times = length / 2; for (int c1i = 0; c1i < times; c1i += 2) { int c2i = c1i + 1; char c1 = charArray[c1i]; char c2 = charArray[c2i]; int c3i = length - c1i - 2; int c4i = length - c1i - 1; charArray[c1i] = charArray[c3i]; charArray[c2i] = charArray[c4i]; charArray[c3i] = c1; charArray[c4i] = c2; } return new String(charArray); } /** * 十六进制转ASCII码 * @other > Integer.toHexString(int) -> 10 to 16 * @param hex * @return */ public static String convertHexToString(String hex) { StringBuilder sb = new StringBuilder(); StringBuilder temp = new StringBuilder(); for (int i = 0; i < hex.length() - 1; i += 2) { // grab the hex in pairs String output = hex.substring(i, (i + 2)); // convert hex to decimal int decimal = Integer.parseInt(output, 16); // convert the decimal to character sb.append((char) decimal); temp.append(decimal); } return sb.toString(); } /** * 十六进制转ASCII码对应值 * @other * @param hexString * @return */ public static String hexStringToAscii(String hexString) { int len = hexString.length(); byte[] bytes = new byte[len / 2]; for (int i = 0; i < len; i += 2) { bytes[i / 2] = (byte) ((Character.digit(hexString.charAt(i), 16) << 4) + Character.digit(hexString.charAt(i+1), 16)); } String asciiString = new String(bytes); return asciiString; } /** * 十六进制转10进制 按位计算,位值乘权重 * @Author @zzh * @Description // 十六进制转10进制 * @Date 14:59 2023/5/4 * @param hex * @return int **/ public static int hexToDecimal(String hex) { int decimal = 0; String digits = "0123456789abcdef"; for (int i = 0; i < hex.length(); i++) { char c = hex.charAt(i); int d = digits.indexOf(c); decimal = decimal * 16 + d; } return decimal; } /** * 汉字转GB2312 * * @param chinese 汉字 * @return * @throws UnsupportedEncodingException */ public static String StringToGb(String chinese) throws UnsupportedEncodingException { // 先把字符串按gb2312转成byte数组 byte[] bytes = chinese.getBytes("GB2312"); StringBuilder gbString = new StringBuilder(); // 遍历数组 for (byte b : bytes){ // 再用Integer中的方法,把每个byte转换成16进制输出 String temp = Integer.toHexString(b); // 截取 if (temp.length() > 2){ temp = temp.substring(6, 8); }else if (temp.length() == 2){ // 为数字,数字的区为A3 gbString.append("A3"); temp = "B" + temp.substring(1); } gbString.append(temp); } return gbString.toString(); } /** * GB2312转汉字 * * @param string gb3212码 * @return * @throws Exception */ public static String GbToString(String string) throws Exception{ byte[] bytes = new byte[string.length() / 2]; for(int i = 0; i < bytes.length; i ++){ byte high = Byte.parseByte(string.substring(i * 2, i * 2 + 1), 16); byte low = Byte.parseByte(string.substring(i * 2 + 1, i * 2 + 2), 16); bytes[i] = (byte) (high << 4 | low); } String result = new String(bytes, "GB2312"); return result; } }
public class MyEncoder extends MessageToByteEncoder<String> { @Override protected void encode(ChannelHandlerContext channelHandlerContext, String s, ByteBuf byteBuf) throws Exception { //将16进制字符串转为数组 byteBuf.writeBytes(hexString2Bytes(s)); } /** * @Title:hexString2Bytes * @Description:16进制字符串转字节数组 * @param src 16进制字符串 * @return 字节数组 */ public static byte[] hexString2Bytes(String src) { // System.out.println("编码:" + src); int l = src.length() / 2; byte[] ret = new byte[l]; for (int i = 0; i < l; i++) { ret[i] = (byte) Integer.valueOf(src.substring(i * 2, i * 2 + 2), 16).byteValue(); } return ret; } /** * 计算出返回码的数据 * @param str */ public static String getCountNum(String str){ StringBuilder sb = new StringBuilder(); byte[] bytes = hexString2Bytes(str); for (int i=bytes.length-3;i>13;i--){ bytes[i] = (byte) (bytes[i] - 51); sb.append(Integer.toHexString(0xFF & bytes[i])); } return sb.toString(); } }
@Slf4j @Component /*不加这个注解那么在增加到childHandler时就必须new出来*/ @ChannelHandler.Sharable public class NettyServerHandler extends SimpleChannelInboundHandler<Object> { private static NettyServerHandler nettyServerHandler; private static SimpleDateFormat sdf = null; static { sdf = new SimpleDateFormat("yyMMddHHmmss"); } @Autowired private MsgSender msgSender; private byte[] req; private int counter; @PostConstruct public void init() { nettyServerHandler = this; } @Autowired private IDeviceTerminalService deviceTerminalService; @Autowired private RedisUtil redisUtil; //保留所有与服务器建立连接的channel对象 private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); // 读取客户端消息 @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { // Channel channel = ctx.channel(); String message=msg.toString(); log.info("----------设备交互数据[channelRead0]---------" + message); if(message.length()<80){ return; } DataPacket dataPacket = new DataPacket(message); String bizKey=dataPacket.getDeviceId(); // 判断当前连接是否为新连接 Tuple2<Boolean, NettySession> newSession = isNewSession(bizKey,ctx); Boolean isNewSession = newSession.getT1(); NettySession nettySession = newSession.getT2(); if(isNewSession){ NettySessionManager.addSession(nettySession); deviceOnline(dataPacket); } handleMessage(dataPacket); } /** * 判断当前连接是否是新链接 * @param bizKey 设备id * @param ctx ChannelHandlerContext * @return */ private Tuple2<Boolean, NettySession> isNewSession(String bizKey,ChannelHandlerContext ctx) { boolean isNewSession = true; Optional<NettySession> optionalSession = NettySessionManager.getSession(ctx); NettySession currentSession = null; if (optionalSession.isPresent()) { // 存在session,代表非第一次连接 isNewSession = false; currentSession = optionalSession.get(); currentSession.setLastTime(LocalDateTime.now()); log.info("----------旧TCP连接[有Session]----------, bizKey=[{}], channelId=[{}]", bizKey, currentSession.getChannelId()); } else { currentSession = NettySession.build(bizKey, ctx); log.info("----------新TCP连接[无Session]----------, bizKey=[{}], channelId=[{}]", bizKey, ctx.channel().id().asLongText()); } return Tuples.of(isNewSession, currentSession); } private void handleMessage(DataPacket dataPacket) throws Exception { if (dataPacket == null) { return; } String commandCode = dataPacket.getCommand(); // 命令 String strMessage = JSON.toJSONString(dataPacket); MsgSender msgSender = SpringUtil.getBean(MsgSender.class); switch (commandCode) { case ComConstant.CommandUp.SENTRY_HEARTBEAT: // 发送设备心跳消息 // System.out.println("数据上报,接收心跳数据:" + strMessage); msgSender.asyncSendDeviceUpHeartBeat(strMessage); break; case ComConstant.CommandUp.SENTRY_RADAR_STATUS: // 雷达数据上报参数 // System.out.println("数据上报,接收雷达数据:" + strMessage); msgSender.syncSendDeviceUpRadar(strMessage); break; default: //除了心跳,雷达以外的,所有数据上报 msgSender.asyncSendDeviceUpCommand(strMessage); break; } } /* * 数据读取完毕 * * 覆盖 channelActive 方法 在channel被启用的时候触发 (在建立连接的时候) * * */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("连接的客户端地址 : " + ctx.channel().remoteAddress() + " active !"); // 获取业务键(假设从Channel获取业务键的方法) String bizKey = ctx.channel().id().asLongText(); if (!NettySessionManager.isHaveSessionId(bizKey)) { List<NettySession> existingSessions = NettySessionManager.getSessionByBizKey(bizKey); if (!existingSessions.isEmpty()) { log.warn("Existing sessions found for bizKey: {}, removing old sessions", bizKey); for (NettySession session : existingSessions) { NettySessionManager.removeSession(session.getChannel().id().asLongText()); } } } super.channelActive(ctx); } // 设备上线 private void deviceOnline(DataPacket dataPacket){ log.info("----------设备:{},上线!---------", dataPacket.getDeviceId()); if(dataPacket==null){ return; } DeviceTerminal deviceTerminal = new DeviceTerminal(); String bizKey=dataPacket.getDeviceId(); deviceTerminal.setDeviceId(bizKey); deviceTerminal.setCode(bizKey); deviceTerminal.setProductionNumber(bizKey); deviceTerminal.setDeviceType(String.valueOf(dataPacket.getType())); deviceTerminal.setDeviceTime(dataPacket.getDatetime()); deviceTerminal.setOnlineTime(DateUtil.date()); deviceTerminal.setStatus(String.valueOf(DeviceStateEnum.on.getValue())); IDeviceTerminalService deviceTerminalService=SpringUtil.getBean(DeviceTerminalServiceImpl.class); deviceTerminalService.updateDeviceState(deviceTerminal); } //设备下线 private void deviceDownline(Optional<NettySession> nettySessionOptional){ nettySessionOptional.ifPresent(session -> { DeviceTerminal deviceTerminal = new DeviceTerminal(); String bizKey=session.getBizKey(); deviceTerminal.setDeviceId(bizKey); deviceTerminal.setOnlineTime(DateUtil.date()); deviceTerminal.setStatus(String.valueOf(DeviceStateEnum.down.getValue())); IDeviceTerminalService deviceTerminalService=SpringUtil.getBean(DeviceTerminalServiceImpl.class); log.info("---------更新设备状态为:{},【deviceDownline】" , DeviceStateEnum.down.getValue()); deviceTerminalService.updateDeviceState(deviceTerminal); }); } //表示服务端与客户端连接建立 @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); //其实相当于一个connection /** * 调用channelGroup的writeAndFlush其实就相当于channelGroup中的每个channel都writeAndFlush * * 先去广播,再将自己加入到channelGroup中 */ channelGroup.writeAndFlush(" 【服务器】 -" +channel.remoteAddress() +" 加入\n"); channelGroup.add(channel); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIpPort = remoteAddress.getAddress().getHostAddress() + ":" + remoteAddress.getPort(); log.info("Client IP:Port: {}", clientIpPort); Optional<NettySession> sessionOptional=NettySessionManager.getSession(ctx.channel()); deviceDownline(sessionOptional); log.info("----------设备下线[channelInactive]----------"); NettySessionManager.removeSession(ctx); super.channelInactive(ctx); } /** 发生异常后的处理*/ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIpPort = remoteAddress.getAddress().getHostAddress() + ":" + remoteAddress.getPort(); log.info("Client IP:Port: {}", clientIpPort); Optional<NettySession> sessionOptional = NettySessionManager.getSession(ctx.channel()); if (ctx.channel().isActive()) { if (cause instanceof SocketTimeoutException) { deviceDownline(sessionOptional); log.error("----------设备连接超时异常下线[exceptionCaught]----------"); } else if (cause instanceof IOException) { deviceDownline(sessionOptional); log.error("----------设备网络异常下线[exceptionCaught]----------"); log.error("Network exception occurred: {}", cause.getMessage(), cause); } else { deviceDownline(sessionOptional); log.error("----------设备未知异常下线[exceptionCaught]----------"); log.error("Unexpected exception occurred: {}", cause.getMessage(), cause); } NettySessionManager.removeSession(ctx); if (NettySessionManager.getSession(ctx.channel()).isPresent()) { log.error("Session not fully removed, additional cleanup needed."); NettySessionManager.removeSession(ctx.channel()); } log.error("----------设备异常下线,移除会话[exceptionCaught],{}", ctx.channel()); ctx.close(); log.info("----------设备异常下线,关闭通道[exceptionCaught]----------"); } else { log.error("----------Channel已经不活跃,无需处理下线逻辑[exceptionCaught]----------"); } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIpPort = remoteAddress.getAddress().getHostAddress() + ":" + remoteAddress.getPort(); log.info("Client IP:Port: {}", clientIpPort); if (ctx.channel().isActive()) { Optional<NettySession> sessionOptional = NettySessionManager.getSession(ctx.channel()); if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; switch (event.state()) { case READER_IDLE: log.info("----------客户端读取闲置超时异常下线[READER_IDLE]----------"); break; case WRITER_IDLE: log.info("----------客户端写入闲置超时异常下线[WRITER_IDLE]----------"); break; case ALL_IDLE: log.info("----------客户端全部闲置超时异常下线[ALL_IDLE]----------"); break; default: log.info("---------客户端闲置超时异常下线,IdleState.{} ----------", event.state()); break; } deviceDownline(sessionOptional); NettySessionManager.removeSession(ctx); ctx.channel().close(); } else { super.userEventTriggered(ctx, evt); log.info("----------客户端闲置超时异常下线,evt不是IdleStateEvent类型的事件,未清除通道信息[ALL_IDLE]----------{}", evt.getClass().getName()); deviceDownline(sessionOptional); } } else { log.info("----------Channel已经不活跃,无需处理下线逻辑[userEventTriggered]----------"); } }
有问题可以留言/私信我,帮你解答,有帮助请一键三连,点个再看,传下去!如果需要可以给你源文档+通讯手册!
欢迎关注微信公众号:小红的成长日记,一起学Java!
完整的源代码放在网盘中了,已经实测,大家放心获取;
解压密码,在公众号后台回复:tcp,即可获得。
链接:
https://pan.baidu.com/s/1mk5A-gSotAbkQC1ZCc74sg?pwd=8ggb
提取码:8ggb
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。