赞
踩
在开发中,有时需要对Redis 进行大批量的处理。
比如Redis批量查询多个Hash。如果是在for循环中逐个查询,那性能会很差。
这时,可以使用 Pipeline (管道)。
Pipeline (管道) 可以一次性发送多条命令并在执行完后一次性将结果返回,pipeline 通过减少客户端与 redis 的通信次数来实现降低往返延时时间,而且 Pipeline 实现的原理是队列,而队列的原理是时先进先出,这样就保证数据的顺序性。
RedisTemplate的管道操作,使用executePipelined()方法。
org.springframework.data.redis.core.RedisTemplate#executePipelined(org.springframework.data.redis.core.SessionCallback<?>)
- @Override
- public List<Object> executePipelined(SessionCallback<?> session, @Nullable RedisSerializer<?> resultSerializer) {
-
- Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");
- Assert.notNull(session, "Callback object must not be null");
-
- RedisConnectionFactory factory = getRequiredConnectionFactory();
- // 绑定连接
- RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);
- try {
- return execute((RedisCallback<List<Object>>) connection -> {
- //打开管道
- connection.openPipeline();
- boolean pipelinedClosed = false;
- try {
- Object result = executeSession(session);
- //callback的返回值,只能返回null,否则会报错。
- if (result != null) {
- throw new InvalidDataAccessApiUsageException(
- "Callback cannot return a non-null value as it gets overwritten by the pipeline");
- }
- //关闭管道,并获取返回值
- List<Object> closePipeline = connection.closePipeline();
- pipelinedClosed = true;
- return deserializeMixedResults(closePipeline, resultSerializer, hashKeySerializer, hashValueSerializer);
- } finally {
- if (!pipelinedClosed) {
- connection.closePipeline();
- }
- }
- });
- } finally {
- RedisConnectionUtils.unbindConnection(factory);
- }
- }
- Object result = executeSession(session);
- if (result != null) {
- throw new InvalidDataAccessApiUsageException(
- "Callback cannot return a non-null value as it gets overwritten by the pipeline");
- }
(2)在executePipelined()中, 使用 get()方法 得到的value会是null。
在 redisTemplate进行管道操作,结果值只能通过 executePipelined()方法的返回值List获取.
- /**
- * Get the value of {@code key}.
- *
- * @param key must not be {@literal null}.
- * 在管道(pipeline)中使用 get()方法 得到的value会是null
- * @return {@literal null} when used in pipeline / transaction.
- */
- @Nullable
- V get(Object key);
- List<Object> list = stringRedisTemplate.executePipelined(new SessionCallback<String>() {
- @Override
- public String execute(@NonNull RedisOperations operations) throws DataAccessException {
- //idList是多个id的集合
- for (String id : idList) {
- //key由前缀加唯一id组成
- String key = KEY_PREFIX + id;
- //在管道中使用 get()方法 得到的value会是null。value通过executePipelined()的返回值List<Object>获取。
- operations.opsForHash().get(key, field);
- }
- //Callback只能返回null, 否则报错:
- // InvalidDataAccessApiUsageException: Callback cannot return a non-null value as it gets overwritten by the pipeline
- return null;
- }
- });
-
- list.forEach(System.out::println);
RedisTemplate操作管道比较方便,但如果要组装key和value的map,就会比较麻烦。
在这种情况下,可以使用Jedis。
比如,Jedis批量查询多个Hash,可以使用 Pipeline (管道)。
源码见: redis.clients.jedis.PipelineBase#hget(java.lang.String, java.lang.String)
hget()方法,跟普通hash的hget()有点类似,不过返回值是 Response。
- public Response<String> hget(String key, String field) {
- this.getClient(key).hget(key, field);
- return this.getResponse(BuilderFactory.STRING);
- }
- public void testPipLine() {
- Map<String, Response<String>> responseMap = new HashMap<>();
- //try-with-resources, 自动关闭资源
- //先连接jedis,再拿到 pipeline
- try (Jedis jedis = getJedis();
- Pipeline pipeline = jedis.pipelined()) {
- for (String id : idList) {
- //前缀加唯一id
- String key = KEY_PREFIX + id;
- //使用pipeline.hget查询hash的数据
- Response<String> response = pipeline.hget(key, field);
- responseMap.put(id, response);
- }
- pipeline.sync();
- } catch (Exception ex) {
- log.error("responses error.", ex);
- }
-
- Map<String, String> map = new HashMap<>();
- //组装map。response.get()在pipeline关闭后才能执行,否则拿到的value都是null
- responseMap.forEach((k,response) -> map.put(k, response.get()));
-
- map.forEach((k,v)-> System.out.println(k+",val:"+v));
-
- }
-
-
- private static Pool<Jedis> jedisPool = null;
-
- /**
- * 连接redis,获取jedisPool
- * @return
- */
- public Jedis getJedis() {
- if (jedisPool == null) {
- JedisPoolConfig poolConfig = new JedisPoolConfig();
- poolConfig.setMaxTotal(maxTotal);
- poolConfig.setMaxIdle(maxIdle);
- poolConfig.setMaxWaitMillis(maxWaitMillis);
- poolConfig.setTestOnBorrow(testOnBorrow);
- //配置可以写在配置中心/文件
- jedisPool = new JedisPool(poolConfig, host, port, timeout, password, database);
- }
- return jedisPool.getResource();
- }
Redis pipelining | Redishttps://www.cnblogs.com/expiator/p/11127719.html
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。