当前位置:   article > 正文

java将请求写入队列,再用异步消费该队列,高并发实战。_异步线程消费队列

异步线程消费队列

一、背景

   在开发项目过程中,遇到一个问题就是,一个系统与上百个系统对接,有大量的请求和有可能数据是重复发送,但是在数据必须保证数据不能重复消费。

二、设计思路

  刚才开始设计思路,按照往常写代码,编写controller,service,dao层,一气呵成。但是在使用过程出现,大量的请求导致服务器不可以,直接宕机。出现这个问题,那没有办法得优化。

设计思路:

   1、将所有的请求,放到队列里面。

   2、异步消费队列里面的数据。

三、代码实现

1、controller层代码  

  1. @RequestMapping("/api")
  2. @RestController
  3. @Api(value = "账号监控平台系统相关接口", tags = {"账号监控平台系统相关接口"})
  4. @Slf4j
  5. public class UserDepartmentController {
  6. private static LinkedBlockingDeque<List<UserLogin>> deque = new LinkedBlockingDeque<>();
  7. @Autowired
  8. private UserLoginManagementService userLoginManagementService;
  9. @Autowired
  10. private UserLoginService userLoginService;
  11. @Autowired
  12. private Sid sid;
  13. @ApiOperation(value = "推送用户登录日志接口",notes = "推送用户登录日志接口", httpMethod = "POST")
  14. @PostMapping("/userLog/add")
  15. public R addUserLogQueue(@Validated @RequestBody List< @Valid UserLogin> list) {
  16. if (list == null || list.isEmpty()) {
  17. return R.errorMsg("传数据不能为空");
  18. }
  19. try {
  20. List<DataMsg> resultList = new ArrayList<>(); //返回给调用接口集合
  21. List<UserLogin> saveList = new ArrayList<>(); // 保存数据的list
  22. // 判断参数不能为空,
  23. for(UserLogin userLogin : list){
  24. R result = ParameterUtil.userParameter(userLogin);
  25. if(result.getCode().equals(RStatus.fail.getCode())){
  26. DataMsg dataMsg = new DataMsg();
  27. BeanUtils.copyProperties(userLogin,dataMsg);
  28. dataMsg.setMessage(result.getMsg());
  29. log.error("【UserDepartmentController -- addUserLog】传入的登录信息数据有空,入参的参数数据:{},消息的提醒:{}",userLogin.toString(),result.getMsg());
  30. resultList.add(dataMsg);
  31. }else {
  32. userLogin.setUserId(userLogin.getSystemName()+"-"+userLogin.getUserId());
  33. saveList.add(userLogin);
  34. }
  35. }
  36. if(saveList != null && saveList.size() > 0 ){
  37. // String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
  38. //log.info("【UserDepartmentController -- addUserLog】传入的登录信息数据,入参的时间:{},入参的数据:{} ",time, saveList.toString());
  39. if(deque.remainingCapacity() > 0){
  40. deque.put(saveList);
  41. }else {
  42. return R.errorMsg("队列已经满了,请稍后重试");
  43. }
  44. SaveUserLogQueueRunnable saveUserLogQueueRunnable = new SaveUserLogQueueRunnable(deque,userLoginService,userLoginManagementService,sid);
  45.   //刚开始使用多线程,但是如果相同的数据发送多次请求,会导致数据会重复。后面修改上面的方式,单线程去处理。
  46. ThreadPoolExecutor cupInstance = ThreadPoolUtils.getCUPInstance();
  47. cupInstance.execute(saveUserLogQueueRunnable);
  48. }
  49. if(resultList != null && resultList.size() >0){
  50. return R.build(500,"失败",resultList);
  51. }else{
  52. return R.ok();
  53. }
  54. }catch (Exception e){
  55. e.printStackTrace();
  56. return R.errorMsg("请求接口异常");
  57. }
  58. }
  59. }
  1. @Slf4j
  2. public class SaveUserLogQueueRunnable implements Runnable {
  3. private UserLoginService userLoginService;
  4. private UserLoginManagementService userLoginManagementService;
  5. private Sid sid;
  6. private LinkedBlockingDeque<List<UserLogin>> deque;
  7. public SaveUserLogQueueRunnable(LinkedBlockingDeque<List<UserLogin>> deque, UserLoginService userLoginService, UserLoginManagementService userLoginManagementService, Sid sid){
  8. this.deque = deque;
  9. this.userLoginService = userLoginService;
  10. this.userLoginManagementService = userLoginManagementService;
  11. this.sid = sid;
  12. }
  13. @Override
  14. public void run() {
  15. try {
  16. //开始处理请求队列中的请求,按照队列的FIFO的规则,先处理先放入到队列中的请求
  17. while (deque != null && deque.size() > 0){
  18. List<UserLogin> take = deque.take(); //取出队列中的请求
  19. addLog(take); //处理请求
  20. }
  21. }catch (Exception e){
  22. log.error("【SaveUserLogQueueRunnable 】处理队列出现错误如下:");
  23. e.printStackTrace();
  24. }
  25. }
  26. public synchronized void addLog( List<UserLogin> list) {
  27. String time1 = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
  28. System.out.println("list的大小: "+list.size()+"----时间为: "+ time1);
  29. for (UserLogin userLogin :list){
  30. // 这是自己写业务处理的代码。
  31. }
  32. }
  33. }

 

public class Constants {
    // 消费者,单一线程, 进行处理业务逻辑
    public static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();


}

四、测试

代码写完了用jmeter 工具测试,,20个并发。

请求参数:

测试结果:预期结果,数据库里面的数据和入参的数据一致。但是出现有重复的数据。有出现问题,还得解决。

解决办法:使用 newSingleThreadScheduledExecutor 单线程去执行,从队列里面拿一条数据消费一条数据

 

再次测试:还是 jmeter 工具测试,,20个并发。发现不会出现重复的数据。

 

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/酷酷是懒虫/article/detail/976213
推荐阅读
相关标签
  

闽ICP备14008679号