赞
踩
视频来源: 【Java项目《谷粒商城》Java架构师 | 微服务 | 大型电商项目】
本笔记对应视频集数: P248 ~ P260
RabbitMQ 是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包
裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是
一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收,
存储和转发消息数据。
RabbitMQ 的三大特点
流量消峰:
该场景一般在秒杀或者团购活动中使用广泛。
使用 MQ 后,用户的大量请求不在直接访问数据库,而是将大量请求积压在 MQ 消息队列中,数据库从 MQ 中拉取能处理的请求,避免了数据库因为大量请求出现崩溃、宕机等情况
应用解耦:
传统做法订单系统直接调用其他接口,如果有一个接口出现问题,整个订单系统无法正常运转的。
使用 MQ 后,将 MQ 作为中间件与其他接口相连,即使有一个接口出现问题,其他还是正常运转的。
异步处理:
场景说明:用户注册后,需要发送注册邮件和注册短信,传统的做法:1、串行方式 2、并行方式 3、MQ 消息队列
1、一套流程全部完成后,返回客户端
2、发送邮件的同时发送短信,节省了一定的时间
3、使用 MQ
Message
:消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成, 这些属性包括routing-key(路由键)
、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可 能需要持久性存储)等。
Producer/publisher
:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。
Consumer
:消息消费者,即消费方客户端,接收MQ转发的消息。
Broker
:消息队列服务进程,此进程包括两个部分:Exchange 和 Queue。
Exchange
:消息队列交换机,按一定的规则将消息路由转发到某个队列,根据message中的routing-key决定转发到哪个 Queue 中
Queue
:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。
Binding
: 绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交 换器理解成一个由绑定构成的路由表。 Exchange 和Queue的绑定可以是多对多的关系。
Connection
: 网络连接,比如一个TCP连接。
Channel
: 信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP 命令都是通过信道 发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都 是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
-----发送消息-----
1、生产者和Broker建立TCP连接。
2、生产者和Broker建立通道。
3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
4、Exchange根据routing-key将消息转发到指定的Queue(队列)
----接收消息-----
1、消费者和Broker建立TCP连接
2、消费者和Broker建立通道
3、消费者监听指定的Queue(队列)
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、消费者接收到消息。
1、拉取镜像并运行实例
docker run -d --name rabbitmq --restart=always -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
4369, 25672 (Erlang发现&集群端口)
5672, 5671 (AMQP端口)
15672 (web管理后台端口)
61613, 61614 (STOMP协议端口)
1883, 8883 (MQTT协议端口)
Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、 fanout、topic、headers
。headers 匹配 AMQP 消息的 header 而不是路由键, headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接 看另外三种类型
直连交换机通过指定的 routing-key 连接指定的队列,是一种完全匹配的方式。
fanout 交换机会将消息转发到所有与它绑定的队列上,无论是否指定了routing-key。是一种 广播的模式,
并且 fanout 交换机时散发消息最快的,因为无需判断 routing-key
topic 交换器通过模式匹配分配消息的 路由键属性,将路由键和某个模式进行 匹配,此时队列需要绑定到一个模式上。 它将路由键和绑定键的字符串切分成单 词,这些单词之间用点隔开。
它同样也 会识别两个通配符:符号“#”和符号 “*”。
# 匹配0个或多个单词, * 匹配一 个单词。
IP:15762
进入到管理界面
默认用户: guest
默认密码:guest
1、创建交换机
2、创建队列
3、交换机绑定队列
如果使用 Topic 交换机,可以在绑定队列时,指明routing-key 使用通配符的方式:
4、发送消息
查看队列接受的消息:
1、引入依赖
RabbitAutoConfiguration 生效
引入了 CachingConnectionFactory、RabbitTemplate、AmqpAdmin、RabbitMessagingTemplate
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、RabbitMQ 的所有配置都在 RabbitProperties 。 进行配置
spring:
rabbitmq:
host: 192.168.56.111
port: 5672
virtual-host: /
3、开启RabbitMQ
@EnableRabbit
使用Java代码创建 Exchange、Queueu、Binding:
@SpringBootTest @Slf4j @RunWith(SpringRunner.class) public class GulimallOrderApplicationTest { @Autowired private AmqpAdmin amqpAdmin; /* * 创建一个交换机 * public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) * name 交换机名字 * durable 是否持久化 * autoDelete 是否自动删除 * arguments其他的一些参数 * */ @Test public void createExchange() { Exchange exchange = new DirectExchange("hello-java-exchange",true,false); // 声明一个交换机 amqpAdmin.declareExchange(exchange); log.info("交换机创建成功:{}","hello-java-exchange"); } /* * 创建一个队列 * public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) * name:队列名 * durable:是否持久化 * exclusive:是否是排他的 * autoDelete:是否自动删除 * arguments:其他的一些参数 * */ @Test public void createQueue() { Queue queue = new Queue("hello-java-queue",true,false,false); amqpAdmin.declareQueue(queue); log.info("队列创建成功:{}","hello-java-queue"); } /* * 创建绑定关系 * public Binding(String destination, DestinationType destinationType, String exchange, String routingKey, * Map<String, Object> arguments) * * destination:绑定目标,绑定的队列名 * destinationType:绑定类型,QUEEN or EXCHANGE * exchange: 绑定的交换机名 * routingKey:路由键 * arguments 其他参数 * */ @Test public void createBinding() { Binding binding = new Binding( "hello-java-queue", Binding.DestinationType.QUEUE, "hello-java-exchange", "hello.java", null); amqpAdmin.declareBinding(binding); log.info("绑定成功{},{}","hello-java-exchange","hello-java-queue"); } }
/*
* 发送消息
* */
@Test
public void senMessage() {
String msg = "hello,world";
// public void convertAndSend(String exchange, String routingKey, final Object object)
// 交换机名称、路由键、消息
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",msg);
log.info("发送消息成功:{}",msg);
}
发送实体类:
/*
* 发送消息
* */
@Test
public void senMessage() {
OrderEntity orderEntity = new OrderEntity();
orderEntity.setId(2L);
orderEntity.setCreateTime(new Date());
// String msg = "hello,world";
// public void convertAndSend(String exchange, String routingKey, final Object object)
// 交换机名称、路由键、消息
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",orderEntity);
log.info("发送消息成功:{}",orderEntity);
}
在发送实体类时,默认是使用 jdk 的序列化机制,并且要求实体类实现 Serializable 接口。
可以自定义消息转换器,使用不同的序列化方式,在 RabbitTemplate 中默认使用的是 SimpleMessageConverter 消息转换器。
在 SimpleMessageConverter 中可以看见,如果是 string类型的消息,直接转化为 byte 流发送
如果实现了 Serializable 接口,就按照 jdk 的方式序列化
可自定义的消息转换器,我们使用 Jackson2JsonMessageConverter
创建配置类,自定义消息转换器:
@Configuration
public class MyRabbitConfig {
/*
* 自定义消息转换器
* */
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
重新发送消息:已经序列化为Json
使用 @RabbitListener 接受消息:
/* * RabbitMQ 接收消息 * 1、使用 @RabbitListener 接收消息,必须使用 @EnableRabbit 开启接受消息 * queueu 是一个 String[], 可指定接受多个队列的消息 * 参数可接收的类型: * rg.springframework.amqp.core.Message;: 封装的内容比较全,消息头+消息体 * T : 可接收发送消息的类型 * Channel channel : 信道信息 * (1) 在多服务下,一条消息只能有一个客户端接收 * (2) 处理完一条消息后,才能接收下一条消息 * * 2、使用 @RabbitHandler + @RabbitListener 接受不同类型的消息 * @RabbitHandler:标注在方法上 * @RabbitListener: 标注在类、方法上 * */ @RabbitListener(queues = {"hello-java-queue"}) public void receiveOrderEntityMessage(Message message, OrderEntity entity, Channel channel) { byte[] body = message.getBody(); MessageProperties header = message.getMessageProperties(); // System.out.println("接收到的消息: " + message); System.out.println("接收到的消息体:" + entity); }
使用 @RabbitHandler + @RabbitListener
接收不同类型的消息:
@RabbitHandler 标注在方法上
@RabbitListener 标注在方法、类上
如果我们发送消息的类型不是一种类型,单独使用 @RabbitListener 还需要获取 body 的数据然后判断类型,非常麻烦,这时就可以组合使用 @RabbitHandler + @RabbitListener
接收不同的消息:
例子
消息发送者:
@RestController @Slf4j public class RabbitController { @Autowired private RabbitTemplate rabbitTemplate; @RequestMapping("/sendMQ/{num}") public String sendMQ(@PathVariable("num") Integer num) { for (Integer i = 0; i < num; i++) { if (i % 2 == 0) { OrderEntity orderEntity = new OrderEntity(); orderEntity.setId(i.longValue()); orderEntity.setCreateTime(new Date()); // String msg = "hello,world"; // public void convertAndSend(String exchange, String routingKey, final Object object) // 交换机名称、路由键、消息 rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",orderEntity); log.info("发送消息成功:{} 第", + i + "条orderEntity消息.."); }else { OrderReturnApplyEntity orderReturnApplyEntity = new OrderReturnApplyEntity(); orderReturnApplyEntity.setId(i.longValue()); orderReturnApplyEntity.setCreateTime(new Date()); rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",orderReturnApplyEntity); log.info("发送消息成功:{} 第", + i + "条orderReturnApplyEntity消息.."); } } return "sendMQ OK"; } }
消息接收者:
@RabbitListener 标注在类上,指明接受哪个队列的消息,使用 @RabbitHandler 标注在不同的方法上,一个方法接收一种类型的数据
@Service("orderItemService") @RabbitListener(queues = {"hello-java-queue"}) public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService { /* * RabbitMQ 接收消息 * 1、使用 @RabbitListener 接收消息,必须使用 @EnableRabbit 开启接受消息 * queueu 是一个 String[], 可指定接受多个队列的消息 * 参数可接收的类型: * rg.springframework.amqp.core.Message;: 封装的内容比较全,消息头+消息体 * T : 可接收发送消息的类型 * Channel channel : 信道信息 * (1) 在多服务下,一条消息只能有一个客户端接收 * (2) 处理完一条消息后,才能接收下一条消息 * * 2、使用 @RabbitHandler + @RabbitListener 接受不同类型的消息 * @RabbitHandler:标注在方法上 * @RabbitListener: 标注在类、方法上 * */ // @RabbitListener(queues = {"hello-java-queue"}) @RabbitHandler public void receiveOrderEntityMessage(Message message, OrderEntity entity, Channel channel) { byte[] body = message.getBody(); MessageProperties header = message.getMessageProperties(); // System.out.println("接收到的消息: " + message); System.out.println("接收到的消息体:" + entity); } @RabbitHandler public void receiveOrderReturnApplyEntityMessage(Message message, OrderReturnApplyEntity entity,Channel channel) { // System.out.println("接收到的消息: " + message); System.out.println("接收到的消息体:" + entity); } }
结果:
在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败, 导致消息丢失,需要手动处理和恢复。因此保证 RabbitMQ 消息的可靠投递,需要采取一些措施。
可以使用事务消息,但是性能下降250倍,为此引入确认 机制
根据RabbitMQ消息的投递流程,可将确认机制分为俩部分:
第一部分:消息生产者的确认回调
第二部分:消息消费者的确认
消息生产者:
@RestController @Slf4j public class RabbitController { @Autowired private RabbitTemplate rabbitTemplate; @RequestMapping("/sendMQ/{num}") public String sendMQ(@PathVariable("num") Integer num) { for (Integer i = 0; i < num; i++) { if (i % 2 == 0) { OrderEntity orderEntity = new OrderEntity(); orderEntity.setId(i.longValue()); orderEntity.setCreateTime(new Date()); /* * public void convertAndSend(String exchange, String routingKey, final Object object,@Nullable CorrelationData correlationData) * exchange: 交换机名称 * routingKey 路由键 * object 消息体 * CorrelationData 消息 id * */ rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",orderEntity,new CorrelationData(UUID.randomUUID().toString())); // log.info("发送消息成功:{}",i); }else { OrderReturnApplyEntity orderReturnApplyEntity = new OrderReturnApplyEntity(); orderReturnApplyEntity.setId(i.longValue()); orderReturnApplyEntity.setCreateTime(new Date()); rabbitTemplate.convertAndSend("hello-java-exchange","hello.java11",orderReturnApplyEntity,new CorrelationData(UUID.randomUUID().toString())); // log.info("发送消息成功:{}",i); } } return "sendMQ OK"; } }
配置 ConfirmCallback、ReturnCallback 回调
@Configuration public class MyRabbitConfig { @Autowired private RabbitTemplate rabbitTemplate; /* * 自定义消息转换器 * */ @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } /* * 设置发布确认机制 * 1、ConfirmCallback,只要生产者发送消息就会执行此回调。 * spring.rabbitmq.publisher-confirms=true * 2、ReturnCallback 只有交换机将消息转发到Queue失败时,才会调用此回调 * # 开启发送端确认机制。 Exchange --> Queue * spring.rabbitmq.publisher-returns=true * # 只要消息成功发送到Queue,就优先异步调用 ReturnCallback * spring.rabbitmq.template.mandatory=true * */ @PostConstruct // MyRabbitConfig初始化之后执行 public void InitRabbitTemplate() { rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * @description * @date 2023/1/31 18:55 * @param correlationData 保存消息的id以及相关信息,可在发送消息时指定 new CorrelationData() * @param ack 消息是否发送成功。true:Broke接收到消息, false:Broker没有接收到消息 * @param cause 消息发送失败的原因 * @return void */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { System.out.println("Broker接收消息成功, correlationData: " + correlationData + " ack:" + ack + " cause:" + cause); } else { System.out.println("Broker接收消息失败, correlationData: " + correlationData + " ack:" + ack + " cause:" + cause); } } }); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { /** * @description * @date 2023/1/31 22:25 * @param message 投递失败的消息 * @param replyCode 回复的状态码 * @param replyText 回复的文本 * @param exchange 投递失败的交换机 * @param routingKey 投递失败消息的 routing-key * @return void */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("message: " + message + " replyCode: " + replyCode + " replyText: " + replyText + " exchange: " + exchange + " routingKey: " + routingKey); } }); }
生产端确认的配置:
# 开启发送端确认机制。 生产者 --> Broker
spring.rabbitmq.publisher-confirms=true
# 开启发送端确认机制。 Exchange --> Queue
spring.rabbitmq.publisher-returns=true
# 只要消息成功发送到Queue,就优先异步调用 ReturnCallback
spring.rabbitmq.template.mandatory=true
消费端消费一个消息默认是自动确认的,当消费者启动时,队列中数据会全部转发给消费者处理,并自动进行消息确认,在队列中删除消息。但是当消费者处理完一条消息后,突然宕机,就会造成其他消息的丢失。
因此在消费者接收消息时应该使用手动确认模式,只要消息没有手动进行 Ack,消息就一直是 unChecked,即使宕机也不会丢失,会重新进入到 Ready 状态。
开启手动确认:
# 设置手动Ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
消费端手动确认方法: channel.basicAck()
消费端手动拒绝方法: channel.basicReject()/ channel.basicNack()
/* * RabbitMQ 接收消息 * 1、使用 @RabbitListener 接收消息,必须使用 @EnableRabbit 开启接受消息 * queueu 是一个 String[], 可指定接受多个队列的消息 * 参数可接收的类型: * rg.springframework.amqp.core.Message;: 封装的内容比较全,消息头+消息体 * T : 可接收发送消息的类型 * Channel channel : 信道信息 * (1) 在多服务下,一条消息只能有一个客户端接收 * (2) 处理完一条消息后,才能接收下一条消息 * * 2、使用 @RabbitHandler + @RabbitListener 接受不同类型的消息 * @RabbitHandler:标注在方法上 * @RabbitListener: 标注在类、方法上 * * */ // @RabbitListener(queues = {"hello-java-queue"}) @RabbitHandler public void receiveOrderEntityMessage(Message message, OrderEntity entity, Channel channel) { // byte[] body = message.getBody(); // MessageProperties header = message.getMessageProperties(); // System.out.println("接收到的消息: " + message); // System.out.println("接收到的消息体:" + entity); /* * 消息确认 * void basicAck(long deliveryTag, boolean multiple) throws IOException; * deliveryTag: 消息标签,channel内顺序自增 * multiple 是否批量确认 * 拒绝消息 * void basicNack(long deliveryTag, boolean multiple, boolean requeue) * multiple 是否批量拒绝消息 * requeue 拒绝的消息是否重新入队。如果重新入队还重新发送给消费者 * void basicReject(long deliveryTag, boolean requeue) throws IOException; * 与 basicNack 区别就是没有批量拒绝消息 * * */ long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { if (deliveryTag % 2 == 0) { // 手动确认消息 channel.basicAck(deliveryTag,false); System.out.println("签收了货物..." + deliveryTag); }else { // 拒绝消息 channel.basicNack(deliveryTag,false,true); // channel.basicReject(); System.out.println("没有签收货物..." + deliveryTag); } } catch (IOException e) { e.printStackTrace(); } }
延时队列使用场景:
订单超过指定时间未支付,解锁库存。
一些概念说明:
消息TTL【Time To Live】:消息存活时间。
RabbitMQ 可以为队列
和 消息
分别设置 TTL。 对队列设置 TTL,就是对队列中的所有消息设置 TTL
死信【Dead Letter】 : 顾名思义就是死掉的消息,没有消费者接收的消息。
死信的来源:
死信交换机【Dead Letter Exchange】 : 死信交换机和普通交换机的创建一样,唯一的区别就是死信交换机专门处理 死信
, 与死信交换机绑定的队列中如果有死信,就会被转发到死信交换机中做下一步处理。
因此,我们可以使用 TTL + 死信交换机就可以实现一个延迟队列。
延迟队列的实现方式一:为队列设置 TTL
生产者发送的消息到达设置TTL的队列后,如果在这个指定时间内没有消费者接收,那么该消息就变为 死信
,同时转发给死信交换机,由死信交换机转发到特定的队列中再次进行消费。
延迟队列实现方式二:为消息设置TTL
为生产者发送的每一条消息都设置TTL,不推荐这种方法。
因为 RabbitMQ 使用的惰性机制对消息进行检查,如果第一条消息的 TTL = 5min,第二条消息的 TTL = 30s。
第三条消息的 TTL = 1s ,RabbitMQ检查第一条消息一看 5 分钟过期,就会5分钟后来检查,那么第二、第三条消息都会在 5min 后 转发给死信交换机。
模拟 订单超时关闭的场景
由生产者P 向 order-event- exchange 交换机发送订单消息,路由键为 order.create.order。
交换机与俩个队列绑定:
第一个:order.delay.queue 为延迟队列,通过order.create.order 路由键绑定,设置三个参数,死信交换机、死信路由键、TTL
第二个:order.release.order.queue 普通队列,通过 order.release.order 路由键绑定。
在页面下单之后,随之生产者会向交换机发送一条订单创建消息,路由键为 order.create.order
, 交换机会将此消息发送到 延迟队列,等到达指定的 TTL 之后,说明订单超时未支付,将消息转发到绑定的 死信交换机 中,交换机在通过 order.release.order queue 队列转发给消费者 C
消费者:创建订单完成后,向RabbitMQ 发送消息
@RestController @Slf4j public class RabbitController { @Autowired private RabbitTemplate rabbitTemplate; // 模拟生成订单,向MQ发送消息 @RequestMapping("/sendMQ/createOrder") public String createOrder() { // 创建订单 OrderEntity orderEntity = new OrderEntity(); orderEntity.setOrderSn(UUID.randomUUID().toString()); // 向MQ发送消息,监听订单是否支付成功 // String exchange, String routingKey, Object message,CorrelationData correlationData // 交换机、消息的路由键,发送的消息,消息的唯一标识 rabbitTemplate.convertAndSend("order-event- exchange","order.create.order",orderEntity); return "Order created !!"; } }
创建交换机、队列,绑定关系,消费者:
@Configuration public class MyMQConfig { // 消费者 @RabbitListener(queues = "order.release.order.queue") public void consumer(OrderEntity order, Message message, Channel channel) throws IOException { System.out.println("订单超时未支付,即将关闭订单: " + order.getOrderSn()); // 手动确认 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); } /* * 使用 @Bean 的方式创建 Exchange、Queue、Binding...服务启动会自动向RabbitMQ创建。 * 前提是RabbitMQ中没有这些 Exchange、Queue、Binding... 如果存在,即使配置不一样也不会重新创建。 * */ // 延迟队列 @Bean public Queue orderDelayQueue() { // String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments HashMap<String, Object> arguments = new HashMap<>(); // 设置与队列相连的死信交换机 arguments.put("x-dead-letter-exchange","order-event-exchange"); // 转发死信的 路由键 arguments.put("x-dead-letter-routing-key","order.release.order"); // 设置队列的 TTL。超过30s就表示未支付订单,准备关闭 arguments.put("x-message-ttl",3000); return new Queue("order.delay.queue",true,false,false,arguments); } // 普通队列 @Bean public Queue orderReleaseOrderQueue() { return new Queue("order.release.order.queue",true,false,false,null); } // 交换机 @Bean public TopicExchange orderEventExchange() { //String name, boolean durable, boolean autoDelete) return new TopicExchange("order-event-exchange",true,false); } // 设置绑定关系: order-event- exchange ——》order.delay.queue @Bean public Binding orderCreateOrder() { //String destination, DestinationType destinationType, String exchange, String routingKey,Map<String, Object> arguments // 绑定目的地-绑定的队列,绑定类型【交换机 OR 队列】,交换机,路由键,其他参数信息 return new Binding( "order.delay.queue", Binding.DestinationType.QUEUE, "order-event- exchange", "order.create.order", null); } // 设置绑定关系: order-event- exchange ——》order.release.order.queue @Bean public Binding orderReleaseOrder() { return new Binding( "order.release.order.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.order", null); } }
启动服务,已成功创建出交换机、队列
创建四条订单,已成功监听到…
消息发送出去,由于网络问题没有抵达服务器
消息抵达Broker,Broker要将消息写入磁盘(持久化)才算成功。此时Broker尚 未持久化完成,宕机。
自动ACK的状态下。消费者收到消息,但没来得及消息然后宕机
总体来说:
1、一定要在 publisher【回调机制】、consumer【手动确认】 俩端做消息确认机制
2、将消息的状态信息保存到数据库中,比如可以创建如下这张表
CREATE TABLE `mq_message`(
`message_id` CHAR(32) NOT NULL,
`content` TEXT,
`to_exchane` VARCHAR(255) DEFAULT NULL,
`routing_key` VARCHAR(255) DEFAULT NULL,
`class_type` VARCHAR(255) DEFAULT NULL,
`message_status` INT(1) DEFAULT '0' COMMENT '0-新建1-己发送2-错误抵达3-己抵达',
`create_time` DATETIME DEFAULT NULL,
`update_time` DATETIME DEFAULT NULL,
PRIMARY KEY (`message_id`)
) ENGINE=INNODB DEFAULT CHARSET=utf8mb4
(1)消息消费成功,事务已经提交,ack时,机器宕机。导致没有ack成功,Broker的消息 重新由unack变为ready,并发送给其他消费者
(2)消息消费失败,由于重试机制,自动又将消息发送出
(3)成功消费,ack时宕机,消息由unack变为ready,Broker又重新发送
解决方案:
消费者的业务消费接口应该设计为幂等性的。比如扣库存有 工作单的状态标志
使用防重表(redis/mysql),发送消息每一个都有业务的唯 一标识,处理过就不用处理
rabbitMQ的每一个消息都有redelivered字段,可以获取是否 是被重新投递过来的,而不是第一次投递过来的
(1)消费者宕机积压
(2)消费者消费能力不足积压
(3)发送者发送流量太大
解决方案
总体来说:
1、一定要在 publisher【回调机制】、consumer【手动确认】 俩端做消息确认机制
2、将消息的状态信息保存到数据库中,比如可以创建如下这张表
CREATE TABLE `mq_message`(
`message_id` CHAR(32) NOT NULL,
`content` TEXT,
`to_exchane` VARCHAR(255) DEFAULT NULL,
`routing_key` VARCHAR(255) DEFAULT NULL,
`class_type` VARCHAR(255) DEFAULT NULL,
`message_status` INT(1) DEFAULT '0' COMMENT '0-新建1-己发送2-错误抵达3-己抵达',
`create_time` DATETIME DEFAULT NULL,
`update_time` DATETIME DEFAULT NULL,
PRIMARY KEY (`message_id`)
) ENGINE=INNODB DEFAULT CHARSET=utf8mb4
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。