当前位置:   article > 正文

Rabbitmq 实现消息延迟发送_rabbitmq延迟发送消息 参数

rabbitmq延迟发送消息 参数

RabbitMq 实现消息延迟发送

实现消息延迟发送的两种方案:
1、定义两个常规的队列 A,B 。A 队列在初始化时添加参数:x-dead-letter-exchange ,值为B队列 的交换机名称如下:

    // A业务队列
    @Bean
    public Queue AQueue(){
        Map<String,Object> map = new HashMap<>(1);
        map.put("x-dead-letter-exchange",MqConstants.B_EXCHANGE_NAME);
        //该参数x-dead-letter-routing-key可以修改该死信的路由key,不设置则使用原消息的路由key
        map.put("x-dead-letter-routing-key",MqConstants.B_QUEUE);
       return new Queue(MqConstants.A_QUEUE,true,false,false,map);
    }
    // A业务队列绑定
    @Bean
    Binding bindingExchange() {
        return BindingBuilder.bind(AQueue()).to(ADirectExchange()).with(MqConstants.A_QUEUE);
    }
    //A业务交换机
    @Bean
    DirectExchange ADirectExchange() {
        return new DirectExchange(MqConstants.A_EXCHANGE_NAME);
    }

    // B队列(死信队列)
    @Bean()
    public Queue BQueue(){
        return new Queue(MqConstants.B_QUEUE);
    }
    // B队列绑定
    @Bean
    Binding bindingBExchangeDead() {
        return BindingBuilder.bind(BQueue()).to(BDirectExchange()).withQueueName();
    }
    //B交换机
    @Bean
    DirectExchange BDirectExchange() {
        return new DirectExchange(MqConstants.B_EXCHANGE_NAME);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

这种方法的思路是,给消息绑定一个过期时间,推送到常规队列A 中,当到了这条消息的过期时间,则队列A把这消息推送到绑定的队列B 交换机中,此时队列B 可称为 死信队列。而消息成为死信的条件如下:

消息什么时候变为死信(dead-letter)

1、消息被否定接收,消费者使用basic.reject 或者 basic.nack并且requeue 重回队列属性设为false。
2、消息在队列里得时间超过了该消息设置的过期时间(TTL)。
3、消息队列到达了它的最大长度,之后再收到的消息。

死信队列的原理

当一个消息再队列里变为死信时,它会被重新publish到另一个exchange交换机上,这个exchange就为DLX。因此我们只需要在声明正常的业务队列时添加一个可选的"x-dead-letter-exchange"参数,值为死信交换机,死信就会被rabbitmq重新publish到配置的这个交换机上,我们接着监听这个交换机就可以了。

所以上文的例子中,我们需要确保 过期时间的消息在A队列中不能被消费,也不手动拒绝。如果被消费了 那就起不到延迟的效果,如果手动拒绝了,过期时间的消息立即就会被投递到 死信队列中,如果一旦投递到死信队列(B)中,B队列的消费者立马就会消费了这条消息,也起不到消息延迟消费的效果。所以以上的例子中我们不去消息A队列里面的消息 任由它呆在队列里,直到过期了。自动推送到另外的队列B,我们只需要监听B队列的消息即可 ,消息生产者把带有过期时间的消息推向A 队列代码如下:

A 队列的消息生产者如下:

    public ResponseData sendDel() {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        MessageDTO messageDTO = new MessageDTO();
        messageDTO.setTitle("hello rabbitmq 我是帅哥!!");
        messageDTO.setContext(format.format(new Date()));
        CorrelationData correlationData = new CorrelationData();
        String messageStr = JSONObject.toJSONString(messageDTO);
        byte[] bytes = messageStr.getBytes();
        ReturnedMessage returnedMessage = new ReturnedMessage(new Message(bytes), 1, "1", MqConstants.A_EXCHANGE_NAME, "");
        correlationData.setReturned(returnedMessage);

        long time = new Date().getTime();
        // long exp = time + (2 * 60 * 1000);
        // 延迟2分钟
        long exp = 120000;
        System.out.println("当前运行时间:");
        System.out.println(format.format(new Date()));
        String ex = String.valueOf(exp);
        rabbitTemplate.convertAndSend(MqConstants.A_EXCHANGE_NAME, MqConstants.A_QUEUE, messageDTO, message -> {
            //设置消息的过期时间,是以毫秒为单位的
            message.getMessageProperties().setExpiration(ex);
            return message;
        },correlationData);
    return ResponseData.success();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

B队列的消费者如下:

    @RabbitHandler
    @RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange(value = MqConstants.B_EXCHANGE_NAME,durable = "true",type = "direct"),
            value = @Queue(value = MqConstants.B_QUEUE,durable = "true"),
            key = MqConstants.B_QUEUE
    ))
    public void listenerDead(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG)Long tagId) throws IOException {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        byte[] body = message.getBody();
        String s = new String(body);
        MessageDTO messageDTO = JSONObject.parseObject(s, MessageDTO.class);
        log.info("死信队列执行中,消费消息id:{}",messageDTO.getTitle());
        log.info("当前时间:{}",format.format(new Date()));
        // 消息确认,当前消息
        //channel.basicAck(tagId,false);

        // 消息拒绝 当前消息,并会重新添加到队列中
        //channel.basicNack(tagId,false,true);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

启动项目测试结果如下:
在这里插入图片描述
从上图中我们可以看到,确实是实现了消息的延迟发送。
以上方案存在一个问题:根据消息投递的顺序 会出现消息延迟消息消费的顺序不一致演示如下:
修改 发送时间为动态的,第一个消息延迟时间30秒,第二条消息 延迟时间 20秒 结果如下:
在这里插入图片描述
可以看得出,结果 并不是 延迟20m 的消息优先被消费,而是等到 第一条延迟了 30秒的消息过期了,才发现也有一条延迟了20m 的消息没有被消费,此时立刻把这两条消息,推送到私信队列B, 所以消费者消费的时间是一致的。

以上是第一种实现消息延迟发送的方案,下面使用插件时间消息的延迟发送案例,首先需要安装插件,安装步骤如下:
https://blog.csdn.net/XZB119211/article/details/126944959

插件发送延迟消息

第一步 定义交换机、队列、等如下:

    // 插件延迟队列
    @Bean
    public Queue deadQueueForEx(){
        return new Queue(MqConstants.E_QUEUE,true,false,false);
    }
    // 插件延迟队列绑定
    @Bean
    Binding bindingExchangeDeadForEx(Queue deadQueueForEx) {
        return BindingBuilder.bind(deadQueueForEx()).to(deadExchangeForEx()).with(MqConstants.E_QUEUE).noargs();
    }
    //插件延迟队列交换机
    @Bean
    public CustomExchange deadExchangeForEx(){
        Map<String, Object> arguments = new HashMap<>(1);
        arguments.put("x-delayed-type", "direct");
        return new CustomExchange(MqConstants.E_EXCHANGE_NAME,"x-delayed-message",true,false,arguments);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

消息发送者如下:

// 插件时间延迟队列
public ResponseData sendDelForEx(long expTime) {
    SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    MessageDTO messageDTO = new MessageDTO();
    messageDTO.setTitle("hello rabbitmq 我是帅哥!!");
    messageDTO.setContext(format.format(new Date()));
    CorrelationData correlationData = new CorrelationData();
    String messageStr = JSONObject.toJSONString(messageDTO);
    byte[] bytes = messageStr.getBytes();
    ReturnedMessage returnedMessage = new ReturnedMessage(new Message(bytes), 1, "1", MqConstants.E_EXCHANGE_NAME, "");
    correlationData.setReturned(returnedMessage);
    System.out.println("当前运行时间:");
    System.out.println(format.format(new Date()));
    String ex = String.valueOf(expTime);
    rabbitTemplate.convertAndSend(MqConstants.E_EXCHANGE_NAME, MqConstants.E_QUEUE, messageDTO, message -> {
        //设置消息的过期时间,是以毫秒为单位的
        message.getMessageProperties().setHeader("x-delay",ex);
        return message;
    },correlationData);
    return ResponseData.success();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

消息消费者如下:

    @RabbitListener(queues = MqConstants.E_QUEUE)
    public void listenerDeadForEx(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG)Long tagId) throws IOException {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        byte[] body = message.getBody();
        String s = new String(body);
        MessageDTO messageDTO = JSONObject.parseObject(s, MessageDTO.class);
        log.info("延迟插件队列执行中,消费消息id:{}",messageDTO.getTitle());
        log.info("当前时间:{}",format.format(new Date()));
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

测试第一条消息延迟 30ms ,第二条消息延迟 20ms
在这里插入图片描述
从上图消息 日志中可以看得出来 消息消费时确实是正常了,但是多打印处理 其他日志:
在这里插入图片描述
这部分日志是:监听消息发送时消息未到达队列的回调方法打印的,其代码如下:

    @Bean(name = "rabbitTemplate")
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // 设置消息从生产者发送至 rabbitmq broker 成功的回调 (保证信息到达 broker)
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            // ack=true:消息成功发送到Exchange
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                ReturnedMessage returned = correlationData.getReturned();
                Message message = returned.getMessage();
                byte[] body = message.getBody();
                String s = new String(body);
                MessageDTO messageDTO=JSONObject.parseObject(s, MessageDTO.class);;
                log.info("ConfirmCallback:     " + "相关数据:" + messageDTO );
                log.info("ConfirmCallback:     " + "确认是否到达交换机:" + ack);
                log.info("ConfirmCallback:     " + "原因:" + cause);
            }
        });
        // 设置信息从交换机发送至 queue 失败的回调
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                byte[] body = message.getBody();
                String s = new String(body);
                MessageDTO messageDTO=JSONObject.parseObject(s, MessageDTO.class);;
                log.info("ReturnCallback:     " + "消息:" + messageDTO);
                log.info("ReturnCallback:     " + "回应码:" + replyCode);
                log.info("ReturnCallback:     " + "回应信息:" + replyText);
                log.info("ReturnCallback:     " + "交换机:" + exchange);
                log.info("ReturnCallback:     " + "路由键:" + routingKey);
            }
        });
        // 为 true 时,消息通过交换器无法匹配到队列时会返回给生产者,为 false 时,匹配不到会直接丢弃
        rabbitTemplate.setMandatory(true);
        // 设置发送时的转换
         rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

结合上面的代码我们可知,使用插件实现延迟消息时候,并不是把消息立马就推送到队列中,而只是将消息推送到交换机中,待过期时间到了才推送掉消息队列中,所以 发送消息 setReturnCallback 才会爆出 消息未到达队列的 日志,但也不影响最终的延迟消息执行。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/778064
推荐阅读
相关标签
  

闽ICP备14008679号