赞
踩
比如,系统之间同步数据,A系统发送数据给B系统,因为网络原因或者B系统正在重启,可能收不到信息,为了确保B能收到消息就得重试几次;经典的比如,微信支付回调
对后台通知交互时,如果微信收到商户的应答不符合规范或超时,微信认为通知失败,微信会通过一定的策略定期重新发起通知,尽可能提高通知的成功率,但微信不保证通知最终能成功。(通知频率为15s/15s/30s/3m/10m/20m/30m/30m/30m/60m/3h/3h/3h/6h/6h
- 总计 24h4m)
程序执行完后必须打印输出 success。如果商家反馈给支付宝的字符不是 success 这 7
个字符,支付宝服务器会不断重发通知,直到超过 24 小时 22 分钟。一般情况下,25 小时以内完成 8
次通知(通知的间隔频率一般是:4m,10m,10m,1h,2h,6h,15h)。
spring-retry的@Retryable方式,是阻塞式的,rabbitmq使用这种方式,如果重试次数过多,后边的消息会阻塞一直得不到处理,重试次数过少则不能保证对方收到回调;那提高消费者数量可以吗?也是不行的,最终会耗尽所有消费者。这就相当于你去银行办业务,轮到你时,你要办的业务正好办不了,窗口就一直等着,后边的人无法办业务;如果增加窗口的数量,同样的原因,最终导致全部窗口阻塞;
解决思路是,每个消息只分配给一次机会,失败后,放入延迟队列,然后处理下一个消息,到达延迟时间,消息再次入列,这样消息不会阻塞。这就相当于,轮到你时,要办的业务办不了,先让后边的人办,你在后边等一会,x时间后再办;
用一张图概括就是RabbitMQ Non-Blocking Retry Solutions in SpringBoot Solution B — Delay Plugin
首先rabbitmq要安装rabbitmq-delayed-message-exchange插件,我的rabbitmq用的是3.8.27所以使用对应的插件3.8.17,下载后文件名rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez
rabbitmq装在docker里,先进入docker
docker exec -it containerId /bin/bash
找到放插件的目录
root@xx:/# rabbitmq-plugins directories -s
// 插件在`/opt/rabbitmq/plugins`目录里
Plugin archives directory: /opt/rabbitmq/plugins
Plugin expansion directory: /var/lib/rabbitmq/mnesia/rabbit@3f09bf7c586f-plugins-expand
Enabled plugins file: /etc/rabbitmq/enabled_plugins
所以将宿主机的延迟插件复制到docker里。
docker cp rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez containerId:/opt/rabbitmq/plugins
启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
查看已启用的插件
root@xx:/# rabbitmq-plugins list --enabled
Listing plugins with pattern ".*" ...
Configured: E = explicitly enabled; e = implicitly enabled
| Status: * = running on rabbit@f1cb3aad0f70
|/
[E*] rabbitmq_delayed_message_exchange 3.8.9+1.g8f537ac
[E*] rabbitmq_management 3.8.27
[E*] rabbitmq_prometheus 3.8.27
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.1.7.RELEASE</version>
</dependency>
#RabbitMQ配置 rabbitmq: host: localhost port: 5672 username: username password: password listener: simple: # consumer收到消息,不回复ack acknowledge-mode: NONE # 抛出异常时,不再入队列 default-requeue-rejected: false #自定义的mq errorhandler,处理失败时的重试次数 retry: # 最大重试次数 max_retry: 5 # 延迟(毫秒) delay: 1000
搜了很多博客,都说要先通过management ui手动创建exchange,其实是不需要的,照下边这样就可以用代码创建exchange。
@Slf4j @Configuration public class MqConfig { @Autowired private AmqpTemplate amqpTemplate; /** * 延迟交换机名称 */ public static final String DELAY_EXCHANGE_NAME="delay-exchange"; /** * 延迟交换机的类型 */ public static final String DELAY_EXCHANGE_TYPE = "x-delayed-message"; /** * 错误重试延迟 */ private static Integer DELAY; /** * 最大重试次数,2880次*30秒=24小时,也就是说,默认重试24小时 */ private static Integer MAX_RETRY; @Value("${retry.max_retry}") public void setMaxRetry(Integer maxRetry){ MAX_RETRY = maxRetry; } @Value("${retry.delay}") public void setDelay(Integer delay){ DELAY = delay; } /** * 创建延迟交换机,必须先创建才能监听 * @return */ @Bean public CustomExchange delayExchange() { CustomExchange customExchange = new CustomExchange(MqConfig.DELAY_EXCHANGE_NAME, MqConfig.DELAY_EXCHANGE_TYPE); customExchange.getArguments().put("x-delayed-type", ExchangeTypes.DIRECT); return customExchange; } // mq的异常处理器 @Bean public RabbitListenerErrorHandler retryErrorHandler() { RabbitListenerErrorHandler errorHandler = (amqpMessage, message, exception) -> { log.error("message监听器出错了",exception); MessageProperties messageProperties = amqpMessage.getMessageProperties(); Map<String, Object> headers = messageProperties.getHeaders(); Integer xRetryCount = ((Integer) headers.get("retry-count")); //根据失败次数,决定是否继续发送到延迟队列 if (xRetryCount == null) { xRetryCount = MAX_RETRY; } Integer retriedCount = (Integer) headers.get("retried-count"); if (retriedCount == null) { retriedCount = 1; } log.info("已执行次数:{},最大重试次数:{}",retriedCount,xRetryCount); if(retriedCount < xRetryCount){ log.info("已执行次数小于最大重试次数"); retriedCount++; headers.put("retried-count",retriedCount); String routingKey = messageProperties.getConsumerQueue(); messageProperties.setDelay(DELAY); log.info("延迟:{}毫秒", DELAY); log.info("路由key:{}",routingKey); amqpTemplate.send(DELAY_EXCHANGE_NAME, routingKey, amqpMessage); return null; } log.info("已执行次数达到最大重试次数了,不再进行重试"); return null; }; return errorHandler; } }
// 发送到了默认交换机(所有的队列都会绑定到默认交换机)
String message = "";
String routingKey = "队列名称"
amqpTemplate.convertAndSend(routingKey ,message);
/**
* 接收到消息后,将消息传给目的地
* @param message
*/
//队列又绑定了延迟交换机,默认交换机收到的消息会在此处理
@RabbitListener(bindings =@QueueBinding(value = @Queue("队列名称"),
exchange = @Exchange(value = MqConfig.DELAY_EXCHANGE_NAME,
type = MqConfig.DELAY_EXCHANGE_TYPE),
key = "队列名称"),errorHandler = "retryErrorHandler")
public void doSynDataToThirdApp(String message){
//http请求发送给第三方,如果出错,则会执行errorHandler ,从而实现重试
}
简单来说,就是将消息发送到默认交换机,errorHandler捕获到消息失败,如果未达到最大重试次数,则设置延迟时间,发送到延迟交换机;如果已经达到重试次数,则结束,不再入队列;这里要注意,发送是默认交换机,再次入队列时是延迟交换机。
以上是点对点模式的重试机制,广播模式的重试略有不同,广播模式可参考springboot rabbitmq 发布订阅 广播模式
// 发送到fanout交换机
String message = "";
String exchangeName = "fanout交换机名称"
//因为是fanout类型交换机,所以routingkey会被忽略,故此取空字符串
String routingKey = ""
amqpTemplate.convertAndSend(exchangeName ,routingKey ,message);
队列要同时绑定fanout交换机和延迟交换机
/** * * 队列同时绑定fanout交换机和延迟交换机 * @param message */ @RabbitListener(bindings = { @QueueBinding(value = @Queue("队列名称"), exchange = @Exchange(value = "fanout交换机名称", type = ExchangeTypes.FANOUT)), @QueueBinding(value = @Queue("队列名称"), exchange = @Exchange(value = MqConstant.DELAY_EXCHANGE_NAME, type = MqConstant.DELAY_EXCHANGE_TYPE), key = "队列名称") }, errorHandler = "retryErrorHandler") public void listen(String message) { log.debug("广播模式,收到消息",message); }
重试机制的关键是,错误处理器捕获到错误,发送到延迟交换机,所以队列要与延迟交换机绑定;也就是说,队列要绑定两个交换机,第一个交换机用来接收广播消息,第二个交换机(延迟交换机)用来处理失败时的重试。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。