当前位置:   article > 正文

大数据互联网架构阶段 QuartZ定时任务+RabbitMQ消息队列_quartz rabbitmq

quartz rabbitmq

QuartZ定时任务+RabbitMQ消息队列

一 、QuartZ定时任务解决订单系统遗留问题

  1. 情景分析:
    1. 在电商项目中 , 订单生成后 , 数据库商品数量-1 , 但是用户迟迟不进行支付操作 , 这样不仅导致没有生成效益, 而且导致后来的用户无法购买的问题 。 这种情况的订单成为恶意订单
  2. 解决方案:
    1. 早期: 虚拟商品数量 , 数量减到0时 ,仍然可以减。 但是这种办法显然没有根本的解决问题
    2. 现在引入定时任务的概念 ,超时未支付 , 则订单失效, 商品回库 。 如:12306一般为15分钟 , 天猫是1天 。
  3. 解决:
    1. 可以使用java中自带的api–Timer , 但是操作比较繁琐 。
    2. 电商项目中引入第三方插件:石英钟quartZ
    3. 定时任务逻辑: 设置24小时支付超时
    4. 如何判断一个订单是否超时?
      1. 订单中有status(订单状态 1未付款 2已付款 3未发货4已发货5交易成功 6交易失败)字段和createTime(订单生成时间)字段 , 检索订单表中创建时间距离现在大于24 , 并且status为1 的数据 , update status为6。
    5. 如果判断超时 , 后续如何处理?
      1. update status为6(订单失效) , 对应的商品数量回退归还库存
  4. 石英钟插件
    1. 核心组件
      1. JobDetail+job
        1. 继承自石英钟 的父类,启动容器后,一旦加载了JobDetail的实例,其中JobDetail下的多个job逻辑需要编写代码(是整个使用石英钟过程唯一编写代码的位置);加载这个实例后,石英钟会为这个任务组注册一个调度器(scheduler)
      2. 调度器(Scheduler)
        1. 负责调度对应一个JobDetail的时间出发器,trigger
      3. 触发器(Trigger)
        1. 管理出发当前一个石英钟逻辑的JobDetail的组件
        2. 时间的计算表达式;何时触发任务执行是由触发器计算管理的;
        3. 分为两种触发器:
          1. simple触发器:只能简单的完成circle时间逻辑;每隔一段时间进行任务触发;
          2. cron触发器:功能比较强大,可以完成simple的任务,还可以控制时间精度;每周五上午5点触发一次,每个月第一个周末的上午五点;
    2. 石英钟调度过程
      1. 创建JobDetail实例–>注册调度器–>调度触发器–>计算时间触发逻辑–>触发任务代码–>执行job代码
  5. 将quartz引入电商项目中

    1. 添加依赖

      <dependency>
          <groupId>org.quartz-scheduler</groupId>
          <artifactId>quartz</artifactId>
          <version>2.2.1</version>
      </dependency>
      
      • 1
      • 2
      • 3
      • 4
      • 5
    2. 添加配置文件applicationContext-scheduler.xml , 与spring框架整合

          <?xml version="1.0" encoding="UTF-8"?>
          <beans xmlns="http://www.springframework.org/schema/beans"
              xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
              xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
              xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
              xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
              http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
              http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
              http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.0.xsd">
      
              <!-- 定义任务bean -->
              <bean name="paymentOrderJobDetail" class="org.springframework.scheduling.quartz.JobDetailFactoryBean">
                  <!-- 指定具体的job类 -->
                  <property name="jobClass" value="com.jt.store.order.job.PaymentOrderJob" />
                  <!-- 指定job的名称 -->
                  <property name="name" value="paymentOrder" />
                  <!-- 指定job的分组 -->
                  <property name="group" value="Order" />
                  <!-- 必须设置为true,如果为false,当没有活动的触发器与之关联时会在调度器中删除该任务  -->
                  <property name="durability" value="true"/>
                  <!-- 指定spring容器的key,如果不设定在job中的jobmap中是获取不到spring容器的 -->
                  <property name="applicationContextJobDataKey" value="applicationContext"/>
              </bean>
      
              <!-- 定义触发器 -->
              <bean id="cronTrigger" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
                  <property name="jobDetail" ref="paymentOrderJobDetail" />
                  <!-- 每一分钟执行一次 -->
                  <property name="cronExpression" value="0 0/1 * * * ?" />
              </bean>
      
              <!-- 定义调度器 -->
              <bean class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
                  <property name="triggers">
                      <list>
                          <ref bean="cronTrigger" />
                      </list>
                  </property>
              </bean>
      
          </beans>
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
    3. 自定义一个job执行类 , 需要继承石英钟的父类 , 编写完成之后需要修改配置文件中的自定义job类的全类名

      public class PayMentOrderJob extends QuartzJobBean{
      
          @Override
          protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
              //通过加载文件获取spring的环境信息
              ApplicationContext applicationContext=
              (ApplicationContext) context.getJobDetail().getJobDataMap().get("applicationContext");
              //获取对应mapper中的bean,来执行对应的sql语句
              //传递的参数就是过期时间;需要你在代码中计算过期时间吧?
              //执行自己需要执行的逻辑
      applicationContext.getBean(OrderMapper.class).paymentOrderScan(new DateTime().minusDays(1).toDate());
              //System.out.println("hahahahahahah");
      
          }
      
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16

二、 RabbitMQ消息队列

  1. 电商项目中引入niginx 、 redis 、 amoeba 之后 , 电商项目 的架构性能得到提升 , 其中:
    1. nginx 解决了日常中的高并发
    2. redis 使用内存缓存数据 , 分担了数据库的一部分压力
    3. amoeba 实现数据库读写分离 , 保护了数据库这最后一道关卡
  2. 但是在超负荷请求的情况下(双十一) , 请求量陡增 , 以上三个技术无法处理。
    1. 例如:淘宝1000W/s,处理日常请求不在话下;双十一3000W/s;2000w/s请求,被拒绝了;
  3. 引入队列的概念:也是一种缓存,可以在队列中等待被处理;游戏登录:服务器爆满,您是第3984位等待者
  4. java中其实自带了一套消息队列的api— JMS , 但是性能极低
  5. 后来第三方消息队列插件出现, 于是sun公司提出一套AMQP标准 , 只要满足AMQP原则 , 就是一套性能超高的消息队列 , RabbitMQ就是符合AMQP标准的消息队列
  6. RabbitMQ
    1. 基于erlang语言: 是一种支持高并发的语言
    2. RabbitMQ的六种工作模式:
      1. simple简单模式
        1. 消息产生着(p)将消息放入队列
        2. 消息的消费者(consumer) 监听(while) 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失)应用场景:聊天(中间有一个过度的服务器;p端,c端)
      2. work工作模式(资源的竞争)
        1. 消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2,同时监听同一个队列,消息被消费?C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患,高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关(syncronize,与同步锁的性能不一样) 保证一条消息只能被一个消费者使用)
        2. 应用场景:红包;大项目中的资源调度(任务分配系统不需知道哪一个任务执行系统在空闲,直接将任务扔到消息队列中,空闲的系统自动争抢)
      3. publish/subscribe发布订阅(共享资源)
        1. X代表交换机rabbitMQ内部组件,erlang 消息产生者是代码完成,代码的执行效率不高,消息产生者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费
        2. 相关场景:邮件群发,群聊天,广播(广告)
      4. routing路由模式
        1. 消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息;
        2. 根据业务功能定义路由字符串
        3. 从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,可以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误;
      5. topic 主题模式(路由模式的一种)
        1. 星号井号代表通配符
        2. 星号代表多个单词,井号代表一个单词
        3. 路由功能添加模糊匹配
        4. 消息产生者产生消息,把消息交给交换机
        5. 交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费
      6. RPC 先不做解释
    3. 这里注意三个别名
      1. 从publish routing topic(都叫发布订阅)
      2. publish:fanout
      3. routing:direct
      4. topic:topic
  7. rabbitmq的安装与使用

    1. rabbitmq使用erlang语言 开发 ,所以需要erlang 的运行环境 , 先安装erlang 在虚拟机中执行wget http://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm 下载
    2. 创建一个rabbitmq的管理目录 , 将下载的erlang资源移入管理目录
    3. 安装 rpm -Uvh erlang-solutions-1.0.1.noarch.rpm U:表示安装的时候所有的包自动更新
    4. yum -y install erlang这里的过程非常缓慢,需要下载将60个rpm包安装,一旦现在失败,在执行一次将失败的包重新下载安装即可
    5. 安装rabbitmq
    6. 上传rpm
    7. 上传rabbitmq-server-3.6.1-1.noarch.rpm文件到rabbitmq管理目录
    8. 执行安装
    9. 安装rpm之后很多文件都是散列放置的 ,默认只允许localhost用户访问 需要配置开启用户远程访问 执行cp /usr/share/doc/rabbitmq-server-3.6.1/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config(默认的安装目录为/usr/share/doc/rabbitmq-server-3.6.1)
    10. 修改配置文件vim /etc/rabbitmq/rabbitmq.config P64行

      %% {loopback_users, []},
      修改1:去掉前面的两个%%,
      修改2:去掉最后面的逗号,保存。
      
      • 1
      • 2
      • 3

    11. 开启后台管理插件(rabbitmq web界面管理工具) ,执行rabbitmq-plugins enable rabbitmq_management
    12. 开启15672端口和5672端口

      /sbin/iptables -I INPUT -p tcp --dport 15672 –j ACCEPT  #控制台端口 浏览器访问端口
      /sbin/iptables -I INPUT -p tcp --dport 5672 –j ACCEPT         #程序访问端口
      
      • 1
      • 2
    13. 保存防火墙修改操作

      /etc/rc.d/init.d/iptables save
      /etc/init.d/iptables status
      
      • 1
      • 2
    14. 开启rabbitmq服务

      service rabbitmq-server start 开启
      service rabbitmq-server stop 关闭
      service rabbitmq-server restart  重启
      
      • 1
      • 2
      • 3
    15. 设置开机启动
      chkconfig rabbitmq-server on
    16. 使用浏览器访问rabbitmq web控制台 ip:15672
    17. 默认的用户名和密码都是guest
  8. rabbitmq web控制台的使用:
    1. 不能直接使用guest访问账号管理rabbitmq , 需要自定义一个具有对应更能权限的独立账号
    2. 创建账号
    3. 点击添加user可以看到在上面的列表中有了丢应的user但是目前没有绑定的vh
    4. 虚拟机相当于redis中的分库分区(select0-15)。添加VH,这里的vh名称必须在程序访问时保持一致,有/
    5. 绑定用户,点击虚拟机
  9. rabbitmq示例代码

    定义rabbitmq工具类
    public class ConnectionUtils {
    
        public static Connection getConnettion() throws IOException{
            ConnectionFactory factory= new ConnectionFactory();
            //需要连接信息;
            factory.setHost("106.75.74.254");
            factory.setPort(5672);
            factory.setVirtualHost("/jt");
            factory.setUsername("jtadmin");
            factory.setPassword("123456");
            Connection cn = factory.newConnection();
            return cn;
    
        }
    
    }
    
    simple模式
    public class RabbitMQTest {
    
        @Test
        public void testSend() throws Exception{
            /*
             * 1 创建连接工厂
             * 2 从工厂穿件connection
             * 3 从连接获取channel
             * 4 从channel声明绑定queue
             * 5 生产者生产消息放入队列
             * 6 释放资源
             */
        ConnectionFactory factory= new ConnectionFactory();
        //需要连接信息;
        factory.setHost("106.75.74.254");
        factory.setPort(5672);
        factory.setVirtualHost("/jt");
        factory.setUsername("jtadmin");
        factory.setPassword("123456");
        Connection cn = factory.newConnection();
        Channel chan = cn.createChannel();
    
        String QUEUE_NAME="simple";
        chan.queueDeclare(QUEUE_NAME, false, false, false, null);
        //第二个参数是否持久化,第三个参数是否独占queue,第四个参数不使用达到一定时间是否自动删除,第五个参数
        //其他参数;
        String msg="hello simple mode11111111";//模拟传送到mq中的消息
        //把消息发送到queue
        chan.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        //第一个参数与路由有关
        chan.close();
        cn.close();
        }
    
    
        @Test//消息消费端
        public void testRec() throws Exception{
            /*
             * 1 创建连接工厂
             * 2 从工厂穿件connection
             * 3 从连接获取channel
             * 4 从channel声明绑定queue
             *5 消费者消费消息
             * 6 释放资源
             */
            ConnectionFactory factory= new ConnectionFactory();
            //需要连接信息;
            factory.setHost("106.75.74.254");
            factory.setPort(5672);
            factory.setVirtualHost("/jt");
            factory.setUsername("jtadmin");
            factory.setPassword("123456");
            Connection cn = factory.newConnection();
            Channel chan = cn.createChannel();
            String QUEUE_NAME="simple";
            chan.queueDeclare(QUEUE_NAME, false, false, false, null);
            //创建一个consumer
            QueueingConsumer consumer=new QueueingConsumer(chan);
            chan.basicConsume(QUEUE_NAME, true,consumer);
            while(true){
                Delivery delivery=consumer.nextDelivery();
                String msg=new String(delivery.getBody());
                System.out.println(msg);
            }
        }
    }
    
    work模式
    public class WorkTest {
        @Test
        public void testSend() throws Exception{
            Connection cn=ConnectionUtils.getConnettion();
            Channel chan=cn.createChannel();
            chan.queueDeclare("work111", false, false, false, null);
            //for循环发送消息
            for(int i=0;i<100;i++){
                String msg="work"+i;
                chan.basicPublish("", "work111", null, msg.getBytes());
                System.out.println("发送消息第"+i+"条");
            }
            chan.close();
            cn.close();
        }
    
        @Test
        public void testRec1() throws Exception{
            Connection cn=ConnectionUtils.getConnettion();
            Channel chan=cn.createChannel();
            chan.queueDeclare("work111", false, false, false, null);
            chan.basicQos(1);
            QueueingConsumer consumer=new QueueingConsumer(chan);
            chan.basicConsume("work111", false,consumer);
            //获取消息
            while(true){
                QueueingConsumer.Delivery delivery=consumer.nextDelivery();
                String msg=new String(delivery.getBody());
                System.out.println("接收到消息:"+msg);
                //模拟机器性能
                Thread.sleep(10);
                chan.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
        @Test
        public void testRec2() throws Exception{
            Connection cn=ConnectionUtils.getConnettion();
            Channel chan=cn.createChannel();
            chan.queueDeclare("work111", false, false, false, null);
            chan.basicQos(1);
            QueueingConsumer consumer=new QueueingConsumer(chan);
            chan.basicConsume("work111", false,consumer);
            //获取消息
            while(true){
                QueueingConsumer.Delivery delivery=consumer.nextDelivery();
                String msg=new String(delivery.getBody());
                System.out.println("接收到消息:"+msg);
                //模拟机器性能
                Thread.sleep(500);
                chan.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    
    publish模式
    public class PublishTest {
        @Test
        public void testSend() throws Exception{
        Connection cn=ConnectionUtils.getConnettion();
        Channel chan=cn.createChannel();
        //声明交换机exchange
        chan.exchangeDeclare("jt_publish", "fanout");//交换机的名称,和交换机的种类
        for(int i=0;i<100;i++){
            String msg="publish num"+i;
            chan.basicPublish("jt_publish", "", null, msg.getBytes());
            System.out.println("发送了第"+i+"条");
            }
        }
        @Test
        public void testRec1() throws Exception{
            Connection cn= ConnectionUtils.getConnettion();
            Channel chan=cn.createChannel();
            //声明队列
            chan.queueDeclare("publish01", false, false, false, null);
            //绑定交换机
            chan.exchangeDeclare("jt_publish", "fanout");
            //绑定队列到交换机
            chan.queueBind("publish01", "jt_publish", "");
            //同一时刻服务器只发送一条消息给消费者消费
            chan.basicQos(1);
            //定义队列的消费者
            QueueingConsumer cons=new QueueingConsumer(chan);
            chan.basicConsume("publish01", false,cons);
            //获取消息
            while(true){
                QueueingConsumer.Delivery deli=cons.nextDelivery();
                String msg=new String(deli.getBody());
                System.out.println("接收到消息:"+msg);
                chan.basicAck(deli.getEnvelope().getDeliveryTag(), false);
            }
        }
    
        @Test
        public void testRec2() throws Exception{
            Connection cn= ConnectionUtils.getConnettion();
            Channel chan=cn.createChannel();
            //声明队列
            chan.queueDeclare("publish02", false, false, false, null);
            //绑定交换机
            chan.exchangeDeclare("jt_publish", "fanout");
            //绑定队列到交换机
            chan.queueBind("publish02", "jt_publish", "");
            //同一时刻服务器只发送一条消息给消费者消费
            chan.basicQos(1);
            //定义队列的消费者
            QueueingConsumer cons=new QueueingConsumer(chan);
            chan.basicConsume("publish02", false,cons);
            //获取消息
            while(true){
                QueueingConsumer.Delivery deli=cons.nextDelivery();
                String msg=new String(deli.getBody());
                System.out.println("接收到消息:"+msg);
                chan.basicAck(deli.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    
    routing
    public class RoutingTest {
        @Test
        public void testSend() throws Exception{
        Connection cn=ConnectionUtils.getConnettion();
        Channel chan=cn.createChannel();
        //声明交换机exchange
        chan.exchangeDeclare("jt_routing", "topic");//交换机的名称,和交换机的种类
        //生成消息
        String msg="哈哈哈哈";
        String routingKey="item.add";
        chan.basicPublish("jt_routing", routingKey, null, msg.getBytes());
        System.out.println("发送完成");
        chan.close();
        cn.close();
        }
    
        @Test
        public void testRec1() throws Exception{
            Connection cn=ConnectionUtils.getConnettion();
            Channel chan=cn.createChannel();
            chan.exchangeDeclare("jt_routing", "direct");
            chan.queueDeclare("routing01", false, false, false, null);
            chan.queueBind("routing01", "jt_routing", "item.add");
            //生成consumer
            QueueingConsumer cons=new QueueingConsumer(chan);
            chan.basicConsume("routing01", false,cons);
            while(true){
                QueueingConsumer.Delivery deli=cons.nextDelivery();
                String msg=new String(deli.getBody());
                System.out.println("一号接受者接收到消息:"+msg);
                chan.basicAck(deli.getEnvelope().getDeliveryTag(), false);
            }
        }
    
        @Test
        public void testRec2() throws Exception{
            Connection cn=ConnectionUtils.getConnettion();
            Channel chan=cn.createChannel();
            chan.exchangeDeclare("jt_routing", "direct");
            chan.queueDeclare("routing02", false, false, false, null);
            chan.queueBind("routing02", "jt_routing", "item1.update");
            //生成consumer
            QueueingConsumer cons=new QueueingConsumer(chan);
            chan.basicConsume("routing02", false,cons);
            while(true){
                QueueingConsumer.Delivery deli=cons.nextDelivery();
                String msg=new String(deli.getBody());
                System.out.println("二号接受者接收到消息:"+msg);
                chan.basicAck(deli.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    
    topic模式
    public class RoutingTest {
        @Test
        public void testSend() throws Exception{
        Connection cn=ConnectionUtils.getConnettion();
        Channel chan=cn.createChannel();
        //声明交换机exchange
        chan.exchangeDeclare("jt_routing", "topic");//交换机的名称,和交换机的种类
        //生成消息
        String msg="哈哈哈哈";
        String routingKey="item.#";
        chan.basicPublish("jt_routing", routingKey, null, msg.getBytes());
        System.out.println("发送完成");
        chan.close();
        cn.close();
        }
    
        @Test
        public void testRec1() throws Exception{
            Connection cn=ConnectionUtils.getConnettion();
            Channel chan=cn.createChannel();
            chan.exchangeDeclare("jt_routing", "direct");
            chan.queueDeclare("routing01", false, false, false, null);
            chan.queueBind("routing01", "jt_routing", "item.add");
            //生成consumer
            QueueingConsumer cons=new QueueingConsumer(chan);
            chan.basicConsume("routing01", false,cons);
            while(true){
                QueueingConsumer.Delivery deli=cons.nextDelivery();
                String msg=new String(deli.getBody());
                System.out.println("一号接受者接收到消息:"+msg);
                chan.basicAck(deli.getEnvelope().getDeliveryTag(), false);
            }
        }
    
        @Test
        public void testRec2() throws Exception{
            Connection cn=ConnectionUtils.getConnettion();
            Channel chan=cn.createChannel();
            chan.exchangeDeclare("jt_routing", "direct");
            chan.queueDeclare("routing02", false, false, false, null);
            chan.queueBind("routing02", "jt_routing", "item1.update");
            //生成consumer
            QueueingConsumer cons=new QueueingConsumer(chan);
            chan.basicConsume("routing02", false,cons);
            while(true){
                QueueingConsumer.Delivery deli=cons.nextDelivery();
                String msg=new String(deli.getBody());
                System.out.println("二号接受者接收到消息:"+msg);
                chan.basicAck(deli.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268
    • 269
    • 270
    • 271
    • 272
    • 273
    • 274
    • 275
    • 276
    • 277
    • 278
    • 279
    • 280
    • 281
    • 282
    • 283
    • 284
    • 285
    • 286
    • 287
    • 288
    • 289
    • 290
    • 291
    • 292
    • 293
    • 294
    • 295
    • 296
    • 297
    • 298
    • 299
    • 300
    • 301
    • 302
    • 303
    • 304
    • 305
    • 306
    • 307
    • 308
    • 309
    • 310
    • 311
  10. 秒杀业务场景:

    业务场景分析:
     
    并发量很高单例rabbitmq的并发量在百万级以上
    有个商品在每天的8点开抢,每天的量5万台iphone4s
     
    可以引入rabbitmq
    在抢单的请求发起后,可以将前台的抢单信息放到rabbitmq;
    simple 队列
    100万个人抢,写入队列中的消息可以是什么?
    电话号码
    ticket
    username
    生产者,前台的controller或者service调用将请求中的信息保存到队列中;
    消费者,监听队列,一旦有数据,获取数据
    调用SSO查询用户信息,把前5万个消息获取到,后面的放入rabbitmq的垃圾桶;
    并发更高的时候需要使用rabbitmq的分布式(峰会的题目之一)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
  11. 将rabbit引入电商项目中:

    1. 在执行商品添加时 , 将商品存入数据库后 , 还需要将商品信息放入缓存中 。 其中数据库操作为主要操作 , 而缓存操作为不主要操作 , 这时为了提升添加商品模块的性能 , 将缓存操作这类不重要的操作抽取出来 , 放入消息队列中 , 稍后执行 。 将写redis操作独立出去,利用路由模式,在manage后台系统的生产者将消息放入队列
    2. 示例代码

      1. 生产者端

        1. 添加rabbitmq与spring整合 的配置文件

          <?xml version="1.0" encoding="UTF-8"?>
          <beans xmlns="http://www.springframework.org/schema/beans"
              xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
              xsi:schemaLocation="http://www.springframework.org/schema/rabbit
              http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
              http://www.springframework.org/schema/beans
              http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
          
              <!-- 定义RabbitMQ的连接工厂 -->
              <rabbit:connection-factory id="connectionFactory"
                  host="${rabbit.ip}" port="${rabbit.port}" username="${rabbit.username}" password="${rabbit.password}"
                  virtual-host="${rabbit.vhost}" />
          
              <!-- 定义Rabbit模板,指定连接工厂以及定义exchange -->
              <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
                  exchange="itemDirectExchange" />
          
              <!-- MQ的管理,包括队列、交换器等 -->
              <rabbit:admin connection-factory="connectionFactory" />
          
              <!-- 定义交换器,自动声明,durable持久化 -->
              <rabbit:direct-exchange name="itemDirectExchange" auto-declare="true" durable="true">
              </rabbit:direct-exchange>
          
          </beans>
          
          • 1
          • 2
          • 3
          • 4
          • 5
          • 6
          • 7
          • 8
          • 9
          • 10
          • 11
          • 12
          • 13
          • 14
          • 15
          • 16
          • 17
          • 18
          • 19
          • 20
          • 21
          • 22
          • 23
          • 24
          • 25
        2. 添加rabbitmq的配置文件rabbitmq.properties

          rabbit.ip=106.75.74.254
          rabbit.port=5672
          rabbit.username=jtadmin
          rabbit.password=123456
          rabbit.vhost=/jt
          
          • 1
          • 2
          • 3
          • 4
          • 5
        3. 在spring核心配置文件中配置读取rabbitmq配置文件的配置

              <!-- 使用spring自带的占位符替换功能,可以实现注解方式获取属性文件中的配置值 -->
              <bean
                  class="com.jt.common.spring.exetend.ExtendedPropertyPlaceholderConfigurer">
                  <!-- 允许JVM参数覆盖 -->
                  <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE" />
                  <!-- 忽略没有找到的资源文件 -->
                  <property name="ignoreResourceNotFound" value="true" />
                  <!-- 配置资源文件 -->
                  <property name="locations">
                      <list>
                          <value>classpath:jdbc.properties</value>
                          <value>classpath:env.properties</value>
                          <value>classpath:redis.properties</value>
                          <value>classpath:rabbitmq.properties</value>
                      </list>
                  </property>
              </bean>
          
          • 1
          • 2
          • 3
          • 4
          • 5
          • 6
          • 7
          • 8
          • 9
          • 10
          • 11
          • 12
          • 13
          • 14
          • 15
          • 16
          • 17
        4. 在service层把需要的操作放入消息队列 (需要先注入rabbitmq模板类) 把商品ID放入到指定的item.add消息队列中 , 监听item.add消息队列消费时会做出特定的动作

          @Service
          public class ItemService extends BaseService<Item>{
              @Autowired
              private ItemMapper itemMapper;
              @Autowired
              private ItemDescMapper itemDescMapper;
              @Autowired
              private RedisService redisService;
              private static final ObjectMapper MAPPER=new ObjectMapper();
              @Autowired
              private RabbitTemplate rabbitmq;
              //商品列表查询
              public List<Item> queryItemList(){
                  List<Item> itemList=itemMapper.queryItemList();
                  return itemList;
              }
          
              public void saveItem(Item item,String desc) throws Exception {
                  //insert 和insertSelective
                  //insert方法在插入对象时,把对象的所有字段都插入
                  //insertSelective插入非NULL值的所有字段
                  //初始化数据没有做 status created updated
                  item.setStatus(1);
                  item.setCreated(new Date());
                  item.setUpdated(item.getCreated());
                  itemMapper.insert(item);
                  //准备插入表格的对象
                  ItemDesc itemDesc=new ItemDesc();
                  //当前的itemDesc的关联ID怎么获取?
                  itemDesc.setItemId(item.getId());
                  itemDesc.setCreated(new Date());
                  itemDesc.setUpdated(itemDesc.getCreated());
                  itemDesc.setItemDesc(desc);
                  //插入对象到表格
                  itemDescMapper.insert(itemDesc);
                  /*//添加新增商品的缓存写入
                  String key="ITEM_"+item.getId();
                  String json=MAPPER.writeValueAsString(item);
                  redisService.set(key, json);*/
                  //写消息到消息队列
                  rabbitmq.convertAndSend("item.add", item.getId());
          
              }
          
          • 1
          • 2
          • 3
          • 4
          • 5
          • 6
          • 7
          • 8
          • 9
          • 10
          • 11
          • 12
          • 13
          • 14
          • 15
          • 16
          • 17
          • 18
          • 19
          • 20
          • 21
          • 22
          • 23
          • 24
          • 25
          • 26
          • 27
          • 28
          • 29
          • 30
          • 31
          • 32
          • 33
          • 34
          • 35
          • 36
          • 37
          • 38
          • 39
          • 40
          • 41
          • 42
          • 43
      2. 消费者端

        1. 引入rabbitmq与spring整合的消费者配置文件

          <?xml version="1.0" encoding="UTF-8"?>
          <beans xmlns="http://www.springframework.org/schema/beans"
              xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
              xsi:schemaLocation="http://www.springframework.org/schema/rabbit
              http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
              http://www.springframework.org/schema/beans
              http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
          
              <!-- 定义RabbitMQ的连接工厂 -->
              <rabbit:connection-factory id="connectionFactory"
                  host="${rabbit.ip}" port="${rabbit.port}" username="${rabbit.username}" password="${rabbit.password}"
                  virtual-host="${rabbit.vhost}" />
          
              <!-- MQ的管理,包括队列、交换器等 -->
              <rabbit:admin connection-factory="connectionFactory" />
          
              <!-- 定义消息队列 -->
              <rabbit:queue name="jt-web.itemQueue" auto-declare="true"/>
          
              <!-- 定义交换机,并且完成队列和交换机的绑定 -->
              <rabbit:direct-exchange name="itemDirectExchange" auto-declare="true">
                  <rabbit:bindings>
                      <!-- 前台系统只接收商品更新的消息,key路由key -->
                      <rabbit:binding queue="jt-web.itemQueue" key="item.add"/>
                  </rabbit:bindings>
              </rabbit:direct-exchange>
          
              <!-- 定义监听 -->
              <rabbit:listener-container connection-factory="connectionFactory">
                  <!-- 监听一个队列,当队列中有消息,就会自动触发类.方法,传递消息就作为方法的参数,根据方法声明的参数强转 -->
                  <rabbit:listener ref="rabbitItemService" method="addItem" queue-names="jt-web.itemQueue"/>
              </rabbit:listener-container>
          
          </beans>
          
          • 1
          • 2
          • 3
          • 4
          • 5
          • 6
          • 7
          • 8
          • 9
          • 10
          • 11
          • 12
          • 13
          • 14
          • 15
          • 16
          • 17
          • 18
          • 19
          • 20
          • 21
          • 22
          • 23
          • 24
          • 25
          • 26
          • 27
          • 28
          • 29
          • 30
          • 31
          • 32
          • 33
          • 34
        2. 引入rabbitmq的配置文件rabbitmq.properties

          rabbit.ip=106.75.74.254
          rabbit.port=5672
          rabbit.username=jtadmin
          rabbit.password=123456
          rabbit.vhost=/jt
          
          • 1
          • 2
          • 3
          • 4
          • 5
        3. 编写消费者rabbitmq

          @Service
          public class RabbitItemService {
              @Autowired
              private HttpClientService client;
              @Autowired
              private RedisService redis;
              public void addItem(Long itemId) throws Exception{
                  //把消息itemId获取过来,到后台拿数据
                  //写redis
                  String url="http://manage.jt.com/item/"+itemId;
                  String jsonItem = client.doGet(url,"utf-8");
                  redis.set("ITEM_"+itemId, jsonItem);
              }
          }
          
          • 1
          • 2
          • 3
          • 4
          • 5
          • 6
          • 7
          • 8
          • 9
          • 10
          • 11
          • 12
          • 13
          • 14

问题:

  1. thread和threadLocal
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/642535
推荐阅读
相关标签
  

闽ICP备14008679号