赞
踩
Lettuce是一款基于netty构建,支持同步、异步、reactive模式访问,支持连接复用的高性能redis开源客户端。
考虑到集群模式的redis无论性能还是可靠性都明显优于其余模式、笔者也只使用过集群模式,本文主要分析集群模式下的客户端处理流程。
1、引入依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</dependency>
2、定义redis配置bean:RedisConfiguration
@Bean
public RedisConfiguration redisConfiguration() {
RedisClusterConfiguration redisClusterConfiguration =
new RedisClusterConfiguration(Collections.singleton("redis.server.address:6379"));
redisClusterConfiguration.setUsername("xxx");
redisClusterConfiguration.setPassword("xxx");
return redisClusterConfiguration;
}
3、定义redis连接工厂bean:RedisConnectionFactory
@Bean
public RedisConnectionFactory redisConnectionFactory(RedisConfiguration redisConfiguration) {
LettuceConnectionFactory lettuceConnectionFactory = new LettuceConnectionFactory(redisConfiguration);
// 是否饥饿式初始化,即服务启动时建立redis连接,默认false
lettuceConnectionFactory.setEagerInitialization(false);
// 是否复用底层tcp连接,默认true
lettuceConnectionFactory.setShareNativeConnection(true);
return lettuceConnectionFactory;
}
4、定义RedisTemplate
@Bean
public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory connectionFactory) {
return new StringRedisTemplate(connectionFactory);
}
LettuceConnectionFactory实现了InitializingBean接口,在创建bean阶段、属性字段设置完成后会回调afterPropertiesSet方法:
最终调用org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory#start
@Override
public void afterPropertiesSet() {
if (isAutoStartup()) {
start();
}
}
主要完成lettuce客户端的构造、并按需初始化tcp连接
public void start() { // 更新状态为启动中 State current = this.state.getAndUpdate(state -> isCreatedOrStopped(state) ? State.STARTING : state); // 幂等处理、状态检查 if (isCreatedOrStopped(current)) { // 构造lettuce客户端 AbstractRedisClient client = createClient(); this.client = client; // 构造真正的连接工厂 LettuceConnectionProvider connectionProvider = new ExceptionTranslatingConnectionProvider( createConnectionProvider(client, CODEC)); this.connectionProvider = connectionProvider; this.reactiveConnectionProvider = new ExceptionTranslatingConnectionProvider( createConnectionProvider(client, LettuceReactiveRedisConnection.CODEC)); // 如果是集群模式的redis,构造命令执行器 if (isClusterAware()) { this.clusterCommandExecutor = createClusterCommandExecutor((RedisClusterClient) client, connectionProvider); } // 更新状态为已启动 this.state.set(State.STARTED); // 如果是饥饿式启动且开启连接复用,则初始化tcp连接 if (getEagerInitialization() && getShareNativeConnection()) { initConnection(); } } }
public void initConnection() {
// 幂等处理,如果已存在连接,则重置旧的连接
resetConnection();
// 根据redis模式获取连接
if (isClusterAware()) {
getSharedClusterConnection();
} else {
getSharedConnection();
}
getSharedReactiveConnection();
}
protected StatefulRedisClusterConnection<byte[], byte[]> getSharedClusterConnection() {
return shareNativeConnection && isClusterAware()
? (StatefulRedisClusterConnection<byte[], byte[]>) getOrCreateSharedConnection().getConnection()
: null;
}
org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory.SharedConnection#getConnection:建立连接并校验连接是否正常
StatefulConnection<E, E> getConnection() {
return doInLock(() -> {
if (this.connection == null) {
this.connection = getNativeConnection();
}
// 校验连接是否正常
if (getValidateConnection()) {
validateConnection();
}
return this.connection;
});
}
getNativeConnection()最终会调用org.springframework.data.redis.connection.lettuce.ClusterConnectionProvider#getConnectionAsync
关键代码如下
public <T extends StatefulConnection<?, ?>> CompletableFuture<T> getConnectionAsync(Class<T> connectionType) { // 集群topo信息初始化代码 if (!initialized) { // Partitions have to be initialized before asynchronous usage. // Needs to happen only once. Initialize eagerly if blocking is not an options. lock.lock(); try { if (!initialized) { client.getPartitions(); initialized = true; } } finally { lock.unlock(); } } // pubsub连接建立 if (connectionType.equals(StatefulRedisPubSubConnection.class) || connectionType.equals(StatefulRedisClusterPubSubConnection.class)) { return client.connectPubSubAsync(codec).thenApply(connectionType::cast); } // 建立集群连接 if (StatefulRedisClusterConnection.class.isAssignableFrom(connectionType) || connectionType.equals(StatefulConnection.class)) { return client.connectAsync(codec).thenApply(connection -> { getReadFrom().ifPresent(connection::setReadFrom); return connectionType.cast(connection); }); } String message = String.format("Connection type %s not supported", connectionType); return LettuceFutureUtils.failed(new InvalidDataAccessApiUsageException(message)); }
集群连接建立最终会调用lettuce客户端方法:io.lettuce.core.cluster.RedisClusterClient#connectClusterAsync
private <K, V> CompletableFuture<StatefulRedisClusterConnection<K, V>> connectClusterAsync(RedisCodec<K, V> codec) { // 开启topo自动刷新机制 topologyRefreshScheduler.activateTopologyRefreshIfNeeded(); DefaultEndpoint endpoint = new DefaultEndpoint(getClusterClientOptions(), getResources()); RedisChannelWriter writer = endpoint; if (CommandExpiryWriter.isSupported(getClusterClientOptions())) { writer = new CommandExpiryWriter(writer, getClusterClientOptions(), getResources()); } if (CommandListenerWriter.isSupported(getCommandListeners())) { writer = new CommandListenerWriter(writer, getCommandListeners()); } ClusterDistributionChannelWriter clusterWriter = new ClusterDistributionChannelWriter(writer, getClusterClientOptions(), topologyRefreshScheduler); // 关注PooledClusterConnectionProvider构造方法执行内容,后面会再提到 PooledClusterConnectionProvider<K, V> pooledClusterConnectionProvider = new PooledClusterConnectionProvider<>(this, clusterWriter, codec, topologyRefreshScheduler); clusterWriter.setClusterConnectionProvider(pooledClusterConnectionProvider); StatefulRedisClusterConnectionImpl<K, V> connection = newStatefulRedisClusterConnection(clusterWriter, pooledClusterConnectionProvider, codec, getFirstUri().getTimeout()); connection.setReadFrom(ReadFrom.UPSTREAM); connection.setPartitions(partitions); // 命令处理handler,构造netty客户端时会用到 Supplier<CommandHandler> commandHandlerSupplier = () -> new CommandHandler(getClusterClientOptions(), getResources(), endpoint); Mono<SocketAddress> socketAddressSupplier = getSocketAddressSupplier(connection::getPartitions, TopologyComparators::sortByClientCount); // 建立连接,最终调用io.lettuce.core.cluster.RedisClusterClient#connectStatefulAsync Mono<StatefulRedisClusterConnectionImpl<K, V>> connectionMono = Mono .defer(() -> connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier)); // 连接重试 for (int i = 1; i < getConnectionAttempts(); i++) { connectionMono = connectionMono .onErrorResume(t -> connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier)); } // 注册连接关闭回调 return connectionMono.doOnNext( c -> connection.registerCloseables(closeableResources, clusterWriter, pooledClusterConnectionProvider)) .map(it -> (StatefulRedisClusterConnection<K, V>) it).toFuture(); }
io.lettuce.core.cluster.RedisClusterClient#connectStatefulAsync方法:
private <K, V, T extends StatefulRedisClusterConnectionImpl<K, V>, S> ConnectionFuture<S> connectStatefulAsync(T connection,
DefaultEndpoint endpoint, RedisURI connectionSettings, Mono<SocketAddress> socketAddressSupplier,
Supplier<CommandHandler> commandHandlerSupplier) {
// 创建连接构造器
ConnectionBuilder connectionBuilder = createConnectionBuilder(connection, connection.getConnectionState(), endpoint,
connectionSettings, socketAddressSupplier, commandHandlerSupplier);
// 初始化channel
ConnectionFuture<RedisChannelHandler<K, V>> future = initializeChannelAsync(connectionBuilder);
return future.thenApply(channelHandler -> (S) connection);
}
initializeChannelAsync方法最终调用io.lettuce.core.AbstractRedisClient#initializeChannelAsync0,关键代码如下:
private void initializeChannelAsync0(ConnectionBuilder connectionBuilder, CompletableFuture<Channel> channelReadyFuture, SocketAddress redisAddress) { // netty客户端启动引导,在前文createConnectionBuilder流程中构造 Bootstrap redisBootstrap = connectionBuilder.bootstrap(); // 连接初始化器 ChannelInitializer<Channel> initializer = connectionBuilder.build(redisAddress); redisBootstrap.handler(initializer); clientResources.nettyCustomizer().afterBootstrapInitialized(redisBootstrap); // 建链 ChannelFuture connectFuture = redisBootstrap.connect(redisAddress); channelReadyFuture.whenComplete((c, t) -> { if (t instanceof CancellationException) { connectFuture.cancel(true); } }); // 建链响应回调处理,连接初始化、握手结果处理 connectFuture.addListener(future -> { // 省略。。。 }); }
channel初始化器构造:io.lettuce.core.ConnectionBuilder#build
public ChannelInitializer<Channel> build(SocketAddress socketAddress) {
return new PlainChannelInitializer(this::buildHandlers, clientResources);
}
protected List<ChannelHandler> buildHandlers() { List<ChannelHandler> handlers = new ArrayList<>(); connection.setOptions(clientOptions); // channel连接、断链事件监听handler(tcp级别事件) handlers.add(new ChannelGroupListener(channelGroup, clientResources.eventBus())); // resp协议编码handler handlers.add(new CommandEncoder()); // 连接建立后握手handler,主要执行AUTH、PING命令 handlers.add(getHandshakeHandler()); // 命令处理handler handlers.add(commandHandlerSupplier.get()); // 连接事件发布handler(握手完成后,应用层级别的事件) handlers.add(new ConnectionEventTrigger(connectionEvents, connection, clientResources.eventBus())); // 自动重连处理handler if (clientOptions.isAutoReconnect()) { handlers.add(createConnectionWatchdog()); } return handlers; }
以上就是LettuceConnectionFactory在bean创建后初始化lettuce集群客户端、饥饿式初始化连接的流程;
懒惰式建链是由第一次命令执行时触发建链,建链调用触发点为org.springframework.data.redis.core.RedisTemplate#execute(org.springframework.data.redis.core.RedisCallback, boolean, boolean),实际原理与饥饿式建链一致,此处不再赘述。
spring应用启动时,会回调LettuceConnectionFactory#afterPropertiesSet方法,实现构造lettuce客户端、并在饥饿式建链时初始化netty客户端、建链,执行resp协议握手流程。
https://github.com/spring-projects/spring-data-redis
https://github.com/redis/lettuce
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。