赞
踩
手动应答模式下,nack或者不ack都会让数据在MQ中积压,抛出异常会重试,到达重试次数会丢失该数据。在实际使用中,我们trycatch业务代码,当发送异常时候,必须在catch中手动抛出异常,MQ才会使用重试机制(类似事务的机制)。重试次数与yml中配置一致,且需要缓存错误次数。另外,当消息到达错误次数上限的时候,通过nack让数据进入死信队列。在死信队列中将消息入库。
yml配置
rabbitmq:
host: 121.36.44.93
port: 5672
username: admin
password: admin
listener:
type: simple
simple:
default-requeue-rejected: false
acknowledge-mode: manual
retry:
max-attempts: 5
enabled: true
普通业务队列,绑定死信交换机
@Bean
public Queue systemQueue() {
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", BizEnum.Message.DLX_EXCHANGE.getType());
// x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", "deadLetter.system");
return QueueBuilder.durable(BizEnum.Message.SYSTEM_QUEUE.getType()).withArguments(args).build();
}
死信队列
@Bean
public Queue deadLetterQueue() {
return new Queue(BizEnum.Message.DLX_QUEUE.getType(), true, false, false, null);
}
死信交换机
@Bean
TopicExchange deadLetterExchange() {
return new TopicExchange(BizEnum.Message.DLX_EXCHANGE.getType(), true, false, null);
}
业务队列监听
@RabbitListener(queues = {"system-queue"}) @Transactional(rollbackFor = Exception.class) public void handleMessage(Message message, com.rabbitmq.client.Channel mqChannel) throws IOException { String consumerTag = message.getMessageProperties().getConsumerTag(); long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { // ... 业务处理 mqChannel.basicAck(deliveryTag, false); } catch (Exception e) { if(MqCache.checkMaxRetry(consumerTag)){ MqCache.clearConsumerTagCount(consumerTag); mqChannel.basicNack(deliveryTag, false, false); }else{ MqCache.cacheConsumerTagCount(consumerTag); log.error("数据错误 ===> {}", e); throw e; } } }
死信队列监听,可以将错误数据入库,或者不ack,后期再消费该队列数据
@RabbitListener(queues = {"dlx-queue"})
@Transactional(rollbackFor = Exception.class)
public void dlMessage(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
// 入库
channel.basicAck(deliveryTag, false);
}
缓存失败次数
public class MqCache extends BaseCache { private final static StringRedisTemplate stringRedisTemplate; static { stringRedisTemplate = SpringUtil.getBean(StringRedisTemplate.class); } // 与yml配置一致 private static Integer retryCount = 5; private static final String CONSUMER_TAG_KEY = "pe:mq:consumerTag:"; public static void cacheConsumerTagCount(String consumerTag) { stringRedisTemplate.opsForValue().increment(CONSUMER_TAG_KEY + consumerTag, 1); bladeRedis.expire(CONSUMER_TAG_KEY + consumerTag, Duration.ofHours(12)); } public static boolean checkMaxRetry(String consumerTag) { return getConsumerTagCount(consumerTag) >= retryCount - 1; } public static Integer getConsumerTagCount(String consumerTag) { String failCount = stringRedisTemplate.opsForValue().get(CONSUMER_TAG_KEY + consumerTag); if (failCount != null) { return Integer.parseInt(failCount); } return 0; } public static void clearConsumerTagCount(String consumerTag) { bladeRedis.del(CONSUMER_TAG_KEY + consumerTag); } }
@Component @AllArgsConstructor @Slf4j public class MqSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { private RabbitTemplate rabbitTemplate; @PostConstruct private void initRabbitTemplate() { //设置消息发送确认回调,发送成功后更新消息表状态 rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); } public void sendMessage(String exchange, String routingKey, OrderBean orderBean) { rabbitTemplate.convertAndSend(exchange, routingKey, JSON.toJSONString(orderBean), message -> { //设置消息持久化 message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); return message; }, new CorrelationData(orderBean.getOrderNo())); } public void sendMessage(String exchange, String routingKey, Object message) { rabbitTemplate.convertAndSend(exchange, routingKey, message); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (!ack) { /* * 处理消息没有到达交换机,数据丢失的情况 * 根据订单号查询到订单数据,并将数据保存到异常消息表中,定时补发,并报警人工处理 * */ String orderId = correlationData.getId(); } else { //查询订单号是否在异常消息表,在的话要删除 log.info(">>>下单消息发送成功{}<<<", correlationData); } } @Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { //消息到达交换机,没有路由到队列,根据订单号查询到订单数据,并将数据保存到异常消息表中,定时补发,并报警人工处理 /* * 1 交换机没有绑定队列 * 2 交换机根据路由键没有匹配到队列 * 3 队列消息已满 * */ byte[] body = message.getBody(); JSONObject json = JSONObject.parseObject(new String(body)); System.out.println("return============================"); System.out.println(message); }
消息确认回调:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。