赞
踩
学习了netty的所有技术,现将netty和spring boot进行整合
导入jar包
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-thymeleaf</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!--netty 依赖包--> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.34.Final</version> </dependency> </dependencies>
1.在spring boot的启动类实现CommandLineRunner,并重写其run方法
@SpringBootApplication public class BootNettyApplication implements CommandLineRunner { @Autowired private NettyServer nettyServer; @Value("${netty.port}") private Integer port; public static void main(String[] args) { SpringApplication.run(BootNettyApplication.class, args); } /** * 此处启动netty服务 * @param args * @throws Exception */ @Override public void run(String... args) throws Exception { InetSocketAddress address = new InetSocketAddress(port); // 启动netty服务器 ChannelFuture channelFuture = nettyServer.startServer(address); // 钩子方法,关闭服务器 Runtime.getRuntime().addShutdownHook(new Thread(() -> nettyServer.closeServer() )); channelFuture.channel().closeFuture().sync(); } }
2.自定义netty服务类
@Component @Slf4j public class NettyServer { private final EventLoopGroup bossGroup = new NioEventLoopGroup(); private final EventLoopGroup workerGroup = new NioEventLoopGroup(); private Channel channel; public ChannelFuture startServer(InetSocketAddress address) { ChannelFuture channelFuture = null; try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup,workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ServerInit()) // 允许的最大连接数 .option(ChannelOption.SO_BACKLOG,128) .childOption(ChannelOption.SO_KEEPALIVE,Boolean.TRUE); channelFuture = serverBootstrap.bind(address).sync(); channel = channelFuture.channel(); } catch (Exception e) { log.error("netty启动出错!",e); } finally { if (channelFuture != null && channelFuture.isSuccess()) { log.info("netty正在监听" + address.getHostName() + " 于端口 " + address.getPort() + ", 等待连接"); } else { log.error("netty启动失败!"); } } return channelFuture; } /** * 关闭netty服务 */ public void closeServer() { log.info("关闭netty服务。。。"); if (channel != null) { channel.close(); } bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); log.info("关闭netty服务成功!"); } }
3.netty服务初始化
public class ServerInit extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder(Charset.forName("utf-8")));
pipeline.addLast(new StringEncoder(Charset.forName("utf-8")));
// 心跳监测机制
// pipeline.addLast(new IdleStateHandler(5,7,10, TimeUnit.SECONDS));
pipeline.addLast(new ServerHandler());
}
}
4.自定义handler实现
@Slf4j public class ServerHandler extends SimpleChannelInboundHandler<String> { /** * 接收到消息执行的回调方法 * @param ctx * @param msg * @throws Exception */ @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(msg); } /** * 心跳监测 * @param ctx * @param evt * @throws Exception */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; String eventType = null; switch (event.state()) { case READER_IDLE: eventType = "读空闲"; break; case WRITER_IDLE: eventType = "些空闲"; break; case ALL_IDLE: eventType = "读写空闲"; break; default: } String date = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); log.info(date + " " + ctx.channel().remoteAddress() + " " + eventType); } } /** * 客户端连接成功执行方法 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { String date = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); log.info(date + " " + ctx.channel().remoteAddress() + " 客户端连接成功!"); ctx.writeAndFlush("连接成功!"); } /** * 客户端下线 * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { String date = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); log.info(date + " " + ctx.channel().remoteAddress() + " 客户端下线!"); } /** * 客户端连接成功后执行的回调方法 * @param ctx * @throws Exception */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { String date = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); log.info(date + " " + ctx.channel().remoteAddress() + " 新的客户端加入!"); } /** * 断开连接 * @param ctx * @throws Exception */ @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { String date = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); log.info(date + " " + ctx.channel().remoteAddress() + " 客户端断开连接!"); } /** * 抛出异常 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("netty 服务出错!",cause); ctx.close(); } }
正常启动spring boot即可进行netty通信,可以自定义一个netty的client进行测试:
public class MyClient { public static void main(String[] args) { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4)); // pipeline.addLast(new LengthFieldPrepender(4)); pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new SimpleChannelInboundHandler<String>() { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(ctx.channel().remoteAddress()); System.out.println("client msg:" + msg); ctx.writeAndFlush("from client:" + LocalDateTime.now()); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush("客户端进行了链接!"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }); } }); ChannelFuture channelFuture = bootstrap.connect("localhost", 8888).sync(); channelFuture.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } }
good luck!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。