当前位置:   article > 正文

SpringBoot-RabbitMQ篇(3)-消费异常处理_field rabbitclient in com.xiaopei.modules.cashback

field rabbitclient in com.xiaopei.modules.cashback.service.impl.cashbacktask

一、消费失败重试

  1. 默认的队列监听是自动确认的,但是如果出现异常不会自动确认
  2. 默认的失败机制是不断重试,这样会影响mq性能
  3. 可以在配置文件中指定失败重试次数和重试间隔
pring:
  rabbitmq:
    ...
    # 配置消息重试
    listener:
      simple:
        retry:
          # 开启重试
          enabled: true
          # 重试三次
          max-attempts: 3
          # 间隔时间1s
          max-interval: 1000
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  1. 配置之后将会按照间隔时间重试三次,重试之后如果消息依然没有发送的话消息将会直接丢弃
  2. 但是这样直接丢弃消息在某些场景下并不合适,所以需要使用到死信队列,当消息不可达时由死信处理

二、死信队列应用

2.1 绑定死信交换器

  1. 死信队列指的是当消息消费失败或失败超过一定次数时可以让其进入死信队列,然后再让监听死信队列的线程处理,可以保证消息的可靠性
  2. 死信队列需要绑定到正常的队列上,所以如果原来交换器和队列已经有绑定需要先解绑才能正常绑定
  3. 声明死信队列和正常队列绑定,死信队列、交换器等通常使用dl/dlx进行标识,标识为死信
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;

@Configuration
public class SubscribeExchangeConfig {
    /**
     * 死信交换器
     */
    @Bean
    public DirectExchange emailDlxDirectExchange() {
        return ExchangeBuilder.directExchange("exchange.direct.dlx.springboot.email").build();
    }

    /**
     * 死信队列
     */
    @Bean
    public Queue emailDlxQueue() {
        return QueueBuilder.durable("queue.direct.dlx.springboot.email").build();
    }

    /**
     * 绑定死信交换器和死信队列
     */
    @Bean
    @Resource
    public Binding emailDlxBiding(Queue emailDlxQueue, DirectExchange emailDlxDirectExchange) {
        // 将路由使用路由键绑定到交换器上
        return BindingBuilder.bind(emailDlxQueue).to(emailDlxDirectExchange).with("springboot.email.dlk.routing.key");
    }

    /**
     * 直连交换器
     */
    @Bean
    public DirectExchange emailDirectExchange() {
        return ExchangeBuilder.directExchange("exchange.direct.springboot.email").build();
    }

    /**
     * 声明正常队列并指定死信交换器和队列
     */
    @Bean
    public Queue emailQueue() {
        Map<String, Object> params = new Hashtable<>(4);
        // 指定死信交换器
        params.put("x-dead-letter-exchange", "exchange.direct.dlx.springboot.email");
        // 指定死信队列
        params.put("x-dead-letter-routing-key", "springboot.email.dlk.routing.key");
        return QueueBuilder.durable("queue.direct.springboot.email").withArguments(params).build();
    }

    /**
     * 交换器和队列绑定
     */
    @Bean
    @Resource
    public Binding emailBiding(Queue emailQueue, DirectExchange emailDirectExchange) {
        // 将路由使用路由键绑定到交换器上
        return BindingBuilder.bind(emailQueue).to(emailDirectExchange).with("springboot.email.routing.key");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  1. 绑定之后可以在管理界面看到正常队列上多了DLX(死信交换器)和DLK(死信路由键)参数

2.1 监听死信队列

  1. 死信队列监听,按照正常监听队列监听即可
@RabbitListener(queues = "queue.direct.dlx.springboot.email")
public void dlxReceiver(String msg) {
    System.out.println("dlxReceiver = " + msg);
}
  • 1
  • 2
  • 3
  • 4

2.2 死信队列测试

@RabbitListener(queues = "queue.direct.springboot.email")
public void receiver01(String msg) {
    // 模拟异常情况
    Integer.parseInt("a");
    System.out.println("receiver01 message = " + msg);
}

@RabbitListener(queues = "queue.direct.dlx.springboot.email")
public void dlxReceiver(String msg) {
    System.out.println("dlxReceiver = " + msg);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 在正常队列中模拟异常,然后往队列中发送消息,当三次之后(配置文件配置的重试)就会从正常队列中移除然后加入到死信队列中,然后死信队列中将会监听到信息

三、消息消费确认

  1. 当消费消息的时候需要回应消息是否确认消费或者需要拒绝
  2. 如果消费一条消息如果不确认,将会是unacked状态,然后过期后将会又重新入队
  3. 确认或拒绝需要在监听器处指定Channel参数,Message参数可选
    • org.springframework.amqp.core.Message
      • deliveryTag获取:message.getMessageProperties().getDeliveryTag()
    • com.rabbitmq.client.Channel
      • 消息确认:basicAck(long deliveryTag, boolean multiple) throws IOException
        • multiple:是否批量确认,将会确认当前tag及之前的全部,谨慎false
      • 消息拒绝:basicReject(long deliveryTag, boolean requeue) throws IOException
        • requeue:是否重新入队,false从队列中直接丢弃,true将会重新入队,谨慎false
      • 批量消息确认:basicAck(long deliveryTag, boolean multiple) throws IOException
      • 批量消息拒绝:basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException
@RabbitListener(queues = "queue.direct.dlx.springboot.email")
public void dlxReceiver(String msg, Message message, Channel channel) {
	System.out.println("dlxReceiver = " + msg);
	// 消息消息确认
	try {
		channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
		System.out.println("消息确认成功");
	} catch (IOException e) {
		// 变更消息状态
		System.out.println(e.getMessage());
	}

	// 消息消息拒绝
	/*try {
		channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
	} catch (IOException e) {
		// 变更消息状态
		System.out.println(e.getMessage());
		System.out.println("消息拒绝出现异常");
	}*/
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

四、重复消费解决

  1. 为了避免因为网络导致消息确认失败二导致消息重复消费,需要处理此种情况
  2. 推荐解决方法之一就是使用redis来保证消息的幂等,可以将消费和放入redis中作为一个事务
    • 需要考虑:消费成功了但是放入redis失败
      • 将消费和放入缓存放入同一个事务,但是也需要注意事务提交失败,缓存存入成功
      • 如果缓存放入失败则事务异常
      • 缓存有效期保证,例如多少时间前的应该删除等
    • redis成功,但是事务保存失败
      • 事务失败清除key,但是可能清除key失败,所以此时需要对此种情况必须要确认key清除成功
@RabbitListener(queues = "queue.direct.springboot.email")
public void receiver(Message message) throws Exception {
    String messageId = message.getMessageProperties().getMessageId();
    if(messageId != null && !cacheUtil.exists(messageId)) {
        String msg = new String(message.getBody());
        // 消息处理
        cacheUtil.set(messageId, true);
    } else {
        System.out.println("已经消费过了");
    }
    // 模拟抛出异常
    // throw new Exception("消费消息出现异常");
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/神奇cpp/article/detail/996851
推荐阅读
相关标签
  

闽ICP备14008679号