当前位置:   article > 正文

异步编程实战之lettuce

异步编程实战之lettuce

一, 添加所需依赖

  1. <parent>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-parent</artifactId>
  4. <version>2.6.5</version>
  5. <relativePath/> <!-- lookup parent from repository -->
  6. </parent>
  7. <dependency>
  8. <groupId>org.springframework.boot</groupId>
  9. <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.apache.commons</groupId>
  13. <artifactId>commons-pool2</artifactId>
  14. </dependency>

二, redis配置参数

  1. # redis
  2. spring.redis.database=0
  3. spring.redis.host=r-uf63gkx4qs0ygz3rag.redis.rds.aliyuncs.com
  4. spring.redis.port=6379
  5. spring.redis.password=ywjAYKDz28RFCJg9iRW4
  6. # 连接超时时间
  7. spring.redis.timeout=5000
  8. # 连接池最大连接数(使用负值表示没有限制)
  9. spring.redis.lettuce.pool.max-active=5000
  10. # 连接池中的最小空闲连接
  11. spring.redis.lettuce.pool.min-idle=1
  12. # 连接池中的最大空闲连接
  13. spring.redis.lettuce.pool.max-idle=100
  14. # 连接池最大阻塞等待时间(使用负值表示没有限制)
  15. spring.redis.lettuce.pool.max-wait=1000
  16. #在关闭客户端连接之前等待任务处理完成的最长时间,在这之后,无论任务是否执行完成,都会被执行器关闭,默认100ms
  17. spring.redis.lettuce.shutdown-timeout=1000
  18. #是否缓存空值
  19. spring.cache.redis.cache-null-values=false

三, redis连接池设置

  1. package com.shuinfo.ods_poc.config;
  2. import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
  3. import org.springframework.beans.factory.annotation.Value;
  4. import org.springframework.cache.annotation.EnableCaching;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. import org.springframework.data.redis.connection.RedisPassword;
  8. import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
  9. import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
  10. import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
  11. import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
  12. import org.springframework.data.redis.core.ReactiveRedisTemplate;
  13. import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
  14. import org.springframework.data.redis.serializer.RedisSerializationContext;
  15. import org.springframework.data.redis.serializer.RedisSerializer;
  16. import org.springframework.data.redis.serializer.StringRedisSerializer;
  17. import java.time.Duration;
  18. @Configuration
  19. @EnableCaching
  20. public class RedisConfig {
  21. @Value("${spring.redis.database}")
  22. private int database;
  23. @Value("${spring.redis.host}")
  24. private String host;
  25. @Value("${spring.redis.password}")
  26. private String password;
  27. @Value("${spring.redis.port}")
  28. private int port;
  29. @Value("${spring.redis.timeout}")
  30. private long timeout;
  31. @Value("${spring.redis.lettuce.shutdown-timeout}")
  32. private long shutDownTimeout;
  33. @Value("${spring.redis.lettuce.pool.max-idle}")
  34. private int maxIdle;
  35. @Value("${spring.redis.lettuce.pool.min-idle}")
  36. private int minIdle;
  37. @Value("${spring.redis.lettuce.pool.max-active}")
  38. private int maxActive;
  39. @Value("${spring.redis.lettuce.pool.max-wait}")
  40. private long maxWait;
  41. Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
  42. @Bean
  43. public LettuceConnectionFactory lettuceConnectionFactory() {
  44. GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
  45. genericObjectPoolConfig.setMaxIdle(maxIdle);
  46. genericObjectPoolConfig.setMinIdle(minIdle);
  47. genericObjectPoolConfig.setMaxTotal(maxActive);
  48. genericObjectPoolConfig.setMaxWait(Duration.ofMillis(maxWait));
  49. //每隔多少毫秒,空闲线程驱逐器关闭多余的空闲连接,且保持最少空闲连接可用,这个值最好设置大一点,否者影响性能
  50. genericObjectPoolConfig.setTimeBetweenEvictionRuns(Duration.ofMillis(1000));
  51. RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration();
  52. redisStandaloneConfiguration.setDatabase(database);
  53. redisStandaloneConfiguration.setHostName(host);
  54. redisStandaloneConfiguration.setPort(port);
  55. redisStandaloneConfiguration.setPassword(RedisPassword.of(password));
  56. LettuceClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder()
  57. .commandTimeout(Duration.ofMillis(timeout))
  58. .shutdownTimeout(Duration.ofMillis(shutDownTimeout))
  59. .poolConfig(genericObjectPoolConfig)
  60. .build();
  61. LettuceConnectionFactory factory = new LettuceConnectionFactory(redisStandaloneConfiguration, clientConfig);
  62. // factory.setShareNativeConnection(true);
  63. // factory.setValidateConnection(false);
  64. return factory;
  65. }
  66. @Bean
  67. public RedisSerializationContext redisSerializationContext() {
  68. RedisSerializationContext.RedisSerializationContextBuilder builder = RedisSerializationContext.newSerializationContext();
  69. builder.key(StringRedisSerializer.UTF_8);
  70. builder.value(RedisSerializer.json());
  71. builder.hashKey(StringRedisSerializer.UTF_8);
  72. builder.hashValue(StringRedisSerializer.UTF_8);
  73. return builder.build();
  74. }
  75. @Bean
  76. public ReactiveRedisTemplate<String, Object> reactiveRedisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
  77. RedisSerializationContext serializationContext = redisSerializationContext();
  78. return new ReactiveRedisTemplate(lettuceConnectionFactory,serializationContext);
  79. }
  80. }

四, 封装工具类

  1. package com.shuinfo.ods_poc.utils;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.data.redis.core.ReactiveRedisTemplate;
  5. import org.springframework.stereotype.Component;
  6. import reactor.core.publisher.Flux;
  7. import reactor.core.publisher.Mono;
  8. import java.time.Duration;
  9. @Component
  10. @Slf4j
  11. public class RedisUtils {
  12. private static final Duration EXPIRED_DAY = Duration.ofDays(7);
  13. private static final Duration EXPIRED_HOURS = Duration.ofHours(30);
  14. private static final Duration EXPIRED_MINUTES = Duration.ofMinutes(1);
  15. @Autowired
  16. private ReactiveRedisTemplate reactiveRedisTemplate;
  17. public <T> void put(String key,T value){
  18. reactiveRedisTemplate.opsForValue()
  19. .set(key,value,EXPIRED_HOURS).subscribe((b->{}));
  20. }
  21. public <T> Mono<T> get(String key){
  22. return reactiveRedisTemplate.opsForValue()
  23. .get(key);
  24. }
  25. public <T> void putList(String key, T value){
  26. reactiveRedisTemplate.opsForSet().add(key,value).subscribe();
  27. }
  28. public <T> Flux<T> getList(String key){
  29. return reactiveRedisTemplate.opsForSet().members(key);
  30. }
  31. public void remove(String key){
  32. reactiveRedisTemplate.opsForSet().delete(key).subscribe();
  33. }
  34. public <T> void putZSet(String key, T value,String score){
  35. reactiveRedisTemplate.opsForZSet().add(key,value,Double.valueOf(score))
  36. .doOnError(e->{
  37. log.error(e.toString());
  38. }).subscribe();
  39. }
  40. public <T> Flux<T> getZSet(String key,Integer page,Integer size){
  41. if(page == null || size == null){
  42. return reactiveRedisTemplate.opsForZSet().rangeByScore(key,
  43. org.springframework.data.domain.Range.unbounded());
  44. }
  45. return reactiveRedisTemplate.opsForZSet().rangeByScore(key,
  46. org.springframework.data.domain.Range.unbounded(),
  47. org.springframework.data.redis.connection
  48. .RedisZSetCommands.Limit.limit()
  49. .offset((page - 1) * size).count(size)
  50. );
  51. }
  52. }

五, 使用

  1. //存缓存
  2. redisUtils.putList(RedisConstant.FIND_ORDER_BY_UID_AND_OTYPE
  3. + req.getMemberCode()+req.getBusinessTypeCode(),req);
  4. redisUtils.putList(RedisConstant.FIND_ORDER_BY_SHOP_POS_INFO
  5. + req.getShopCode()+req.getOrderTime()+req.getPosRefId()
  6. + req.getReceiptNumber(),req);

六, 分布式锁(RedisTemplate)

  1. package com.shuinfo.ods_poc.utils;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.data.redis.connection.RedisStringCommands;
  4. import org.springframework.data.redis.connection.ReturnType;
  5. import org.springframework.data.redis.core.RedisCallback;
  6. import org.springframework.data.redis.core.RedisTemplate;
  7. import org.springframework.data.redis.core.types.Expiration;
  8. import org.springframework.stereotype.Component;
  9. import javax.annotation.Resource;
  10. import java.nio.charset.Charset;
  11. import java.util.concurrent.TimeUnit;
  12. @Component
  13. @Slf4j
  14. public class RedisLockUtils {
  15. @Resource
  16. private RedisTemplate redisTemplate;
  17. private static final Long DEFAULT_EXPIRE_TIME = 1L;
  18. private static final TimeUnit DEFAULT_UNIT = TimeUnit.SECONDS;
  19. private static final String UNLOCK_LUA;
  20. private static final String CHARCODE = "UTF-8";
  21. /**
  22. * 释放锁脚本,原子操作
  23. */
  24. static {
  25. StringBuilder sb = new StringBuilder();
  26. sb.append("if redis.call(\"get\",KEYS[1]) == ARGV[1] ");
  27. sb.append("then ");
  28. sb.append(" return redis.call(\"del\",KEYS[1]) ");
  29. sb.append("else ");
  30. sb.append(" return 0 ");
  31. sb.append("end ");
  32. UNLOCK_LUA = sb.toString();
  33. }
  34. /**
  35. * 获取分布式锁,原子操作
  36. * @param lockKey
  37. * @param requestId 唯一ID, 可以使用UUID.randomUUID().toString();
  38. * @param expire
  39. * @param timeUnit
  40. * @return
  41. */
  42. public boolean tryLock(String lockKey, String requestId, long expire, TimeUnit timeUnit) {
  43. try{
  44. RedisCallback<Boolean> callback = (connection) ->
  45. connection.set(lockKey.getBytes(Charset.forName(CHARCODE)),
  46. requestId.getBytes(Charset.forName(CHARCODE)),
  47. Expiration.seconds(timeUnit.toSeconds(expire)),
  48. RedisStringCommands.SetOption.SET_IF_ABSENT);
  49. return (Boolean)redisTemplate.execute(callback);
  50. } catch (Exception e) {
  51. log.error("redis lock error.", e);
  52. }
  53. return false;
  54. }
  55. public boolean tryLock(String lockKey, String requestId) {
  56. return tryLock( lockKey, requestId,DEFAULT_EXPIRE_TIME,DEFAULT_UNIT);
  57. }
  58. /**
  59. * 释放锁
  60. * @param lockKey
  61. * @param requestId 唯一ID
  62. * @return
  63. */
  64. public boolean releaseLock(String lockKey, String requestId) {
  65. RedisCallback<Boolean> callback = (connection) ->
  66. connection.eval(UNLOCK_LUA.getBytes(), ReturnType.BOOLEAN ,
  67. 1, lockKey.getBytes(Charset.forName(CHARCODE)),
  68. requestId.getBytes(Charset.forName(CHARCODE)));
  69. return (Boolean)redisTemplate.execute(callback);
  70. }
  71. /**
  72. * 获取Redis锁的value值
  73. * @param lockKey
  74. * @return
  75. */
  76. public String get(String lockKey) {
  77. try {
  78. RedisCallback<String> callback = (connection) ->
  79. new String(connection.get(lockKey.getBytes()), Charset.forName(CHARCODE));
  80. return (String)redisTemplate.execute(callback);
  81. } catch (Exception e) {
  82. log.error("get redis occurred an exception", e);
  83. }
  84. return null;
  85. }
  86. }

可参照阿里文档:

通过客户端程序连接Redis_云数据库 Redis 版(Redis)-阿里云帮助中心

注: redis的参数还需要关注redis服务端的参数,有些默认的参数可能实际并不适合

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/202744
推荐阅读
相关标签
  

闽ICP备14008679号