当前位置:   article > 正文

redission收发命令流程分析_redisson 发送原始报文

redisson 发送原始报文

一、示例,我们从最简单的GET命令开始。

RBucket<Object> t = redissonClient.getBucket("syncTradeUid_idOff");
int idOff = (int)t.get();

二、springboot的Redission自动配置

  1. @Order(value = 4001)
  2. @ConditionalOnProperty("redisson.password")
  3. @Configuration
  4. @EnableConfigurationProperties({RedissonProperties.class})
  5. public class RedissonAutoConfiguration {
  6. public RedissonAutoConfiguration() {
  7. System.out.println("==========================redis 初始化成功=======================");
  8. }
  9. @Autowired
  10. private RedissonProperties redissonProperties;
  11. @Bean(name = "redissonClient")
  12. @ConditionalOnProperty(name="redisson.address")
  13. RedissonClient redissonSingle() {
  14. Config config = new Config();
  15. config.setCodec(new FastJsonCodec());
  16. SingleServerConfig serverConfig = config.useSingleServer()
  17. .setAddress(redissonProperties.getAddress())
  18. .setTimeout(redissonProperties.getTimeout())
  19. .setConnectionPoolSize(redissonProperties.getConnectionPoolSize())
  20. .setConnectionMinimumIdleSize(redissonProperties.getConnectionMinimumIdleSize());
  21. if(!StrUtil.isEmpty(redissonProperties.getPassword())) {
  22. serverConfig.setPassword(redissonProperties.getPassword());
  23. }
  24. return Redisson.create(config);
  25. }
  26. /**
  27. * 哨兵模式自动装配
  28. * @return
  29. */
  30. @Bean(name = "redissonClient")
  31. @ConditionalOnProperty(name="redisson.masterName")
  32. RedissonClient redissonSentinel() {
  33. Config config = new Config();
  34. config.setCodec(new FastJsonCodec());
  35. SentinelServersConfig serverConfig = config.useSentinelServers().addSentinelAddress(redissonProperties.getSentinelAddresses())
  36. .setMasterName(redissonProperties.getMasterName())
  37. .setTimeout(redissonProperties.getTimeout())
  38. .setMasterConnectionPoolSize(redissonProperties.getMasterConnectionPoolSize())
  39. .setSlaveConnectionPoolSize(redissonProperties.getSlaveConnectionPoolSize())
  40. .setReadMode(ReadMode.SLAVE);
  41. if(!StrUtil.isEmpty(redissonProperties.getPassword())) {
  42. serverConfig.setPassword(redissonProperties.getPassword());
  43. }
  44. return Redisson.create(config);
  45. }
  46. }

application.properites

  1. #单机
  2. redisson.address = redis://127.0.0.1:6379
  3. redisson.password =
  4. #哨兵
  5. #redisson.masterName=BF-20190319DBXF
  6. #redisson.schema=redis://
  7. #redisson.sentinelAddresses=redis://127.0.0.1:26379,redis://127.0.0.1:26479,redis://127.0.0.1:26579
  8. #redisson.password=

三、REDISSION自动配置初始化流程

1.从Redisson.create(config)创建redission对象开始。Redission继承于

RedissonClient

2. 创建连接管理器对象

  1. org.redisson.config.ConfigSupport
  2. public static ConnectionManager createConnectionManager(Config configCopy) {
  3. UUID id = UUID.randomUUID();
  4. if (configCopy.getMasterSlaveServersConfig() != null) {
  5. validate(configCopy.getMasterSlaveServersConfig());
  6. return new MasterSlaveConnectionManager(configCopy.getMasterSlaveServersConfig(), configCopy, id);
  7. } else if (configCopy.getSingleServerConfig() != null) {
  8. validate(configCopy.getSingleServerConfig());
  9. return new SingleConnectionManager(configCopy.getSingleServerConfig(), configCopy, id);
  10. } else if (configCopy.getSentinelServersConfig() != null) {
  11. validate(configCopy.getSentinelServersConfig());
  12. return new SentinelConnectionManager(configCopy.getSentinelServersConfig(), configCopy, id);
  13. } else if (configCopy.getClusterServersConfig() != null) {
  14. validate(configCopy.getClusterServersConfig());
  15. return new ClusterConnectionManager(configCopy.getClusterServersConfig(), configCopy, id);
  16. } else if (configCopy.getReplicatedServersConfig() != null) {
  17. validate(configCopy.getReplicatedServersConfig());
  18. return new ReplicatedConnectionManager(configCopy.getReplicatedServersConfig(), configCopy, id);
  19. } else if (configCopy.getConnectionManager() != null) {
  20. return configCopy.getConnectionManager();
  21. }else {
  22. throw new IllegalArgumentException("server(s) address(es) not defined!");
  23. }
  24. }

3.先看下MasterSlaveEntry->setupMasterEntry,这里会创建RedisClient,以及连接REDIS服务器。

  1. org.redisson.connection.MasterSlaveEntry
  2. public RFuture<RedisClient> setupMasterEntry(RedisURI address) {
  3. RedisClient client = connectionManager.createClient(NodeType.MASTER, address, sslHostname);
  4. return setupMasterEntry(client);
  5. }

4.创建RedisClient,这里面单机也是使用主从管理器,即是只有主没有从。统一起来。

  1. org.redisson.connection.MasterSlaveConnectionManager
  2. @Override
  3. public RedisClient createClient(NodeType type, RedisURI address, int timeout, int commandTimeout, String sslHostname) {
  4. RedisClientConfig redisConfig = createRedisConfig(type, address, timeout, commandTimeout, sslHostname);
  5. return RedisClient.create(redisConfig);
  6. }

5.在RedisClient会创建NEETY的bootstrap,channel,handler.

  1. org.redisson.client.RedisClient
  2. private Bootstrap createBootstrap(RedisClientConfig config, Type type) {
  3. Bootstrap bootstrap = new Bootstrap()
  4. .resolver(config.getResolverGroup())
  5. .channel(config.getSocketChannelClass())
  6. .group(config.getGroup());
  7. bootstrap.handler(new RedisChannelInitializer(bootstrap, config, this, channels, type));
  8. bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout());
  9. bootstrap.option(ChannelOption.SO_KEEPALIVE, config.isKeepAlive());
  10. bootstrap.option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay());
  11. config.getNettyHook().afterBoostrapInitialization(bootstrap);
  12. return bootstrap;
  13. }

 6.我们再看下RedisChannelInitializer,有添加哪些inBounder,outBounder

  1. org.redisson.client.handler.RedisChannelInitializer
  2. @Override
  3. protected void initChannel(Channel ch) throws Exception {
  4. initSsl(config, ch);
  5. if (type == Type.PLAIN) {
  6. ch.pipeline().addLast(new RedisConnectionHandler(redisClient));
  7. } else {
  8. ch.pipeline().addLast(new RedisPubSubConnectionHandler(redisClient));
  9. }
  10. ch.pipeline().addLast(
  11. connectionWatchdog,
  12. CommandEncoder.INSTANCE,
  13. CommandBatchEncoder.INSTANCE,
  14. new CommandsQueue());
  15. if (pingConnectionHandler != null) {
  16. ch.pipeline().addLast(pingConnectionHandler);
  17. }
  18. if (type == Type.PLAIN) {
  19. ch.pipeline().addLast(new CommandDecoder(config.getAddress().getScheme()));
  20. } else {
  21. ch.pipeline().addLast(new CommandPubSubDecoder(config));
  22. }
  23. ch.pipeline().addLast(new ErrorsLoggingHandler());
  24. config.getNettyHook().afterChannelInitialization(ch);
  25. }

 

 7.创建好RedisClient后,开始连接REDIS服务器。这里首先异步解析地址,解析成功后,在添加到写连接池时会创建和添加连接,在创建连接时会去连接REDIS服务器。

  1. org.redisson.connection. MasterSlaveEntry
  2. private RFuture<RedisClient> setupMasterEntry(RedisClient client) {
  3. RPromise<RedisClient> result = new RedissonPromise<RedisClient>();
  4. result.onComplete((res, e) -> {
  5. if (e != null) {
  6. client.shutdownAsync();
  7. }
  8. });
  9. RFuture<InetSocketAddress> addrFuture = client.resolveAddr();
  10. addrFuture.onComplete((res, e) -> {
  11. if (e != null) {
  12. result.tryFailure(e);
  13. return;
  14. }
  15. masterEntry = new ClientConnectionsEntry(
  16. client,
  17. config.getMasterConnectionMinimumIdleSize(),
  18. config.getMasterConnectionPoolSize(),
  19. config.getSubscriptionConnectionMinimumIdleSize(),
  20. config.getSubscriptionConnectionPoolSize(),
  21. connectionManager,
  22. NodeType.MASTER);
  23. int counter = 1;
  24. if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {
  25. counter++;
  26. }
  27. CountableListener<RedisClient> listener = new CountableListener<>(result, client, counter);
  28. RFuture<Void> writeFuture = writeConnectionPool.add(masterEntry);
  29. writeFuture.onComplete(listener);
  30. });
  31. return result;
  32. }

8.查看连接REDIS服务器过程

  1. org.redisson.connection.pool.ConnectionPool.
  2. private void initConnections(ClientConnectionsEntry entry, RPromise<Void> initPromise, boolean checkFreezed) {
  3. int minimumIdleSize = getMinimumIdleSize(entry);
  4. if (minimumIdleSize == 0 || (checkFreezed && entry.isFreezed())) {
  5. initPromise.trySuccess(null);
  6. return;
  7. }
  8. AtomicInteger initializedConnections = new AtomicInteger(minimumIdleSize);
  9. int startAmount = Math.min(10, minimumIdleSize);
  10. AtomicInteger requests = new AtomicInteger(startAmount);
  11. for (int i = 0; i < startAmount; i++) {
  12. createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections);
  13. }
  14. }

在这里可以看到会初始化10个客户端连接到连接池。

9.从连接池去申请创建连接

  1. ConnectionPool
  2. private void createConnection(boolean checkFreezed, AtomicInteger requests, ClientConnectionsEntry entry, RPromise<Void> initPromise,
  3. int minimumIdleSize, AtomicInteger initializedConnections) {
  4. acquireConnection(entry, new Runnable() {
  5. @Override
  6. public void run() {
  7. RPromise<T> promise = new RedissonPromise<T>();
  8. createConnection(entry, promise);
  9. promise.onComplete((conn, e) -> {
  10. });
  11. }
  12. });
  13. }

 10.最终创建连接是在RedisClient.connectAsync这个异步连接方法中。

  1. public RFuture<RedisConnection> connectAsync() {
  2. final RPromise<RedisConnection> f = new RedissonPromise<RedisConnection>();
  3. RFuture<InetSocketAddress> addrFuture = resolveAddr();
  4. addrFuture.onComplete((res, e) -> {
  5. if (e != null) {
  6. f.tryFailure(e);
  7. return;
  8. }
  9. ChannelFuture channelFuture = bootstrap.connect(res);
  10. });
  11. return f;
  12. }

11.在连接成功后,RedisConnectionHandler.channelRegistered方法中创建连接对象。

  1. @Override
  2. public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
  3. if (connection == null) {
  4. connection = createConnection(ctx);
  5. }
  6. super.channelRegistered(ctx);
  7. }

12.在这里对channel赋值,保存。这里每个channel里面会有一个RedisConnection的属性。

  1. RedisConnection
  2. public void updateChannel(Channel channel) {
  3. this.channel = channel;
  4. channel.attr(CONNECTION).set(this);
  5. }

13.在连接成功后发送PING心跳命令

  1. BaseConnectionHandler
  2. @Override
  3. public void channelActive(final ChannelHandlerContext ctx) {
  4. List<RFuture<Object>> futures = new ArrayList<RFuture<Object>>();
  5. RedisClientConfig config = redisClient.getConfig();
  6. if (config.getPassword() != null) {
  7. RFuture<Object> future;
  8. if (config.getUsername() != null) {
  9. future = connection.async(RedisCommands.AUTH, config.getUsername(), config.getPassword());
  10. } else {
  11. future = connection.async(RedisCommands.AUTH, config.getPassword());
  12. }
  13. futures.add(future);
  14. }futures.add(future);
  15. }
  16. if (config.getPingConnectionInterval() > 0) {
  17. RFuture<Object> future = connection.async(RedisCommands.PING);
  18. futures.add(future);
  19. }
  20. final AtomicBoolean retry = new AtomicBoolean();
  21. final AtomicInteger commandsCounter = new AtomicInteger(futures.size());
  22. for (RFuture<Object> future : futures) {
  23. future.onComplete((res, e) -> {
  24. if (e != null) {
  25. if (e instanceof RedisLoadingException) {
  26. if (retry.compareAndSet(false, true)) {
  27. ctx.executor().schedule(() -> {
  28. channelActive(ctx);
  29. }, 1, TimeUnit.SECONDS);
  30. }
  31. return;
  32. }
  33. connection.closeAsync();
  34. connectionPromise.tryFailure(e);
  35. return;
  36. }
  37. if (commandsCounter.decrementAndGet() == 0) {
  38. ctx.fireChannelActive();
  39. connectionPromise.trySuccess(connection);
  40. }
  41. });
  42. }
  43. }

四、发送命令过程。

1.首先从RedissionBucket的set方法

,这里面的 commandExecutor来源于connectionManager中的命令执行器。

2.然后进行入到RedisExecutor中的execute方法,去异步执行命令。这里首先从连接池获取连接,然后在异步连接成功后,发送命令。

  1. public void execute() {
  2. codec = getCodec(codec);
  3. RFuture<RedisConnection> connectionFuture = getConnection();
  4. connectionFuture.onComplete((connection, e) -> {
  5. sendCommand(attemptPromise, connection);
  6. writeFuture.addListener(new ChannelFutureListener() {
  7. @Override
  8. public void operationComplete(ChannelFuture future) throws Exception {
  9. checkWriteFuture(writeFuture, attemptPromise, connection);
  10. }
  11. });
  12. releaseConnection(attemptPromise, connectionFuture);
  13. });
  14. }

3.获取连接是从连接池中获取。根据读写模式从连接管理器中选择可用连接返回。

  1. RedisExecutor
  2. protected RFuture<RedisConnection> getConnection() {
  3. if (readOnlyMode) {
  4. connectionFuture = connectionManager.connectionReadOp(source, command);
  5. } else {
  6. connectionFuture = connectionManager.connectionWriteOp(source, command);
  7. }
  8. return connectionFuture;
  9. }

 3.接着调用RedisConnection的send向channel写入数据。

  1. RedisConnection
  2. public <T, R> ChannelFuture send(CommandData<T, R> data) {
  3. return channel.writeAndFlush(data);
  4. }

4.netty的inBoundHandler中有一个CommandsQueue,为一个命令同步队列,同一时刻一个连接只有一个命令在执行,执行完后,再执行下一个命令。

  1. org.redisson.client.handler.CommandsQueue
  2. private void sendData(Channel ch) {
  3. QueueCommandHolder command = queue.peek();
  4. if (command != null && command.trySend()) {
  5. QueueCommand data = command.getCommand();
  6. List<CommandData<Object, Object>> pubSubOps = data.getPubSubOperations();
  7. if (!pubSubOps.isEmpty()) {
  8. for (CommandData<Object, Object> cd : pubSubOps) {
  9. for (Object channel : cd.getParams()) {
  10. ch.pipeline().get(CommandPubSubDecoder.class).addPubSubCommand((ChannelName) channel, cd);
  11. }
  12. }
  13. } else {
  14. ch.attr(CURRENT_COMMAND).set(data);
  15. }
  16. command.getChannelPromise().addListener(listener);
  17. ch.writeAndFlush(data, command.getChannelPromise());
  18. }
  19. }

 

 

五、接收数据回调过程。

1.接收inhandler, 在收到数据后,从attr中的current_command属性中取出数据。

  1. CommandDecoder
  2. @Override
  3. protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
  4. QueueCommand data = ctx.channel().attr(CommandsQueue.CURRENT_COMMAND).get();
  5. if (state() == null) {
  6. state(new State());
  7. }
  8. if (data == null) {
  9. while (in.writerIndex() > in.readerIndex()) {
  10. int endIndex = skipCommand(in);
  11. try {
  12. decode(ctx, in, data);
  13. } catch (Exception e) {
  14. in.readerIndex(endIndex);
  15. throw e;
  16. }
  17. }
  18. } else {
  19. int endIndex = 0;
  20. if (!(data instanceof CommandsData)) {
  21. endIndex = skipCommand(in);
  22. }
  23. try {
  24. decode(ctx, in, data);
  25. } catch (Exception e) {
  26. if (!(data instanceof CommandsData)) {
  27. in.readerIndex(endIndex);
  28. }
  29. throw e;
  30. }
  31. }
  32. }

 

2.根据相应的PROMISE设置回调数据。

  1. CommandDecoder
  2. protected void completeResponse(CommandData<Object, Object> data, Object result) {
  3. if (data != null) {
  4. data.getPromise().trySuccess(result);
  5. }
  6. }

3.在等待异步PROMISE结果。

  1. CommandAsyncService
  2. @Override
  3. public <V> V get(RFuture<V> future) {
  4. try {
  5. future.await();
  6. } catch (InterruptedException e) {
  7. Thread.currentThread().interrupt();
  8. }
  9. if (future.isSuccess()) {
  10. return future.getNow();
  11. }
  12. throw convertException(future);
  13. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/710746
推荐阅读
相关标签
  

闽ICP备14008679号