当前位置:   article > 正文

Redis与RabbitMQ配合使用多线程(多消费者)处理消息_多线程 处理 rabbitmq消息

多线程 处理 rabbitmq消息

引言

并发引起的服务器崩溃是非常常见的现象,为了解决这一问题,目前流行使用缓存数据库与消息队列搭配使用。在最近的项目中也是使用到这一手段,本篇文章通过一个案例为大家展示该套方案如何使用。

案例描述与流程

本案例是一个经典的并发下单的案例。在Redis中存在一条key为Apple,Value为10000的数据,为防止超卖问题的发生使用Redisson分布式锁避免超卖(在Redis解决超卖Demo这篇文章中已经讲过),在一个线程拿到锁并且符合下单条件则直接返回下单成功同时发送消息,使用AMQP监听队列消息,通过线程池创建多个线程作为消费者进行底层DB的更新。

环境准备

创建模块名为Redis

yml配置文件的编写
  1. server:
  2. port: 9000
  3. spring:
  4. application:
  5. name: redis
  6. datasource:
  7. driver-class-name: com.mysql.jdbc.Driver
  8. url: jdbc:mysql://localhost:3306/user?useSSL=false
  9. username: root
  10. password: 123456
  11. redis:
  12. host: 192.168.136.130
  13. port: 6379
  14. password: 123456
  15. lettuce:
  16. pool:
  17. max-active: 10
  18. max-idle: 10
  19. min-idle: 1
  20. time-between-eviction-runs: 10s
  21. rabbitmq:
  22. host: 192.168.136.130 #MQ地址
  23. port: 5672 #端口
  24. virtual-host: / #虚拟主机
  25. username: demo #用户密码
  26. password: 123321
  27. connection-timeout: 1s
  28. template:
  29. retry: #重试机制
  30. enabled: true
  31. initial-interval: 1000ms
  32. multiplier: 1
  33. max-attempts: 3
  34. publisher-confirm-type: correlated
  35. publisher-returns: true
Controller Test类的编写

 设置路径为  /testAddsetxAddFinally

  1. @RestController
  2. @RequestMapping("/")
  3. public class Test {
  4. @Autowired
  5. private RedisTemplate redisTemplate;
  6. @Autowired
  7. private RabbitTemplate rabbitTemplate;
  8. /*使用setnx锁,同时给锁释放过期时间,自动释放锁
  9. * */
  10. @RequestMapping("testAddsetxAddFinally")
  11. String cherkAndReduceStockAddSetnxAddFinally()
  12. {
  13. Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock-stock", "0000",2, TimeUnit.SECONDS);
  14. //获取锁失败,停止50ms,递归调用
  15. if (!lock){
  16. try {
  17. Thread.sleep(3000);
  18. this.cherkAndReduceStockAddSetnxAddFinally();
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. }
  22. }else {
  23. try {
  24. String stock = redisTemplate.opsForValue().get("Apple").toString();
  25. if(stock!=null&&stock.length()!=0)
  26. {
  27. Integer valueOf = Integer.valueOf(stock);
  28. if (valueOf>0)
  29. {
  30. redisTemplate.opsForValue().set("Apple",String.valueOf(--valueOf));
  31. //推送MQ
  32. String queue="demo.queue";
  33. //123456为用户id 1为商品id
  34. String masg="123456:1";
  35. rabbitTemplate.convertAndSend(queue,masg);
  36. return "抢购成功!";
  37. }else {
  38. System.out.println("商品售罄!!!");
  39. return "商品售罄!!!";
  40. }
  41. }
  42. }finally {
  43. redisTemplate.delete("lock-stock");
  44. }
  45. }
  46. return "";
  47. }
  48. }
 编写RedisConfig类序列化存储
  1. @Configuration
  2. public class RedisConfig {
  3. @Bean
  4. public RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory factory)
  5. {
  6. //缓存序列化配置避免存储乱码
  7. RedisTemplate<String,Object> redisTemplate=new RedisTemplate<>();
  8. redisTemplate.setConnectionFactory(factory);
  9. redisTemplate.setKeySerializer(new StringRedisSerializer());
  10. redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
  11. return redisTemplate;
  12. }
  13. }

 

创建Consumer模块

 yml文件
  1. server:
  2. port: 9004
  3. spring:
  4. application:
  5. name: redis
  6. datasource:
  7. driver-class-name: com.mysql.jdbc.Driver
  8. url: jdbc:mysql://localhost:3306/user?useSSL=false
  9. username: root
  10. password: 123456
  11. redis:
  12. host: 192.168.136.130
  13. port: 6379
  14. password: 123456
  15. lettuce:
  16. pool:
  17. max-active: 10
  18. max-idle: 10
  19. min-idle: 1
  20. time-between-eviction-runs: 10s
  21. rabbitmq:
  22. host: 192.168.136.130
  23. port: 5672
  24. virtual-host: /
  25. username: demo
  26. password: 123321
  27. connection-timeout: 1s
  28. template:
  29. retry:
  30. enabled: true
  31. initial-interval: 1000ms
  32. multiplier: 1
  33. max-attempts: 3
  34. publisher-confirm-type: correlated
  35. publisher-returns: true
编写order实体类
  1. package cn.itcast.mq.pojo;
  2. import com.baomidou.mybatisplus.annotation.TableField;
  3. import com.baomidou.mybatisplus.annotation.TableName;
  4. import lombok.Data;
  5. import org.springframework.data.relational.core.mapping.Table;
  6. @Data
  7. @TableName("orderlist")
  8. public class order {
  9. //用户id
  10. @TableField("userId")
  11. private String userId;
  12. //商品id
  13. private String id;
  14. public order(String userId, String id) {
  15. this.userId = userId;
  16. this.id = id;
  17. }
  18. }

注意对应关系

 编写orderMapper
  1. package cn.itcast.mq.mapper;
  2. import cn.itcast.mq.pojo.order;
  3. import com.baomidou.mybatisplus.core.mapper.BaseMapper;
  4. import org.apache.ibatis.annotations.Mapper;
  5. @Mapper
  6. public interface orderMapper extends BaseMapper<order> {
  7. }
编写orderService
  1. package cn.itcast.mq.service;
  2. import cn.itcast.mq.pojo.order;
  3. import com.baomidou.mybatisplus.extension.service.IService;
  4. public interface orderService extends IService<order> {
  5. }
编写Iml实现类
  1. package cn.itcast.mq.service;
  2. import cn.itcast.mq.mapper.orderMapper;
  3. import cn.itcast.mq.pojo.order;
  4. import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
  5. import org.springframework.stereotype.Service;
  6. @Service
  7. public class orderServiceImpl extends ServiceImpl<orderMapper, order> implements orderService{
  8. }
构建Listerner线程池,构建容器工厂

使用@RabbitListener注解指定消费方法,默认情况是单线程监听队列,可以观察当队列有多个任务时消费端每次只消费一个消息,单线程处理消息容易引起消息处理缓慢,消息堆积,不能最大利用硬件资源,可以配置mq的容器工厂参数,增加并发处理数量即可实现多线程处理监听队列,实现多线程处理消息。

  1. package cn.itcast.mq.thread;
  2. import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
  3. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  4. import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. import org.springframework.scheduling.annotation.EnableAsync;
  8. @Configuration
  9. @EnableAsync
  10. public class ThreadPoolConfig {
  11. @Bean("customContainerFactory")
  12. public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
  13. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  14. factory.setConcurrentConsumers(10); //设置线程数
  15. factory.setMaxConcurrentConsumers(10); //最大线程数
  16. configurer.configure(factory, connectionFactory);
  17. return factory;
  18. }
  19. }
编写MQListener监听队列
  1. package cn.itcast.mq.listeners;
  2. import cn.itcast.mq.pojo.order;
  3. import cn.itcast.mq.service.orderService;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.scheduling.annotation.Async;
  8. import org.springframework.stereotype.Component;
  9. @Component
  10. @Slf4j
  11. public class MqListener {
  12. @Autowired
  13. private orderService orderService;
  14. //声明队列 mq的容器工厂
  15. @RabbitListener(queues="demo.queue",containerFactory = "customContainerFactory")
  16. public void listenSimpleQueue(String msg)
  17. {
  18. //拆分消息
  19. String[] split = msg.split(":");
  20. order order = new order(split[0], split[1]);
  21. System.out.println(order.toString());
  22. //保存MYSQL
  23. orderService.save(order);
  24. //测试是否多个消费者
  25. System.out.println("线程" + Thread.currentThread().getName() + " 执行异步任务" );
  26. }
  27. }

RabbitMQ的准备

创建demo.queue队列

 创建demo用户并且配置虚拟主机

 进行测试

启动Redis和Consumer服务

 使用JMeter压测12000个用户

 开始压测

查看队列

观察Consumer控制台,一万条消息瞬间执行完成!

 查看MySQL orderlist表,有一万条数据

 查看Redis 数据库并没有出现超卖问题,案例成功!!

 附加

解决RabbitMQ消息堆积的方案有三种

  • 增加更多消费者,提高消息速度。(本案例采用这一种)
  • 在消费者中开启线程池加快消息处理速度。
  • 扩大队列容积,提高堆积上限,采用惰性队列。

 总结

通过本次演示的案例,希望大家可以掌握并且多加练习,在日常的开发中缓存数据库和异步队列是必备的手段,同时也是大家找工作时的一个亮点。本文如有不妥之处希望大家指正!!!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/1009072
推荐阅读
相关标签
  

闽ICP备14008679号