赞
踩
消息堆积的产生场景:
1、生产者使用消息确认机制保证消息百分之百能够将消息投递到MQ成功。
2、MQ服务器端应该将消息持久化到硬盘
3、消费者使用手动ack机制确认消息消费成功
如果MQ服务器容量满了怎么办?
使用死信队列将消息存到数据库中去,后期补偿消费。
RabbitMQ死信队列俗称,备胎队列;消息中间件因为某种原因拒收该消息后,可以转移到死信队列中存放,死信队列也可以有交换机和路由key等。
产生背景:
代码案例:
maven依赖
<dependencies> <!-- springboot-web组件 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- 添加springboot对amqp的支持 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <!--fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.49</version> </dependency> </dependencies>
yml配置
server: # 服务启动端口配置 port: 8081 servlet: # 应用访问路径 context-path: / spring: #增加application.druid.yml 的配置文件 # profiles: # active: rabbitmq rabbitmq: ####连接地址 host: www.kaicostudy.com ####端口号 port: 5672 ####账号 username: kaico ####密码 password: kaico ### 地址 virtual-host: /kaicoStudy ###模拟演示死信队列 kaico: dlx: exchange: kaico_order_dlx_exchange queue: kaico_order_dlx_queue routingKey: kaico.order.dlx ###备胎交换机 order: exchange: kaico_order_exchange queue: kaico_order_queue routingKey: kaico.order
队列配置类
@Configuration public class DeadLetterMQConfig { /** * 订单交换机 */ @Value("${kaico.order.exchange}") private String orderExchange; /** * 订单队列 */ @Value("${kaico.order.queue}") private String orderQueue; /** * 订单路由key */ @Value("${kaico.order.routingKey}") private String orderRoutingKey; /** * 死信交换机 */ @Value("${kaico.dlx.exchange}") private String dlxExchange; /** * 死信队列 */ @Value("${kaico.dlx.queue}") private String dlxQueue; /** * 死信路由 */ @Value("${kaico.dlx.routingKey}") private String dlxRoutingKey; /** * 声明死信交换机 * * @return DirectExchange */ @Bean public DirectExchange dlxExchange() { return new DirectExchange(dlxExchange); } /** * 声明死信队列 * * @return Queue */ @Bean public Queue dlxQueue() { return new Queue(dlxQueue); } /** * 声明订单业务交换机 * * @return DirectExchange */ @Bean public DirectExchange orderExchange() { return new DirectExchange(orderExchange); } /** * 绑定死信队列到死信交换机 * * @return Binding */ @Bean public Binding binding() { return BindingBuilder.bind(dlxQueue()) .to(dlxExchange()) .with(dlxRoutingKey); } /** * 声明订单队列,并且绑定死信队列 * * @return Queue */ @Bean public Queue orderQueue() { // 订单队列绑定我们的死信交换机 Map<String, Object> arguments = new HashMap<>(2); arguments.put("x-dead-letter-exchange", dlxExchange); arguments.put("x-dead-letter-routing-key", dlxRoutingKey); return new Queue(orderQueue, true, false, false, arguments); } /** * 绑定订单队列到订单交换机 * * @return Binding */ @Bean public Binding orderBinding() { return BindingBuilder.bind(orderQueue()) .to(orderExchange()) .with(orderRoutingKey); } }
死信队列消费者
@Component
public class OrderDlxConsumer {
/**
* 死信队列监听队列回调的方法
* @param msg
*/
@RabbitListener(queues = "kaico_order_dlx_queue")
public void orderDlxConsumer(String msg) {
System.out.println("死信队列消费订单消息" + msg);
}
}
普通队列消费者
@Component
public class OrderConsumer {
/**
* 监听队列回调的方法
*
* @param msg
*/
@RabbitListener(queues = "kaico_order_queue")
public void orderConsumer(String msg) {
System.out.println("正常订单消费者消息msg:" + msg);
}
}
后台队列管理页面如下:
部署方式:死信队列不能够和正常队列存在同一个服务器中,应该分服务器存放。
订单30分钟未支付,系统自动超时关闭的实现方案。
实现逻辑:
主要使用死信队列来实现。
想要的代码:就是正常的消费者不消费消息,或者没有正常的消费者,在设置的时间后进入死信队列中,然后死信消费者实现相应的业务逻辑。
消息重试原理: 在重试的过程中,使用aop拦截我们的消费监听方法,也不会打印这个错误日志。如果重试多次还是失败,达到最大失败次数的时候才会打印错误日志。
如果消费多次还是失败的情况下:
1、自动删除该消息;(消息可能丢失)
解决办法:
第一步、springboot项目配置需要开启ack模式
acknowledge-mode: manual
第二步、消费者Java代码
int result = orderMapper.addOrder(orderEntity);
if (result >= 0) {
// 开启消息确认机制
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
什么是消息幂等性?MQ消费者如何保证幂等性?
产生的原因:就是因为消费者可能会开启自动重试,重试过程中可能会导致消费者业务逻辑代码重复执行。此刻消息已经消费了,因为业务报错导致消息重新消费,这时会出现
解决方案:采用消息全局id根据业务来定,根据业务id(全局唯一id)消费者可以判断这条消息已经消费了。
消费者代码逻辑:
分布式事务:在分布式系统中,因为跨服务调用接口,存在多个不同的事务,每个事务都互不影响。就存在分布式事务的问题。
解决分布式事务核心思想:数据最终一致性。
分布式领域中名词:
强一致性 :要么同步速度非常快或者采用锁的机制 不允许出现脏读;
强一致性解决方案:要么数据库A非常迅速的将数据同步给数据B,或者数据库A没有同步完成之前数据库B不能够读取数据。
弱一致性: 允许读取的数据为原来的脏数据,允许读取的结果不一致性。
最终一致性: 在我们的分布式系统中,因为数据之间同步通过网络实现通讯,短暂的数据延迟是允许的,但是最终数据必须要一致性。
基于RabbitMQ解决分布式事务的思路:(采用最终一致性的方案)
解决思路图:核心是利用mq发送消息给其他系统将数据修改回来。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。