赞
踩
死信队列,简称:DLX
,Dead Letter Exchange
(死信交换机),当消息成为Dead message
后,可以被重新发送到另外一个交换机,这个交换机就是DLX
那么什么情况下会成为Dead message?
队列的长度达到阈值。
消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false。
原队列存在消息过期设置,消息到达超时时间未被消费。
流程讲解,如图所示(以第三种情况为例):
Producer发送一条消息到Exchange并路由到设有过期时间(假设30分钟)的Queue中。
当消息的存活时间超过了30分钟后,Queue会将消息转发给DLX。
DLX接收到Dead message后,将Dead message路由到与其绑定的Queue中。
此时消费者监听此死信队列并消费此消息。
那么什么情况下会成为Dead message?
队列的长度达到阈值。
消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false。
原队列存在消息过期设置,消息到达超时时间未被消费。
流程讲解,如图所示(以第三种情况为例):
Producer发送一条消息到Exchange并路由到设有过期时间(假设30分钟)的Queue中。
当消息的存活时间超过了30分钟后,Queue会将消息转发给DLX。
DLX接收到Dead message后,将Dead message路由到与其绑定的Queue中。
此时消费者监听此死信队列并消费此消息。
死信队列有什么用呢?
因为要实现延迟消息,我们先得知道如何设置过期时间。这里指演示
TTL :Time To Live(存活时间/过期时间),当消息到达存活时间后,还没有被消费,会被自动清除。
RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。
设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。
设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。 例:现在有两条消息,第一条消息过期时间为30s,而第二条消息过期时间为15s,当过了15秒后,第二条消息不会立即过期,而是要等第一条消息被消费后,第二条消息被消费时,才会判断是否过期,所以当所有消息的过期时间一致时(比如30m后过期),最好给队列设置过期时间,而不是消息。但是有的情况确实每个消息的过期时间不一致,比如海底捞预约,每个人预约的时间段不一致,有个可能一个小时后,有的可能三个小时等,当快到预约时间点需要给用户进行短信通知,这就有问题了,不可能设置那么多的队列。
如果两者都进行了设置,以时间短的为准。
动态绑定队列和重试队列:
/** * 遍历所有的 枚举队列 手动注册队列等相关bean到spring容器中 */ @Component public class QueueAutoRegisterAware implements BeanFactoryAware { @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) beanFactory; for (QueueDeclareEnum value : QueueDeclareEnum.values()) { // 正常消息 队列 交换器 路由键绑定 Queue queue = new Queue(value.getQueueName()); DirectExchange directExchange = new DirectExchange(value.getExchangeName()); Binding binding = BindingBuilder.bind(queue).to(directExchange).with(value.getRoutingKey()); // 注册bean defaultListableBeanFactory.registerSingleton(value.getQueueName(), queue); defaultListableBeanFactory.registerSingleton(value.getExchangeName(), directExchange); defaultListableBeanFactory.registerSingleton(value.getRoutingKey(), binding); // TODO 重试队列 交换器 路由键(暂时没有处理, 后续需要用到mq机制则可以进行加入AOP切面重试机制) if (StringUtil.isNotBlank(value.getRetryQueueName()) && StringUtil.isNotBlank(value.getRetryRoutingKey())) { Map<String, Object> dlqParamMap = new HashMap<>(2); dlqParamMap.put("x-dead-letter-exchange", value.getExchangeName()); dlqParamMap.put("x-dead-letter-routing-key", value.getRoutingKey()); Queue retryQueue = new Queue(value.getRetryQueueName(), true, false, false, dlqParamMap); Binding retryBindIng = BindingBuilder.bind(retryQueue).to(directExchange).with(value.getRetryRoutingKey()); defaultListableBeanFactory.registerSingleton(value.getRetryQueueName(), retryQueue); defaultListableBeanFactory.registerSingleton(value.getRetryRoutingKey(), retryBindIng); } } } }
Aop异常切面,拦截异常把设置消息的重试次数和ttl过期时间,发送重试队列中;
/** * RabbitMQ监听器 切面 * 异常消息发送到重试队列 */ @Aspect @Component @Slf4j @AllArgsConstructor public class RabbitListenerAspect { private final RabbitTemplate rabbitTemplate; private final IFailureService failureService; private final TmsProperties tmsProperties; private final IOrderRequestService orderRequestService; @Pointcut("@annotation(org.springframework.amqp.rabbit.annotation.RabbitListener)") public void pointCut() { } @Around("pointCut()") public Object around(ProceedingJoinPoint joinPoint) { log.debug("MQ切面接收到消息"); AtomicReference<Message> messageReference = new AtomicReference<>(); Arrays.stream(joinPoint.getArgs()).forEach(arg -> { if (arg instanceof Message) { messageReference.set((Message) arg); } }); Message message = messageReference.get(); if (message == null) { log.debug("mq消息内容为空,不执行相关操作"); return null; } Object proceed; // 消费出现异常时 会根据配置发送到重试队列 try { //绑定租户 String tenantId = message.getMessageProperties().getHeader(ForecastConstant.TENANT_ID); if (tmsProperties.isTenantApp()) { TmsTenantUtil.bindId(tenantId); } proceed = joinPoint.proceed(); if (tmsProperties.isTenantApp()) { TmsTenantUtil.unbind(); } } catch (ForecastException e) { log.info("预报失败,直接回传失败信息", e); this.forecastMqExceptionDeal(message, e); return null; } catch (Throwable throwable) { log.error("消费失败,异常信息:{}", throwable); this.sendToRetryQueue(message, throwable); return null; } return proceed; } /** * 发送到重试队列 * * @param message 消息内容 */ private void sendToRetryQueue(Message message, Throwable throwable) { sendToRetryQueue(message, throwable, true); } /** * 发送到重试队列 * * @param message 消息内容 */ private void sendToRetryQueue(Message message, Throwable throwable, Boolean addRetryCount) { String consumerQueue = message.getMessageProperties().getConsumerQueue(); // 重试次数 默认0 Optional<QueueDeclareEnum> first = Arrays.stream(QueueDeclareEnum.values()).filter(value -> value.getQueueName().equalsIgnoreCase(consumerQueue)).findFirst(); if (!first.isPresent()) { return; } QueueDeclareEnum queueDeclareEnum = first.get(); String retryQueueName = queueDeclareEnum.getRetryQueueName(); String retryRoutingKey = queueDeclareEnum.getRetryRoutingKey(); if (StringUtil.isEmpty(retryQueueName) || StringUtil.isEmpty(retryRoutingKey)) { log.info("当前队列没有配置重试队列 不进行重试,队列名:{}", queueDeclareEnum.getQueueName()); return; } Integer retryCount = (Integer) Optional.ofNullable(message.getMessageProperties(). getHeader(ForecastConstant.RETRY_COUNT)).orElse(0); //有效的操作 重试次数才+1 if (addRetryCount) { retryCount++; } if (retryCount == 1) { try { failureService.deal(message, throwable); } catch (Exception e) { log.error("失败最大次数处理异常", e); } } if (retryCount > queueDeclareEnum.getMaxRetryCount()) { log.info("当前消息超过队列配置最大重试次数,不进行重试,队列名:{}", queueDeclareEnum.getQueueName()); return; } //如果是手动重试的 则不进入重试队列 if (message.getMessageProperties().getHeader(ForecastConstant.IS_HAND)) { return; } Message retryMessage = MessageBuilder.fromMessage(message).setHeader(ForecastConstant.RETRY_COUNT, retryCount).build(); this.convertAndRetry(retryMessage, queueDeclareEnum); } /** * 预报失败处理 */ private void forecastMqExceptionDeal(Message message, ForecastException mqException) { switch (mqException.getErrorCode()) { case ERROR_CODE_1: OrderEntityBO orderEntityBO = (OrderEntityBO) mqException.getData(); //1.回传失败 failureService.deal(message, mqException); //2.塞到重试任务中去 第二天0时执行 orderRequestService.saveEntity(new OrderTaskEntity(orderEntityBO.getOrder().getCode(), OrderRequestTypeEnum.RETRY_FORECAST, getOperateTime())); //3.记录到redis中 redisDeal(orderEntityBO); break; case ERROR_CODE_2: //不增加重试次数 sendToRetryQueue(message, mqException, false); break; case ERROR_CODE_3: Integer retryCount = (Integer) Optional.ofNullable(message.getMessageProperties(). getHeader(ForecastConstant.RETRY_COUNT)).orElse(0); if (retryCount == 3) { WXCallUtil.call(mqException.getCallMessage()); } this.sendToRetryQueue(message, mqException); default: } } private void redisDeal(OrderEntityBO orderEntityBO) { String channelCode = orderEntityBO.getOrder().getChannelCode(); String country = orderEntityBO.getReceiver().getCountryCode(); String ruleName = country + "_" + channelCode; List<String> list = TmsRedisUtil.get(ForecastRedisConstant.CHANNEL_LIMIT_DATA); if (ObjectUtil.isEmpty(list)) { list = new ArrayList<>(); } if (list.contains(ruleName)) { return; } list.add(ruleName); TmsRedisUtil.set(ForecastRedisConstant.CHANNEL_LIMIT_DATA, list); TmsRedisUtil.expireAt(ForecastRedisConstant.CHANNEL_LIMIT_DATA, getSecondDayZero()); } /** * 获取第二天0时 date * * @return */ private Date getSecondDayZero() { Instant instant = new Date().toInstant(); return DateUtil.beginOfDay(Date.from(instant.plus(Duration.ofDays(1)))); } /** * 避免服务器时间不同步 向后兼容5分钟 * * @return */ private Date getOperateTime() { //如果当前时间小于0时5分 则下次重试时间设为当天零时 Date date = DateUtil.beginOfDay(new Date()); if (System.currentTimeMillis() - date.getTime() < 5 * 60 * 1000) { return date; } return getSecondDayZero(); } /** * 发送消息到重试队列 * * @param message 重试消息 * @param queueDeclareEnum 队列枚举 */ public void convertAndRetry(Message message, QueueDeclareEnum queueDeclareEnum) { String ttlTime = String.valueOf(queueDeclareEnum.getTtlTime() * (Integer) message.getMessageProperties().getHeader(ForecastConstant.RETRY_COUNT)); message.getMessageProperties().setExpiration(ttlTime); rabbitTemplate.convertAndSend(queueDeclareEnum.getExchangeName(), queueDeclareEnum.getRetryRoutingKey(), message); } /** * 发送消息到重试队列 * * @param message 重试消息 * @param queueDeclareEnum 队列枚举 */ public void convertAndRetry(Message message, QueueDeclareEnum queueDeclareEnum, String ttlTime) { message.getMessageProperties().setExpiration(ttlTime); rabbitTemplate.convertAndSend(queueDeclareEnum.getExchangeName(), queueDeclareEnum.getRetryRoutingKey(), message); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。