当前位置:   article > 正文

springboot整合rabbitmq实现延时队列之TTL方式_spring.rabbitmq ttl

spring.rabbitmq ttl

很多时候我们想定时去做某件事情的时候我们会首先想到定时任务,quartz是个不错的选择,但是也有缺点,假如配置在项目中,集群部署会有重复执行的问题,如果持久化在mysql中,解决了集群的问题,但是过于依赖mysql,耦合严重,当然还有日志量庞大、执行时间精度、过于耗费系统资源等等问题。所以这时候使用消息队列中间件的的延时队列就是一个很好得解决方案,我们设置要触发消费的时间和必要的参数入队mq,到时监听queue的消费者自然拿到消息然后去走业务流程,这里介绍的是基于rabbitmq中间件实现的TTL版的延时队列。

什么是TTL
先简单介绍下rabbitmq执行的流程,它和我之前写到的spring boot整合activeMQ不太一样,除了队列(queue)之外还引入了交换机(exchange)的概念。
rabbitmq的交换机有4种模式,我不详细介绍,简单说下大体执行流程:
在这里插入图片描述
①:生产者将消息(msg)和路由键(routekey)发送指定的交换机(exchange)上
②:交换机(exchange)根据路由键(routekey)找到绑定自己的队列(queue)并把消息给它
③:队列(queue)再把消息发送给监听它的消费者(customer)
那么延时队列TTL又是什么呢?这里引入了一个死信(死亡信息)的概念,有死信必定有死亡时间,也就是我们希望延时多久的时间:
在这里插入图片描述
①:生产者将消息(msg)和路由键(routekey)发送指定的死信交换机(delayexchange)上
②:死信交换机(delayexchange)根据路由键(routekey1)找到绑定自己的死信队列(delayqueue)并把消息给它
③:消息(msg)到期死亡变成死信转发给死信接收交换机(delayexchange)
④:死信接收交换机(receiveexchange)根据路由键(routekey2)找到绑定自己的死信接收队列(receivequeue)并把消息给它
⑤:死信接收队列(receivequeue)再把消息发送给监听它的消费者(customer)
ps:延时队列也叫死信队列。基于TTL模式的延时队列会涉及到2个交换机、2个路由键、2个队列…emmmmm比较麻烦
流程介绍完了,看下具体代码吧!

  1. 首先pom因为依赖
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
  • 1
  • 2
  • 3
  • 4
  1. 配置文件配置rabbitmq的信息
# rabbitmq
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
# 手动ACK 不开启自动ACK模式,目的是防止报错后未正确处理消息丢失 默认 为 none
spring.rabbitmq.listener.simple.acknowledge-mode=manual
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  1. 编写rabbitmq配置类,声明几个bean
@Configuration
public class RabbitUserConfig {
    /**
     * 死信交换机
     * @return
     */
    @Bean
    public DirectExchange delayExchange(){
        return new DirectExchange("delay_exchange");
    }

    /**
     * 死信队列
     * @return
     */
    @Bean
    public Queue delayQueue(){
        Map<String,Object> map = new HashMap<>(16);
        map.put("x-dead-letter-exchange","receive_exchange");
        map.put("x-dead-letter-routing-key", "receive_key");
        return new Queue("delay_queue",true,false,false,map);
    }

    /**
     * 给死信队列绑定交换机
     * @return
     */
    @Bean
    public Binding delayBinding(Queue delayQueue,DirectExchange delayExchange){
        return BindingBuilder.bind(cfgDelayQueue).to(cfgDelayExchange).with("delay_key");
    }

    /**
     * 死信接收交换机
     * @return
     */
    @Bean
    public DirectExchange receiveExchange(){
        return new DirectExchange("receive_exchange");
    }

    /**
     * 死信接收队列
     * @return
     */
    @Bean
    public Queue receiveQueue(){
        return new Queue("receive_queue");
    }

    /**
     * 死信交换机绑定消费队列
     * @return
     */
    @Bean
    public Binding receiveBinding(Queue receiveQueue,DirectExchange receiveExchange){
        return BindingBuilder.bind(cfgReceiveQueue).to(cfgReceiveExchange).with("receive_key");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  1. 编写rabbitmq生产者:
/**
 * rabbitMq生产者类
 * @author zhanghang
 * @date 2018/12/13
 */
@Component
@Slf4j
public class RabbitProduct{

    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void sendDelayMessage(List<Integer> list) {
    	 //这里的消息可以是任意对象,无需额外配置,直接传即可
         log.info("===============延时队列生产消息====================");
         log.info("发送时间:{},发送内容:{}", LocalDateTime.now(), list.toString());
         this.rabbitTemplate.convertAndSend(
                 "delay_exchange",
                 "delay_key",
                 list,
                 message -> {
                 	 //注意这里时间要是字符串形式
                     message.getMessageProperties().setExpiration("60000");
                     return message;
                 }
         );
     	 log.info("{}ms后执行", 60000);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  1. 编写rabbitmq消费者
/**
 * activeMq消费者类
 * @author zhanghang
 * @date 2017/12/19
 */
@Component
@Slf4j
public class RabbitConsumer {
    @Autowired
    private CcqCustomerCfgService ccqCustomerCfgService;

    /**
     * 默认情况下,如果没有配置手动ACK, 那么Spring Data AMQP 会在消息消费完毕后自动帮我们去ACK
     * 存在问题:如果报错了,消息不会丢失,但是会无限循环消费,一直报错,如果开启了错误日志很容易就吧磁盘空间耗完
     * 解决方案:手动ACK,或者try-catch 然后在 catch 里面将错误的消息转移到其它的系列中去
     * spring.rabbitmq.listener.simple.acknowledge-mode = manual
     * @param list 监听的内容
     */
    @RabbitListener(queues = "receive_queue")
    public void cfgUserReceiveDealy(List<Integer> list, Message message, Channel channel) throws IOException {
        log.info("===============接收队列接收消息====================");
        log.info("接收时间:{},接受内容:{}", LocalDateTime.now(), list.toString());
        //通知 MQ 消息已被接收,可以ACK(从队列中删除)了
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        try {
            dosomething.....
        } catch (Exception e) {
            log.error("============消费失败,尝试消息补发再次消费!==============");
            log.error(e.getMessage());
            /**
             * basicRecover方法是进行补发操作,
             * 其中的参数如果为true是把消息退回到queue但是有可能被其它的consumer(集群)接收到,
             * 设置为false是只补发给当前的consumer
             */
            channel.basicRecover(false);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  1. 编写测试类:
/**
 * @author zhanghang
 * @date 2019/1/3 17:57
 */
@RestController
@RequestMapping("/test")
public class TestController {

    @Autowired
    private RabbitProduct rabbitProduct;
    
    @GetMapping("/sendMessage")
    public void sendMessage(){
    	List<Integer> list = new ArrayList<>();
    	list.add(1);
    	list.add(2);
        rabbitProduct.sendDelayMessage(list);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

至此就完成了,但是基于TTL的延时队列存在一个问题,就是同一个队列里的消息延时时间最好一致,比如说队列里的延时时间都是1小时,千万不能队列里的消息延时时间乱七八糟多久的都有,这样的话先入队的消息如果延时时间过长会堵着后入队延时时间小的消息,导致后面的消息到时也无法变成死信转发出去,很坑!!!
举个栗子:延时队列里先后进入A,B,C三条消息,存活时间是3h,2h,1h,结果到了1小时C不会死,到了2hB不会死,到了3小时A死了,同时B,C也死了,意味着3h后A,B,C才能消费,很坑!!!
我本来使用时候以为会像redis的存活时间一样,内部维护一个定时器去扫描死亡时间然后变成死信转发,结果不是。。。

至于怎么解决这个问题,一个队列里可以放不同死亡时间的消息,还能够异步死亡转发,请看下回分解:
springboot整合rabbitmq实现延时队列之rabbitmq_delayed_message_exchange插件方式

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/572413
推荐阅读
相关标签
  

闽ICP备14008679号