当前位置:   article > 正文

RabbitMq实战如何保证消息幂等消费_rabbitmq如何保证消息的幂等性

rabbitmq如何保证消息的幂等性

目录

一、消息幂等性

二、解决方案

三、代码


一、消息幂等性

在编程中一个幂等操作的特点是其任意多次执行所产生的结果与一次执行的产生的结果相同,在mq中由于网络故障或客户端延迟消费mq自动重试过程中可能会导致消息的重复消费,那我们如何保证消息的幂等问题呢?也可以理解为如何保证消息不被重复消费呢,不重复消费也就解决了幂等问题。

二、解决方案

1、生成全局id,存入redis或者数据库,在消费者消费消息之前,查询一下该消息是否有消费过。

2、如果该消息已经消费过,则告诉mq消息已经消费,将该消息丢弃(手动ack)。

3、如果没有消费过,将该消息进行消费并将消费记录写进redis或者数据库中。

注:还有一种方式,数据库操作可以设置唯一键(消息id),防止重复数据的插入,这样插入只会报错而不会插入重复数据,本人没有测试。

三、代码

 简单描述一下需求,如果订单完成之后,需要为用户累加积分,又需要保证积分不会重复累加。那么再mq消费消息之前,先去数据库查询该消息是否已经消费,如果已经消费那么直接丢弃消息。

如果是Redis存放数据key=全局id,value=积分值,在消费消息之前,通过全局id去redis查询是否有该数据,如果有直接丢弃。该方法本人没有测试,只是说说自己的思路。有不对的希望大佬们不吝赐教。

生产者

  1. package com.xiaojie.score.producer;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.xiaojie.score.entity.Score;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.springframework.amqp.rabbit.connection.CorrelationData;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.scheduling.annotation.Async;
  9. import org.springframework.stereotype.Component;
  10. import java.util.UUID;
  11. /**
  12. * @author xiaojie
  13. * @version 1.0
  14. * @description:发送积分消息的生产者
  15. * @date 2021/10/10 22:18
  16. */
  17. @Component
  18. @Slf4j
  19. public class ScoreProducer implements RabbitTemplate.ConfirmCallback {
  20. @Autowired
  21. private RabbitTemplate rabbitTemplate;
  22. //定义交换机
  23. private static final String SCORE_EXCHANGE = "xiaojie_score_exchaneg";
  24. //定义路由键
  25. private static final String SCORE_ROUTINNGKEY = "score.add";
  26. /**
  27. * @description: 订单完成
  28. * @param:
  29. * @return: java.lang.String
  30. * @author xiaojie
  31. * @date: 2021/10/10 22:30
  32. */
  33. public String completeOrder() {
  34. String orderId = UUID.randomUUID().toString();
  35. System.out.println("订单已完成");
  36. //发送积分通知
  37. Score score = new Score();
  38. score.setScore(100);
  39. score.setOrderId(orderId);
  40. String jsonMSg = JSONObject.toJSONString(score);
  41. sendScoreMsg(jsonMSg, orderId);
  42. return orderId;
  43. }
  44. /**
  45. * @description: 发送积分消息
  46. * @param:
  47. * @param: message
  48. * @param: orderId
  49. * @return: void
  50. * @author xiaojie
  51. * @date: 2021/10/10 22:22
  52. */
  53. @Async
  54. public void sendScoreMsg(String jsonMSg, String orderId) {
  55. this.rabbitTemplate.setConfirmCallback(this);
  56. rabbitTemplate.convertAndSend(SCORE_EXCHANGE, SCORE_ROUTINNGKEY, jsonMSg, message -> {
  57. //设置消息的id为唯一
  58. message.getMessageProperties().setMessageId(orderId);
  59. return message;
  60. });
  61. }
  62. @Override
  63. public void confirm(CorrelationData correlationData, boolean ack, String s) {
  64. if (ack) {
  65. log.info(">>>>>>>>消息发送成功:correlationData:{},ack:{},s:{}", correlationData, ack, s);
  66. } else {
  67. log.info(">>>>>>>消息发送失败{}", ack);
  68. }
  69. }
  70. }

消费者

  1. package com.xiaojie.score.consumer;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.rabbitmq.client.Channel;
  4. import com.xiaojie.score.entity.Score;
  5. import com.xiaojie.score.mapper.ScoreMapper;
  6. import lombok.extern.slf4j.Slf4j;
  7. import org.apache.commons.lang3.StringUtils;
  8. import org.springframework.amqp.core.Message;
  9. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import org.springframework.messaging.handler.annotation.Headers;
  12. import org.springframework.stereotype.Component;
  13. import java.io.IOException;
  14. import java.util.Map;
  15. /**
  16. * @author xiaojie
  17. * @version 1.0
  18. * @description: 积分的消费者
  19. * @date 2021/10/10 22:37
  20. */
  21. @Component
  22. @Slf4j
  23. public class ScoreConsumer {
  24. @Autowired
  25. private ScoreMapper scoreMapper;
  26. @RabbitListener(queues = {"xiaojie_score_queue"})
  27. public void onMessage(Message message, @Headers Map<String, Object> headers, Channel channel) throws IOException {
  28. String orderId = message.getMessageProperties().getMessageId();
  29. if (StringUtils.isBlank(orderId)) {
  30. return;
  31. }
  32. log.info(">>>>>>>>消息id是:{}", orderId);
  33. String msg = new String(message.getBody());
  34. Score score = JSONObject.parseObject(msg, Score.class);
  35. if (score == null) {
  36. return;
  37. }
  38. //执行前去数据库查询,是否存在该数据,存在说明已经消费成功,不存在就去添加数据,添加成功丢弃消息
  39. Score dbScore = scoreMapper.selectByOrderId(orderId);
  40. if (dbScore != null) {
  41. //证明已经消费消息,告诉mq已经消费,丢弃消息
  42. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  43. return;
  44. }
  45. Integer result = scoreMapper.save(score);
  46. if (result > 0) {
  47. //积分已经累加,删除消息
  48. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  49. return;
  50. } else {
  51. log.info("消费失败,采取相应的人工补偿");
  52. }
  53. }
  54. }

完整代码:spring-boot: Springboot整合redis、消息中间件等相关代码

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/430667
推荐阅读
相关标签
  

闽ICP备14008679号