赞
踩
引入Netty的Maven依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.50.Final</version>
</dependency>
配置Netty的host和port
netty:
host: 0.0.0.0 #0.0.0.0表示绑定任意ip
port: 9998
springboot 异步启动netty
配置线程池
#线程池配置
async:
executor:
thread:
# 核心线程数
core_pool_size: 5
# 最大线程数
max_pool_size: 20
# 任务队列大小
queue_capacity: 100
# 线程池中线程的名称前缀
name_prefix: async-service-
# 缓冲队列中线程的空闲时间
keep_alive_seconds: 100
线程池 ThreadPoolConfig
package com.hongyu.thread; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.task.TaskExecutor; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.ThreadPoolExecutor; /** * 线程池配置 * * @author JHY * @date 2023/03/09 */ @Configuration @EnableAsync @Slf4j public class ThreadPoolConfig { @Value("${async.executor.thread.core_pool_size}") private int corePoolSize; @Value("${async.executor.thread.max_pool_size}") private int maxPoolSize; @Value("${async.executor.thread.queue_capacity}") private int queueCapacity; @Value("${async.executor.thread.name_prefix}") private String namePrefix; @Value("${async.executor.thread.keep_alive_seconds}") private int keepAliveSeconds; @Bean public TaskExecutor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); /** *第一种:有具体的计算公式算出来的 * 比如:线程数 = N * U * ( 1 + W/C ) * N = cpu数量(可以理解为cpu核数,即同一时刻能并行处理线程的数量) * U = 目标cpu使用率(0-1区间范围内) * W = 等待时间 * C = 计算时间 * W/C = 等待时间和计算时间的比例 *第二种: * 如果是计算密集型的应用则设置N+1线程数 * 如果是I0密集性的应用则设置2N的线程数 * - N是cpu数量 */ // 设置核心线程数 executor.setCorePoolSize(corePoolSize); // 设置最大线程数 executor.setMaxPoolSize(maxPoolSize); // 设置队列容量 executor.setQueueCapacity(queueCapacity); // 设置线程活跃时间(秒) executor.setKeepAliveSeconds(keepAliveSeconds); // 设置默认线程名称 executor.setThreadNamePrefix(namePrefix); // 设置拒绝策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 等待所有任务结束后再关闭线程池 executor.setWaitForTasksToCompleteOnShutdown(true); log.info("创建一个线程池 corePoolSize is [" + corePoolSize + "] maxPoolSize is [" + maxPoolSize + "] queueCapacity is [" + queueCapacity + "] keepAliveSeconds is [" + keepAliveSeconds + "] namePrefix is [" + namePrefix + "]."); return executor; } }
启动类
package com.hongyu; import com.hongyu.netty.NettyAsyncServer; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ApplicationContext; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling; /** * 应用程序 * * @author JHY * @date 2023/3/9 */ @Slf4j @EnableScheduling @EnableAsync @SpringBootApplication public class App { public static void main(String[] args) { ApplicationContext ctx = SpringApplication.run(App.class, args); NettyAsyncServer nettyAsyncServer = ctx.getBean("NettyAsyncServer", NettyAsyncServer.class); nettyAsyncServer.start(); } }
编写NettyServer
package com.hongyu.netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import java.net.InetSocketAddress; /** * Netty服务 * * @author JHY * @date 2023/03/08 */ @Slf4j @Component("NettyAsyncServer") public class NettyAsyncServer { @Value("${netty.host}") private String host = "0.0.0.0"; @Value("${netty.port}") private Integer port = 9998; @Async public void start() { InetSocketAddress socketAddress = new InetSocketAddress(host, port); // 主线程池: 用于接收客户端请求连接,不做任何处理 EventLoopGroup masterGroup = new NioEventLoopGroup(); // 从线程池: 主线程池会把任务交给他,让其做任务 EventLoopGroup subGroup = new NioEventLoopGroup(); try { // 创建服务器启动类 ServerBootstrap server = new ServerBootstrap(); // 设置主从线程组 server.group(masterGroup, subGroup) // 设置双向通道 .channel(NioServerSocketChannel.class) // 添加子处理器,用于处理从线程池的任务 .childHandler(new NettyServerInitializer()) .localAddress(socketAddress); // 启动服务,并且设置端口号,设置成同步方式 ChannelFuture future = server.bind(socketAddress).sync(); log.info("NettyServer启动成功: ws:/{}/ws", socketAddress); // 监听关闭的channel,设置成同步方式 future.channel().closeFuture().sync(); } catch (Exception e) { log.error("NettyServer异常:" + e.getMessage()); } finally { masterGroup.shutdownGracefully(); subGroup.shutdownGracefully(); } } }
NettyServerInitializer
package com.hongyu.netty; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.handler.timeout.IdleStateHandler; /** * 网状服务器初始化 * * @author JHY * @date 2023/03/08 */ public class NettyServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel channel) throws Exception { // 通过SocketChannel获取对应的管道 ChannelPipeline pipeline = channel.pipeline(); // 通过管道添加Handler // HttpServerCodec由Netty提供的助手类,可以理解为拦截器 // 当请求到服务器需要解码,响应到客户端需要编码 pipeline.addLast("HttpServerCodec", new HttpServerCodec()); // 写大数据流支持 pipeline.addLast("ChunkedWriteHandler", new ChunkedWriteHandler()); // 对httpMessage 聚合处理 pipeline.addLast("HttpObjectAggregator", new HttpObjectAggregator(1024 * 64)); // 增加心跳支持,已秒为单位 pipeline.addLast("IdleStateHandler", new IdleStateHandler(8, 10, 12)); // 自定义空闲状态检测 pipeline.addLast("HeartHandler", new HeartHandler()); // 支持ws,处理一些繁重复杂的事情,处理握手动作{close,ping.pong} pipeline.addLast("WebSocketServerProtocolHandler", new WebSocketServerProtocolHandler("/ws")); // 添加自定义助手类 pipeline.addLast("CustomHandler", new CustomChatHandler()); } }
心跳HeartHandler
package com.hongyu.netty; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import lombok.extern.slf4j.Slf4j; /** * 心处理程序 * 检测心跳 * * @author JHY * @date 2023/03/08 */ @Slf4j public class HeartHandler extends ChannelInboundHandlerAdapter { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { // 强制类型转换 IdleStateEvent event = (IdleStateEvent) evt; // 读空闲 if (event.state() == IdleState.READER_IDLE) { log.info("客户端[{}]进入读空闲", ctx.channel().id().asShortText()); } else if (event.state() == IdleState.WRITER_IDLE) { log.info("客户端[{}]进入写空闲", ctx.channel().id().asShortText()); } else if (event.state() == IdleState.ALL_IDLE) { log.info("客户端[{}]进入全部空闲", ctx.channel().id().asShortText()); log.info("关闭之前,users数量为:" + CustomChatHandler.users.size()); Channel channel = ctx.channel(); // 资源释放 channel.close(); log.info("关闭之后,users数量为:" + CustomChatHandler.users.size()); } } } }
自定义助手类
package com.hongyu.netty; import com.alibaba.fastjson.JSON; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.util.concurrent.GlobalEventExecutor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; /** * 自定义聊天处理程序 * * @author JHY * @date 2023/03/08 */ @Slf4j public class CustomChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { /** * 记录和管理客户端的通道 */ public static ChannelGroup users = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { // 1.获取客户端发来的消息 String text = msg.text(); Channel channel = ctx.channel(); String now = LocalDateTime.now().format(formatter); log.info("接收到[{}]的数据:{},接收时间:{}", channel.id().asShortText(), text, now); int textIndexOf = text.indexOf("{"); if (textIndexOf >= 0) { DataContent dataContent = JSON.parseObject(text, DataContent.class); /** * 2.1 当websocket第一次open时 初始化 channel,把当前channel和userid关联起来 * 2.2 判断消息类型,把聊天内容记录到数据库,标签消息的读取状态,未读 * 2.3 读取消息类型 , 针对具体的消息进行读取,修改读取状态,已读 * 2.5 心跳类型的消息 */ String types = dataContent.getTypes(); if (StringUtils.isNotBlank(types)) { Integer meId = dataContent.getMeId(); Integer chatId = dataContent.getChatId(); String content = dataContent.getContent(); Integer msgId = dataContent.getMsgId(); String extended = dataContent.getExtended(); if ("第一次连接".equals(types)) { UserChannelRelation.put(meId, channel); } else if ("单聊".equals(types)) { // 参考接口 一对一聊天 逻辑 // 保存消息进数据库 设为未读 // 需要使用SpringUtil来注入service Channel receiverChannel = UserChannelRelation.get(chatId); if (receiverChannel == null) { // 离线用户 } else { Channel findChannel = users.find(receiverChannel.id()); if (findChannel != null) { // 用户在线 receiverChannel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(dataContent))); } else { // 离线用户 } } } else if ("群聊".equals(types)) { } else if ("读取".equals(types)) { // 如果多条消息批处理 } else if ("心跳".equals(types)) { log.info("收到来自[{}]的心跳包,当前时间:{}", channel.id().asLongText(), now); channel.writeAndFlush(new TextWebSocketFrame("{\"types\":\"心跳回复\",\"timestamp\":\"" + now + "\"}")); } } } else if ("PING".equals(text)) { channel.writeAndFlush(new TextWebSocketFrame("PONG")); } // // 将数据刷新到客户端上 // String writeText = String.format("服务器在:%s,接收到的消息内容为:%s", LocalDateTime.now(), msg.text()); // users.writeAndFlush(new TextWebSocketFrame(writeText)); } /** * 处理程序添加 * * @param ctx ctx * @throws Exception 异常 */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { users.add(ctx.channel()); log.info("客户端[{}]连接成功,当前{}人在线", ctx.channel().id().asShortText(), users.size()); } /** * 处理程序删除 * * @param ctx ctx * @throws Exception 异常 */ @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { users.remove(ctx.channel()); log.info("客户端[{}]断开连接,当前{}人在线", ctx.channel().id().asShortText(), users.size()); } /** * 异常 * * @param ctx ctx * @param e 异常 * @throws Exception 异常 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) throws Exception { log.error("连接发生异常:{}", e.getMessage()); // 发生异常,关闭连接,同时ChannelGroup移除 ctx.channel().close(); users.remove(ctx.channel()); } }
DataContent
package com.hongyu.netty; import lombok.Data; import lombok.ToString; import java.io.Serializable; /** * 数据内容 * * @author JHY * @date 2023/03/08 */ @Data @ToString public class DataContent implements Serializable { /** * 类型,单聊 群聊 心跳 */ private String types; /** * 个人ID */ private Integer meId; /** * 聊天id */ private Integer chatId; /** * 聊天内容 */ private String content; /** * 消息id */ private Integer msgId; /** * 扩展字段 */ private String extended; }
UserChannelRelation
package com.hongyu.netty; import io.netty.channel.Channel; import lombok.Data; import lombok.ToString; import java.io.Serializable; import java.util.HashMap; /** * 用户渠道关系 * * @author JHY * @date 2023/03/08 */ @Data @ToString public class UserChannelRelation implements Serializable { private static HashMap<Integer, Channel> manage = new HashMap<>(); public static void put(Integer meId, Channel channel) { manage.put(meId, channel); } public static Channel get(Integer meId) { return manage.get(meId); } } ```
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。