赞
踩
上一篇文章中我介绍了RabbitMQ实现分布式最终一致性的开发方案,这篇就来解决一下这个方案中存在的一些问题。http://t.csdnimg.cn/aOYTH
首先,RabbitMQ的三大问题:消息丢失、消息重复、消息积压
这三个问题中最严重的就是消息丢失的问题。那我我们就来反向介绍一下这几个问题的最简单处理办法。
消息积压,顾名思义就是给MQ发的消息太多了太快了,消费者消费不过来了。
解决办法1: 我是富豪我就是有钱,ok,那我们就疯狂加服务器加消费者,直到能够消费的过来。
解决办法2: 我是穷波一,好吧,穷波一得有自己苟活的法呀,我们再上线一个专门的队列消费服务,但是我们这个服务不去做对应的业务逻辑处理,而是直接把这些消息全部原封不动的存到我们的数据库,这样至少不会丢掉消息,我们记录到数据库,后续我们再慢慢的去处理这些消息就好啦。
举个栗子:这就像是淘宝买东西一样,假如我们的自己写的一个淘宝服务哈,我们写的很low,消费者处理消息很慢,可能我们的服务只能支持一分钟30个人购买,买的人再多就只能消息积压在MQ中,然后再有人继续购买就越等越久,最后导致积压,处理不过来,然后只能被迫给用户返回一个超时错误,用户一看,真垃圾,不买了,我们到手的money飞了。这种事我们怎么允许呢。money、money、money啊,到手的飞了,不可能!这时候我们就上线一个专门的消费者处理这些消息,因为我们不做业务逻辑处理,直接存表,所以我们的速度很快。这样我们一分钟可能就能够支持300个人购买了。太好了,money没飞掉,舒服。过后我们的服务再慢慢处理这些消息,哪怕是我第二天了才处理完,也没有事,money已经到账了,这是不是想想都开心(*^▽^*)。
顾名思义 消息发送了两次或者多次
解决办法1: 其实通常我们的消费者服务接口都会实现接口的幂等性。(幂等性不懂的看我以前的文章吧~http://t.csdnimg.cn/X3rmg)
解决办法2: 加防重表,用mysql啊redis啊都可以,唉这个就不想讲了,听到这个名字大家应该都能懂什么意思吧。
我滴妈,终于讲这个最重要的了,今天一大早就开始写,有点小累。
通俗理解就是 我们给MQ发了消息,但是最后我们没有消费到。
这个问题出现才是我们最懵逼,最头疼的。
但是想看懂这下面的解决办法首先要了解RabbitMQ的消息可靠投递,不懂的可以转到我以前的文章学习一下(http://t.csdnimg.cn/pwfpF)
解决办法 1:加如MQ的消息日志表。(下面直接实战代码)
还是已最通用的电商下单逻辑为例,下面的实战代码就是我上一遍文章《基于RabbitMQ实现下单减库存的最终一致性分布式事务》中的代码基础上进一步完善的。(http://t.csdnimg.cn/aOYTH)
- public class MqMessage {
-
- @TableId(type = IdType.INPUT)
- private String messageId;
-
- private String content;
-
- private String toExchange;
-
- private String routingKey;
-
- private String classType;
-
- private int messageStatus;
-
- private Date createTime;
-
- private Date updateTime;
- }
MqMessage实体类,记录MQ的日志。
下单业务逻辑部分
- String jsonString = JSON.toJSONString(order.getOrder());
- MqMessage mqMessage = new MqMessage();
- mqMessage.setMessageId(uuid);
- mqMessage.setMessageStatus(0); // 新建
- mqMessage.setContent(jsonString);
- mqMessage.setRoutingKey("order.create.order");
- mqMessage.setToExchange("order-event-exchange");
- mqMessage.setClassType(OrderEntity.class.getTypeName());
- mqMessage.setCreateTime(new Date());
- mqMessage.setUpdateTime(new Date());
- mqMessageService.save(mqMessage);
-
- rabbitTemplate.convertAndSend("order-event-exchange", "order.create.order", order.getOrder(), correlationData);
-
- /**
- * MQ 的回调函数
- */
- @Configuration
- public class MyRabbitConfig {
-
- @Autowired
- RabbitTemplate rabbitTemplate;
-
- @Autowired
- MqMessageService mqMessageService;
-
-
- @Primary
- @Bean
- public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
- RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
- this.rabbitTemplate = rabbitTemplate;
- rabbitTemplate.setMessageConverter(messageConverter());
- initRabbitTemplate();
- return rabbitTemplate;
- }
-
-
- // 把消息使用json序列化的方式
- @Bean
- public MessageConverter messageConverter() {
- return new Jackson2JsonMessageConverter();
- }
-
- /**
- * 定制 RabbitTemplate
- * 1、服务器收到消息
- * 1、spring.rabbitmq.publisher-confirms=true
- * 2、设置确认回调 setConfirmCallback
- * 2、 消息正确抵达队列 回调
- * 1、## 开启发送端消息抵达队列的确认
- * spring.rabbitmq.publisher-returns=true
- * 2、## 只要抵达队列 优先异步回调
- * spring.rabbitmq.template.mandatory=true
- * 3、消费端
- * 1、##手动ACK确认
- * spring.rabbitmq.listener.simple.acknowledge-mode=manual
- */
- public void initRabbitTemplate() {
- /**
- * ACK 确认机制
- * @param correlationData correlation data for the callback.
- * @param ack true for ack, false for nack
- * @param cause An optional cause, for nack, when available, otherwise null.
- */
- rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
- System.out.println("==============成功发送消息到服务器回调==========");
- System.out.println("correlationData:" + correlationData + ",ack:" + ack + ",cause:" + cause);
- if (ack) {
- // 消息已成功发送到Broker并得到确认
- System.out.println("订单 消息发送成功");
- String id = correlationData.getId();
- MqMessage mqMessage = new MqMessage();
- mqMessage.setMessageId(id);
- mqMessage.setMessageStatus(1); // 发送成功
- mqMessage.setUpdateTime(new Date());
- mqMessageService.updateById(mqMessage);
-
- // 在这里执行相应的逻辑
- } else {
- // 消息发送失败或未得到确认
- System.out.println("消息发送失败:" + cause);
- // 在这里执行相应的逻辑
- }
- });
-
- rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
- /**
- // * 只要消息没投递到指定的队列,就触发失败回调
- // * @param message the returned message.
- // * @param i the reply code.
- // * @param s the reply text.
- // * @param s1 the exchange.
- // * @param s2 the routing key.
- // */
- @Override
- public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
-
- System.out.println("=============交换机发送消息到消息队列失败回调==========");
- System.out.println("message===>" + message + "replyCode===>" + replyCode + "replyText==>" + replyText + "exchange==>" + exchange + "routingKey==>" + routingKey);
- System.out.println("消息为正确抵达队列。。。");
- String string = new String(message.getBody());
- OrderEntity orderEntity = JSON.parseObject(string, OrderEntity.class);
- MqMessage mqMessage = new MqMessage();
- mqMessage.setMessageId(orderEntity.getMqMessageId());
- mqMessage.setMessageStatus(2); // 错误抵达
- mqMessage.setUpdateTime(new Date());
- mqMessageService.updateById(mqMessage);
- }
- });
- }
-
- }
- @RabbitHandler
- public void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException {
- System.out.println("过期订单, 准备关闭。。。");
- try {
- orderService.closeOrder(orderEntity);
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
-
- String string = new String(message.getBody());
- System.out.println("记录MQ 的日志 , string=" + string);
- MqMessage mqMessage = new MqMessage();
- mqMessage.setMessageId(orderEntity.getMqMessageId());
- mqMessage.setMessageStatus(3); // 已抵达
- mqMessage.setUpdateTime(new Date());
- mqMessageService.updateById(mqMessage);
- }catch (Exception e){
- e.printStackTrace();
- channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
- }
- }
ok,看一下我们的测试结果。
这样就不怕消息丢失了。
MQ的知识应该暂时就可以撒花了。坚持就是胜利。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。