当前位置:   article > 正文

Redis使用Pipeline(管道)批量处理_redistemplate管道pipeline关闭

redistemplate管道pipeline关闭

Redis 批量处理

在开发中,有时需要对Redis 进行大批量的处理。

比如Redis批量查询多个Hash。如果是在for循环中逐个查询,那性能会很差。

这时,可以使用 Pipeline (管道)。

Pipeline (管道)

Pipeline (管道) 可以一次性发送多条命令并在执行完后一次性将结果返回,pipeline 通过减少客户端与 redis 的通信次数来实现降低往返延时时间,而且 Pipeline 实现的原理是队列,而队列的原理是时先进先出,这样就保证数据的顺序性。

RedisTemplate的管道操作

RedisTemplate的管道操作,使用executePipelined()方法。
org.springframework.data.redis.core.RedisTemplate#executePipelined(org.springframework.data.redis.core.SessionCallback<?>)

  1. @Override
  2. public List<Object> executePipelined(SessionCallback<?> session, @Nullable RedisSerializer<?> resultSerializer) {
  3. Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");
  4. Assert.notNull(session, "Callback object must not be null");
  5. RedisConnectionFactory factory = getRequiredConnectionFactory();
  6. // 绑定连接
  7. RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);
  8. try {
  9. return execute((RedisCallback<List<Object>>) connection -> {
  10. //打开管道
  11. connection.openPipeline();
  12. boolean pipelinedClosed = false;
  13. try {
  14. Object result = executeSession(session);
  15. //callback的返回值,只能返回null,否则会报错。
  16. if (result != null) {
  17. throw new InvalidDataAccessApiUsageException(
  18. "Callback cannot return a non-null value as it gets overwritten by the pipeline");
  19. }
  20. //关闭管道,并获取返回值
  21. List<Object> closePipeline = connection.closePipeline();
  22. pipelinedClosed = true;
  23. return deserializeMixedResults(closePipeline, resultSerializer, hashKeySerializer, hashValueSerializer);
  24. } finally {
  25. if (!pipelinedClosed) {
  26. connection.closePipeline();
  27. }
  28. }
  29. });
  30. } finally {
  31. RedisConnectionUtils.unbindConnection(factory);
  32. }
  33. }
  • 注意:
    (1) executePipelined()的callback参数,实现execute() 方法只能返回null,否则会报错。
    报错: InvalidDataAccessApiUsageException: Callback cannot return a non-null value as it gets overwritten by the pipeline
    源码如下:
  1. Object result = executeSession(session);
  2. if (result != null) {
  3. throw new InvalidDataAccessApiUsageException(
  4. "Callback cannot return a non-null value as it gets overwritten by the pipeline");
  5. }

(2)在executePipelined()中, 使用 get()方法 得到的value会是null。
在 redisTemplate进行管道操作,结果值只能通过 executePipelined()方法的返回值List获取.

  1. /**
  2. * Get the value of {@code key}.
  3. *
  4. * @param key must not be {@literal null}.
  5. * 在管道(pipeline)中使用 get()方法 得到的value会是null
  6. * @return {@literal null} when used in pipeline / transaction.
  7. */
  8. @Nullable
  9. V get(Object key);
  • redisTemplate获取管道返回值:
  1. List<Object> list = stringRedisTemplate.executePipelined(new SessionCallback<String>() {
  2. @Override
  3. public String execute(@NonNull RedisOperations operations) throws DataAccessException {
  4. //idList是多个id的集合
  5. for (String id : idList) {
  6. //key由前缀加唯一id组成
  7. String key = KEY_PREFIX + id;
  8. //在管道中使用 get()方法 得到的value会是nullvalue通过executePipelined()的返回值List<Object>获取。
  9. operations.opsForHash().get(key, field);
  10. }
  11. //Callback只能返回null, 否则报错:
  12. // InvalidDataAccessApiUsageException: Callback cannot return a non-null value as it gets overwritten by the pipeline
  13. return null;
  14. }
  15. });
  16. list.forEach(System.out::println);

Jedis操作管道

RedisTemplate操作管道比较方便,但如果要组装key和value的map,就会比较麻烦。
在这种情况下,可以使用Jedis。
比如,Jedis批量查询多个Hash,可以使用 Pipeline (管道)。
源码见: redis.clients.jedis.PipelineBase#hget(java.lang.String, java.lang.String)
hget()方法,跟普通hash的hget()有点类似,不过返回值是 Response。

  1. public Response<String> hget(String key, String field) {
  2. this.getClient(key).hget(key, field);
  3. return this.getResponse(BuilderFactory.STRING);
  4. }

示例:

  1. public void testPipLine() {
  2. Map<String, Response<String>> responseMap = new HashMap<>();
  3. //try-with-resources, 自动关闭资源
  4. //先连接jedis,再拿到 pipeline
  5. try (Jedis jedis = getJedis();
  6. Pipeline pipeline = jedis.pipelined()) {
  7. for (String id : idList) {
  8. //前缀加唯一id
  9. String key = KEY_PREFIX + id;
  10. //使用pipeline.hget查询hash的数据
  11. Response<String> response = pipeline.hget(key, field);
  12. responseMap.put(id, response);
  13. }
  14. pipeline.sync();
  15. } catch (Exception ex) {
  16. log.error("responses error.", ex);
  17. }
  18. Map<String, String> map = new HashMap<>();
  19. //组装map。response.get()在pipeline关闭后才能执行,否则拿到的value都是null
  20. responseMap.forEach((k,response) -> map.put(k, response.get()));
  21. map.forEach((k,v)-> System.out.println(k+",val:"+v));
  22. }
  23. private static Pool<Jedis> jedisPool = null;
  24. /**
  25. * 连接redis,获取jedisPool
  26. * @return
  27. */
  28. public Jedis getJedis() {
  29. if (jedisPool == null) {
  30. JedisPoolConfig poolConfig = new JedisPoolConfig();
  31. poolConfig.setMaxTotal(maxTotal);
  32. poolConfig.setMaxIdle(maxIdle);
  33. poolConfig.setMaxWaitMillis(maxWaitMillis);
  34. poolConfig.setTestOnBorrow(testOnBorrow);
  35. //配置可以写在配置中心/文件
  36. jedisPool = new JedisPool(poolConfig, host, port, timeout, password, database);
  37. }
  38. return jedisPool.getResource();
  39. }

参考资料

Redis pipelining | Redishttps://www.cnblogs.com/expiator/p/11127719.html

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/76648
推荐阅读
相关标签
  

闽ICP备14008679号