赞
踩
注意:本篇博客风格(不多比比就是撸代码!!!)
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha2</version>
</dependency>
import lombok.AllArgsConstructor; import lombok.Getter; /** * @author Andon * 2022/7/27 * <p> * 自定义Netty数据包类型 */ @Getter @AllArgsConstructor public enum NettyPacketType { HEARTBEAT("心跳", "HEARTBEAT"), REQUEST("请求", "REQUEST"), RESPONSE("响应", "RESPONSE"); private final String name; private final String value; }
import com.andon.common.constant.NettyPacketType; import lombok.Data; import lombok.extern.slf4j.Slf4j; import java.io.Serializable; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; /** * @author Andon * 2022/7/27 * <p> * 自定义Netty数据包 */ @Data @Slf4j public class NettyPacket<T> implements Serializable { public static Map<String, NettyPacket<Object>> RESPONSE = new ConcurrentHashMap<>(); private String requestId; private String nettyPacketType; private String command; private int code; private T data; private String message; private int total; public static <T> NettyPacket<T> buildRequest(T param) { NettyPacket<T> nettyPacket = new NettyPacket<>(); nettyPacket.setRequestId(getOnlyId()); nettyPacket.setNettyPacketType(NettyPacketType.REQUEST.getValue()); nettyPacket.setData(param); nettyPacket.setCommand(command(param)); return nettyPacket; } public static <T> NettyPacket<T> buildRequest(String requestId, T param) { NettyPacket<T> nettyPacket = new NettyPacket<>(); nettyPacket.setRequestId(getOnlyId()); nettyPacket.setNettyPacketType(requestId); nettyPacket.setData(param); nettyPacket.setCommand(command(param)); return nettyPacket; } public static <T> NettyPacket<T> buildResponse(T data) { NettyPacket<T> nettyPacket = new NettyPacket<>(); nettyPacket.setRequestId(getOnlyId()); nettyPacket.setNettyPacketType(NettyPacketType.RESPONSE.getValue()); nettyPacket.setData(data); nettyPacket.setCommand(command(data)); return nettyPacket; } public static <T> NettyPacket<T> buildResponse(String requestId, T data) { NettyPacket<T> nettyPacket = new NettyPacket<>(); nettyPacket.setRequestId(requestId); nettyPacket.setNettyPacketType(NettyPacketType.RESPONSE.getValue()); nettyPacket.setData(data); nettyPacket.setCommand(command(data)); return nettyPacket; } public static void response(String requestId, NettyPacket<Object> nettyResponse) { RESPONSE.put(requestId, nettyResponse); } public NettyPacket<Object> futureResponse() throws Exception { long start = System.currentTimeMillis(); NettyPacket<Object> nettyResponse = RESPONSE.get(requestId); while (nettyResponse == null) { long cost = System.currentTimeMillis() - start; if (cost > TimeUnit.MINUTES.toMillis(3)) { throw new RuntimeException("requestId:" + requestId + " 超时,耗时" + cost + "ms"); } TimeUnit.MILLISECONDS.sleep(10); nettyResponse = RESPONSE.get(requestId); } return RESPONSE.remove(requestId); } public static String getOnlyId() { return UUID.randomUUID().toString(); } public static final String REQUEST_TEST_1 = "1"; public static final String RESPONSE_TEST_1 = "2"; public static <T> String command(T t) { if (t instanceof RequestTestVO) { return REQUEST_TEST_1; } else if (t instanceof ResponseTestVO) { return RESPONSE_TEST_1; } return null; } }
import com.andon.common.dto.NettyPacket; import lombok.Getter; import org.springframework.context.ApplicationEvent; /** * @author Andon * 2022/7/27 * <p> * 自定义Netty数据包处理事件 */ @Getter public class NettyPacketEvent extends ApplicationEvent { private NettyPacket<Object> nettyPacket; public NettyPacketEvent(Object source, NettyPacket<Object> nettyPacket) { super(source); this.nettyPacket = nettyPacket; } }
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; /** * @author Andon * 2022/7/22 * <p> * Netty服务端 */ @Slf4j @Component @RequiredArgsConstructor public class NettyServer implements CommandLineRunner { private Channel channel; // boss事件轮询线程组,处理连接事件 private final EventLoopGroup bossGroup = new NioEventLoopGroup(); // worker事件轮询线程组,用于数据处理 private final EventLoopGroup workerGroup = new NioEventLoopGroup(); private final NettyServerInitializer nettyServerInitializer; @Value("${netty.port}") private Integer port; /** * 开启Netty服务 */ @Override public void run(String... args) { try { // 启动类 ServerBootstrap serverBootstrap = new ServerBootstrap(); // 设置参数,组配置 serverBootstrap.group(bossGroup, workerGroup) // 指定channel .channel(NioServerSocketChannel.class) // 初始化服务端可连接队列 .option(ChannelOption.SO_BACKLOG, 1024) // 允许重复使用本地地址和端口,连接关闭后,可以立即重用端口 .option(ChannelOption.SO_REUSEADDR, true) // 设置TCP长连接,TCP会主动探测空闲连接的有效性 .childOption(ChannelOption.SO_KEEPALIVE, true) // 禁用Nagle算法,小数据时可以即时传输 .childOption(ChannelOption.TCP_NODELAY, true) // 发送缓冲区大小 .childOption(ChannelOption.SO_SNDBUF, 256 * 1024) // 接收缓冲区大小 .childOption(ChannelOption.SO_RCVBUF, 256 * 1024) // Netty服务端channel初始化 .childHandler(nettyServerInitializer); // 绑定端口,开始接收进来的连接 ChannelFuture future = serverBootstrap.bind(port).sync(); if (future.isSuccess()) { log.info("Netty服务端启动!! 端口:[{}]", port); } channel = future.channel(); } catch (Exception e) { log.error("Netty服务端启动异常!! error:{}", e.getMessage()); } } @PreDestroy private void destroy() { if (channel != null) { channel.close(); } workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); log.warn("Netty服务关闭!!"); } public boolean channelWrite(ChannelId channelId, Object msg) { ChannelHandlerContext ctx = NettyServerHandler.CHANNEL_MAP.get(channelId); if (ctx == null) { log.warn("通道【{}】不存在!!", channelId); return false; } ctx.writeAndFlush(msg); return true; } }
import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; /** * @author Andon * 2022/7/22 * <p> * Netty服务端初始化配置 */ @Component @RequiredArgsConstructor public class NettyServerInitializer extends ChannelInitializer<Channel> { private final NettyServerHandler nettyServerHandler; /** * 初始化channel */ @Override protected void initChannel(Channel channel) { channel.pipeline() // 解码器,对接收到的数据进行长度字段解码,也会对数据进行粘包和拆包处理 .addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2)) // 编码器,主要是在响应字节数据前面添加字节长度字段 .addLast(new LengthFieldPrepender(2)) .addLast(new StringDecoder(CharsetUtil.UTF_8)) .addLast(new StringEncoder(CharsetUtil.UTF_8)) .addLast("nettyServerHandler", nettyServerHandler); } }
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.TypeReference; import com.andon.common.dto.NettyPacket; import com.andon.common.event.NettyPacketEvent; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelId; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationEventPublisher; import org.springframework.stereotype.Component; import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * @author Andon * 2022/7/22 * <p> * Netty服务端处理器 */ @Slf4j @Component @RequiredArgsConstructor @ChannelHandler.Sharable public class NettyServerHandler extends ChannelInboundHandlerAdapter { // 管理一个全局map,保存连接进服务端的通道 public static final Map<ChannelId, ChannelHandlerContext> CHANNEL_MAP = new ConcurrentHashMap<>(); private final ApplicationEventPublisher applicationEventPublisher; /** * 当客户端主动连接服务端,通道活跃后触发 */ @Override public void channelActive(ChannelHandlerContext ctx) { InetSocketAddress inetSocketAddress = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = inetSocketAddress.getAddress().getHostAddress(); int clientPort = inetSocketAddress.getPort(); // 获取连接通道唯一标识 ChannelId channelId = ctx.channel().id(); // 如果map中不包含此连接,就保存连接 if (CHANNEL_MAP.containsKey(channelId)) { log.info("客户端【{}】是连接状态,连接通道数量:{}", channelId, CHANNEL_MAP.size()); } else { // 保存连接 CHANNEL_MAP.put(channelId, ctx); log.info("客户端【{}】连接Netty服务端!![clientIp:{} clientPort:{}]", channelId, clientIp, clientPort); log.info("连接通道数量:{}", CHANNEL_MAP.size()); } } /** * 当客户端主动断开连接,通道不活跃触发 */ @Override public void channelInactive(ChannelHandlerContext ctx) { InetSocketAddress inetSocketAddress = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = inetSocketAddress.getAddress().getHostAddress(); int clientPort = inetSocketAddress.getPort(); // 获取终止连接的客户端ID ChannelId channelId = ctx.channel().id(); // 包含此客户端才去删除 if (CHANNEL_MAP.containsKey(channelId)) { // 删除连接 CHANNEL_MAP.remove(channelId); log.warn("客户端【{}】断开Netty连接!![clientIp:{} clientPort:{}]", channelId, clientIp, clientPort); log.info("连接通道数量:{}", CHANNEL_MAP.size()); } } /** * 通道有消息触发 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { try { // 报文解析处理 NettyPacket<Object> nettyPacket = JSONObject.parseObject(msg.toString(), new TypeReference<NettyPacket<Object>>() { }.getType()); // 发布自定义Netty数据包处理事件 applicationEventPublisher.publishEvent(new NettyPacketEvent(ctx.channel().id(), nettyPacket)); } catch (Exception e) { log.error("channelId:【{}】 报文解析失败!! msg:{} error:{}", ctx.channel().id(), msg.toString(), e.getMessage()); NettyPacket<String> nettyResponse = NettyPacket.buildRequest("报文解析失败!!"); ctx.writeAndFlush(JSONObject.toJSONString(nettyResponse)); } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { String socketString = ctx.channel().remoteAddress().toString(); if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { log.warn("Client: 【{}】 READER_IDLE 读超时", socketString); ctx.close(); } else if (event.state() == IdleState.WRITER_IDLE) { log.warn("Client: 【{}】 WRITER_IDLE 写超时", socketString); ctx.close(); } else if (event.state() == IdleState.ALL_IDLE) { log.warn("Client: 【{}】 ALL_IDLE 读/写超时", socketString); ctx.close(); } } } /** * 当连接发生异常时触发 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 当出现异常就关闭连接 ctx.close(); } }
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.TypeReference; import com.andon.common.constant.NettyPacketType; import com.andon.common.dto.NettyPacket; import com.andon.common.dto.RequestTestVO; import com.andon.common.dto.ResponseTestVO; import com.andon.common.event.NettyPacketEvent; import com.andon.nettyserver.socket.NettyServer; import io.netty.channel.ChannelId; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationListener; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import java.util.Date; /** * @author Andon * 2022/7/27 * <p> * Netty服务端自定义数据包处理监听器 */ @Slf4j @Component @RequiredArgsConstructor public class NettyServerPacketListener implements ApplicationListener<NettyPacketEvent> { private final NettyServer nettyServer; @Async @Override public void onApplicationEvent(NettyPacketEvent event) { ChannelId channelId = (ChannelId) event.getSource(); String nettyPacketType = event.getNettyPacket().getNettyPacketType(); if (nettyPacketType.equals(NettyPacketType.HEARTBEAT.getValue())) { log.info("server receive heartbeat!! channelId:{}", channelId); NettyPacket<String> nettyResponse = NettyPacket.buildResponse("server receive heartbeat " + new Date().toString()); nettyResponse.setNettyPacketType(NettyPacketType.HEARTBEAT.getValue()); boolean success = nettyServer.channelWrite(channelId, JSONObject.toJSONString(nettyResponse)); } else if (nettyPacketType.equals(NettyPacketType.REQUEST.getValue())) { String command = event.getNettyPacket().getCommand(); Object data = event.getNettyPacket().getData(); if (command.equals(NettyPacket.REQUEST_TEST_1)) { requestTest1(channelId, event.getNettyPacket().getRequestId(), JSONObject.parseObject(JSONObject.toJSONString(data), new TypeReference<RequestTestVO>() { }.getType())); } else { log.warn("unknown command!! channelId:{} data:{}", channelId, JSONObject.toJSONString(data)); } } else if (nettyPacketType.equals(NettyPacketType.RESPONSE.getValue())) { // TODO RESPONSE log.info("channelId:{} RESPONSE!! data:{}", channelId, JSONObject.toJSONString(event.getNettyPacket().getData())); } else { log.warn("unknown NettyPacketType!! channelId:{} event:{}", channelId, JSONObject.toJSONString(event)); } } /** * 处理请求:RequestTest1 */ private void requestTest1(ChannelId channelId, String requestId, RequestTestVO param) { log.info("处理客户端【{}】的请求,请求ID:{},请求参数:{}", channelId, requestId, JSONObject.toJSONString(param)); ResponseTestVO response = ResponseTestVO.builder().message("server receive param").date(new Date()).build(); NettyPacket<Object> nettyResponse = NettyPacket.buildResponse(requestId, response); nettyServer.channelWrite(channelId, JSONObject.toJSONString(nettyResponse)); } }
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
@EnableAsync //启用异步
@SpringBootApplication
public class NettyServerApplication {
public static void main(String[] args) {
SpringApplication.run(NettyServerApplication.class, args);
}
}
import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; import java.util.concurrent.TimeUnit; /** * @author Andon * 2022/7/22 * <p> * Netty客户端 */ @Slf4j @Component @RequiredArgsConstructor public class NettyClient implements CommandLineRunner { private Channel channel; private final EventLoopGroup workGroup = new NioEventLoopGroup(); private final NettyClientInitializer nettyClientInitializer; @Value("${netty.host}") private String host; @Value("${netty.port}") private Integer port; @Override public void run(String... args) { try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workGroup) .channel(NioSocketChannel.class) // 设置TCP长连接,TCP会主动探测空闲连接的有效性 .option(ChannelOption.SO_KEEPALIVE, true) // 禁用Nagle算法,小数据时可以即时传输 .option(ChannelOption.TCP_NODELAY, true) // 发送缓冲区大小 .option(ChannelOption.SO_SNDBUF, 256 * 1024) // 接收缓冲区大小 .option(ChannelOption.SO_RCVBUF, 256 * 1024) // Netty客户端channel初始化 .handler(nettyClientInitializer); // 连接服务器ip、端口 ChannelFuture future = bootstrap.connect(host, port); //客户端断线重连逻辑 future.addListener((ChannelFutureListener) futureListener -> { if (futureListener.isSuccess()) { log.info("连接Netty服务端成功!!"); } else { log.warn("连接Netty服务端失败,准备30s后进行断线重连!!"); futureListener.channel().eventLoop().schedule((Runnable) this::run, 30, TimeUnit.SECONDS); } }); channel = future.channel(); } catch (Exception e) { log.error("连接Netty服务端异常!! error:{}", e.getMessage()); } } @PreDestroy private void destroy() { if (channel != null) { channel.close(); } workGroup.shutdownGracefully(); log.warn("Netty连接关闭!!"); } public boolean sendMsg(String msg) { boolean active = channel.isActive(); if (active) { channel.writeAndFlush(msg); } else { log.warn("channel active:{}", false); } return active; } }
import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.CharsetUtil; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; import java.util.concurrent.TimeUnit; /** * @author Andon * 2022/7/22 * <p> * Netty客户端通道初始化 */ @Component @RequiredArgsConstructor public class NettyClientInitializer extends ChannelInitializer<Channel> { private final NettyClientHandler nettyClientHandler; /** * 初始化channel */ @Override protected void initChannel(Channel channel) { channel.pipeline() // 解码器,对接收到的数据进行长度字段解码,也会对数据进行粘包和拆包处理 .addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2)) // 编码器,主要是在响应字节数据前面添加字节长度字段 .addLast(new LengthFieldPrepender(2)) .addLast(new StringDecoder(CharsetUtil.UTF_8)) .addLast(new StringEncoder(CharsetUtil.UTF_8)) .addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS)) .addLast(nettyClientHandler); } }
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.TypeReference; import com.andon.common.constant.NettyPacketType; import com.andon.common.dto.NettyPacket; import com.andon.common.event.NettyPacketEvent; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Date; import java.util.concurrent.TimeUnit; /** * @author Andon * 2022/7/22 * <p> * Netty客户端处理器 */ @Slf4j @Component @ChannelHandler.Sharable @RequiredArgsConstructor public class NettyClientHandler extends ChannelInboundHandlerAdapter { @Lazy @Resource private NettyClient nettyClient; private final ApplicationEventPublisher applicationEventPublisher; /** * 建立连接时 */ @Override public void channelActive(ChannelHandlerContext ctx) { log.info("建立Netty连接!!"); ctx.fireChannelActive(); } /** * 关闭连接时 */ @Override public void channelInactive(ChannelHandlerContext ctx) { log.warn("Netty连接关闭!!"); reconnect(ctx); } /** * 心跳处理,每5秒发送一次心跳请求 */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent idleStateEvent = (IdleStateEvent) evt; if (idleStateEvent.state() == IdleState.WRITER_IDLE) { log.info("已经5秒没有发送消息给服务端!!"); // 向服务端发送心跳包 NettyPacket<String> nettyRequest = NettyPacket.buildRequest("client heartbeat " + new Date().toString()); nettyRequest.setNettyPacketType(NettyPacketType.HEARTBEAT.getValue()); // 发送心跳消息,并在发送失败时关闭该连接 ctx.writeAndFlush(JSONObject.toJSONString(nettyRequest)); } } else { super.userEventTriggered(ctx, evt); } } /** * 收到服务端消息 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { try { // 报文解析处理 NettyPacket<Object> nettyPacket = JSONObject.parseObject(msg.toString(), new TypeReference<NettyPacket<Object>>() { }.getType()); // 发布自定义Netty数据包处理事件 applicationEventPublisher.publishEvent(new NettyPacketEvent(ctx.channel().id(), nettyPacket)); } catch (Exception e) { log.error("channelId:【{}】 报文解析失败!! msg:{} error:{}", ctx.channel().id(), msg.toString(), e.getMessage()); NettyPacket<String> nettyResponse = NettyPacket.buildRequest("报文解析失败!!"); ctx.writeAndFlush(JSONObject.toJSONString(nettyResponse)); } } /** * 当连接发生异常时触发 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 当出现异常就关闭连接 ctx.close(); } private void reconnect(ChannelHandlerContext ctx) { log.info("准备30s后断线重连!!"); ctx.channel().eventLoop().schedule(() -> nettyClient.run(), 30, TimeUnit.SECONDS); } }
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.TypeReference; import com.andon.common.constant.NettyPacketType; import com.andon.common.dto.NettyPacket; import com.andon.common.event.NettyPacketEvent; import io.netty.channel.ChannelId; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationListener; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; /** * @author Andon * 2022/7/27 * <p> * Netty客户端自定义数据包处理监听器 */ @Slf4j @Component @RequiredArgsConstructor public class NettyClientPacketListener implements ApplicationListener<NettyPacketEvent> { @Async @Override public void onApplicationEvent(NettyPacketEvent event) { ChannelId channelId = (ChannelId) event.getSource(); String nettyPacketType = event.getNettyPacket().getNettyPacketType(); if (nettyPacketType.equals(NettyPacketType.HEARTBEAT.getValue())) { log.info("client receive heartbeat!!"); } else if (nettyPacketType.equals(NettyPacketType.REQUEST.getValue())) { // TODO REQUEST log.info("channelId:{} REQUEST!! data:{}", channelId, JSONObject.toJSONString(event.getNettyPacket().getData())); } else if (nettyPacketType.equals(NettyPacketType.RESPONSE.getValue())) { NettyPacket<Object> nettyResponse = JSONObject.parseObject(JSONObject.toJSONString(event.getNettyPacket()), new TypeReference<NettyPacket<Object>>() { }.getType()); NettyPacket.response(event.getNettyPacket().getRequestId(), nettyResponse); } else { log.warn("unknown NettyPacketType!! channelId:{} event:{}", channelId, JSONObject.toJSONString(event)); } } }
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
@EnableAsync // 启用异步
@SpringBootApplication
public class NettyClientApplication {
public static void main(String[] args) {
SpringApplication.run(NettyClientApplication.class, args);
}
}
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.TypeReference; import com.andon.common.dto.NettyPacket; import com.andon.common.dto.RequestTestVO; import com.andon.common.dto.ResponseTestVO; import com.andon.nettyclient.socket.NettyClient; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; import java.util.Date; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * @author Andon * 2022/7/25 */ @Slf4j @Component @RequiredArgsConstructor public class TaskScheduled implements CommandLineRunner { private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2); private final NettyClient nettyClient; /** * 模拟服务交互 */ @Override public void run(String... args) { // 如果任务里面执行的时间大于 period 的时间,下一次的任务会推迟执行。 // 本次任务执行完后下次的任务还需要延迟period时间后再执行 scheduledExecutorService.scheduleWithFixedDelay(() -> { try { System.out.println("====定时任务开始====" + new Date()); // 向Netty服务端发送消息 RequestTestVO param = RequestTestVO.builder().key("hello").value("world").date(new Date()).build(); NettyPacket<RequestTestVO> nettyRequest = NettyPacket.buildRequest(param); boolean success = nettyClient.sendMsg(JSONObject.toJSONString(nettyRequest)); NettyPacket<Object> response = nettyRequest.futureResponse(); if (response != null) { ResponseTestVO responseTestVO = JSONObject.parseObject(JSONObject.toJSONString(response.getData()), new TypeReference<ResponseTestVO>() { }.getType()); log.info("requestId:{} response:{}", nettyRequest.getRequestId(), JSONObject.toJSONString(responseTestVO)); } } catch (Exception e) { log.error("failure!! error:{}", e.getMessage()); } }, 2, 10, TimeUnit.SECONDS); } }
GitHub: link. 欢迎star
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。