赞
踩
使用消息队列进行异步通知需要保证消息的可靠性,即生产端将消息成功通知到消费端。
消息从生产端发送到消费端经历了如下过程:
1、消息发送到交换机
2、消息由交换机发送到队列
3、消息者收到消息进行处理
保证消息的可靠性需要保证以上过程的可靠性,本项目使用RabbitMQ可以通过如下方面保证消息的可靠性。
1、生产者确认机制
发送消息前使用数据库事务将消息保证到数据库表中
成功发送到交换机将消息从数据库中删除
2、mq持久化
mq收到消息进行持久化,当mq重启即使消息没有消费完也不会丢失。
需要配置交换机持久化、队列持久化、发送消息时设置持久化。
3、消费者确认机制
消费者消费成功自动发送ack,否则重试消费。
以订单服务为列进行演示
首先在订单服务添加消息队列依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置文件配置
spring: rabbitmq: host: 192.168.101.65 port: 5672 username: guest password: guest virtual-host: / publisher-confirm-type: correlated #correlated 异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback publisher-returns: false #开启publish-return功能,同样是基于callback机制,需要定义ReturnCallback template: mandatory: false #定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息 listener: simple: acknowledge-mode: none #出现异常时返回unack,消息回滚到mq;没有异常,返回ack ,manual:手动控制,none:丢弃消息,不回滚到mq retry: enabled: true #开启消费者失败重试 initial-interval: 1000ms #初识的失败等待时长为1秒 multiplier: 1 #失败的等待时长倍数,下次等待时长 = multiplier * last-interval max-attempts: 3 #最大重试次数 stateless: true #true无状态;false有状态。如果业务中包含事务,这里改为false
在订单服务service工程编写MQ配置类,配置交换机
import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Slf4j @Configuration public class PayNotifyConfig implements ApplicationContextAware { //交换机 public static final String PAYNOTIFY_EXCHANGE_FANOUT = "paynotify_exchange_fanout"; //支付结果通知消息类型 public static final String MESSAGE_TYPE = "payresult_notify"; //支付通知队列 public static final String PAYNOTIFY_QUEUE = "paynotify_queue"; //声明交换机,且持久化 @Bean(PAYNOTIFY_EXCHANGE_FANOUT) public FanoutExchange paynotify_exchange_fanout() { // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除 return new FanoutExchange(PAYNOTIFY_EXCHANGE_FANOUT, true, false); } //支付通知队列,且持久化 @Bean(PAYNOTIFY_QUEUE) public Queue course_publish_queue() { return QueueBuilder.durable(PAYNOTIFY_QUEUE).build(); } //交换机和支付通知队列绑定 @Bean public Binding binding_course_publish_queue(@Qualifier(PAYNOTIFY_QUEUE) Queue queue, @Qualifier(PAYNOTIFY_EXCHANGE_FANOUT) FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); } //消息发送失败,将消息保存到记录中 @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { // 获取RabbitTemplate RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); //消息处理service MqMessageService mqMessageService = applicationContext.getBean(MqMessageService.class); // 设置ReturnCallback rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { // 投递失败,记录日志 log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}", replyCode, replyText, exchange, routingKey, message.toString()); //todo:保存代码 }); } }
重启订单服务,登录rabbitmq,查看交换机自动创建成功
查看队列自动成功
发送支付结果,MqMessage 需要发送消息的实体
public void notifyPayResult(MqMessage message) { //1、消息体,转json String msg = JSON.toJSONString(message); //设置消息持久化 Message msgObj = MessageBuilder.withBody(msg.getBytes(StandardCharsets.UTF_8)) .setDeliveryMode(MessageDeliveryMode.PERSISTENT) .build(); // 2.全局唯一的消息ID,需要封装到CorrelationData中 CorrelationData correlationData = new CorrelationData(message.getId().toString()); // 3.添加callback correlationData.getFuture().addCallback( result -> { if(result.isAck()){ // 3.1.ack,消息成功 log.debug("通知支付结果消息发送成功, ID:{}", correlationData.getId()); //删除消息表中的记录自行编写 }else{ // 3.2.nack,消息失败 log.error("通知支付结果消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason()); } }, ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage()) ); // 发送消息 rabbitTemplate.convertAndSend(PayNotifyConfig.PAYNOTIFY_EXCHANGE_FANOUT, "", msgObj,correlationData); }
在学习中心服务添加消息队列依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在消费服务接口工程置文件添加MQ配置
spring: rabbitmq: host: 192.168.101.65 port: 5672 username: guest password: guest virtual-host: / publisher-confirm-type: correlated #correlated 异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback publisher-returns: false #开启publish-return功能,同样是基于callback机制,需要定义ReturnCallback template: mandatory: false #定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息 listener: simple: acknowledge-mode: none #出现异常时返回unack,消息回滚到mq;没有异常,返回ack ,manual:手动控制,none:丢弃消息,不回滚到mq retry: enabled: true #开启消费者失败重试 initial-interval: 1000ms #初识的失败等待时长为1秒 multiplier: 1 #失败的等待时长倍数,下次等待时长 = multiplier * last-interval max-attempts: 3 #最大重试次数 stateless: true #true无状态;false有状态。如果业务中包含事务,这里改为false
添加配置类
import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Slf4j @Configuration public class PayNotifyConfig { //交换机 public static final String PAYNOTIFY_EXCHANGE_FANOUT = "paynotify_exchange_fanout"; //支付结果通知消息类型 public static final String MESSAGE_TYPE = "payresult_notify"; //支付通知队列 public static final String PAYNOTIFY_QUEUE = "paynotify_queue"; //声明交换机,且持久化 @Bean(PAYNOTIFY_EXCHANGE_FANOUT) public FanoutExchange paynotify_exchange_fanout() { // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除 return new FanoutExchange(PAYNOTIFY_EXCHANGE_FANOUT, true, false); } //支付通知队列,且持久化 @Bean(PAYNOTIFY_QUEUE) public Queue course_publish_queue() { return QueueBuilder.durable(PAYNOTIFY_QUEUE).build(); } //交换机和支付通知队列绑定 @Bean public Binding binding_course_publish_queue(@Qualifier(PAYNOTIFY_QUEUE) Queue queue, @Qualifier(PAYNOTIFY_EXCHANGE_FANOUT) FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); } }
监听MQ,接收支付结果
@RabbitListener(queues = PayNotifyConfig.PAYNOTIFY_QUEUE) public void receive(Message message, Channel channel) { try { Thread.sleep(5000); } catch (InterruptedException e) { throw new RuntimeException(e); } //获取消息 MqMessage mqMessage = JSON.parseObject(message.getBody(), MqMessage.class);//对应的消息实体 log.debug("服务接收支付结果:{}", mqMessage); //消息类型 //具体消费代码逻辑 } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。