当前位置:   article > 正文

【RabbitMQ】——死信_rabbitmq 死信

rabbitmq 死信

一、概念

  先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer将消息投递到 broker或者直接到queue里了,consumer 从 queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
  应用场景:为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。还有比如说:用户在商城下单成功并点击去支付后在指定时间未古什时白动失效。

二、来源

  1. 消息TTL过期(TTL 存活时长)
  2. 队列达到最大长度(队列满了,无法再添加数据到mq中)
  3. 消息被拒绝(basic.reject或 basic.nack)并且requeue=false。

三、实现

  1. 设置一个普通交换机(normal_exchange)和一个普通个队列(normal_queue)。
  2. 设置一个死信交换机(dead_exchange)和一个死信个队列(dead_queue)。
  3. 当普通队列的消息成为死信消息时候,该消息就会转发到死信队列,进行消费。
  4. 核心实现在消费者01中,消费者01 创建了普通交换机和普通队列并进行绑定,创建死信交换机和死信队列并进行绑定,在普通队列中绑定死信交换机和死信队列。
Map<String, Object> arguments = new HashMap<>();
//设置 死信交换机
arguments.put("x-dead-letter-exchange", EXCHANGE_DEAD);
//设置 死信routingKey
arguments.put("x-dead-letter-routing-key", ROUTING_KEY_DEAD);
/**
 * 声明普通队列
 */
channel.queueDeclare(QUEUE_NORMAL, false, false, false, arguments);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

在这里插入图片描述

1. TTL过期

过期时间单位ms ,消息过期时间有两种方式,一种是通过 生产者,另一种是通过消费者。
通过生产者方式:

  AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build(); 
   channel.basicPublish(EXCHANGE_NORMAL, ROUTING_KEY_NORMAL, properties, message.getBytes("UTF-8"));
  • 1
  • 2

通过消费者方式:

  Map<String, Object> arguments = new HashMap<>();
 //第一种,消息过期类型 过期时间,单位毫秒 (建议从生产者端 设置过期时间,比较灵活)
 arguments.put("x-message-ttl", 10 * 1000);
 channel.queueDeclare(QUEUE_NORMAL, false, false, false, arguments);
  • 1
  • 2
  • 3
  • 4
<1> 生产者01
package com.rabbitmqDemo.rabbitmq.eight;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmqDemo.rabbitmq.utils.RabbitMqUtils;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class Producer01 {
    private static final String EXCHANGE_NORMAL = "normal_exchange";
    private static final String ROUTING_KEY_NORMAL = "normal";
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        /**
         * 死信消息 设置TTL时间
         * 单位毫秒
         */
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String message = scanner.next();

            /**
             * 发送消息
             * param1 发送到哪个交换机
             * param2 routingKey
             * param3 其他参数信息
             * param4 发送的消息体
             */
            channel.basicPublish(EXCHANGE_NORMAL, ROUTING_KEY_NORMAL, properties, message.getBytes("UTF-8"));
            System.out.println("message send end : " + message);

        }

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
<2> 消费者01
package com.rabbitmqDemo.rabbitmq.eight;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmqDemo.rabbitmq.utils.RabbitMqUtils;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

/**
 * 死信队列实战
 */
public class Consumer01 {

    private static final String EXCHANGE_NORMAL = "normal_exchange";
    private static final String EXCHANGE_DEAD = "dead_exchange";

    private static final String QUEUE_NORMAL = "normal_queue";
    private static final String QUEUE_DEAD = "dead_queue";

    private static final String ROUTING_KEY_NORMAL = "normal";
    private static final String ROUTING_KEY_DEAD = "dead";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        /**
         * 声明正常交换机和死信交换机
         */
        channel.exchangeDeclare(EXCHANGE_NORMAL, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(EXCHANGE_DEAD, BuiltinExchangeType.DIRECT);

        /**
         * 普通队列参数
         */
        Map<String, Object> arguments = new HashMap<>();
        //第一种,消息过期类型 过期时间,单位毫秒 (建议从生产者端 设置过期时间,比较灵活)
        arguments.put("x-message-ttl", 10 * 1000);
        //设置 死信交换机
        arguments.put("x-dead-letter-exchange", EXCHANGE_DEAD);
        //设置 死信routingKey
        arguments.put("x-dead-letter-routing-key", ROUTING_KEY_DEAD);
        /**
         * 声明普通队列
         */
        channel.queueDeclare(QUEUE_NORMAL, false, false, false, arguments);
        /**
         * 声明死信队列
         */
        channel.queueDeclare(QUEUE_DEAD, false, false, false, null);

        /**
         * 绑定普通交换机队列 和 死信交换机队列
         * param1 队列名称
         * param2 交换机名称
         * param3 routingkey
         */
        channel.queueBind(QUEUE_NORMAL, EXCHANGE_NORMAL, ROUTING_KEY_NORMAL);
        channel.queueBind(QUEUE_DEAD, EXCHANGE_DEAD, ROUTING_KEY_DEAD);

        //声明 普通队列 消费者成功消费的回调
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("Consumer01-message:" + msg);
        };
        //声明 普通队列 取消消息时的回调
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("Consumer01-消息消费被中断-" + consumerTag);
        };

        /**
         * 普通 消费者消费消息
         * param1 队列名称
         * param2 消费成功之后是否自动应答,true 代表自动应答,false表示不自动应答
         * param3 消费者成功消费的回调
         * param4 消费者取消消费回调
         */
        System.out.println("Consumer01等待接收消息......");
        channel.basicConsume(QUEUE_NORMAL, false, deliverCallback, cancelCallback);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
<3> 消费者02
package com.rabbitmqDemo.rabbitmq.eight;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmqDemo.rabbitmq.utils.RabbitMqUtils;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class Consumer02 {
   
    private static final String EXCHANGE_DEAD = "dead_exchange";
    private static final String QUEUE_DEAD = "dead_queue";
    private static final String ROUTING_KEY_DEAD = "dead";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
      
        channel.queueBind(QUEUE_DEAD, EXCHANGE_DEAD, ROUTING_KEY_DEAD);

        //声明 死信队列 消费者成功消费的回调
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("Consumer02-message:" + new String(message.getBody(), "UTF-8"));
        };
        //声明 死信队列 取消消息时的回调
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("Consumer02-消息消费被中断-" + consumerTag);
        };

        /**
         * 死信 消费者消费消息
         * param1 队列名称
         * param2 消费成功之后是否自动应答,true 代表自动应答,false表示不自动应答
         * param3 消费者成功消费的回调
         * param4 消费者取消消费回调
         */
        System.out.println("Consumer02等待接收消息......");
        channel.basicConsume(QUEUE_DEAD, true, deliverCallback, cancelCallback);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
<4> 运行步骤

第一步:先运行消费者01 创建普通交换机和死信交换机,然后结束消费者01进程,创造无法消费的条件。
第二步:运行生产者01,此时消息会通过普通交换机发送到普通队列,TTL过期之后会通过死信交换机转移到死信队列。
第三步:运行消费者02,会消费死信队列里的消息

<5> 运行结果

在这里插入图片描述

2. 队列达到最大长度

在上一个案例的基础上进行修改,将设置TTL的代码改为 设置队列最大长度即可

/**
 * 普通队列参数
 */
Map<String, Object> arguments = new HashMap<>();
//第一种,消息过期类型 过期时间,单位毫秒 (建议从生产者端 设置过期时间,比较灵活)
//arguments.put("x-message-ttl", 10 * 1000);
//第二种,队列长度类型 设置正常队列长度限制
arguments.put("x-max-length",6);
//设置 死信交换机
arguments.put("x-dead-letter-exchange", EXCHANGE_DEAD);
//设置 死信routingKey
arguments.put("x-dead-letter-routing-key", ROUTING_KEY_DEAD);
/**
 * 声明普通队列
 */
channel.queueDeclare(QUEUE_NORMAL, false, false, false, arguments);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

3. 消息被拒绝

在上一个案例的基础上进行修改,将设置TTL和设置队列最大长度的代码隐藏,然后在普通队列 消费者成功消费的回调的方法中添加消息拒绝的代码。

//声明 普通队列 消费者成功消费的回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
    //第三种,消息被拒绝类型
    String msg = new String(message.getBody(), "UTF-8");
    if ("rejectmessage".equals(msg)) {
        System.out.println("Consumer01-message-reject:" + msg);
        //拒接消息
        channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
    } else {
        System.out.println("Consumer01-message:" + msg);

    }
};

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/569752
推荐阅读
相关标签
  

闽ICP备14008679号