当前位置:   article > 正文

RabbitMq基础及死信队列使用_rabbitmq异常队列

rabbitmq异常队列

交换机模式

  1. fanout
    fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。
  2. direct
    direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。
  3. topic
    前面讲到direct类型的Exchange路由规则是完全匹配binding key与routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但是key可以用模糊匹配。

项目使用

手动应答模式下,nack或者不ack都会让数据在MQ中积压,抛出异常会重试,到达重试次数会丢失该数据。在实际使用中,我们trycatch业务代码,当发送异常时候,必须在catch中手动抛出异常,MQ才会使用重试机制(类似事务的机制)。重试次数与yml中配置一致,且需要缓存错误次数。另外,当消息到达错误次数上限的时候,通过nack让数据进入死信队列。在死信队列中将消息入库。

重试机制 + 死信队列

yml配置

  rabbitmq:
    host: 121.36.44.93
    port: 5672
    username: admin
    password: admin
    listener:
      type: simple
      simple:
        default-requeue-rejected: false
        acknowledge-mode: manual
        retry:
          max-attempts: 5
          enabled: true
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

普通业务队列,绑定死信交换机

	@Bean
    public Queue systemQueue() {
        Map<String, Object> args = new HashMap<>(2);
//       x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", BizEnum.Message.DLX_EXCHANGE.getType());
//       x-dead-letter-routing-key  这里声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key", "deadLetter.system");
        return QueueBuilder.durable(BizEnum.Message.SYSTEM_QUEUE.getType()).withArguments(args).build();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

死信队列

	@Bean
    public Queue deadLetterQueue() {
        return new Queue(BizEnum.Message.DLX_QUEUE.getType(), true, false, false, null);
    }
  • 1
  • 2
  • 3
  • 4

死信交换机

	@Bean
    TopicExchange deadLetterExchange() {
        return new TopicExchange(BizEnum.Message.DLX_EXCHANGE.getType(), true, false, null);
    }
  • 1
  • 2
  • 3
  • 4

业务队列监听

	@RabbitListener(queues = {"system-queue"})
    @Transactional(rollbackFor = Exception.class)
    public void handleMessage(Message message, com.rabbitmq.client.Channel mqChannel) throws IOException {
        String consumerTag = message.getMessageProperties().getConsumerTag();
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
           // ... 业务处理
           mqChannel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            if(MqCache.checkMaxRetry(consumerTag)){
            	MqCache.clearConsumerTagCount(consumerTag);
                mqChannel.basicNack(deliveryTag, false, false);
            }else{
                MqCache.cacheConsumerTagCount(consumerTag);
                log.error("数据错误 ===> {}", e);
                throw e;
            }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

死信队列监听,可以将错误数据入库,或者不ack,后期再消费该队列数据

	@RabbitListener(queues = {"dlx-queue"})
    @Transactional(rollbackFor = Exception.class)
    public void dlMessage(Message message, Channel channel) throws IOException {
       String msg = new String(message.getBody());
       // 入库
         channel.basicAck(deliveryTag, false);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

缓存失败次数

public class MqCache extends BaseCache {

    private final static StringRedisTemplate stringRedisTemplate;

    static {
        stringRedisTemplate = SpringUtil.getBean(StringRedisTemplate.class);
    }


    // 与yml配置一致
    private static Integer retryCount = 5;

    private static final String CONSUMER_TAG_KEY = "pe:mq:consumerTag:";

    public static void cacheConsumerTagCount(String consumerTag) {
        stringRedisTemplate.opsForValue().increment(CONSUMER_TAG_KEY + consumerTag, 1);
        bladeRedis.expire(CONSUMER_TAG_KEY + consumerTag, Duration.ofHours(12));
    }

    public static boolean checkMaxRetry(String consumerTag) {
        return getConsumerTagCount(consumerTag) >= retryCount - 1;
    }

    public static Integer getConsumerTagCount(String consumerTag) {
        String failCount = stringRedisTemplate.opsForValue().get(CONSUMER_TAG_KEY + consumerTag);
        if (failCount != null) {
            return Integer.parseInt(failCount);
        }
        return 0;
    }

    public static void clearConsumerTagCount(String consumerTag) {
        bladeRedis.del(CONSUMER_TAG_KEY + consumerTag);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

消息确认机制

在这里插入图片描述

@Component
@AllArgsConstructor
@Slf4j
public class MqSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    private void initRabbitTemplate() {
        //设置消息发送确认回调,发送成功后更新消息表状态
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);

    }

    public void sendMessage(String exchange, String routingKey, OrderBean orderBean) {
        rabbitTemplate.convertAndSend(exchange, routingKey, JSON.toJSONString(orderBean),
                message -> {
                    //设置消息持久化
                    message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                    return message;
                },
                new CorrelationData(orderBean.getOrderNo()));
    }


    public void sendMessage(String exchange, String routingKey, Object message) {
        rabbitTemplate.convertAndSend(exchange, routingKey, message);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (!ack) {
            /*
             *   处理消息没有到达交换机,数据丢失的情况
             *   根据订单号查询到订单数据,并将数据保存到异常消息表中,定时补发,并报警人工处理
             * */
            String orderId = correlationData.getId();
        } else {
            //查询订单号是否在异常消息表,在的话要删除
            log.info(">>>下单消息发送成功{}<<<", correlationData);

        }
    }

    @Override
    public void returnedMessage(Message message, int i, String s, String s1, String s2) {
        //消息到达交换机,没有路由到队列,根据订单号查询到订单数据,并将数据保存到异常消息表中,定时补发,并报警人工处理
        /*
         *  1 交换机没有绑定队列
         *  2 交换机根据路由键没有匹配到队列
         *  3 队列消息已满
         * */
        byte[] body = message.getBody();
        JSONObject json = JSONObject.parseObject(new String(body));
        System.out.println("return============================");
        System.out.println(message);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58

消息确认回调:

  1. 当消息发送到了一个不存在的交换机,会进入confirm方法(ack参数为false)。
  2. 当消息发送到了一个存在的交换机,且
    (1 交换机没有绑定队列、2 交换机根据路由键没有匹配到队列、3.队列消息已满)
    ,会先进入returnedMessage方法,再进入confirm方法(ack参数为true)。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/994624
推荐阅读
相关标签
  

闽ICP备14008679号