赞
踩
最近使用redis缓存,但是不允许客户端直接连接redis服务器,需要模拟redis的服务器,实现redis的get\set\del\auth基础功能。
网上该场景的解决方案不多,文章如有问题,请指正以备完善。
使用netty来实现,Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序,Netty 是一个基于NIO的客户、服务器端的编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议(redis协议)的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。
实现目标:客户端以redis协议连接netty redis服务端,netty redis服务端后端连接redis服务器,在客户端与服务器之间添加netty redis服务。客户端对连接对象无感知。
1. 国际惯例,添加pom依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
2. 配置启动类
@SpringBootApplication
@ServletComponentScan
public class EwsNettyApplication {
public static void main(String[] args) {
SpringApplication.run(EwsNettyApplication.class, args);
System.out.println("******************启动成功*****************");
}
}
启动类添加@ServletComponentScan注解,该注解可以使Listener直接通过@WebListener 注解自动注册。
3. 设置servlet容器监听
@WebListener public class NettyRunServletContextListener implements ServletContextListener { private static final Logger logger = LoggerFactory.getLogger(NettyRunServletContextListener.class); @Value("${netty.port}") private int port; @Value("${netty.url}") private String url; @Autowired private NettyConfig nettyConfig; @Autowired private ApplicationContext applicationContext; /** * Servlet容器终止Web应用时调用该方法 * @param sce servletContextEvent */ @Override public void contextDestroyed(ServletContextEvent sce) { System.out.println("====== springboot netty destroy ======"); nettyConfig.destroy(); System.out.println("---test contextDestroyed method---"); } /** * Servlet容器启动Web应用时调用该方法。 * @param sce servletContextEvent */ @Override public void contextInitialized(ServletContextEvent sce) { //处理无法注入 WebApplicationContextUtils.getRequiredWebApplicationContext(sce.getServletContext()) .getAutowireCapableBeanFactory().autowireBean(this); //初始化策略容器 initializerStrategy(); try { InetSocketAddress address = new InetSocketAddress(url, port); //nettyConfig执行,返回异步通知接口 ChannelFuture future = nettyConfig.run(address); logger.info("====== springboot netty start ======"); Runtime.getRuntime().addShutdownHook(new Thread(() -> { logger.info("---nettyConfig destroy---"); nettyConfig.destroy(); })); //channel关闭 future.channel().closeFuture().syncUninterruptibly(); } catch (Exception e) { logger.error("---springboot netty server start error : ", e.getMessage() + "---"); } } /** * 初始化策略容器 */ public void initializerStrategy() { Map<String, Object> beans = applicationContext.getBeansWithAnnotation(RedisStrategyTypeHandler.class); RedisStrategyContext strategyContext = applicationContext.getBean(RedisStrategyContext.class); beans.forEach((name, bean) -> { RedisStrategyTypeHandler typeHandler = bean.getClass().getAnnotation(RedisStrategyTypeHandler.class); strategyContext.putRedisStrategy(typeHandler.value().getIndex(), (RedisStrategy) bean); }); }
Servlet容器启动时调用contextInitialized方法执行netty启动逻辑。Servlet容器终止Web应用时调用contextDestroyed方法关闭netty服务,关闭EventLoopGroup、channel,释放资源。
4. nettyConfig配置
@Configuration public class NettyConfig { private static final Logger log = LoggerFactory.getLogger(NettyConfig.class); private final EventLoopGroup bossGroup = new NioEventLoopGroup(1); private final EventLoopGroup workerGroup = new NioEventLoopGroup(6); private Channel channel; @Autowired private ServerChannelInitializer serverChannelInitializer; /** * 启动服务 */ public ChannelFuture run(InetSocketAddress address) { ChannelFuture future = null; try { //创建serverBootstrap实例引导和绑定服务器 ServerBootstrap bootstrap = new ServerBootstrap(); //指定 EventLoopGroup 以处理客户端事件 bootstrap.group(bossGroup, workerGroup) //适用于NIO传输的channel类型 .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) //指定ChannelInitializer,对于所有连接均为调用 .childHandler(serverChannelInitializer).childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.TCP_NODELAY, true); //绑定 future = bootstrap.bind(address).sync(); channel = future.channel(); } catch (Exception e) { log.error("Netty start error:", e); } finally { if (future != null && future.isSuccess()) { log.info("Netty server listening " + address.getHostName() + " on port " + address.getPort() + " and ready for connections..."); } else { log.error("Netty server start up Error!"); } } return future; } public void destroy() { log.info("Shutdown Netty Server..."); if (channel != null) { channel.close(); } //关闭EventLoopGroup、channel,释放资源 workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); log.info("Shutdown Netty Server Success!"); } }
netty服务端与客户端的引导基本类似,服务端使用ServerBootstrap以及NioServerSocketChannel的channel类型来引导,这点与客户端不同。着重注意netty的数据处理组件——ChannelHandler(即代码中serverChannelInitializer)。
5. 配置channelHandler初始化器
/** * channel初始化器 * 将nettyServerHandler实列添加到channel的channelPipeline中 * @author phli */ @Component public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> { @Autowired private NettyServerHandler nettyServerHandler; @Override protected void initChannel(SocketChannel socketChannel) { // 解码编码 socketChannel.pipeline().addLast(new RedisCommandDecoder()); socketChannel.pipeline().addLast(new RedisReplyEncoder()); socketChannel.pipeline().addLast(nettyServerHandler); } }
每一个新创建的 Channel 都将会被分配一个新的 ChannelPipeline。ChannelPipeline以及ChannelHandlerContext的概念本文不做涉及,可自行百度。将redis协议的编码解码逻辑添加到channelPipeline内。
6. redis解码
/** * redis命令解码 * @author phli */ public class RedisCommandDecoder extends ReplayingDecoder<Void> { /** Decoded command and arguments */ private byte[][] cmds; /** Current argument */ private int arg; /** Decode in block-io style, rather than nio. */ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (cmds == null) { if (in.readByte() == '*') { doDecodeNumOfArgs(in); } } else { doDecodeArgs(in); } if (isComplete()) { doSendCmdToHandler(out); doCleanUp(); } } /** Decode number of arguments */ private void doDecodeNumOfArgs(ByteBuf in) { // Ignore negative case int numOfArgs = readInt(in); System.out.println("RedisCommandDecoder NumOfArgs: " + numOfArgs); cmds = new byte[numOfArgs][]; checkpoint(); } /** Decode arguments */ private void doDecodeArgs(ByteBuf in) { for (int i = arg; i < cmds.length; i++) { if (in.readByte() == '$') { int lenOfBulkStr = readInt(in); System.out.println("RedisCommandDecoder LenOfBulkStr[" + i + "]: " + lenOfBulkStr); cmds[i] = new byte[lenOfBulkStr]; in.readBytes(cmds[i]); // Skip CRLF(\r\n) in.skipBytes(2); arg++; checkpoint(); } else { throw new IllegalStateException("Invalid argument"); } } } /** * cmds != null means header decode complete * arg > 0 means arguments decode has begun * arg == cmds.length means complete! */ private boolean isComplete() { return (cmds != null) && (arg > 0) && (arg == cmds.length); } /** Send decoded command to next handler */ private void doSendCmdToHandler(List<Object> out) { System.out.println("RedisCommandDecoder: Send command to next handler"); if (cmds.length == 2) { out.add(new RedisCommand(new String(cmds[0]), cmds[1])); } else if (cmds.length == 3) { out.add(new RedisCommand(new String(cmds[0]), cmds[1], cmds[2])); } else { throw new IllegalStateException("Unknown command"); } } /** Clean up state info */ private void doCleanUp() { this.cmds = null; this.arg = 0; } private int readInt(ByteBuf in) { int integer = 0; char c; while ((c = (char) in.readByte()) != '\r') { integer = (integer * 10) + (c - '0'); } if (in.readByte() != '\n') { throw new IllegalStateException("Invalid number"); } return integer; } }
7. redis解码
/**
* redis响应编码
* @author phli
*/
public class RedisReplyEncoder extends MessageToByteEncoder<RedisReply> {
@Override
protected void encode(ChannelHandlerContext ctx, RedisReply msg, ByteBuf out) throws Exception {
System.out.println("RedisReplyEncoder: " + msg);
msg.write(out);
}
}
8. netty服务逻辑处理类
@Component @ChannelHandler.Sharable public class NettyServerHandler extends ChannelInboundHandlerAdapter { @Autowired private RedisStrategyContext redisStrategyContext; @Autowired private ChannelCache channelCache; private static final String REDIS_AUTH = "auth"; /** * 当从Channel读取数据时被调用,具体逻辑处理 * 校验channel是否已存在,验证appKey * appKey校验通过将channel添加到channelGroup * 将appKey与channelId存放到redis-2db内 * 校验不通过返回信息,关闭channel * @param ctx channelHandlerContext * @param message 消息 */ @Override public void channelRead(ChannelHandlerContext ctx, Object message) { RedisCommand msg = (RedisCommand) message; if (channelCache.getChannel(ctx.channel()) == null) { if (REDIS_AUTH.equalsIgnoreCase(msg.getName())) { //策略模式appKey RedisStrategy redisStrategy = redisStrategyContext.getRedisStrategy(RedisStrategyType.getIndexByMessage(msg.getName().toUpperCase())); redisStrategy.redisHandler(ctx, msg, null); } else { ctx.writeAndFlush(new BulkReply("no Auth".getBytes())); ctx.close(); } return; } //获取channelID String channelId = ctx.channel().id().asLongText(); //根据channelID获取该channel对应的appKey String appKey = channelCache.getChannelIdUid().get(channelId); //分析message命令类型,根据策略模式完成判断 RedisStrategy redisStrategy = redisStrategyContext.getRedisStrategy(RedisStrategyType.getIndexByMessage(msg.getName().toUpperCase())); redisStrategy.redisHandler(ctx, msg, appKey); } /** * 建立连接时,返回消息 * @param ctx channelHandlerContext * @throws Exception 异常 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("连接的客户端地址:" + ctx.channel().remoteAddress()); super.channelActive(ctx); } /** * 异常触发 * @param ctx channelHandlerContext * @param cause 异常 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { System.out.println("exceptionCaught:" + cause.getMessage()); ctx.close(); } /** * channel没有连接到远程节点 * @param ctx channelHandlerContext */ @Override public void channelInactive(ChannelHandlerContext ctx) { System.out.println("====== channelInactive ======"); channelCache.removeChannel(ctx.channel()); ctx.close(); System.out.println("====== Channel close ======"); } }
该类进行具体逻辑处理,各个方法均有注释。不同redis协议命令的不同处理使用策略模式进行处理,具体的实现方式这里不做展开,仅以redis的get命令为例:
/** * redis-get处理逻辑 * @author phli */ @Component @RedisStrategyTypeHandler(RedisStrategyType.GET) public class RedisGetImpl implements RedisStrategy { @Autowired private RedisUtils redisUtils; @Override public void redisHandler(ChannelHandlerContext ctx, RedisCommand msg, String appKey) { String value = redisUtils.redisGet(EwsCommonConstants.REDIS_DB_1, StringUtils.concatStr(appKey, EwsCommonConstants.SystemParam.CON_SIGN, new String(msg.getArg1()))); if (value != null){ //冲刷数据消息到远程节点 ctx.writeAndFlush(new BulkReply(value.getBytes())); } else { //冲刷数据消息到远程节点 ctx.writeAndFlush(BulkReply.NIL_REPLY); } } }
以上为get命令处理逻辑。因客户端连接netty时做验证校验,所以也需要实现redis auth命令处理逻辑,可将校验字段置于客户端redis配置password,没有验证校验的可忽略。
9. 用到的实体类
RedisCommand
/** * redis命令 * @author phli */ public class RedisCommand { /**命令名称*/ private final String name; /**参数*/ private byte[] arg1; private byte[] arg2; public RedisCommand(String name, byte[] arg1) { this.name = name; this.arg1 = arg1; } public RedisCommand(String name, byte[] arg1, byte[] arg2) { this.name = name; this.arg1 = arg1; this.arg2 = arg2; } public String getName() { return name; } public byte[] getArg1() { return arg1; } public byte[] getArg2() { return arg2; } @Override public String toString() { return "Command{" + "name='" + name + '\'' + ", arg1=" + Arrays.toString(arg1) + ", arg2=" + Arrays.toString(arg2) + '}'; } }
BulkReply
/** * 字符响应 * @author phli */ public class BulkReply implements RedisReply<byte[]>{ public static final BulkReply NIL_REPLY = new BulkReply(); private static final char MARKER = '$'; private final byte[] data; private final int len; public BulkReply() { this.data = null; this.len = -1; } public BulkReply(byte[] data) { this.data = data; this.len = data.length; } @Override public byte[] data() { return this.data; } @Override public void write(ByteBuf out) throws IOException { // 1.Write header out.writeByte(MARKER); out.writeBytes(String.valueOf(len).getBytes()); out.writeBytes(CRLF); // 2.Write data if (len > 0) { out.writeBytes(data); out.writeBytes(CRLF); } } @Override public String toString() { return "BulkReply{" + "bytes=" + Arrays.toString(data) + '}'; } }
IntegerReply
/** * 数值响应 * @author phli */ public class IntegerReply implements RedisReply<Integer>{ private static final char MARKER = ':'; private final int data; public IntegerReply(int data) { this.data = data; } @Override public Integer data() { return this.data; } @Override public void write(ByteBuf out) throws IOException { out.writeByte(MARKER); out.writeBytes(String.valueOf(data).getBytes()); out.writeBytes(CRLF); } @Override public String toString() { return "IntegerReply{" + "data=" + data + '}'; } }
RedisReply接口
/**
* redis响应接口
* @author phli
*/
public interface RedisReply<T> {
byte[] CRLF = new byte[] { '\r', '\n' };
T data();
void write(ByteBuf out) throws IOException;
}
10. 以上,整理完毕。
补充说明:强烈建议查看netty的相关知识,一些主流技术框架底层通讯框架都是使用netty:springCloudGateway、dubbo、RocketMQ等等。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。