赞
踩
Netty是一个高性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持。
Netty是对JDK自带的NIO的API进行封装,具有高并发,高性能等优点。
项目中经常用到netty实现服务器与设备的通信,先写服务端代码:
- @Slf4j
- @Component
- public class NettyServerBootstrap {
-
- private Channel serverChannel;
- private static final int DEFAULT_PORT = 60782;
- //bossGroup只是处理连接请求
- private static EventLoopGroup bossGroup = null;
- //workGroup处理非连接请求,如果牵扯到数据量处理业务非常耗时的可以再单独新建一个eventLoopGroup,并在childHandler初始化的时候添加到pipeline绑定
- private static EventLoopGroup workGroup = null;
-
- /**
- * 启动Netty服务
- *
- * @return 启动结果
- */
- @PostConstruct
- public boolean start() {
- bossGroup = new NioEventLoopGroup();
- workGroup = new NioEventLoopGroup();
- //创建服务端启动对象
- ServerBootstrap bootstrap = new ServerBootstrap();
- try {
- //使用链式编程来设置
- bootstrap.group(bossGroup, workGroup)//设置两个线程组
- //使用NioSocketChannel作为服务器的通道实现
- .channel(NioServerSocketChannel.class)
- //设置线程队列得到的连接数
- .option(ChannelOption.SO_BACKLOG, 1024)
- //设置保持活动连接状态
- .childOption(ChannelOption.SO_KEEPALIVE, true)
- //设置处理器 WorkerGroup 的 EvenLoop 对应的管道设置处理器
- .childHandler(new ChannelInitializer<Channel>() {
-
- @Override
- protected void initChannel(Channel ch){
- log.info("--------------有客户端连接");
- ch.pipeline().addLast(new StringDecoder());
- ch.pipeline().addLast(new StringEncoder());
- ch.pipeline().addLast(new NettyServerHandler());
- }
- });
- //绑定端口, 同步等待成功;
- ChannelFuture future = bootstrap.bind(DEFAULT_PORT).sync();
- log.info("netty服务启动成功,ip:{},端口:{}", InetAddress.getLocalHost().getHostAddress(), DEFAULT_PORT);
- serverChannel = future.channel();
- ThreadUtil.execute(() -> {
- //等待服务端监听端口关闭
- try {
- future.channel().closeFuture().sync();
- log.info("netty服务正常关闭成功,ip:{},端口:{}", InetAddress.getLocalHost().getHostAddress(), DEFAULT_PORT);
- } catch (InterruptedException | UnknownHostException e) {
- e.printStackTrace();
- } finally {
- shutdown();
- }
- });
- } catch (Exception e) {
- e.printStackTrace();
- log.error("netty服务异常,异常原因:{}", e.getMessage());
- return false;
- }
- return true;
- }
-
-
- /**
- * 关闭当前server
- */
- public boolean close() {
- if (serverChannel != null) {
- serverChannel.close();//关闭服务
- try {
- //保险起见
- serverChannel.closeFuture().sync();
- } catch (InterruptedException e) {
- e.printStackTrace();
- return false;
- } finally {
- shutdown();
- serverChannel = null;
- }
- }
- return true;
- }
-
- /**
- * 优雅关闭
- */
- private void shutdown() {
- workGroup.shutdownGracefully();
- bossGroup.shutdownGracefully();
- }
-
- }
服务端处理类代码:
- @Slf4j
- public class NettyServerHandler extends SimpleChannelInboundHandler<String> {
-
- /**
- * 处理读取到的msg
- *
- * @param ctx 上下文
- * @param msg 数据
- */
- @Override
- protected void channelRead0(ChannelHandlerContext ctx,String msg) throws Exception {
- System.out.println("服务端收到的消息--------"+msg);
- ctx.channel().writeAndFlush("ok");
- }
-
- /**
- * 断开连接
- *
- * @param ctx 傻瓜下文
- */
- @Override
- public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
- ChannelId channelId = ctx.channel().id();
- log.info("客户端id:{},断开连接,ip:{}", channelId, ctx.channel().remoteAddress());
- super.handlerRemoved(ctx);
- }
-
- }
接下来模拟客户端:
- @Configuration
- @Component
- public class TianmiaoClient {
-
-
- private static String ip;
-
- private static int port ;
-
- @Value("${tianmiao.nettyServer.ip}")
- public void setIp(String ip) {
- this.ip = ip;
- }
-
- @Value("${tianmiao.nettyServer.port}")
- public void setPort(int port) {
- this.port = port;
- }
-
- /**
- * 服务类
- */
- private static Bootstrap bootstrap=null;
-
- /**
- * 初始化 项目启动后自动初始化
- */
- @PostConstruct
- public void init() {
-
- //worker
- EventLoopGroup worker = new NioEventLoopGroup();
-
- bootstrap = new Bootstrap();
- //设置线程池
- bootstrap.group(worker);
-
- //设置socket工厂
- bootstrap.channel(NioSocketChannel.class);
-
- //设置管道
- bootstrap.handler(new ChannelInitializer<Channel>() {
-
- @Override
- protected void initChannel(Channel ch) throws Exception {
- ch.pipeline().addLast(new StringDecoder());
- ch.pipeline().addLast(new StringEncoder());
- ch.pipeline().addLast(new TianmiaoClientHandler());
- }
- });
- }
-
-
- /**
- * 获取会话 (获取或者创建一个会话)
- */
- public Channel createChannel() {
- try {
- Channel channel = bootstrap.connect( ip, port).sync().channel();
- return channel;
- } catch (Exception e) {
- e.printStackTrace();
- }
- return null;
- }
-
- }
客户端处理类代码
- public class TianmiaoClientHandler extends SimpleChannelInboundHandler<String> {
-
- @Override
- protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
- System.out.println("服务端发过来的消息:"+s);
- }
-
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- System.out.println(".......................tcp断开连接.........................");
- //移除
- Channel channel = ctx.channel();
- channel.close().sync();
- super.channelInactive(ctx);
- }
-
- }
管理客户端channel的一个工具类:
- public class TianmiaoChannelManager {
-
- /**
- * 在线会话(存储注册成功的会话)
- */
- private static final ConcurrentHashMap<String, Channel> onlineChannels = new ConcurrentHashMap<>();
-
-
- /**
- * 加入
- *
- * @param mn
- * @param channel
- * @return
- */
- public static boolean putChannel(String mn, Channel channel) {
- if (!onlineChannels.containsKey(mn)) {
- boolean success = onlineChannels.putIfAbsent(mn, channel) == null ? true : false;
- return success;
- }
- return false;
- }
-
- /**
- * 移除
- *
- * @param mn
- */
- public static Channel removeChannel(String mn) {
- return onlineChannels.remove(mn);
- }
-
- /**
- * 获取Channel
- *
- * @param mn
- * @return
- */
- public static Channel getChannel(String mn) {
- // 获取一个可用的会话
- Channel channel = onlineChannels.get(mn);
- if (channel != null) {
- // 连接有可能是断开,加入已经断开连接了,我们需要进行尝试重连
- if (!channel.isActive()) {
- //先移除之前的连接
- removeChannel(mn);
- return null;
- }
- }
- return channel;
- }
-
- /**
- * 发送消息[自定义协议]
- *
- * @param <T>
- * @param mn
- * @param msg
- */
- public static <T> void sendMessage(String mn, String msg) {
- Channel channel = onlineChannels.get(mn);
- if (channel != null && channel.isActive()) {
- channel.writeAndFlush(msg);
- }
- }
-
- /**
- * 发送消息[自定义协议]
- *
- * @param <T>
- * @param msg
- */
- public static <T> void sendChannelMessage(Channel channel, String msg) {
- if (channel != null && channel.isActive()) {
- channel.writeAndFlush(msg);
- }
- }
-
- /**
- * 关闭连接
- *
- * @return
- */
- public static void closeChannel(String mn) {
- onlineChannels.get(mn).close();
- }
- }
最后是客户端使用方法:
- /**
- * 发送数据包
- * @param key
- */
- public static void tianmiaoData(String key, String data) {
- Channel channel = TianmiaoChannelManager.getChannel(key);
- //将通道存入
- if(channel==null){
- TianmiaoClient client = new TianmiaoClient();
- channel = client.createChannel();
- TianmiaoChannelManager.putChannel(key, channel);
- }
- if (channel != null && channel.isActive()) {
- //发送数据
- channel.writeAndFlush(data);
- System.out.println("-------------天苗转发数据成功-------------");
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。