当前位置:   article > 正文

自定义注解解决消息幂等性问题

自定义注解解决消息幂等性问题

1.什么是幂等性问题

        用户的一个请求由于网络波动未完成,而用户那边并不知道,于是反复点击刷新,这时候后端就会接收到大量相同的请求,这时候就需要进行幂等处理,使得多个相同请求跟一个请求所得结果相同。而对于get和delete请求,具有天生的幂等性,故只用考虑put和post请求即可。还有一种情况则是消息队列的重复消费问题,其原因是一样的。

2.如何解决

        利用token的验证机制,当第一个请求过来时,向redis中写入一个key-value,其中key需要保证相同请求一定相同,不同请求则一定不同,来保证请求是相同的,当然,多个相同的请求也可能是由于订阅模式造成的,所以还需要根据value来判断是否是同一用户发起的请求。

3代码实现

定义注解

  1. @Target({ElementType.TYPE,ElementType.METHOD})
  2. @Retention(RetentionPolicy.RUNTIME)
  3. @Documented
  4. public @interface Repeat {
  5. //消息的最长消费时间,超过该时间则判断消费失败
  6. long time() default 60000L;
  7. }

定义切面

        其中RepeatServiceAdapter是一个接口类,可以自定义对什么类型的消息进行幂等处理,这里只给出处理http请求的重复请求问题,后续添加处理器只需要添加实现类即可

  1. @Aspect
  2. @Component
  3. public class RepeatAspect {
  4. @Resource
  5. private List<RepeatServiceAdapter> repeatServiceAdapters;//自动注入接口的所有实现类
  6. @Pointcut("@annotation(com.cg.annotation.Repeat)")
  7. public void pointcut() {
  8. }
  9. @Around("pointcut()")
  10. public Object doAround(ProceedingJoinPoint pjp) throws Throwable {
  11. //获取代理的方法
  12. MethodSignature methodSignature = (MethodSignature) pjp.getSignature();
  13. Method method = methodSignature.getMethod();
  14. for (RepeatServiceAdapter repeatServiceAdapter : repeatServiceAdapters) {
  15. //判断是否符合对应的设配器
  16. if (repeatServiceAdapter.support(method) || repeatServiceAdapter.support(method.getClass())) {
  17. return repeatServiceAdapter.resolve(pjp);
  18. }
  19. }
  20. //没有适配的则直接放行
  21. return pjp.proceed();
  22. }
  23. }

定义处理逻辑的适配器

  1. /**
  2. * 解决重复请求的问题
  3. */
  4. public interface RepeatServiceAdapter {
  5. /**
  6. * 判断是否符合该适配器
  7. * @param clazz
  8. * @return
  9. */
  10. boolean support(Class<?> clazz);
  11. boolean support(Method method);
  12. /**
  13. * 处理重复请求
  14. * @param pjp 切面类
  15. * @return 处理结果
  16. * @throws Throwable
  17. */
  18. Object resolve(ProceedingJoinPoint pjp) throws Throwable;
  19. }
  1. /**
  2. * 处理http重复请求问题
  3. */
  4. @Component
  5. @Slf4j
  6. public class RequestRepeatAdapter implements RepeatServiceAdapter {
  7. @Autowired
  8. private RedisTemplate<String, String> redisTemplate;
  9. @Override
  10. public boolean support(Class<?> clazz) {
  11. return clazz.isAnnotationPresent(PostMapping.class) || clazz.isAnnotationPresent(PutMapping.class);
  12. }
  13. @Override
  14. public boolean support(Method method) {
  15. return method.isAnnotationPresent(PostMapping.class) || method.isAnnotationPresent(PutMapping.class);
  16. }
  17. @Override
  18. public Object resolve(ProceedingJoinPoint pjp) throws Throwable {
  19. MethodSignature methodSignature = (MethodSignature) pjp.getSignature();
  20. //获取方法上的注释
  21. Method method = methodSignature.getMethod();
  22. //获取request
  23. ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
  24. //获取请求头中的repeat属性
  25. HttpServletRequest request = attributes.getRequest();
  26. //repeat是前端访问时需要携带的key值
  27. String repeat = request.getHeader("repeat");
  28. //获取请求路径中的userId,也可以从token中解析
  29. String userId = request.getParameter("userId");
  30. //判断是否存在该请求头的uuid以及对应的userId
  31. return getObject(pjp, method, repeat, userId);
  32. }
  33. private Object getObject(ProceedingJoinPoint pjp, Method method, String repeat, String userId) throws Throwable {
  34. if (Boolean.TRUE.equals(redisTemplate.hasKey(repeat)) && userId.equals(redisTemplate.opsForValue().get(repeat))) {
  35. //判断该请求为重复请求,执行拦截
  36. log.info("重复请求...");
  37. return ResultInfo.error(CodeEnum.REPEAT_REQUEST);
  38. } else {
  39. //不是重复请求,则放行
  40. //获取最长重复等待时间并保存uuid
  41. Repeat annotation = method.getAnnotation(Repeat.class);
  42. long time = annotation.time();
  43. //双重检查锁保证线程安全
  44. synchronized (this.getClass()) {
  45. if (Boolean.TRUE.equals(redisTemplate.hasKey(repeat)) && userId.equals(redisTemplate.opsForValue().get(repeat))) {
  46. //判断该请求为重复请求,执行拦截
  47. log.info("重复请求...");
  48. return ResultInfo.error(CodeEnum.REPEAT_REQUEST);
  49. }
  50. redisTemplate.opsForValue().set(repeat, userId, time, TimeUnit.MILLISECONDS);
  51. }
  52. log.info("执行请求...");
  53. Object proceed = pjp.proceed();
  54. //执行结束后删除uuid
  55. redisTemplate.delete(repeat);
  56. return proceed;
  57. }
  58. }
  59. }

4.结果验证

controller方法

  1. @Repeat
  2. @PostMapping("login")
  3. public ResultInfo<?> insert() throws InterruptedException {
  4. testMapper.insert();
  5. //模拟网络波动
  6. Thread.sleep(1000);
  7. return ResultInfo.success(CodeEnum.SUCCESS);
  8. }

多线程模拟重复请求

  1. public class Client {
  2. @Test
  3. public void test() throws Exception{
  4. for (int i = 0; i < 10; i++) {
  5. new Thread(()->{
  6. try {
  7. String url = "http://localhost:8010/login?userId=1"; // 要发送POST请求的URL
  8. URL obj = new URL(url);
  9. HttpURLConnection con = (HttpURLConnection) obj.openConnection();
  10. con.setRequestMethod("POST");
  11. con.setRequestProperty("repeat", "b5ed28e9-e7c2-4a05-8b4d-aaee58984e24"); // 在HTTP头中添加名为"repeat"的字段
  12. BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
  13. String inputLine;
  14. StringBuffer response = new StringBuffer();
  15. while ((inputLine = in.readLine()) != null) {
  16. response.append(inputLine);
  17. }
  18. in.close();
  19. String json = response.toString(); // JSON数据
  20. System.out.println(json);
  21. } catch (Exception e) {
  22. throw new RuntimeException(e);
  23. }
  24. }).start();
  25. }
  26. Thread.sleep(100000L);
  27. }
  28. }

结果

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/504686
推荐阅读
相关标签
  

闽ICP备14008679号