当前位置:   article > 正文

spring boot整合RabbitMQ

spring boot整合rabbitmq


前言


一、环境准备

引入依赖生产者和消费都引入这个依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

在生产者application.properties里面配置

  1. #配置rabbitmq
  2. spring.rabbitmq.host=192.168.91.133
  3. spring.rabbitmq.port=5672
  4. spring.rabbitmq.username=admin
  5. spring.rabbitmq.password=admin
  6. #默认虚拟主机
  7. spring.rabbitmq.virtual-host=/
  8. #开启生产者通过信道到交换机confirm机制
  9. spring.rabbitmq.publisher-confirm-type=correlated
  10. #开启生产者通过交换机到队列
  11. spring.rabbitmq.publisher-returns=true

消费者的配置文件

  1. #配置rabbitmq
  2. spring.rabbitmq.host=192.168.91.133
  3. spring.rabbitmq.port=5672
  4. spring.rabbitmq.username=admin
  5. spring.rabbitmq.password=admin
  6. spring.rabbitmq.virtual-host=/
  7. #消费端
  8. server.port=8081
  9. #开启消费者端的简单模式的手动确认消息机制
  10. spring.rabbitmq.listener.simple.acknowledge-mode=manual

二、使用步骤

在生产者的spring boot项目中创建配置类

  1. package com.rcg.springbootrabbitmqproducer.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.QueueBuilder;
  5. import org.springframework.amqp.core.ExchangeBuilder;
  6. import org.springframework.amqp.core.Queue;
  7. import org.springframework.amqp.core.Exchange;
  8. import org.springframework.amqp.core.Binding;
  9. import org.springframework.context.annotation.Bean;
  10. import org.springframework.context.annotation.Configuration;
  11. /**
  12. * @author :1452863884@qq.com rcg
  13. * @date :Created in 2023/3/8 10:15
  14. * @description:rabbitmq消息中间件配置类
  15. * @modified By:
  16. * @version:1.0
  17. */
  18. @Configuration
  19. public class MyRabbitMqConfig {
  20. private String queue01="my_queue01";
  21. private String queue02="my_queue02";
  22. private String myExchange="my_top_exchange";
  23. @Bean(value = "queue01")
  24. public Queue queue(){
  25. Queue build = QueueBuilder.durable(queue01).build();
  26. return build;
  27. }
  28. @Bean(value = "queue02")
  29. public Queue queue02(){
  30. Queue build = QueueBuilder.durable(queue02).build();
  31. return build;
  32. }
  33. @Bean(value = "exchange01")
  34. public Exchange exchange(){
  35. Exchange exchange = ExchangeBuilder.topicExchange(myExchange).build();
  36. return exchange;
  37. }
  38. @Bean
  39. public Binding binding(Exchange exchange01,Queue queue01){
  40. Binding binding = BindingBuilder.bind(queue01)
  41. .to(exchange01)
  42. .with("*.orange.*").noargs();
  43. return binding;
  44. }
  45. @Bean
  46. public Binding binding02(Exchange exchange01,Queue queue02){
  47. Binding binding = BindingBuilder
  48. .bind(queue02).to(exchange01).with("*.*.rabbit").noargs();
  49. return binding;
  50. }
  51. @Bean
  52. public Binding binding03(Exchange exchange01,Queue queue02){
  53. Binding binding = BindingBuilder.bind(queue02)
  54. .to(exchange01).with("*.*.lazy.#").noargs();
  55. return binding;
  56. }
  57. }

在测试文件中测试

  1. package com.rcg.springbootrabbitmqproducer;
  2. import org.junit.jupiter.api.Test;
  3. import org.springframework.amqp.AmqpException;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.core.MessagePostProcessor;
  6. import org.springframework.amqp.core.ReturnedMessage;
  7. import org.springframework.amqp.rabbit.connection.CorrelationData;
  8. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  9. import org.springframework.boot.test.context.SpringBootTest;
  10. import javax.annotation.Resource;
  11. import java.util.UUID;
  12. @SpringBootTest
  13. class SpringbootRabbitmqProducerApplicationTests {
  14. @Resource
  15. private RabbitTemplate rabbitTemplate;
  16. /**
  17. * 测试死信队列+ttl
  18. * 就是给正常队列设置一些参数
  19. * pt_exchange
  20. * 比如参数长度 超过该长度的信息, 就把信息弄到死信队列里面
  21. */
  22. @Test
  23. public void testHeadQueue(){
  24. for (int i = 0; i < 20; i++) {
  25. //设置id号
  26. MessagePostProcessor messagePostProcessor=new MessagePostProcessor() {
  27. @Override
  28. public Message postProcessMessage(Message message) throws AmqpException {
  29. message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
  30. return message;
  31. }
  32. };
  33. rabbitTemplate.convertAndSend("pt_exchange","","今天天气不错",messagePostProcessor);
  34. }
  35. }
  36. /**
  37. * 测试消费者手动确认消息被消费
  38. */
  39. @Test
  40. public void testHandAck(){
  41. //设置id号
  42. MessagePostProcessor messagePostProcessor=new MessagePostProcessor() {
  43. @Override
  44. public Message postProcessMessage(Message message) throws AmqpException {
  45. message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
  46. //设置消息的过期时间
  47. //如果对列也设置了过期时间那么就是谁的过期时间短就先过期谁的
  48. message.getMessageProperties().setExpiration("5000");
  49. return message;
  50. }
  51. };
  52. rabbitTemplate.convertAndSend("simple_exchange","","今天天气不错",messagePostProcessor);
  53. }
  54. /**
  55. * 测试交换机到队列的返回机制
  56. */
  57. @Test
  58. public void testReturn(){
  59. //只有在发生失败时才会触发
  60. rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
  61. @Override
  62. public void returnedMessage(ReturnedMessage returnedMessage) {
  63. System.out.println("消费者从交换机到队列失败");
  64. }
  65. });
  66. //设置id号
  67. MessagePostProcessor messagePostProcessor=new MessagePostProcessor() {
  68. @Override
  69. public Message postProcessMessage(Message message) throws AmqpException {
  70. message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
  71. return message;
  72. }
  73. };
  74. //测试失败 就是来个没有的规则就转发不到队列
  75. rabbitTemplate.convertAndSend("my_top_exchange","xx.lazy.ads","今天天气真冷",messagePostProcessor);
  76. }
  77. /**
  78. * 测试信道到交换机确认机制
  79. */
  80. @Test
  81. public void testConfirm(){
  82. //当发生消息到交换机无论成功还是失败--都会触发该方法
  83. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
  84. @Override
  85. public void confirm(CorrelationData correlationData, boolean b, String s) {
  86. if (b==false){
  87. //弥补机制--重新发送--取消订单--业务
  88. System.out.println("重新发送信息");
  89. }else{
  90. System.out.println("消息发送成功");
  91. }
  92. }
  93. });
  94. //设置id号
  95. MessagePostProcessor messagePostProcessor=new MessagePostProcessor() {
  96. @Override
  97. public Message postProcessMessage(Message message) throws AmqpException {
  98. message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
  99. return message;
  100. }
  101. };
  102. //测试成功
  103. //rabbitTemplate.convertAndSend("my_top_exchange","xx.*.lazy.ads","今天天气真冷",messagePostProcessor);
  104. //测试失败 就是发送到一个不存在的交换机 来模拟故障
  105. rabbitTemplate.convertAndSend("my_top_exchange1","xx.*.lazy.ads","今天天气真冷",messagePostProcessor);
  106. }
  107. /**
  108. * 测试主题模式
  109. */
  110. @Test
  111. public void testTopExchange(){
  112. //设置id号
  113. MessagePostProcessor messagePostProcessor=new MessagePostProcessor() {
  114. @Override
  115. public Message postProcessMessage(Message message) throws AmqpException {
  116. message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
  117. return message;
  118. }
  119. };
  120. rabbitTemplate.convertAndSend("my_top_exchange","xx.*.lazy.ads","今天天气真冷",messagePostProcessor);
  121. }
  122. @Test
  123. void contextLoads() {
  124. rabbitTemplate.convertAndSend("my_exchange01","info","今天天气不错");
  125. }
  126. }

消费者的代码

  1. package com.rcg.springbootrabbitmqcluseer;
  2. import com.rabbitmq.client.Channel;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. import java.io.IOException;
  7. /**
  8. * @author :1452863884@qq.com rcg
  9. * @date :Created in 2023/3/8 13:51
  10. * @description:监听消息队列消费
  11. * @modified By:
  12. * @version:1.0
  13. */
  14. @Component
  15. public class MyListener {
  16. @RabbitListener(queues = {"simple_queue"})
  17. public void getMsg(Message message, Channel channel){
  18. byte[] body = message.getBody();
  19. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  20. try {
  21. //int a=10/0;
  22. //手动确认 批量
  23. channel.basicAck(deliveryTag,true);
  24. }catch (Exception e){
  25. //重发
  26. try {
  27. //多次重发
  28. channel.basicNack(deliveryTag,true,true);
  29. } catch (IOException ioException) {
  30. ioException.printStackTrace();
  31. }
  32. }
  33. //得到消息的id
  34. String messageId = message.getMessageProperties().getMessageId();
  35. System.out.println(messageId+"队列信息= " + new String(body));
  36. }
  37. }

 2.1 RabbitMQ高级特性

2.1.1 消息的可靠性传递

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
confirm 确认模式
return  退回模式

rabbitmq 整个消息投递的路径为:
producer--->rabbitmq broker--->exchange--->queue--->consumer
消息从 producer 到 exchange 则会返回一个 confirmCallback 。
消息从 exchange-->queue 投递失败则会返回一个 returnCallback 。
我们将利用这两个 callback 控制消息的可靠性投递
 

设置ConnectionFactory的publisher-confirm-type: correlated开启 确认模式。

使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。

设置ConnectionFactory的publisher-returns="true" 开启 退回模式。

使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后执行回调函数returnedMessage。

在RabbitMQ中也提供了事务机制,但是性能较差,此处不做讲解。
使用channel下列方法,完成事务控制:
txSelect(), 用于将当前channel设置成transaction模式
txCommit(),用于提交事务
txRollback(),用于回滚事务
 

2.1.2 Consumer Ack 

ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。
有三种确认方式:
自动确认:acknowledge="none"
手动确认:acknowledge="manual"
根据异常情况确认:acknowledge="auto",(这种方式使用麻烦,并且不常用,不作讲解)

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息队列中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。
 

在rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手动确认

如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息

如果出现异常,则在catch中调用 basicNack或 basicReject,拒绝消息,让MQ重新发送消息。

2.2.3 TTL 

TTL 全称 Time To Live(存活时间/过期时间)。

当消息到达存活时间后,还没有被消费,会被自动清除。

RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。
 

 设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。
设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。
如果两者都进行了设置,以时间短的为准。

2.2.4 死信队列 

死信队列,英文缩写:DLX  。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。

消息成为死信的三种情况:

1. 队列消息长度到达限制;

2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;

3. 原队列存在消息过期设置,消息到达超时时间未被消费;

1. 死信交换机和死信队列和普通的没有区别

2. 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列

3. 消息成为死信的三种情况:
 队列消息长度到达限制;
消费者拒接消费消息,并且不重回队列;
原队列存在消息过期设置,消息到达超时时间未被消费;
 


总结

待补充

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/438757
推荐阅读
相关标签
  

闽ICP备14008679号