当前位置:   article > 正文

rabbmitmq延时队列的多种实现_put("x-delayed-type", "direct")

put("x-delayed-type", "direct")

前言:rabbitmq的工作方式:建立一个交换机,并设置交换机名称,再创建一个消息队列,并设置名称,绑定交换机器和消息队列并设置routingkey,当客户端发送消息队列到交换机时,交换机根据消息的routingkey和前面的绑定关系路由到对应的消息队列。

rabbitmq有三种延时队列实现方式,各有利弊,下面一一介绍:

第一种方式:创建具有超时功能且绑定死信交换机的消息队列,步骤如下

1、创建一个交换机和消息队列,并设置消息队列的x-message-ttl:队列消息的超时时间,该参数值是一个毫秒值,指进入改消息队列的消息从进入后超过该值会成为死信

2、设置x-dead-letter-exchange:成为死信的消息会被发送到该参数指定的交换机,并通过参数x-dead-letter-routing-key设置死信的routingkey,然后绑定交换机和消息队列

  1. // 声明交换机Exchange
  2. @Bean("delayExchange")
  3. public DirectExchange delayExchange(){
  4. return new DirectExchange(DELAY_EXCHANGE_NAME);
  5. }
  6. @Bean("delayQueueA")
  7. public Queue delayQueueA(){
  8. Map<String, Object> args = new HashMap<>(2);
  9. // x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
  10. args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
  11. // x-dead-letter-routing-key 这里声明当前队列的死信路由key
  12. args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
  13. // x-message-ttl 声明队列的TTL
  14. args.put("x-message-ttl", 1000*30);
  15. return QueueBuilder.durable(DELAY_QUEUEA_NAME).withArguments(args).build();
  16. }
  17. // 声明延时队列A绑定关系
  18. @Bean
  19. public Binding delayBindingA(@Qualifier("delayQueueA") Queue queue,
  20. @Qualifier("delayExchange") DirectExchange exchange){
  21. return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEA_ROUTING_KEY);
  22. }

3、创建一个死信交换机

4、创建一个死信队列,接受第三步创建的死信交换机的消息,并绑定死信队列和死信交换机,设置好对应的routingkey。

  1. // 声明死信Exchange
  2. @Bean("deadLetterExchange")
  3. public DirectExchange deadLetterExchange(){
  4. return new DirectExchange(DEAD_LETTER_EXCHANGE);
  5. }
  6. // 声明死信队列A 用于接收延时10s处理的消息
  7. @Bean("deadLetterQueueA")
  8. public Queue deadLetterQueueA(){
  9. return new Queue(DEAD_LETTER_QUEUEA_NAME);
  10. }
  11. // 声明延时队列A绑定关系
  12. @Bean
  13. public Binding delayBindingA(@Qualifier("delayQueueA") Queue queue,
  14. @Qualifier("delayExchange") DirectExchange exchange){
  15. return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEA_ROUTING_KEY);
  16. }

5、监听死信队列,即可处理超时的消息队列

  1. @RabbitListener(queues=MqConfig.DEAD_LETTER_QUEUEA_NAME)
  2. public void receiveA(Message message, Channel channel)throws IOException {
  3. String msg = new String(message.getBody());
  4. log.info("收到死信队列A消息:"+msg);
  5. channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
  6. }

 

图解:

 

 

第二种方式:创建通用延时消息队列

上述实现方式中,ttl延时队列中所有的消息超时时间都是一样的,如果不同消息想设置不一样的超时时间,就需要建立多个不同超时时间的消息队列,比较麻烦,且不利于维护,下面的方式是建立一个超时时间在消息级别上而不是在消息队列级别上,

1、在创建交换机时不设置x-message-ttl参数,死信交换机和死信交换机的routingkey还是需要设置。

  1. /**
  2. * 声明一个队列,并设置死信交换机和死信routingkey
  3. * @return
  4. */
  5. @Bean("delayQueueC")
  6. public Queue delayQueueC(){
  7. Map<String,Object> args = new HashMap<>(3);
  8. args.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);
  9. args.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUEC_ROUTING_KEY);
  10. return QueueBuilder.durable(DELAY_QUEUEC_NAME).withArguments(args).build();
  11. }

2、创建死信交换机、死信队列时和第一种方式一样

3、在生成消息时需要给消息设置expiration参数,指定消息的超时时间

  1. rabbitTemplate.convertAndSend(MqConfig.DELAY_EXCHANGE_NAME,MqConfig.DELAY_QUEUEC_ROUTING_KEY,msg,a->{
  2. a.getMessageProperties().setExpiration(String.valueOf(delayTime));
  3. return a;
  4. });

该种方式可以创建一个承载不同超时时间消息的消息队列,但是这种方式有一个问题,如果消息队列中排在前面的消息没有到超时时间,即使后面的消息到了超时时间,先到超时时间的消息也不会进入死信队列,而是先检查排在最前面的消息队列是否到了超时时间,如果到了超时时间才会继续检查后面的消息。

图解:

 

第三种方式:使用rabbitmq的延时队列插件,实现同一个队列中有多个不同超时时间的消息,并按时间超时顺序出队

1、下载rabbitmq_delayed_message_exchange插件,并放入rabbitmq插件目录(linux:usr/lib/rabbitmq/lib/rabbitmq_server-3.7.8/plugins),再启用插件

$ rabbitmq-plugins enable rabbitmq_delayed_message_exchange

1、创建一个普通消息队列

  1. @Bean
  2. public Queue delaydQueue(){
  3. return new Queue(DELAYED_QUEUE_NAME);
  4. }

2、创建一个延时交换机,并设置参数x-delayed-type为direct

  1. @Bean
  2. public CustomExchange customExchange(){
  3. Map<String,Object> args = new HashMap<>(1);
  4. args.put("x-delayed-type","direct");
  5. return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,args);
  6. }

3、绑定消息队列和延时交换机,并设置routingkey

  1. @Bean
  2. public Binding bindingDelayedExchange(@Qualifier("delaydQueue")Queue queue,@Qualifier("customExchange")CustomExchange exchange){
  3. return BindingBuilder.bind(queue).to(exchange).with(DELAYED_EXCHANGE_ROUTING_KEY).noargs();
  4. }

4、生产的消息设置延时delay参数

  1. rabbitTemplate.convertAndSend(MqConfig.DELAYED_EXCHANGE_NAME,MqConfig.DELAYED_EXCHANGE_ROUTING_KEY,msg,a->{
  2. a.getMessageProperties().setDelay(Integer.valueOf(delayTime));
  3. return a;
  4. });

5、创建消费者消费延时队列

  1. @RabbitListener(queues=MqConfig.DELAYED_QUEUE_NAME)
  2. public void receiveD(Message message, Channel channel)throws IOException {
  3. String msg = new String(message.getBody());
  4. log.info("收到延时队列D消息:"+msg);
  5. channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
  6. }

 

图解:

 

 

 

 

 

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/569714
推荐阅读
相关标签
  

闽ICP备14008679号