赞
踩
maven里面引入netty依赖
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-all</artifactId>
- <version>4.1.51.Final</version>
- </dependency>
创建NettyServer类
- package com.NettyServer.service;
-
- import com.NettyServer.common.ServerChannelInitializer;
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import org.springframework.beans.factory.annotation.Value;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.PostConstruct;
- import javax.annotation.PreDestroy;
- import java.net.InetSocketAddress;
-
- @Component
- @Slf4j
- public class NettyServer {
- /**
- * boss 线程组用于TCP处理连接工作
- */
- private final EventLoopGroup boss = new NioEventLoopGroup();
- /**
- * work 线程组用于IO数据处理
- */
- private final EventLoopGroup work = new NioEventLoopGroup();
-
- @Value("${netty.port}")
- private Integer port;
-
-
- /**
- * 启动Netty Server
- */
- @PostConstruct
- public void start() throws InterruptedException {
- ServerBootstrap bootstrap = new ServerBootstrap();
- bootstrap.group(boss, work)
- // 指定Channel
- .channel(NioServerSocketChannel.class)
- //使用指定的端口设置套接字地址
- .localAddress(new InetSocketAddress(port))
- //服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数
- .option(ChannelOption.SO_BACKLOG, 1024)
- //设置TCP长连接,一般如果两个小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
- .childOption(ChannelOption.SO_KEEPALIVE, true)
- //将小的数据包包装成更大的帧进行传送,提高网络的负载
- .childOption(ChannelOption.TCP_NODELAY, true)
- .childHandler(new ServerChannelInitializer());
- ChannelFuture future = bootstrap.bind().sync();
- if (future.isSuccess()) {
- log.info("已启动 Netty Server");
- }
- }
-
- @PreDestroy
- public void destory() throws InterruptedException {
- boss.shutdownGracefully().sync();
- work.shutdownGracefully().sync();
- log.info("关闭Netty");
- }
创建ServerChannelInitializer类
- package com.NettyServer.common;
-
- import com.NettyServer.service.NettyServerHandler;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.handler.codec.serialization.ClassResolvers;
- import io.netty.handler.codec.serialization.ObjectDecoder;
- import io.netty.handler.codec.serialization.ObjectEncoder;
-
- public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
-
- @Override
- protected void initChannel(SocketChannel socketChannel) {
- //添加编解码
- socketChannel.pipeline().addLast(new ObjectDecoder(10 * 1024 * 1024, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
- socketChannel.pipeline().addLast(new ObjectEncoder());
- socketChannel.pipeline().addLast(new NettyServerHandler());
- }
-
-
-
- }
创建NettyServerHandler类
- package com.NettyServer.service;
-
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelId;
- import io.netty.channel.ChannelInboundHandlerAdapter;
- import io.netty.handler.timeout.IdleState;
- import io.netty.handler.timeout.IdleStateEvent;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.PostConstruct;
- import java.net.InetSocketAddress;
- import java.util.concurrent.ConcurrentHashMap;
-
- @Slf4j
- @Component
- public class NettyServerHandler extends ChannelInboundHandlerAdapter {
-
- private static NettyServerHandler handler;
-
- /**
- * 管理一个全局map,保存连接进服务端的通道数量
- */
- private static final ConcurrentHashMap<ChannelId, ChannelHandlerContext> CHANNEL_MAP = new ConcurrentHashMap<>();
-
- //@Autowired
- //private IBaseAttachmentService attachmentService;
-
- @PostConstruct
- public void init() {
- handler = this;
- }
-
- /**
- * @param ctx
- * @author xxx on 2021/4/28 16:10
- * @DESCRIPTION: 有客户端连接服务器会触发此函数
- * @return: void
- */
- @Override
- public void channelActive(ChannelHandlerContext ctx) {
-
- InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
-
- String clientIp = insocket.getAddress().getHostAddress();
- int clientPort = insocket.getPort();
-
- //获取连接通道唯一标识
- ChannelId channelId = ctx.channel().id();
-
- //如果map中不包含此连接,就保存连接
- if (CHANNEL_MAP.containsKey(channelId)) {
- log.info("客户端【" + channelId + "】是连接状态,连接通道数量: " + CHANNEL_MAP.size());
- } else {
- //保存连接
- CHANNEL_MAP.put(channelId, ctx);
-
- log.info("客户端【" + channelId + "】连接netty服务器[IP:" + clientIp + "--->PORT:" + clientPort + "]");
- log.info("连接通道数量: " + CHANNEL_MAP.size());
- }
- }
-
-
- /**
- * @param ctx
- * @author xxx on 2021/4/28 16:10
- * @DESCRIPTION: 有客户端终止连接服务器会触发此函数
- * @return: void
- */
- @Override
- public void channelInactive(ChannelHandlerContext ctx) {
-
- InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
-
- String clientIp = insocket.getAddress().getHostAddress();
-
- ChannelId channelId = ctx.channel().id();
-
- //包含此客户端才去删除
- if (CHANNEL_MAP.containsKey(channelId)) {
- //删除连接
- CHANNEL_MAP.remove(channelId);
- log.info("客户端【" + channelId + "】退出netty服务器[IP:" + clientIp + "--->PORT:" + insocket.getPort() + "]");
- log.info("连接通道数量: " + CHANNEL_MAP.size());
- }
- }
-
-
- /**
- * @param ctx
- * @author xxx on 2021/4/28 16:10
- * @DESCRIPTION: 有客户端发消息会触发此函数
- * @return: void
- */
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- log.info("【" + ctx.channel().id() + "】" + " :" + msg.toString());
- //可以在这个地方写业务处理逻辑入库或者什么的...................................
- // 结果写到客户端
- /**
- * 下面可以解析数据,保存数据,生成返回报文,将需要返回报文写入write函数
- *
- */
-
- //响应客户端
- this.channelWrite(ctx.channel().id(), msg);
-
- }
-
- /**
- * @param msg 需要发送的消息内容
- * @param channelId 连接通道唯一id
- * @author xxx 2021/05/14 16:10
- * @DESCRIPTION: 服务端给客户端发送消息
- * @return: void
- */
-
- public void channelWrite(ChannelId channelId, Object msg) throws Exception {
-
- ChannelHandlerContext ctx = CHANNEL_MAP.get(channelId);
-
- if (ctx == null) {
- log.info("通道【" + channelId + "】不存在");
- return;
- }
-
- if (msg == null || msg == "") {
- log.info("服务端响应空的消息");
- return;
- }
-
- //将客户端的信息直接返回写入ctx
- ctx.write(msg);
- //刷新缓存区
- ctx.flush();
- }
-
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
-
- String socketString = ctx.channel().remoteAddress().toString();
-
- if (evt instanceof IdleStateEvent) {
- IdleStateEvent event = (IdleStateEvent) evt;
- if (event.state() == IdleState.READER_IDLE) {
- log.info("Client: " + socketString + " READER_IDLE 读超时");
- ctx.disconnect();
- } else if (event.state() == IdleState.WRITER_IDLE) {
- log.info("Client: " + socketString + " WRITER_IDLE 写超时");
- ctx.disconnect();
- } else if (event.state() == IdleState.ALL_IDLE) {
- log.info("Client: " + socketString + " ALL_IDLE 总超时");
- ctx.disconnect();
- }
- }
- }
-
- /**
- * @param ctx
- * @author xxx on 2021/4/28 16:10
- * @DESCRIPTION: 发生异常会触发此函数
- * @return: void
- */
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-
- System.out.println();
- ctx.close();
- log.info(ctx.channel().id() + " 发生了错误,此连接被关闭" + "此时连通数量: " + CHANNEL_MAP.size());
- //cause.printStackTrace();
- }
-
-
- }
下面贴出来客户端代码
创建NettyClient类
- package com.NettyClient.service;
-
-
- import com.NettyClient.common.NettyClientInitializer;
- import io.netty.bootstrap.Bootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelFutureListener;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.nio.NioSocketChannel;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.PostConstruct;
-
- @Slf4j
- @Component
- public class NettyClient {
-
- static final String HOST = "127.0.0.1";
- static final int PORT = 8899;
- private EventLoopGroup group;
- private Bootstrap b;
- private ChannelFuture cf;
- private NettyClientInitializer nettyClientInitializer;
-
-
- public NettyClient() {
- nettyClientInitializer = new NettyClientInitializer();
- group = new NioEventLoopGroup();
- b = new Bootstrap();
- b.group(group)
- .channel(NioSocketChannel.class)
- .handler(nettyClientInitializer);
- }
-
- public void connect() {
- try {
- this.cf = b.connect(HOST, PORT).sync();
- } catch (InterruptedException e) {
- log.error("客户端连接服务端异常:" + e);
- }
- }
-
- public ChannelFuture getChannelFuture() {
- if (this.cf == null) {
- this.connect();
- }
- if (!this.cf.channel().isActive()) {
- this.connect();
- }
- return this.cf;
- }
-
- public void close() {
- try {
- this.cf.channel().closeFuture().sync();
- this.group.shutdownGracefully();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- public void setMessage(String msg) throws InterruptedException {
- ChannelFuture cf = this.getChannelFuture();
- cf.channel().writeAndFlush(msg);
- }
-
-
- public static void main(String[] args) {
- try {
-
- } catch (Exception e) {
- log.error("异常:" + e);
- }
- }
-
-
- }
创建NettyClientHandler类
- package com.NettyClient.service;
-
-
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.stereotype.Component;
-
-
- @Slf4j
- public class NettyClientHandler extends ChannelInboundHandlerAdapter {
-
- private String result;
-
- @Override
-
- public void channelActive(ChannelHandlerContext ctx) {
- }
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) {
- //if (!ObjectUtils.isEmpty(msg)) {
- result = (String) msg;
- //}
- }
-
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) {
- ctx.flush();
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
- cause.printStackTrace();
- ctx.close();
- }
-
- }
创建NettyClientInitializer类
- package com.NettyClient.common;
-
- import com.NettyClient.service.NettyClientHandler;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.handler.codec.serialization.ClassResolvers;
- import io.netty.handler.codec.serialization.ObjectDecoder;
- import io.netty.handler.codec.serialization.ObjectEncoder;
- import io.netty.handler.timeout.ReadTimeoutHandler;
- import org.springframework.beans.factory.annotation.Autowired;
-
-
- public class NettyClientInitializer extends ChannelInitializer<SocketChannel> {
-
-
- @Override
- protected void initChannel(SocketChannel socketChannel) throws Exception {
- socketChannel.pipeline().addLast(new ObjectDecoder(1024, ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));
- socketChannel.pipeline().addLast(new ObjectEncoder());
- socketChannel.pipeline().addLast(new NettyClientHandler());
- }
-
- }
客户端直接可以在业务代码里面调用发送数据到服务端就行了、我简单写了一个定时器测试发送的例子
- package com.NettyClient;
-
-
- import com.NettyClient.service.NettyClient;
- import lombok.RequiredArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.scheduling.annotation.Async;
- import org.springframework.scheduling.annotation.EnableAsync;
- import org.springframework.scheduling.annotation.Scheduled;
- import org.springframework.stereotype.Component;
-
- import java.text.SimpleDateFormat;
- import java.util.Date;
-
- @Component
- @Slf4j
- @EnableAsync
- public class TaskManager {
-
- @Autowired
- private NettyClient nettyClient;
-
- @Async
- @Scheduled(cron = "0 */1 * * * ?")
- public void test() throws InterruptedException {
- try {
- SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- nettyClient.setMessage("客户端发送此数据" + df.format(new Date()));
- } catch (Exception e) {
- log.error("异常:" + e);
- }
-
- }
-
-
- @Async
- @Scheduled(cron = "0 */1 * * * ?")
- public void test2() throws InterruptedException {
- try {
- SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- nettyClient.setMessage("客户端2发送此数据" + df.format(new Date()));
- } catch (Exception e) {
- log.error("异常:" + e);
- }
-
- }
-
-
-
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。