当前位置:   article > 正文

基于netty实现redis服务(解析redis协议密码验证)_netty redis

netty redis

最近使用redis缓存,但是不允许客户端直接连接redis服务器,需要模拟redis的服务器,实现redis的get\set\del\auth基础功能。

网上该场景的解决方案不多,文章如有问题,请指正以备完善。

使用netty来实现,Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序,Netty 是一个基于NIO的客户、服务器端的编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议(redis协议)的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。

实现目标:客户端以redis协议连接netty redis服务端,netty redis服务端后端连接redis服务器,在客户端与服务器之间添加netty redis服务。客户端对连接对象无感知。

1. 国际惯例,添加pom依赖

	 <dependency>
	     <groupId>io.netty</groupId>
	     <artifactId>netty-all</artifactId>
	 </dependency>
  • 1
  • 2
  • 3
  • 4

2. 配置启动类

@SpringBootApplication
@ServletComponentScan
public class EwsNettyApplication {
    public static void main(String[] args) {
        SpringApplication.run(EwsNettyApplication.class, args);
        System.out.println("******************启动成功*****************");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

启动类添加@ServletComponentScan注解,该注解可以使Listener直接通过@WebListener 注解自动注册。

3. 设置servlet容器监听

@WebListener
public class NettyRunServletContextListener implements ServletContextListener {
    private static final Logger logger = LoggerFactory.getLogger(NettyRunServletContextListener.class);
    @Value("${netty.port}")
    private int port;
    @Value("${netty.url}")
    private String url;

    @Autowired
    private NettyConfig nettyConfig;
    @Autowired
    private ApplicationContext applicationContext;

    /**
     * Servlet容器终止Web应用时调用该方法
     * @param sce servletContextEvent
     */
    @Override
    public void contextDestroyed(ServletContextEvent sce) {
        System.out.println("====== springboot netty destroy ======");
        nettyConfig.destroy();
        System.out.println("---test contextDestroyed method---");
    }

    /**
     * Servlet容器启动Web应用时调用该方法。
     * @param sce servletContextEvent
     */
    @Override
    public void contextInitialized(ServletContextEvent sce) {
        //处理无法注入
        WebApplicationContextUtils.getRequiredWebApplicationContext(sce.getServletContext())
                .getAutowireCapableBeanFactory().autowireBean(this);
        //初始化策略容器
        initializerStrategy();
        try {
            InetSocketAddress address = new InetSocketAddress(url, port);
            //nettyConfig执行,返回异步通知接口
            ChannelFuture future = nettyConfig.run(address);
            logger.info("====== springboot netty start ======");
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                logger.info("---nettyConfig destroy---");
                nettyConfig.destroy();
            }));
            //channel关闭
            future.channel().closeFuture().syncUninterruptibly();
        } catch (Exception e) {
            logger.error("---springboot netty server start error : ", e.getMessage() + "---");
        }
    }

    /**
     * 初始化策略容器
     */
    public void initializerStrategy() {
        Map<String, Object> beans = applicationContext.getBeansWithAnnotation(RedisStrategyTypeHandler.class);
        RedisStrategyContext strategyContext = applicationContext.getBean(RedisStrategyContext.class);
        beans.forEach((name, bean) -> {
            RedisStrategyTypeHandler typeHandler = bean.getClass().getAnnotation(RedisStrategyTypeHandler.class);
            strategyContext.putRedisStrategy(typeHandler.value().getIndex(), (RedisStrategy) bean);
        });
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62

Servlet容器启动时调用contextInitialized方法执行netty启动逻辑。Servlet容器终止Web应用时调用contextDestroyed方法关闭netty服务,关闭EventLoopGroup、channel,释放资源。

4. nettyConfig配置

 @Configuration
public class NettyConfig {

    private static final Logger log = LoggerFactory.getLogger(NettyConfig.class);

    private final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    private final EventLoopGroup workerGroup = new NioEventLoopGroup(6);

    private Channel channel;
    @Autowired
    private ServerChannelInitializer serverChannelInitializer;

    /**
     * 启动服务
     */
    public ChannelFuture run(InetSocketAddress address) {

        ChannelFuture future = null;
        try {
            //创建serverBootstrap实例引导和绑定服务器
            ServerBootstrap bootstrap = new ServerBootstrap();
            //指定 EventLoopGroup 以处理客户端事件
            bootstrap.group(bossGroup, workerGroup)
                    //适用于NIO传输的channel类型
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    //指定ChannelInitializer,对于所有连接均为调用
                    .childHandler(serverChannelInitializer).childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childOption(ChannelOption.TCP_NODELAY, true);
            //绑定
            future = bootstrap.bind(address).sync();
            channel = future.channel();
        } catch (Exception e) {
            log.error("Netty start error:", e);
        } finally {
            if (future != null && future.isSuccess()) {
                log.info("Netty server listening " + address.getHostName() + " on port " + address.getPort()
                        + " and ready for connections...");
            } else {
                log.error("Netty server start up Error!");
            }
        }

        return future;
    }

    public void destroy() {
        log.info("Shutdown Netty Server...");
        if (channel != null) {
            channel.close();
        }
        //关闭EventLoopGroup、channel,释放资源
        workerGroup.shutdownGracefully();
        bossGroup.shutdownGracefully();
        log.info("Shutdown Netty Server Success!");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

netty服务端与客户端的引导基本类似,服务端使用ServerBootstrap以及NioServerSocketChannel的channel类型来引导,这点与客户端不同。着重注意netty的数据处理组件——ChannelHandler(即代码中serverChannelInitializer)。

5. 配置channelHandler初始化器

 /**
 * channel初始化器
 * 将nettyServerHandler实列添加到channel的channelPipeline中
 * @author phli
 */
@Component
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Autowired
    private NettyServerHandler nettyServerHandler;

    @Override
    protected void initChannel(SocketChannel socketChannel) {
        // 解码编码
        socketChannel.pipeline().addLast(new RedisCommandDecoder());
        socketChannel.pipeline().addLast(new RedisReplyEncoder());
        socketChannel.pipeline().addLast(nettyServerHandler);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

每一个新创建的 Channel 都将会被分配一个新的 ChannelPipeline。ChannelPipeline以及ChannelHandlerContext的概念本文不做涉及,可自行百度。将redis协议的编码解码逻辑添加到channelPipeline内。

6. redis解码

 /**
 * redis命令解码
 * @author phli
 */
public class RedisCommandDecoder extends ReplayingDecoder<Void> {
    /** Decoded command and arguments */
    private byte[][] cmds;

    /** Current argument */
    private int arg;

    /** Decode in block-io style, rather than nio. */
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if (cmds == null) {
            if (in.readByte() == '*') {
                doDecodeNumOfArgs(in);
            }
        } else {
            doDecodeArgs(in);
        }

        if (isComplete()) {
            doSendCmdToHandler(out);
            doCleanUp();
        }
    }

    /** Decode number of arguments */
    private void doDecodeNumOfArgs(ByteBuf in) {
        // Ignore negative case
        int numOfArgs = readInt(in);
        System.out.println("RedisCommandDecoder NumOfArgs: " + numOfArgs);
        cmds = new byte[numOfArgs][];

        checkpoint();
    }

    /** Decode arguments */
    private void doDecodeArgs(ByteBuf in) {
        for (int i = arg; i < cmds.length; i++) {
            if (in.readByte() == '$') {
                int lenOfBulkStr = readInt(in);
                System.out.println("RedisCommandDecoder LenOfBulkStr[" + i + "]: " + lenOfBulkStr);

                cmds[i] = new byte[lenOfBulkStr];
                in.readBytes(cmds[i]);

                // Skip CRLF(\r\n)
                in.skipBytes(2);

                arg++;
                checkpoint();
            } else {
                throw new IllegalStateException("Invalid argument");
            }
        }
    }

    /**
     * cmds != null means header decode complete
     * arg > 0 means arguments decode has begun
     * arg == cmds.length means complete!
     */
    private boolean isComplete() {
        return (cmds != null)
                && (arg > 0)
                && (arg == cmds.length);
    }

    /** Send decoded command to next handler */
    private void doSendCmdToHandler(List<Object> out) {
        System.out.println("RedisCommandDecoder: Send command to next handler");
        if (cmds.length == 2) {
            out.add(new RedisCommand(new String(cmds[0]), cmds[1]));
        } else if (cmds.length == 3) {
            out.add(new RedisCommand(new String(cmds[0]), cmds[1], cmds[2]));
        } else {
            throw new IllegalStateException("Unknown command");
        }
    }

    /** Clean up state info */
    private void doCleanUp() {
        this.cmds = null;
        this.arg = 0;
    }

    private int readInt(ByteBuf in) {
        int integer = 0;
        char c;
        while ((c = (char) in.readByte()) != '\r') {
            integer = (integer * 10) + (c - '0');
        }

        if (in.readByte() != '\n') {
            throw new IllegalStateException("Invalid number");
        }
        return integer;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101

7. redis解码

/**
 * redis响应编码
 * @author phli
 */
public class RedisReplyEncoder extends MessageToByteEncoder<RedisReply> {
    @Override
    protected void encode(ChannelHandlerContext ctx, RedisReply msg, ByteBuf out) throws Exception {
        System.out.println("RedisReplyEncoder: " + msg);
        msg.write(out);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

8. netty服务逻辑处理类

@Component
@ChannelHandler.Sharable
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    @Autowired
    private RedisStrategyContext redisStrategyContext;
    @Autowired
    private ChannelCache channelCache;
    private static final String REDIS_AUTH = "auth";

    /**
     * 当从Channel读取数据时被调用,具体逻辑处理
     * 校验channel是否已存在,验证appKey
     * appKey校验通过将channel添加到channelGroup
     * 将appKey与channelId存放到redis-2db内
     * 校验不通过返回信息,关闭channel
     * @param ctx channelHandlerContext
     * @param message 消息
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object message) {
        RedisCommand msg = (RedisCommand) message;
        if (channelCache.getChannel(ctx.channel()) == null) {
            if (REDIS_AUTH.equalsIgnoreCase(msg.getName())) {
                //策略模式appKey
                RedisStrategy redisStrategy = redisStrategyContext.getRedisStrategy(RedisStrategyType.getIndexByMessage(msg.getName().toUpperCase()));
                redisStrategy.redisHandler(ctx, msg, null);
            } else {
                ctx.writeAndFlush(new BulkReply("no Auth".getBytes()));
                ctx.close();
            }
            return;
        }
        //获取channelID
        String channelId = ctx.channel().id().asLongText();
        //根据channelID获取该channel对应的appKey
        String appKey = channelCache.getChannelIdUid().get(channelId);
        //分析message命令类型,根据策略模式完成判断
        RedisStrategy redisStrategy = redisStrategyContext.getRedisStrategy(RedisStrategyType.getIndexByMessage(msg.getName().toUpperCase()));
        redisStrategy.redisHandler(ctx, msg, appKey);
    }

    /**
     * 建立连接时,返回消息
     * @param ctx channelHandlerContext
     * @throws Exception 异常
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("连接的客户端地址:" + ctx.channel().remoteAddress());
        super.channelActive(ctx);
    }

    /**
     * 异常触发
     * @param ctx channelHandlerContext
     * @param cause 异常
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        System.out.println("exceptionCaught:" + cause.getMessage());
        ctx.close();
    }

    /**
     * channel没有连接到远程节点
     * @param ctx channelHandlerContext
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        System.out.println("====== channelInactive ======");
        channelCache.removeChannel(ctx.channel());
        ctx.close();
        System.out.println("====== Channel close ======");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75

该类进行具体逻辑处理,各个方法均有注释。不同redis协议命令的不同处理使用策略模式进行处理,具体的实现方式这里不做展开,仅以redis的get命令为例:

/**
 * redis-get处理逻辑
 * @author phli
 */
@Component
@RedisStrategyTypeHandler(RedisStrategyType.GET)
public class RedisGetImpl implements RedisStrategy {
    @Autowired
    private RedisUtils redisUtils;
    @Override
    public void redisHandler(ChannelHandlerContext ctx, RedisCommand msg, String appKey) {
        String value = redisUtils.redisGet(EwsCommonConstants.REDIS_DB_1, StringUtils.concatStr(appKey,
                EwsCommonConstants.SystemParam.CON_SIGN, new String(msg.getArg1())));
        if (value != null){
            //冲刷数据消息到远程节点
            ctx.writeAndFlush(new BulkReply(value.getBytes()));
        } else {
            //冲刷数据消息到远程节点
            ctx.writeAndFlush(BulkReply.NIL_REPLY);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

以上为get命令处理逻辑。因客户端连接netty时做验证校验,所以也需要实现redis auth命令处理逻辑,可将校验字段置于客户端redis配置password,没有验证校验的可忽略。

9. 用到的实体类

RedisCommand

 /**
 * redis命令
 * @author phli
 */
public class RedisCommand {
    /**命令名称*/
    private final String name;
    /**参数*/
    private byte[] arg1;
    private byte[] arg2;

    public RedisCommand(String name, byte[] arg1) {
        this.name = name;
        this.arg1 = arg1;
    }

    public RedisCommand(String name, byte[] arg1, byte[] arg2) {
        this.name = name;
        this.arg1 = arg1;
        this.arg2 = arg2;
    }

    public String getName() {
        return name;
    }

    public byte[] getArg1() {
        return arg1;
    }

    public byte[] getArg2() {
        return arg2;
    }

    @Override
    public String toString() {
        return "Command{" +
                "name='" + name + '\'' +
                ", arg1=" + Arrays.toString(arg1) +
                ", arg2=" + Arrays.toString(arg2) +
                '}';
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

BulkReply

/**
 * 字符响应
 * @author phli
 */
public class BulkReply implements RedisReply<byte[]>{
    public static final BulkReply NIL_REPLY = new BulkReply();

    private static final char MARKER = '$';

    private final byte[] data;

    private final int len;

    public BulkReply() {
        this.data = null;
        this.len = -1;
    }

    public BulkReply(byte[] data) {
        this.data = data;
        this.len = data.length;
    }

    @Override
    public byte[] data() {
        return this.data;
    }

    @Override
    public void write(ByteBuf out) throws IOException {
        // 1.Write header
        out.writeByte(MARKER);
        out.writeBytes(String.valueOf(len).getBytes());
        out.writeBytes(CRLF);

        // 2.Write data
        if (len > 0) {
            out.writeBytes(data);
            out.writeBytes(CRLF);
        }
    }

    @Override
    public String toString() {
        return "BulkReply{" +
                "bytes=" + Arrays.toString(data) +
                '}';
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

IntegerReply

/**
 * 数值响应
 * @author phli
 */
public class IntegerReply implements RedisReply<Integer>{
    private static final char MARKER = ':';

    private final int data;

    public IntegerReply(int data) {
        this.data = data;
    }

    @Override
    public Integer data() {
        return this.data;
    }

    @Override
    public void write(ByteBuf out) throws IOException {
        out.writeByte(MARKER);
        out.writeBytes(String.valueOf(data).getBytes());
        out.writeBytes(CRLF);
    }

    @Override
    public String toString() {
        return "IntegerReply{" +
                "data=" + data +
                '}';
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

RedisReply接口

/**
 * redis响应接口
 * @author phli
 */
public interface RedisReply<T> {
    byte[] CRLF = new byte[] { '\r', '\n' };

    T data();

    void write(ByteBuf out) throws IOException;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

10. 以上,整理完毕。

补充说明:强烈建议查看netty的相关知识,一些主流技术框架底层通讯框架都是使用netty:springCloudGateway、dubbo、RocketMQ等等。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/159953?site
推荐阅读
相关标签
  

闽ICP备14008679号