当前位置:   article > 正文

Redis客户端-Lettuce源码详解(一)_lettuceconnectionfactory has been created. use sta

lettuceconnectionfactory has been created. use start() to initialize it

简介

Lettuce是一款基于netty构建,支持同步、异步、reactive模式访问,支持连接复用的高性能redis开源客户端。
考虑到集群模式的redis无论性能还是可靠性都明显优于其余模式、笔者也只使用过集群模式,本文主要分析集群模式下的客户端处理流程。

基于spring-data-redis集成lettuce客户端

1、引入依赖:

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>io.lettuce</groupId>
            <artifactId>lettuce-core</artifactId>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

2、定义redis配置bean:RedisConfiguration

    @Bean
    public RedisConfiguration redisConfiguration() {
        RedisClusterConfiguration redisClusterConfiguration = 
        new RedisClusterConfiguration(Collections.singleton("redis.server.address:6379"));
        redisClusterConfiguration.setUsername("xxx");
        redisClusterConfiguration.setPassword("xxx");
        return redisClusterConfiguration;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

3、定义redis连接工厂bean:RedisConnectionFactory

    @Bean
    public RedisConnectionFactory redisConnectionFactory(RedisConfiguration redisConfiguration) {
        LettuceConnectionFactory lettuceConnectionFactory = new LettuceConnectionFactory(redisConfiguration);
        // 是否饥饿式初始化,即服务启动时建立redis连接,默认false
        lettuceConnectionFactory.setEagerInitialization(false);
        // 是否复用底层tcp连接,默认true
        lettuceConnectionFactory.setShareNativeConnection(true);
        return lettuceConnectionFactory;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

4、定义RedisTemplate

    @Bean
    public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory connectionFactory) {
        return new StringRedisTemplate(connectionFactory);
    }
  • 1
  • 2
  • 3
  • 4

Lettuce客户端创建和启动流程

LettuceConnectionFactory初始化

LettuceConnectionFactory类继承关系
LettuceConnectionFactory实现了InitializingBean接口,在创建bean阶段、属性字段设置完成后会回调afterPropertiesSet方法:
最终调用org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory#start

	@Override
	public void afterPropertiesSet() {
		if (isAutoStartup()) {
			start();
		}
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

LettuceConnectionFactory#start方法主要流程:

主要完成lettuce客户端的构造、并按需初始化tcp连接

	public void start() {
		// 更新状态为启动中
		State current = this.state.getAndUpdate(state -> isCreatedOrStopped(state) ? State.STARTING : state);
		// 幂等处理、状态检查
		if (isCreatedOrStopped(current)) {
			// 构造lettuce客户端
			AbstractRedisClient client = createClient();
			this.client = client;
			// 构造真正的连接工厂
			LettuceConnectionProvider connectionProvider = new ExceptionTranslatingConnectionProvider(
					createConnectionProvider(client, CODEC));
			this.connectionProvider = connectionProvider;
			this.reactiveConnectionProvider = new ExceptionTranslatingConnectionProvider(
					createConnectionProvider(client, LettuceReactiveRedisConnection.CODEC));
			// 如果是集群模式的redis,构造命令执行器
			if (isClusterAware()) {
				this.clusterCommandExecutor = createClusterCommandExecutor((RedisClusterClient) client, connectionProvider);
			}
			// 更新状态为已启动
			this.state.set(State.STARTED);
			// 如果是饥饿式启动且开启连接复用,则初始化tcp连接
			if (getEagerInitialization() && getShareNativeConnection()) {
				initConnection();
			}
		}
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory#initConnection方法流程

	public void initConnection() {
		// 幂等处理,如果已存在连接,则重置旧的连接
		resetConnection();
		// 根据redis模式获取连接
		if (isClusterAware()) {
			getSharedClusterConnection();
		} else {
			getSharedConnection();
		}
		getSharedReactiveConnection();
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
	protected StatefulRedisClusterConnection<byte[], byte[]> getSharedClusterConnection() {
		return shareNativeConnection && isClusterAware()
				? (StatefulRedisClusterConnection<byte[], byte[]>) getOrCreateSharedConnection().getConnection()
				: null;
	}
  • 1
  • 2
  • 3
  • 4
  • 5

org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory.SharedConnection#getConnection:建立连接并校验连接是否正常

		StatefulConnection<E, E> getConnection() {
			return doInLock(() -> {
				if (this.connection == null) {
					this.connection = getNativeConnection();
				}
				// 校验连接是否正常
				if (getValidateConnection()) {
					validateConnection();
				}
				return this.connection;
			});
		}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

getNativeConnection()最终会调用org.springframework.data.redis.connection.lettuce.ClusterConnectionProvider#getConnectionAsync
关键代码如下

	public <T extends StatefulConnection<?, ?>> CompletableFuture<T> getConnectionAsync(Class<T> connectionType) {
		// 集群topo信息初始化代码
		if (!initialized) {
			// Partitions have to be initialized before asynchronous usage.
			// Needs to happen only once. Initialize eagerly if blocking is not an options.
			lock.lock();
			try {
				if (!initialized) {
					client.getPartitions();
					initialized = true;
				}
			} finally {
				lock.unlock();
			}
		}
		// pubsub连接建立
		if (connectionType.equals(StatefulRedisPubSubConnection.class)
				|| connectionType.equals(StatefulRedisClusterPubSubConnection.class)) {
			return client.connectPubSubAsync(codec).thenApply(connectionType::cast);
		}
		// 建立集群连接
		if (StatefulRedisClusterConnection.class.isAssignableFrom(connectionType)
				|| connectionType.equals(StatefulConnection.class)) {
			return client.connectAsync(codec).thenApply(connection -> {
						getReadFrom().ifPresent(connection::setReadFrom);
						return connectionType.cast(connection);
					});
		}
		String message = String.format("Connection type %s not supported", connectionType);
		return LettuceFutureUtils.failed(new InvalidDataAccessApiUsageException(message));
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

集群连接建立最终会调用lettuce客户端方法:io.lettuce.core.cluster.RedisClusterClient#connectClusterAsync

    private <K, V> CompletableFuture<StatefulRedisClusterConnection<K, V>> connectClusterAsync(RedisCodec<K, V> codec) {
        // 开启topo自动刷新机制
        topologyRefreshScheduler.activateTopologyRefreshIfNeeded();
        DefaultEndpoint endpoint = new DefaultEndpoint(getClusterClientOptions(), getResources());
        RedisChannelWriter writer = endpoint;
        if (CommandExpiryWriter.isSupported(getClusterClientOptions())) {
            writer = new CommandExpiryWriter(writer, getClusterClientOptions(), getResources());
        }
        if (CommandListenerWriter.isSupported(getCommandListeners())) {
            writer = new CommandListenerWriter(writer, getCommandListeners());
        }
        ClusterDistributionChannelWriter clusterWriter = new ClusterDistributionChannelWriter(writer, getClusterClientOptions(),
                topologyRefreshScheduler);
        // 关注PooledClusterConnectionProvider构造方法执行内容,后面会再提到
        PooledClusterConnectionProvider<K, V> pooledClusterConnectionProvider = new PooledClusterConnectionProvider<>(this,
                clusterWriter, codec, topologyRefreshScheduler);
        clusterWriter.setClusterConnectionProvider(pooledClusterConnectionProvider);
        StatefulRedisClusterConnectionImpl<K, V> connection = newStatefulRedisClusterConnection(clusterWriter,
                pooledClusterConnectionProvider, codec, getFirstUri().getTimeout());
        connection.setReadFrom(ReadFrom.UPSTREAM);
        connection.setPartitions(partitions);
        // 命令处理handler,构造netty客户端时会用到
        Supplier<CommandHandler> commandHandlerSupplier = () -> new CommandHandler(getClusterClientOptions(), getResources(),
                endpoint);
        Mono<SocketAddress> socketAddressSupplier = getSocketAddressSupplier(connection::getPartitions,
                TopologyComparators::sortByClientCount);
        // 建立连接,最终调用io.lettuce.core.cluster.RedisClusterClient#connectStatefulAsync
        Mono<StatefulRedisClusterConnectionImpl<K, V>> connectionMono = Mono
                .defer(() -> connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier));
        // 连接重试
        for (int i = 1; i < getConnectionAttempts(); i++) {
            connectionMono = connectionMono
                    .onErrorResume(t -> connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier));
        }
        // 注册连接关闭回调
        return connectionMono.doOnNext(
                        c -> connection.registerCloseables(closeableResources, clusterWriter, pooledClusterConnectionProvider))
                .map(it -> (StatefulRedisClusterConnection<K, V>) it).toFuture();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

io.lettuce.core.cluster.RedisClusterClient#connectStatefulAsync方法:

    private <K, V, T extends StatefulRedisClusterConnectionImpl<K, V>, S> ConnectionFuture<S> connectStatefulAsync(T connection,
            DefaultEndpoint endpoint, RedisURI connectionSettings, Mono<SocketAddress> socketAddressSupplier,
            Supplier<CommandHandler> commandHandlerSupplier) {
        // 创建连接构造器
        ConnectionBuilder connectionBuilder = createConnectionBuilder(connection, connection.getConnectionState(), endpoint,
                connectionSettings, socketAddressSupplier, commandHandlerSupplier);
        // 初始化channel
        ConnectionFuture<RedisChannelHandler<K, V>> future = initializeChannelAsync(connectionBuilder);
        return future.thenApply(channelHandler -> (S) connection);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

initializeChannelAsync方法最终调用io.lettuce.core.AbstractRedisClient#initializeChannelAsync0,关键代码如下:

    private void initializeChannelAsync0(ConnectionBuilder connectionBuilder, CompletableFuture<Channel> channelReadyFuture,
            SocketAddress redisAddress) {
        // netty客户端启动引导,在前文createConnectionBuilder流程中构造
        Bootstrap redisBootstrap = connectionBuilder.bootstrap();
        // 连接初始化器
        ChannelInitializer<Channel> initializer = connectionBuilder.build(redisAddress);
        redisBootstrap.handler(initializer);
        clientResources.nettyCustomizer().afterBootstrapInitialized(redisBootstrap);
        // 建链
        ChannelFuture connectFuture = redisBootstrap.connect(redisAddress);
        channelReadyFuture.whenComplete((c, t) -> {
            if (t instanceof CancellationException) {
                connectFuture.cancel(true);
            }
        });
        // 建链响应回调处理,连接初始化、握手结果处理
        connectFuture.addListener(future -> {
            // 省略。。。
        });
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

channel初始化器构造:io.lettuce.core.ConnectionBuilder#build

    public ChannelInitializer<Channel> build(SocketAddress socketAddress) {
        return new PlainChannelInitializer(this::buildHandlers, clientResources);
    }
  • 1
  • 2
  • 3
    protected List<ChannelHandler> buildHandlers() {
        List<ChannelHandler> handlers = new ArrayList<>();
        connection.setOptions(clientOptions);
        // channel连接、断链事件监听handler(tcp级别事件)
        handlers.add(new ChannelGroupListener(channelGroup, clientResources.eventBus()));
        // resp协议编码handler
        handlers.add(new CommandEncoder());
        // 连接建立后握手handler,主要执行AUTH、PING命令
        handlers.add(getHandshakeHandler());
        // 命令处理handler
        handlers.add(commandHandlerSupplier.get());
        // 连接事件发布handler(握手完成后,应用层级别的事件)
        handlers.add(new ConnectionEventTrigger(connectionEvents, connection, clientResources.eventBus()));
        // 自动重连处理handler
        if (clientOptions.isAutoReconnect()) {
            handlers.add(createConnectionWatchdog());
        }
        return handlers;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

以上就是LettuceConnectionFactory在bean创建后初始化lettuce集群客户端、饥饿式初始化连接的流程;
懒惰式建链是由第一次命令执行时触发建链,建链调用触发点为org.springframework.data.redis.core.RedisTemplate#execute(org.springframework.data.redis.core.RedisCallback, boolean, boolean),实际原理与饥饿式建链一致,此处不再赘述。

总结:

spring应用启动时,会回调LettuceConnectionFactory#afterPropertiesSet方法,实现构造lettuce客户端、并在饥饿式建链时初始化netty客户端、建链,执行resp协议握手流程。

Lettuce客户端redis命令处理流程见—Lettuce源码详解(二)

注:本文基于个人理解编写,如有不当之处欢迎指正

参考内容:

https://github.com/spring-projects/spring-data-redis
https://github.com/redis/lettuce

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/酷酷是懒虫/article/detail/923155
推荐阅读
相关标签
  

闽ICP备14008679号