当前位置:   article > 正文

简单注解实现集群同步锁(spring+redis+注解)_redis boundvalueops 同步锁

redis boundvalueops 同步锁

       互联网面试的时候,是不是面试官常问一个问题如何保证集群环境下数据操作并发问题,常用的synchronized肯定是无法满足了,或许你可以借助for update对数据加锁。本文的最终解决方式你只要在方法上加一个@P4jSyn注解就能保证集群环境下同synchronized的效果,且锁的key可以任意指定。本注解还支持了锁的超时机制。

本文需要对redis、spring和spring-data-redis有一定的了解。当然你可以借助本文的思路对通过注解对方法返回数据进行缓存,类似com.google.code.simple-spring-memcached的@ReadThroughSingleCache。


第一步: 介绍两个自定义注解P4jSyn、P4jSynKey

P4jSyn:必选项,标记在方法上,表示需要对该方法加集群同步锁;

P4jSynKey:可选项,加在方法参数上,表示以方法某个参数作为锁的key,用来保证更多的坑,P4jSynKey并不是强制要添加的,当没有P4jSynKey标记的情况下只会以P4jSyn的synKey作为锁key

  1. package com.yaoguoyin.redis.lock;
  2. import java.lang.annotation.ElementType;
  3. import java.lang.annotation.Inherited;
  4. import java.lang.annotation.Retention;
  5. import java.lang.annotation.RetentionPolicy;
  6. import java.lang.annotation.Target;
  7. /**
  8. * <b>同步锁:</b><br/>
  9. * 主要作用是在服务器集群环境下保证方法的synchronize;<br/>
  10. * 标记在方法上,使该方法的执行具有互斥性,并不保证并发执行方法的先后顺序;<br/>
  11. * 如果原有“A任务”获取锁后任务执行时间超过最大允许持锁时间,且锁被“B任务”获取到,在“B任务”成功货物锁会并不会终止“A任务”的执行;<br/>
  12. * <br/>
  13. * <b>注意:</b><br/>
  14. * 使用过程中需要注意keepMills、toWait、sleepMills、maxSleepMills等参数的场景使用;<br/>
  15. * 需要安装redis,并使用spring和spring-data-redis等,借助redis NX等方法实现。
  16. *
  17. * @see com.yaoguoyin.redis.lock.P4jSynKey
  18. * @see com.yaoguoyin.redis.lock.RedisLockAspect
  19. *
  20. * @author partner4java
  21. *
  22. */
  23. @Target({ ElementType.METHOD })
  24. @Retention(RetentionPolicy.RUNTIME)
  25. @Inherited
  26. public @interface P4jSyn {
  27. /**
  28. * 锁的key<br/>
  29. * 如果想增加坑的个数添加非固定锁,可以在参数上添加@P4jSynKey注解,但是本参数是必写选项<br/>
  30. * redis key的拼写规则为 "RedisSyn+" + synKey + @P4jSynKey<br/>
  31. *
  32. */
  33. String synKey();
  34. /**
  35. * 持锁时间,超时时间,持锁超过此时间自动丢弃锁<br/>
  36. * 单位毫秒,默认20秒<br/>
  37. * 如果为0表示永远不释放锁,在设置为0的情况下toWait为true是没有意义的<br/>
  38. * 但是没有比较强的业务要求下,不建议设置为0
  39. */
  40. long keepMills() default 20 * 1000;
  41. /**
  42. * 当获取锁失败,是继续等待还是放弃<br/>
  43. * 默认为继续等待
  44. */
  45. boolean toWait() default true;
  46. /**
  47. * 没有获取到锁的情况下且toWait()为继续等待,睡眠指定毫秒数继续获取锁,也就是轮训获取锁的时间<br/>
  48. * 默认为10毫秒
  49. *
  50. * @return
  51. */
  52. long sleepMills() default 10;
  53. /**
  54. * 锁获取超时时间:<br/>
  55. * 没有获取到锁的情况下且toWait()为true继续等待,最大等待时间,如果超时抛出
  56. * {@link java.util.concurrent.TimeoutException.TimeoutException}
  57. * ,可捕获此异常做相应业务处理;<br/>
  58. * 单位毫秒,默认一分钟,如果设置为0即为没有超时时间,一直获取下去;
  59. *
  60. * @return
  61. */
  62. long maxSleepMills() default 60 * 1000;
  63. }
  1. package com.yaoguoyin.redis.lock;
  2. import java.lang.annotation.ElementType;
  3. import java.lang.annotation.Inherited;
  4. import java.lang.annotation.Retention;
  5. import java.lang.annotation.RetentionPolicy;
  6. import java.lang.annotation.Target;
  7. /**
  8. * <b>同步锁 key</b><br/>
  9. * 加在方法的参数上,指定的参数会作为锁的key的一部分
  10. *
  11. * @author partner4java
  12. *
  13. */
  14. @Target({ ElementType.PARAMETER })
  15. @Retention(RetentionPolicy.RUNTIME)
  16. @Inherited
  17. public @interface P4jSynKey {
  18. /**
  19. * key的拼接顺序
  20. *
  21. * @return
  22. */
  23. int index() default 0;
  24. }
这里就不再对两个注解进行使用上的解释了,因为注释已经说明的很详细了。
使用示例:

  1. package com.yaoguoyin.redis.lock;
  2. import org.springframework.stereotype.Component;
  3. @Component
  4. public class SysTest {
  5. private static int i = 0;
  6. @P4jSyn(synKey = "12345")
  7. public void add(@P4jSynKey(index = 1) String key, @P4jSynKey(index = 0) int key1) {
  8. i++;
  9. System.out.println("i=-===========" + i);
  10. }
  11. }

第二步:切面编程
在不影响原有代码的前提下,保证执行同步,目前最直接的方式就是使用切面编程

  1. package com.yaoguoyin.redis.lock;
  2. import java.lang.annotation.Annotation;
  3. import java.lang.reflect.Method;
  4. import java.util.SortedMap;
  5. import java.util.TreeMap;
  6. import java.util.concurrent.TimeUnit;
  7. import java.util.concurrent.TimeoutException;
  8. import org.aspectj.lang.ProceedingJoinPoint;
  9. import org.aspectj.lang.annotation.Around;
  10. import org.aspectj.lang.annotation.Aspect;
  11. import org.aspectj.lang.reflect.MethodSignature;
  12. import org.springframework.beans.factory.annotation.Autowired;
  13. import org.springframework.beans.factory.annotation.Qualifier;
  14. import org.springframework.data.redis.core.BoundValueOperations;
  15. import org.springframework.data.redis.core.RedisTemplate;
  16. /**
  17. * 锁的切面编程<br/>
  18. * 针对添加@RedisLock 注解的方法进行加锁
  19. *
  20. * @see com.yaoguoyin.redis.lock.P4jSyn
  21. *
  22. * @author partner4java
  23. *
  24. */
  25. @Aspect
  26. public class RedisLockAspect {
  27. @Autowired
  28. @Qualifier("redisTemplate")
  29. private RedisTemplate<String, Long> redisTemplate;
  30. @Around("execution(* com.yaoguoyin..*(..)) && @annotation(com.yaoguoyin.redis.lock.P4jSyn)")
  31. public Object lock(ProceedingJoinPoint pjp) throws Throwable {
  32. P4jSyn lockInfo = getLockInfo(pjp);
  33. if (lockInfo == null) {
  34. throw new IllegalArgumentException("配置参数错误");
  35. }
  36. String synKey = getSynKey(pjp, lockInfo.synKey());
  37. if (synKey == null || "".equals(synKey)) {
  38. throw new IllegalArgumentException("配置参数synKey错误");
  39. }
  40. boolean lock = false;
  41. Object obj = null;
  42. try {
  43. // 超时时间
  44. long maxSleepMills = System.currentTimeMillis() + lockInfo.maxSleepMills();
  45. while (!lock) {
  46. long keepMills = System.currentTimeMillis() + lockInfo.keepMills();
  47. lock = setIfAbsent(synKey, keepMills);
  48. // 得到锁,没有人加过相同的锁
  49. if (lock) {
  50. obj = pjp.proceed();
  51. }
  52. // 锁设置了没有超时时间
  53. else if (lockInfo.keepMills() <= 0) {
  54. // 继续等待获取锁
  55. if (lockInfo.toWait()) {
  56. // 如果超过最大等待时间抛出异常
  57. if (lockInfo.maxSleepMills() > 0 && System.currentTimeMillis() > maxSleepMills) {
  58. throw new TimeoutException("获取锁资源等待超时");
  59. }
  60. TimeUnit.MILLISECONDS.sleep(lockInfo.sleepMills());
  61. } else {
  62. break;
  63. }
  64. }
  65. // 已过期,并且getAndSet后旧的时间戳依然是过期的,可以认为获取到了锁
  66. else if (System.currentTimeMillis() > getLock(synKey) && (System.currentTimeMillis() > getSet(synKey, keepMills))) {
  67. lock = true;
  68. obj = pjp.proceed();
  69. }
  70. // 没有得到任何锁
  71. else {
  72. // 继续等待获取锁
  73. if (lockInfo.toWait()) {
  74. // 如果超过最大等待时间抛出异常
  75. if (lockInfo.maxSleepMills() > 0 && System.currentTimeMillis() > maxSleepMills) {
  76. throw new TimeoutException("获取锁资源等待超时");
  77. }
  78. TimeUnit.MILLISECONDS.sleep(lockInfo.sleepMills());
  79. }
  80. // 放弃等待
  81. else {
  82. break;
  83. }
  84. }
  85. }
  86. } catch (Exception e) {
  87. e.printStackTrace();
  88. throw e;
  89. } finally {
  90. // 如果获取到了锁,释放锁
  91. if (lock) {
  92. releaseLock(synKey);
  93. }
  94. }
  95. return obj;
  96. }
  97. /**
  98. * 获取包括方法参数上的key<br/>
  99. * redis key的拼写规则为 "RedisSyn+" + synKey + @P4jSynKey
  100. *
  101. */
  102. private String getSynKey(ProceedingJoinPoint pjp, String synKey) {
  103. try {
  104. synKey = "RedisSyn+" + synKey;
  105. Object[] args = pjp.getArgs();
  106. if (args != null && args.length > 0) {
  107. MethodSignature methodSignature = (MethodSignature) pjp.getSignature();
  108. Annotation[][] paramAnnotationArrays = methodSignature.getMethod().getParameterAnnotations();
  109. SortedMap<Integer, String> keys = new TreeMap<Integer, String>();
  110. for (int ix = 0; ix < paramAnnotationArrays.length; ix++) {
  111. P4jSynKey p4jSynKey = getAnnotation(P4jSynKey.class, paramAnnotationArrays[ix]);
  112. if (p4jSynKey != null) {
  113. Object arg = args[ix];
  114. if (arg != null) {
  115. keys.put(p4jSynKey.index(), arg.toString());
  116. }
  117. }
  118. }
  119. if (keys != null && keys.size() > 0) {
  120. for (String key : keys.values()) {
  121. synKey = synKey + key;
  122. }
  123. }
  124. }
  125. return synKey;
  126. } catch (Exception e) {
  127. e.printStackTrace();
  128. }
  129. return null;
  130. }
  131. @SuppressWarnings("unchecked")
  132. private static <T extends Annotation> T getAnnotation(final Class<T> annotationClass, final Annotation[] annotations) {
  133. if (annotations != null && annotations.length > 0) {
  134. for (final Annotation annotation : annotations) {
  135. if (annotationClass.equals(annotation.annotationType())) {
  136. return (T) annotation;
  137. }
  138. }
  139. }
  140. return null;
  141. }
  142. /**
  143. * 获取RedisLock注解信息
  144. */
  145. private P4jSyn getLockInfo(ProceedingJoinPoint pjp) {
  146. try {
  147. MethodSignature methodSignature = (MethodSignature) pjp.getSignature();
  148. Method method = methodSignature.getMethod();
  149. P4jSyn lockInfo = method.getAnnotation(P4jSyn.class);
  150. return lockInfo;
  151. } catch (Exception e) {
  152. e.printStackTrace();
  153. }
  154. return null;
  155. }
  156. public BoundValueOperations<String, Long> getOperations(String key) {
  157. return redisTemplate.boundValueOps(key);
  158. }
  159. /**
  160. * Set {@code value} for {@code key}, only if {@code key} does not exist.
  161. * <p>
  162. * See http://redis.io/commands/setnx
  163. *
  164. * @param key
  165. * must not be {@literal null}.
  166. * @param value
  167. * must not be {@literal null}.
  168. * @return
  169. */
  170. public boolean setIfAbsent(String key, Long value) {
  171. return getOperations(key).setIfAbsent(value);
  172. }
  173. public long getLock(String key) {
  174. Long time = getOperations(key).get();
  175. if (time == null) {
  176. return 0;
  177. }
  178. return time;
  179. }
  180. public long getSet(String key, Long value) {
  181. Long time = getOperations(key).getAndSet(value);
  182. if (time == null) {
  183. return 0;
  184. }
  185. return time;
  186. }
  187. public void releaseLock(String key) {
  188. redisTemplate.delete(key);
  189. }
  190. }

RedisLockAspect会对添加注解的方法进行特殊处理,具体可看lock方法。

大致思路就是:1、首选借助redis本身支持对应的setIfAbsent方法,该方法的特点是如果redis中已有该数据不保存返回false,不存该数据保存返回true;2、如果setIfAbsent返回true标识拿到同步锁,可进行操作,操作后并释放锁;3、如果没有通过setIfAbsent拿到数据,判断是否对锁设置了超时机制,没有设置判断是否需要继续等待;4、判断是否锁已经过期,需要对(System.currentTimeMillis() > getLock(synKey) && (System.currentTimeMillis() > getSet(synKey, keepMills)))进行细细的揣摩一下,getSet可能会改变了其他人拥有锁的超时时间,但是几乎可以忽略;5、没有得到任何锁,判断继续等待还是退出。

第三步:spring的基本配置

  1. #*****************jedis连接参数设置*********************#
  2. #redis服务器ip #
  3. redis.hostName=127.0.0.1
  4. #redis服务器端口号#
  5. redis.port=6379
  6. #redis服务器外部访问密码
  7. redis.password=XXXXXXXXXX
  8. #************************jedis池参数设置*******************#
  9. #jedis的最大分配对象#
  10. jedis.pool.maxActive=1000
  11. jedis.pool.minIdle=100
  12. #jedis最大保存idel状态对象数 #
  13. jedis.pool.maxIdle=1000
  14. #jedis池没有对象返回时,最大等待时间 #
  15. jedis.pool.maxWait=5000
  16. #jedis调用borrowObject方法时,是否进行有效检查#
  17. jedis.pool.testOnBorrow=true
  18. #jedis调用returnObject方法时,是否进行有效检查 #
  19. jedis.pool.testOnReturn=true
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
  3. xmlns:context="http://www.springframework.org/schema/context" xmlns:jee="http://www.springframework.org/schema/jee" xmlns:tx="http://www.springframework.org/schema/tx"
  4. xmlns:aop="http://www.springframework.org/schema/aop" xmlns:redis="http://www.springframework.org/schema/redis" xmlns:cache="http://www.springframework.org/schema/cache"
  5. xsi:schemaLocation="http://www.springframework.org/schema/beans
  6. http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
  7. http://www.springframework.org/schema/context
  8. http://www.springframework.org/schema/context/spring-context-4.2.xsd
  9. http://www.springframework.org/schema/aop
  10. http://www.springframework.org/schema/aop/spring-aop-4.1.xsd
  11. http://www.springframework.org/schema/redis
  12. http://www.springframework.org/schema/redis/spring-redis.xsd
  13. http://www.springframework.org/schema/cache
  14. http://www.springframework.org/schema/cache/spring-cache.xsd">
  15. <!-- 开启注解 -->
  16. <aop:aspectj-autoproxy />
  17. <bean class="com.yaoguoyin.redis.lock.RedisLockAspect" />
  18. <!-- 扫描注解包范围 -->
  19. <context:component-scan base-package="com.yaoguoyin" />
  20. <!-- 引入redis配置 -->
  21. <context:property-placeholder location="classpath:config.properties" />
  22. <!-- 连接池 -->
  23. <bean id="poolConfig" class="redis.clients.jedis.JedisPoolConfig">
  24. <property name="minIdle" value="${jedis.pool.minIdle}" />
  25. <property name="maxIdle" value="${jedis.pool.maxIdle}" />
  26. <property name="maxWaitMillis" value="${jedis.pool.maxWait}" />
  27. </bean>
  28. <!-- p:password="${redis.pass}" -->
  29. <bean id="redisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory" p:host-name="${redis.hostName}" p:port="${redis.port}"
  30. p:password="${redis.password}" p:pool-config-ref="poolConfig" />
  31. <!-- 类似于jdbcTemplate -->
  32. <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate" p:connection-factory-ref="redisConnectionFactory" />
  33. </beans>
redis的安装本文就不再说明。

测试

  1. package com.yaoguoyin.redis;
  2. import org.junit.runner.RunWith;
  3. import org.springframework.test.context.ContextConfiguration;
  4. import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;
  5. import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
  6. @RunWith(SpringJUnit4ClassRunner.class)
  7. @ContextConfiguration(locations = { "classpath:META-INF/spring/redis.xml" })
  8. public class BaseTest extends AbstractJUnit4SpringContextTests {
  9. }
  1. package com.yaoguoyin.redis.lock;
  2. import java.util.concurrent.TimeUnit;
  3. import org.junit.Test;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import com.yaoguoyin.redis.BaseTest;
  6. public class RedisTest extends BaseTest {
  7. @Autowired
  8. private SysTest sysTest;
  9. @Test
  10. public void testHello() throws InterruptedException {
  11. for (int i = 0; i < 100; i++) {
  12. new Thread(new Runnable() {
  13. @Override
  14. public void run() {
  15. try {
  16. TimeUnit.SECONDS.sleep(1);
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. }
  20. sysTest.add("xxxxx", 111111);
  21. }
  22. }).start();
  23. }
  24. TimeUnit.SECONDS.sleep(20);
  25. }
  26. @Test
  27. public void testHello2() throws InterruptedException{
  28. sysTest.add("xxxxx", 111111);
  29. TimeUnit.SECONDS.sleep(10);
  30. }
  31. }

你可以对

void com.yaoguoyin.redis.lock.SysTest.add(@P4jSynKey(index=1) String key, @P4jSynKey(index=0) int key1)
去除注解@P4jSyn进行测试对比。

demo整体下载地址(示例使用了maven):http://download.csdn.net/detail/partner4java/9604967

ps:本demo的执行性能取决于redis和java交互距离;成千山万单锁并发建议不要使用这种形式,直接通过redis等解决,本demo只解决小并发不想耦合代码的形式。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/212626?site
推荐阅读
相关标签
  

闽ICP备14008679号