赞
踩
并发引起的服务器崩溃是非常常见的现象,为了解决这一问题,目前流行使用缓存数据库与消息队列搭配使用。在最近的项目中也是使用到这一手段,本篇文章通过一个案例为大家展示该套方案如何使用。
本案例是一个经典的并发下单的案例。在Redis中存在一条key为Apple,Value为10000的数据,为防止超卖问题的发生使用Redisson分布式锁避免超卖(在Redis解决超卖Demo这篇文章中已经讲过),在一个线程拿到锁并且符合下单条件则直接返回下单成功同时发送消息,使用AMQP监听队列消息,通过线程池创建多个线程作为消费者进行底层DB的更新。
- server:
- port: 9000
- spring:
- application:
- name: redis
- datasource:
- driver-class-name: com.mysql.jdbc.Driver
- url: jdbc:mysql://localhost:3306/user?useSSL=false
- username: root
- password: 123456
-
- redis:
- host: 192.168.136.130
- port: 6379
- password: 123456
- lettuce:
- pool:
- max-active: 10
- max-idle: 10
- min-idle: 1
- time-between-eviction-runs: 10s
-
- rabbitmq:
- host: 192.168.136.130 #MQ地址
- port: 5672 #端口
- virtual-host: / #虚拟主机
- username: demo #用户密码
- password: 123321
- connection-timeout: 1s
- template:
- retry: #重试机制
- enabled: true
- initial-interval: 1000ms
- multiplier: 1
- max-attempts: 3
- publisher-confirm-type: correlated
- publisher-returns: true
-
设置路径为 /testAddsetxAddFinally
-
- @RestController
- @RequestMapping("/")
- public class Test {
- @Autowired
- private RedisTemplate redisTemplate;
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
-
- /*使用setnx锁,同时给锁释放过期时间,自动释放锁
- * */
- @RequestMapping("testAddsetxAddFinally")
- String cherkAndReduceStockAddSetnxAddFinally()
- {
-
- Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock-stock", "0000",2, TimeUnit.SECONDS);
- //获取锁失败,停止50ms,递归调用
- if (!lock){
- try {
- Thread.sleep(3000);
- this.cherkAndReduceStockAddSetnxAddFinally();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- }else {
- try {
- String stock = redisTemplate.opsForValue().get("Apple").toString();
- if(stock!=null&&stock.length()!=0)
- {
- Integer valueOf = Integer.valueOf(stock);
- if (valueOf>0)
- {
- redisTemplate.opsForValue().set("Apple",String.valueOf(--valueOf));
- //推送MQ
- String queue="demo.queue";
-
- //123456为用户id 1为商品id
- String masg="123456:1";
- rabbitTemplate.convertAndSend(queue,masg);
- return "抢购成功!";
-
-
- }else {
- System.out.println("商品售罄!!!");
- return "商品售罄!!!";
- }
- }
- }finally {
- redisTemplate.delete("lock-stock");
- }
-
- }
- return "";
- }
-
- }
- @Configuration
- public class RedisConfig {
- @Bean
- public RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory factory)
- {
- //缓存序列化配置避免存储乱码
-
- RedisTemplate<String,Object> redisTemplate=new RedisTemplate<>();
- redisTemplate.setConnectionFactory(factory);
- redisTemplate.setKeySerializer(new StringRedisSerializer());
- redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
- return redisTemplate;
- }
- }
server: port: 9004 spring: application: name: redis datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://localhost:3306/user?useSSL=false username: root password: 123456 redis: host: 192.168.136.130 port: 6379 password: 123456 lettuce: pool: max-active: 10 max-idle: 10 min-idle: 1 time-between-eviction-runs: 10s rabbitmq: host: 192.168.136.130 port: 5672 virtual-host: / username: demo password: 123321 connection-timeout: 1s template: retry: enabled: true initial-interval: 1000ms multiplier: 1 max-attempts: 3 publisher-confirm-type: correlated publisher-returns: true
- package cn.itcast.mq.pojo;
-
- import com.baomidou.mybatisplus.annotation.TableField;
- import com.baomidou.mybatisplus.annotation.TableName;
- import lombok.Data;
- import org.springframework.data.relational.core.mapping.Table;
-
- @Data
- @TableName("orderlist")
- public class order {
- //用户id
- @TableField("userId")
- private String userId;
- //商品id
- private String id;
-
- public order(String userId, String id) {
- this.userId = userId;
- this.id = id;
- }
- }
注意对应关系
- package cn.itcast.mq.mapper;
-
- import cn.itcast.mq.pojo.order;
- import com.baomidou.mybatisplus.core.mapper.BaseMapper;
- import org.apache.ibatis.annotations.Mapper;
-
- @Mapper
- public interface orderMapper extends BaseMapper<order> {
- }
- package cn.itcast.mq.service;
-
- import cn.itcast.mq.pojo.order;
- import com.baomidou.mybatisplus.extension.service.IService;
-
- public interface orderService extends IService<order> {
- }
- package cn.itcast.mq.service;
-
- import cn.itcast.mq.mapper.orderMapper;
- import cn.itcast.mq.pojo.order;
- import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
-
- import org.springframework.stereotype.Service;
-
- @Service
- public class orderServiceImpl extends ServiceImpl<orderMapper, order> implements orderService{
-
-
- }
使用@RabbitListener注解指定消费方法,默认情况是单线程监听队列,可以观察当队列有多个任务时消费端每次只消费一个消息,单线程处理消息容易引起消息处理缓慢,消息堆积,不能最大利用硬件资源,可以配置mq的容器工厂参数,增加并发处理数量即可实现多线程处理监听队列,实现多线程处理消息。
- package cn.itcast.mq.thread;
-
-
- import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
- import org.springframework.amqp.rabbit.connection.ConnectionFactory;
- import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.scheduling.annotation.EnableAsync;
-
- @Configuration
- @EnableAsync
- public class ThreadPoolConfig {
-
- @Bean("customContainerFactory")
-
- public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
-
- SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
-
- factory.setConcurrentConsumers(10); //设置线程数
-
- factory.setMaxConcurrentConsumers(10); //最大线程数
-
- configurer.configure(factory, connectionFactory);
-
- return factory;
-
- }
- }
-
- package cn.itcast.mq.listeners;
-
-
- import cn.itcast.mq.pojo.order;
- import cn.itcast.mq.service.orderService;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
-
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.scheduling.annotation.Async;
- import org.springframework.stereotype.Component;
-
- @Component
- @Slf4j
- public class MqListener {
-
-
- @Autowired
- private orderService orderService;
-
-
- //声明队列 mq的容器工厂
- @RabbitListener(queues="demo.queue",containerFactory = "customContainerFactory")
- public void listenSimpleQueue(String msg)
- {
- //拆分消息
- String[] split = msg.split(":");
- order order = new order(split[0], split[1]);
- System.out.println(order.toString());
- //保存MYSQL
- orderService.save(order);
-
- //测试是否多个消费者
- System.out.println("线程" + Thread.currentThread().getName() + " 执行异步任务" );
- }
-
- }
查看队列
观察Consumer控制台,一万条消息瞬间执行完成!
查看MySQL orderlist表,有一万条数据
查看Redis 数据库并没有出现超卖问题,案例成功!!
解决RabbitMQ消息堆积的方案有三种
通过本次演示的案例,希望大家可以掌握并且多加练习,在日常的开发中缓存数据库和异步队列是必备的手段,同时也是大家找工作时的一个亮点。本文如有不妥之处希望大家指正!!!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。