当前位置:   article > 正文

RabbitMq实战篇_rabbitmq消费者代码

rabbitmq消费者代码

1.springboot整合RabbitMq

1.1新建maven工程

1.2引入对应的rabbitMq依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

1.3配置类型代码示例

  1. @Configuration
  2. public class MyRabbitMqConfig {
  3. /**
  4. * 声明交换机的名称
  5. */
  6. public static final String EXCHANGE_NAME="order-exchange";
  7. /**
  8. * 声明队列名称
  9. */
  10. public static final String QUEUE_NAME="order-queue";
  11. /**
  12. * 声明路由键
  13. */
  14. public static final String ROUTE_KEY="route-order-queue-key";
  15. /**
  16. * 创建交换机
  17. * @return
  18. */
  19. @Bean
  20. public DirectExchange directExchange(){
  21. return new DirectExchange(EXCHANGE_NAME);
  22. }
  23. /**
  24. * 创建队列
  25. * @return
  26. */
  27. @Bean
  28. public Queue queue(){
  29. return new Queue(QUEUE_NAME);
  30. }
  31. /**
  32. * 绑定交换机与队列
  33. */
  34. @Bean
  35. public Binding binding(){
  36. return BindingBuilder.bind(queue()).to(directExchange()).with(ROUTE_KEY);
  37. }
  38. }

1.4生产者代码示例

  1. @Component
  2. public class MessageProducer {
  3. @Autowired
  4. RabbitTemplate rabbitTemplate;
  5. public void send(String message){
  6. System.out.println("生产者发送消息:"+message);
  7. CorrelationData correlationData = new CorrelationData();
  8. correlationData.setId(message);
  9. rabbitTemplate.convertAndSend(MyRabbitMqConfig.EXCHANGE_NAME,MyRabbitMqConfig.ROUTE_KEY,message,correlationData);
  10. }
  11. }

1.5消费者代码示例

  1. @Component
  2. public class MessageConsumer {
  3. @RabbitListener(queues = MyRabbitMqConfig.QUEUE_NAME)
  4. public void handlerMessage(String msg){
  5. System.out.println("消费者受到消息:"+msg);
  6. System.out.println("开始处理业务逻辑.....");
  7. }
  8. }

1.6程序代码验证

  1. @RestController
  2. public class RabbitMqController {
  3. @Autowired
  4. MessageProducer messageProducer;
  5. @RequestMapping("/sendMessage")
  6. public String sendMessage(){
  7. messageProducer.send("订单消息");
  8. return "success";
  9. }
  10. }

2.如何确保消息不丢失

2.1RabbitMQ 消息的可靠性投递主要两种实现:
(1)通过实现消费的重试机制,通过 @Retryable 来实现重试,可以设置重试次数和重试频率。
(2)生产端实现消息可靠性投递。
以上两种方法消费端都可能收到重复消息,所以要求消费端必须实现幂等性消费。

2.2消息投递以及消费的流程图

2.3生产端保证消息的可靠性

        在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。 RabbitMQ 为我 们提供了两种方式用来控制消息的投递可靠性模式。rabbitMq的消息投递过程是producer---borker---exchange-queue-consumer
confirm确认模式
        消息投递到exchange的确认模式,生产端发送消息到 rabbitmq broker 后,异步接受从 rabbitmq 返回的 ack确认信息,生产端收到返回的 ack 确认消息后,根据 ack true 还是 false ,调用 confirmCallback 接口进行自己的业务逻辑处理。
 实现步骤如下:
1.修改配置文件
  1. #开启消息确认模式
  2. spring.rabbitmq.publisher-confirm-type=correlated
2.自定义实现ConfirmCallback接口
  1. @Component
  2. public class MyConfirmMessageCallBack implements RabbitTemplate.ConfirmCallback {
  3. /**
  4. * @param correlationData 相关配置信息
  5. * @param ack exchange交换机 是否成功收到了消息。true 成功,false代表失败
  6. * @param cause 失败原因
  7. */
  8. @Override
  9. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  10. if(ack){
  11. //接收成功
  12. System.out.println("成功发送消息到exchange");
  13. System.out.println("correlationData:"+ JSONUtil.toJsonStr(correlationData));
  14. }else{
  15. System.out.println("发送消息到exchange失败");
  16. //在这可以做一些业务处理,比如消息的重新投递等
  17. }
  18. }
  19. }

4.修改生产者代码

  1. @Component
  2. public class MessageProducer {
  3. @Autowired
  4. RabbitTemplate rabbitTemplate;
  5. @Autowired
  6. MyConfirmMessageCallBack myConfirmMessageCallBack;
  7. public void send(String message){
  8. System.out.println("生产者发送消息:"+message);
  9. CorrelationData correlationData = new CorrelationData();
  10. correlationData.setId(message);
  11. //设置消息确认回调接口
  12. rabbitTemplate.setConfirmCallback(myConfirmMessageCallBack);
  13. //发送消息
  14. rabbitTemplate.convertAndSend(MyRabbitMqConfig.EXCHANGE_NAME,MyRabbitMqConfig.ROUTE_KEY,message,correlationData);
  15. }
  16. }
5.验证
1.队列和交换机都正确的情况下,看下消息有没有回调确认
2.绑定一个不存在的交换机exchange,看下消息有没有回调确认
return 退回模式
        消息未投递到queue的退回模式, 消息从 exchange–>queue 投递失败则会返回一个 returnCallback 生产端通过实现 ReturnCallback 接口,启动消息失败返回,消息路由不到队列时会触发该回调接口。
1.修改配置文件
  1. #开启消息退回模式
  2. spring.rabbitmq.publisher-returns=true
2.生产者设置投递失败的模式
  1. @Component
  2. public class MessageProducer {
  3. @Autowired
  4. RabbitTemplate rabbitTemplate;
  5. @Autowired
  6. MyConfirmMessageCallBack myConfirmMessageCallBack;
  7. @Autowired
  8. MyReturnMessageCallBack myReturnMessageCallBack;
  9. public void send(String message){
  10. System.out.println("生产者发送消息:"+message);
  11. CorrelationData correlationData = new CorrelationData();
  12. correlationData.setId(message);
  13. //设置消息确认回调接口
  14. rabbitTemplate.setConfirmCallback(myConfirmMessageCallBack);
  15. //确保消息发送失败后可以重新返回到队列中
  16. rabbitTemplate.setMandatory(true);
  17. //设置消息投递queue失败回调接口
  18. rabbitTemplate.setReturnsCallback(myReturnMessageCallBack);
  19. //发送消息
  20. rabbitTemplate.convertAndSend(MyRabbitMqConfig.EXCHANGE_NAME,MyRabbitMqConfig.ROUTE_KEY,message,correlationData);
  21. }
  22. }

3.自定义接口实现 ReturnCallback并重写returnedMessage方法
  1. @Component
  2. public class MyReturnMessageCallBack implements RabbitTemplate.ReturnsCallback {
  3. /**
  4. * 交换机路由消息不到queue的时候会触发该接口的回调
  5. * @param returnedMessage
  6. */
  7. @Override
  8. public void returnedMessage(ReturnedMessage returnedMessage) {
  9. String exchange = returnedMessage.getExchange();
  10. Message message = returnedMessage.getMessage();
  11. String routingKey = returnedMessage.getRoutingKey();
  12. int replyCode = returnedMessage.getReplyCode();
  13. String replyText = returnedMessage.getReplyText();
  14. System.out.println("交换机:"+exchange);
  15. System.out.println("消息内容:"+message);
  16. System.out.println("路由键:"+routingKey);
  17. System.out.println("应答码:"+replyCode);
  18. System.out.println("应答内容:"+replyText);
  19. }
  20. }
4.验证
   将交换机绑定的路由键改为不存在的key,这样交换机就找不到投递消息的队列了,从而可以验证return模式

消费端确认ack机制

  ack Acknowledge确认,表示消费端收到消息后的确认方式,消费端消息的确认分为:自动确认(默认)手动确认,不确认等三种模式。
AcknowledgeMode.NONE :不确认
AcknowledgeMode.AUTO :自动确认
AcknowledgeMode.MANUAL :手动确认
其中自动确认是指,当消息一旦被 Consumer 接收到,则自动确认收到,并将相应 message
RabbitMQ 的消息 缓存中移除。 但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了 手 动确认方式,则需要在业务处理成功后,调用 channel.basicAck() ,手动签收,如果出现异常,则调用 channel.basicNack() 方法,让其自动重新发送消息。
3.2实现步骤如下:
 1.修改配置文件
  1. #开启消费者手动ack
  2. spring.rabbitmq.listener.simple.acknowledge-mode=manual
 2.修改消费者代码
  1. @Component
  2. public class MessageConsumer {
  3. @RabbitListener(queues = MyRabbitMqConfig.QUEUE_NAME)
  4. public void handlerMessage(String msg, Channel channel, Message message) throws IOException {
  5. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  6. try {
  7. System.out.println("消费者受到消息:"+msg);
  8. System.out.println("开始处理业务逻辑.....");
  9. //手动Ack[参数1:消息投递序号,参数2:批量签收]
  10. channel.basicAck(deliveryTag,true);
  11. } catch (Exception e) {
  12. //拒绝[参数1:消息投递序号,参数2:批量拒绝,参数3:是否重新加入队列]
  13. channel.basicNack(deliveryTag,true,true);
  14. }
  15. }
  16. }

3.验证

发生异常,拒绝确认,重新加入队列,一直循环,直到确认消息

3.如何确保消息不被重复消费

产生重复消费的原因:消费端ack的机制默认是自动ack的,如果消费者在发送ack的请求时,MQ突然挂掉就自然接收不到该消息的ack也就不会在队列中删除该消息,一旦有其他消费者过来消费消息的时候,MQ还是会投递已消费的消息给其他消费者,就产生了消息重复消费的问题。

主要有以下方式保证消息不被重复消费:​​​​​​​

1.首先消费端开启手动Ack模式

 2.设计消息事务表,在每次消费消息之前根据meessageId记录一条信息,每次消费之前根据messageId判断该消息是否被消费过,如果已经消费过就直接ack,ack之后broker会删掉该消息的。

4.如何确保消息消费的顺序性

1.使用分布式锁的机制,消费同一个队列的时候确保只有一个消费者在消费。

2.同类型的消息存放在同一个队列里,利用队列有序性特点且只有一个消费者在消费该队列,这样也是能够保证有序的。在集群的模式下我们可以设置队列的“单活模式”。x-single-active-consumer:单活模式,表示是否最多只允许一个消费者消费,如果有多个消费者同时绑定,则只会激活第一个,除非第一个消费者被取消或者死亡,才会自动转到下一个消费者。

5.介绍死信队列

5.1死信队列
英文缩写: DLX Dead Letter Exchange (死信交换机),当消息成为 Dead message 后, 可以 被重新发送到另一个交换机,这个交换机就是DLX。
5.2消息成为死信的三种情况:
1. 队列消息长度到达限制;
2. 消费者拒接消费消息, basicNack/basicReject, 并且不把消息重新放入原目标队
,requeue=false。
3. 原队列存在消息过期设置,消息到达超时时间未被消费。

5.3队列绑定死信交换机:

给队列设置参数: x-dead-letter-exchange x-dead-letter-routing-key
也就是说此时 Queue 作为 " 生产者 "。

 6.延迟队列的实现

   延迟队列顾名思义,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。我们可以以一个业务场景看看延迟队列的应用常用,比如我们日常生活中使用美团点外卖,但是你下单了没有支付十五分钟或者是30分钟之后系统会默认给你去掉订单,这个是怎么实现的呢?没错就是使用了延迟队列。

需求:用户下单之后,没有支付,系统取消订单并修改状态。

6.1实现步骤

但是rabbitMQ中并没有提供延迟队列的实现,可以使用TTL+私信队列的方式来实现延迟队列的效果。设置队列过期时间30分钟,当30分钟过后,消息未被消费,进入死信队列,路由到指定队列,调用库存 系统,判断订单状态。

代码实现:

1.新增DeLayQueueConfig配置类

  1. @Configuration
  2. public class DeLayQueueConfig {
  3. /**
  4. * 死信队列名称
  5. */
  6. public static final String DEAD_QUEUE_NAME="dead-queue-demo";
  7. /**
  8. * 死信队列路由键
  9. */
  10. public static final String DEAD_ROUTE_KEY="dead-route-demo";
  11. /**
  12. * 死信队列路由交换机名称
  13. */
  14. public static final String DEAD_EXCHANGE_NAME="dead-exchange-demo";
  15. /**
  16. * 延迟订单队列名称
  17. */
  18. public static final String DELAY_ORDER_QUEUE_NAME="delay-order-queue";
  19. /**
  20. * 延迟交换机名称
  21. */
  22. public static final String DELAY_ORDER_EXCHANGE_NAME="delay-order-exchange";
  23. /**
  24. * 路由键
  25. */
  26. public static final String DELAY_ORDER_ROUTE_NAME="delay-order-route";
  27. /**
  28. * 创建死信交换机
  29. * @return
  30. */
  31. @Bean
  32. public DirectExchange deadExchange(){
  33. return new DirectExchange(DEAD_EXCHANGE_NAME);
  34. }
  35. /**
  36. * 死信队列
  37. * @return
  38. */
  39. @Bean
  40. public Queue deadQueue(){
  41. return new Queue(DEAD_QUEUE_NAME);
  42. }
  43. /**
  44. * 邦定延迟队列与交换机
  45. * @return
  46. */
  47. @Bean
  48. public Binding deadBinding(){
  49. return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(DEAD_ROUTE_KEY);
  50. }
  51. /**
  52. * 创建订单交换机
  53. * @return
  54. */
  55. @Bean
  56. public DirectExchange delayExchange(){
  57. return new DirectExchange(DELAY_ORDER_EXCHANGE_NAME);
  58. }
  59. /**
  60. * 订单队列
  61. * @return
  62. */
  63. @Bean
  64. public Queue delayQueue(){
  65. Map<String, Object> arguments = new HashMap<>(2);
  66. // 绑定我们的死信交换机
  67. arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
  68. // 绑定我们的路由key
  69. arguments.put("x-dead-letter-routing-key", DEAD_ROUTE_KEY);
  70. return new Queue(DELAY_ORDER_QUEUE_NAME,true,false,false,arguments);
  71. }
  72. /**
  73. * 邦定订单队列与交换机
  74. * @return
  75. */
  76. @Bean
  77. public Binding delayBinding(){
  78. return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(DELAY_ORDER_ROUTE_NAME);
  79. }
  80. }

2.新增DelayProducer类

  1. @Component
  2. @Slf4j
  3. public class DelayProducer {
  4. @Autowired
  5. RabbitTemplate rabbitTemplate;
  6. public void send(String message){
  7. System.out.println("生产者发送订单消息:"+message+",发送时间:"+ DateUtil.format(new Date(), DatePattern.NORM_DATETIME_FORMAT));
  8. MessagePostProcessor messagePostProcessor = messagePostProcessor();
  9. //发送消息
  10. rabbitTemplate.convertAndSend(DeLayQueueConfig.DELAY_ORDER_EXCHANGE_NAME,DeLayQueueConfig.DELAY_ORDER_ROUTE_NAME,message,messagePostProcessor);
  11. }
  12. //给待发送的消息添加TTL配置
  13. private MessagePostProcessor messagePostProcessor() {
  14. return new MessagePostProcessor() {
  15. @Override
  16. public Message postProcessMessage(Message message) throws AmqpException {
  17. MessageProperties messageProperties = message.getMessageProperties();
  18. // 设置编码
  19. messageProperties.setContentEncoding("utf-8");
  20. //设置有效期30分钟
  21. //messageProperties.setExpiration("1800000");
  22. //测试修改为10秒
  23. messageProperties.setExpiration("10000");
  24. return message;
  25. }
  26. };
  27. }
  28. }

3.新增DelayConsumer类

  1. @Component
  2. @Slf4j
  3. public class DelayConsumer {
  4. @RabbitListener(queues = DeLayQueueConfig.DEAD_QUEUE_NAME)
  5. public void delayConsumer(String msg, Channel channel, Message message) throws IOException {
  6. System.out.println("延迟队列接收到消息:"+msg+",消费时间:"+ DateUtil.format(new Date(), DatePattern.NORM_DATETIME_FORMAT));
  7. System.out.println("查询订单状态,修改状态值");
  8. channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
  9. }
  10. }

3.controller新增测试请求

  1. @RestController
  2. public class RabbitMqController {
  3. @Autowired
  4. MessageProducer messageProducer;
  5. @Autowired
  6. DelayProducer delayProducer;
  7. @RequestMapping("/sendMessage")
  8. public String sendMessage(){
  9. messageProducer.send("订单消息");
  10. return "success";
  11. }
  12. @RequestMapping("/sendDelayMessage")
  13. public String send(){
  14. delayProducer.send("订单消息");
  15. return "success";
  16. }
  17. }

4.测试验证

1.浏览器访问http://localhost:8081/sendDelayMessage

7.总结

   本文重点介绍了springboot如何整个RabbitMq以及常用的api,对确保消息不丢失,重复消费,顺序消费问题进行了分析以及解决方法,最后介绍了RabbitMQ如何使用死信队列+TTL机制实现延迟队列的案例。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/666498
推荐阅读
相关标签
  

闽ICP备14008679号