赞
踩
依赖准备
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.42.Final</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.20</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.75</version> </dependency>
1.定义消息实体
@Data public class NettyMsg { /** * 设备id */ private String deviceId; /** * 消息类型 1=定位 2=闯红灯事件 3=车流量事件 */ private Integer msgType; /** * 发送的消息 */ private String msg; }
2.定义Netty服务端
import java.nio.charset.StandardCharsets; import org.springframework.stereotype.Component; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import lombok.extern.slf4j.Slf4j; /** * Netty 服务端 */ @Component @Slf4j public class NettyServer { /** * Netty服务端的端口 */ private static final int PORT = 28888; /** * 服务端接收连接队列的长度 */ private static final int QUEUE_SIZE = 1024*1024; /** * 粘包的分隔符 */ private static final String DELIMITER = "\r\n"; /** * 分隔的最大长度 */ private static final int DELIMITER_MAX_LENGTH = 1024; /** * 服务启动绑定端口 */ public void bind() { // 配置服务端的NIO线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 创建服务器端的启动对象,配置参数 ServerBootstrap bootstrap = new ServerBootstrap(); // 设置两个线程组 bootstrap.group(bossGroup, workerGroup) // 使用NioServerSocketChannel 作为服务器的通道实现 .channel(NioServerSocketChannel.class) // 设置线程队列得到连接个数 .option(ChannelOption.SO_BACKLOG, QUEUE_SIZE) // 设置保持活动连接状态 .childOption(ChannelOption.SO_KEEPALIVE, true) // 通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去 .childOption(ChannelOption.TCP_NODELAY, true) // 给workerGroup的EventLoop对应的管道设置处理器 .childHandler(new ChannelInitializer<SocketChannel>() { // 给pipeline 设置处理器 @Override protected void initChannel(SocketChannel socketChannel) { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new DelimiterBasedFrameDecoder(DELIMITER_MAX_LENGTH, Unpooled.wrappedBuffer(DELIMITER.getBytes(StandardCharsets.UTF_8)))); // 字符串编解码器 pipeline.addLast(new StringDecoder(StandardCharsets.UTF_8), new StringEncoder(StandardCharsets.UTF_8)); pipeline.addLast(new NettyServerHandler()); } }); // 绑定端口,开始接收进来的连接 ChannelFuture future = bootstrap.bind(PORT).sync(); if (future.isSuccess()) { log.info("======socket server start======"); } // 对关闭通道进行监听 future.channel().closeFuture().sync(); } catch (InterruptedException e) { // 发送异常关闭 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); Thread.currentThread().interrupt(); } } }
3.定义 netty服务端处理器
package com.huawei.edu.iot.lab.provider.netty; import java.net.InetSocketAddress; import java.util.Iterator; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import javax.annotation.PostConstruct; import javax.annotation.Resource; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSON; import com.huawei.edu.iot.lab.provider.enums.NettyMsgTypeEnum; import com.huawei.edu.iot.lab.provider.manager.NettyManager; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelId; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import lombok.extern.slf4j.Slf4j; /** * netty服务端处理器 */ @Slf4j @Component public class NettyServerHandler extends ChannelInboundHandlerAdapter { @Resource private NettyManager nettyManager; /** * 存放连接的客户端信息 */ private static final Map<String, ChannelHandlerContext> MAP = new ConcurrentHashMap<>(); private static NettyManager staticNettyManager; @PostConstruct public void init() { staticNettyManager = nettyManager; } /** * 通道激活时触发,当客户端connect成功后,服务端就会接收到这个事件,从而可以把客户端的Channel记录下来,供后面复用 * * @param ctx ctx */ @Override public void channelActive(ChannelHandlerContext ctx) { ChannelId channelId = ctx.channel().id(); InetSocketAddress inetSocketAddress = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = inetSocketAddress.getAddress().getHostAddress(); int clientPort = inetSocketAddress.getPort(); log.info("客户端:{}连接netty服务器[IP: {},PORT: {}]", channelId, clientIp, clientPort); String channelKey = ctx.channel().id().toString(); // 如果map中不包含此连接,就保存连接 if (!hasConnection(channelKey)) { MAP.put(channelKey, ctx); } } /** * 有客户端断开连接服务器会触发此函数 * * @param ctx ctx */ @Override public void channelInactive(ChannelHandlerContext ctx) { ChannelId channelId = ctx.channel().id(); log.info("有客户端断开连接,绑定的信道是:{}", channelId); String channelKey = channelId. toString(); // 如果map中不包含此连接,就删除连接 if (hasConnection(channelKey)) { ctx.close(); MAP.remove(channelKey); } } /** * 当客户端发来消息后,就会触发,参数msg就是发来的信息,可以是基础类型,也可以是序列化的复杂对象 * * @param ctx ctx * @param msg 发来的消息 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { log.info("加载客户端报文[信道:{},消息:{}]", ctx.channel().id(), msg); // 解析客户端发来的消息 NettyMsg nettyMsg = JSON.parseObject(msg.toString(), NettyMsg.class); if (Objects.isNull(nettyMsg)) { // 消息解析出现异常 log.info("消息解析异常,msg:{}", msg); return; } // 处理消息 this.handlerMsg(nettyMsg); } /** * 处理客户端发来的消息 * * @param nettyMsg msg */ private void handlerMsg(NettyMsg nettyMsg) { NettyMsgTypeEnum.isInclude(nettyMsg.getMsgType()); // 处理闯红灯事件 if (NettyMsgTypeEnum.RED_LIGHT_RUNNING_EVENT.getValue() == nettyMsg.getMsgType()) { staticNettyManager.processRedLightRunningEvent(nettyMsg.getMsg()); } // 处理车流量事件 if (NettyMsgTypeEnum.TRAFFIC_CONGESTION_EVENT.getValue() == nettyMsg.getMsgType()) { staticNettyManager.processCongestion(nettyMsg.getMsg()); } // 定位 if (NettyMsgTypeEnum.POSITIONING.getValue() == nettyMsg.getMsgType()) { staticNettyManager.processPosition(nettyMsg); } // 禁止推流处理 if (NettyMsgTypeEnum.PUSH_STREAM_CONTROL.getValue() == nettyMsg.getMsgType()) { staticNettyManager.processForbiddenPushStream(nettyMsg.getMsg()); } } /** * 出错是会触发,做一些错误处理 * * @param ctx ctx * @param cause cause */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { log.error("exceptionCaught", cause); } /** * 服务端给客户端发送消息 * * @param nettyMsg 消息内容 */ public void sendMsgToClient(NettyMsg nettyMsg) { Iterator<String> iterator = MAP.keySet().iterator(); if (iterator.hasNext()) { String key = iterator.next(); if (hasConnection(key)) { ChannelHandlerContext ctx = MAP.get(key); if (ctx.channel().isActive()) { String jsonNettyMsg = JSON.toJSONString(nettyMsg); log.info("服务端发送信息为:{}", jsonNettyMsg); ctx.writeAndFlush(jsonNettyMsg); } else { MAP.remove(key); } } } } /** * 判断当前连接是否存在 * * @param channelKey channelId * @return ture/false */ public boolean hasConnection(String channelKey) { if (StringUtils.isEmpty(channelKey)) { return false; } return MAP.containsKey(channelKey); } /** * 心跳机制,超时处理 * * @param ctx ctx * @param evt evt */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { String socketString = ctx.channel().remoteAddress().toString(); if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { log.info("Client: {} READER_IDLE 读超时", socketString); ctx.channel().close(); } else if (event.state() == IdleState.WRITER_IDLE) { log.info("Client: {} WRITER_IDLE 写超时", socketString); ctx.channel().close(); } else if (event.state() == IdleState.ALL_IDLE) { log.info("Client: {} ALL_IDLE 总超时", socketString); ctx.channel().close(); } } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。