当前位置:   article > 正文

RabbitMq整合springboot超详细,超适合新手_springboot rabbitmq配置

springboot rabbitmq配置
1、引入springboot整合amqp的依赖
  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>
2、application.yml 配置
  1. server:
  2. port: 8111
  3. spring:
  4. rabbitmq:
  5. port: 5672
  6. host: localhost
  7. username: guest
  8. password: guest
  9. #这个配置是保证提供者确保消息推送到交换机中,不管成不成功,都会回调
  10. publisher-confirm-type: correlated
  11. #保证交换机能把消息推送到队列中
  12. publisher-returns: true
  13. virtual-host: /
  14. #这个配置是保证消费者会消费消息,手动确认
  15. listener:
  16. simple:
  17. acknowledge-mode: manual
  18. template:
  19. mandatory: true
3、RabbitConfig.java (自定义Rabbitmq配置类)

配置详细解释都写在注解上了

  1. //常用的三个配置如下
  2. //1---设置手动应答(acknowledge-mode: manual)
  3. // 2---设置生产者消息发送的确认回调机制 ( #这个配置是保证提供者确保消息推送到交换机中,不管成不成功,都会回调
  4. // publisher-confirm-type: correlated
  5. // #保证交换机能把消息推送到队列中
  6. // publisher-returns: true
  7. // template:
  8. // #以下是rabbitmqTemplate配置
  9. // mandatory: true)
  10. // 3---设置重试
  11. @Configuration
  12. public class RabbitConfig {
  13. @Autowired
  14. private ConnectionFactory rabbitConnectionFactory;
  15. //@Bean 缓存连接池
  16. //public CachingConnectionFactory rabbitConnectionFactory
  17. @Autowired
  18. private RabbitProperties properties;
  19. //这里因为使用自动配置的connectionFactory,所以把自定义的connectionFactory注解掉
  20. // 存在此名字的bean 自带的连接工厂会不加载(也就是说yml中rabbitmq下一级不生效),如果想自定义来区分开 需要改变bean 的名称
  21. // @Bean
  22. // public ConnectionFactory connectionFactory() throws Exception {
  23. // //创建工厂类
  24. // CachingConnectionFactory cachingConnectionFactory=new CachingConnectionFactory();
  25. // //用户名
  26. // cachingConnectionFactory.setUsername("gust");
  27. // //密码
  28. // cachingConnectionFactory.setPassword("gust");
  29. // //rabbitMQ地址
  30. // cachingConnectionFactory.setHost("127.0.0.1");
  31. // //rabbitMQ端口
  32. // cachingConnectionFactory.setPort(Integer.parseInt("5672"));
  33. //
  34. // //设置发布消息后回调
  35. // cachingConnectionFactory.setPublisherReturns(true);
  36. // //设置发布后确认类型,此处确认类型为交互
  37. // cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
  38. //
  39. // cachingConnectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
  40. // return cachingConnectionFactory;
  41. // }
  42. // 存在此名字的bean 自带的容器工厂会不加载(yml下rabbitmq下的listener下的simple配置),如果想自定义来区分开 需要改变bean 的名称
  43. @Bean
  44. public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
  45. SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
  46. containerFactory.setConnectionFactory(rabbitConnectionFactory);
  47. // 并发消费者数量
  48. containerFactory.setConcurrentConsumers(1);
  49. containerFactory.setMaxConcurrentConsumers(20);
  50. // 预加载消息数量 -- QOS
  51. containerFactory.setPrefetchCount(1);
  52. // 应答模式(此处设置为手动)
  53. containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
  54. //消息序列化方式
  55. containerFactory.setMessageConverter(new Jackson2JsonMessageConverter());
  56. // 设置通知调用链 (这里设置的是重试机制的调用链)
  57. containerFactory.setAdviceChain(
  58. RetryInterceptorBuilder
  59. .stateless()
  60. .recoverer(new RejectAndDontRequeueRecoverer())
  61. .retryOperations(rabbitRetryTemplate())
  62. .build()
  63. );
  64. return containerFactory;
  65. }
  66. // 存在此名字的bean 自带的容器工厂会不加载(yml下rabbitmq下的template的配置),如果想自定义来区分开 需要改变bean 的名称
  67. @Bean
  68. public RabbitTemplate rabbitTemplate(){
  69. RabbitTemplate rabbitTemplate=new RabbitTemplate(rabbitConnectionFactory);
  70. //默认是用jdk序列化
  71. //数据转换为json存入消息队列,方便可视化界面查看消息数据
  72. rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
  73. //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
  74. rabbitTemplate.setMandatory(true);
  75. //此处设置重试template后,会再生产者发送消息的时候,调用该template中的调用链
  76. rabbitTemplate.setRetryTemplate(rabbitRetryTemplate());
  77. //CorrelationData correlationData, boolean b, String s
  78. rabbitTemplate.setConfirmCallback(
  79. (correlationData, b, s) -> {
  80. System.out.println("ConfirmCallback "+"相关数据:"+ correlationData);
  81. System.out.println("ConfirmCallback "+"确认情况:"+b);
  82. System.out.println("ConfirmCallback "+"原因:"+s);
  83. });
  84. //Message message, int i, String s, String s1, String s2
  85. rabbitTemplate.setReturnCallback((message, i, s, s1, s2) -> {
  86. System.out.println("ReturnCallback: "+"消息:"+message);
  87. System.out.println("ReturnCallback: "+"回应码:"+i);
  88. System.out.println("ReturnCallback: "+"回应消息:"+s);
  89. System.out.println("ReturnCallback: "+"交换机:"+s1);
  90. System.out.println("ReturnCallback: "+"路由键:"+s2);
  91. });
  92. return rabbitTemplate;
  93. }
  94. //重试的Template
  95. @Bean
  96. public RetryTemplate rabbitRetryTemplate() {
  97. RetryTemplate retryTemplate = new RetryTemplate();
  98. // 设置监听 调用重试处理过程
  99. retryTemplate.registerListener(new RetryListener() {
  100. @Override
  101. public <T, E extends Throwable> boolean open(RetryContext retryContext, RetryCallback<T, E> retryCallback) {
  102. // 执行之前调用 (返回false时会终止执行)
  103. return true;
  104. }
  105. @Override
  106. public <T, E extends Throwable> void close(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) {
  107. // 重试结束的时候调用 (最后一次重试 )
  108. System.out.println("---------------最后一次调用");
  109. return ;
  110. }
  111. @Override
  112. public <T, E extends Throwable> void onError(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) {
  113. // 异常 都会调用
  114. System.err.println("-----第{}次调用"+retryContext.getRetryCount());
  115. }
  116. });
  117. retryTemplate.setBackOffPolicy(backOffPolicyByProperties());
  118. retryTemplate.setRetryPolicy(retryPolicyByProperties());
  119. return retryTemplate;
  120. }
  121. @Bean
  122. public ExponentialBackOffPolicy backOffPolicyByProperties() {
  123. ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
  124. long maxInterval = properties.getListener().getSimple().getRetry().getMaxInterval().getSeconds();
  125. long initialInterval = properties.getListener().getSimple().getRetry().getInitialInterval().getSeconds();
  126. double multiplier = properties.getListener().getSimple().getRetry().getMultiplier();
  127. // 重试间隔
  128. backOffPolicy.setInitialInterval(initialInterval * 1000);
  129. // 重试最大间隔
  130. backOffPolicy.setMaxInterval(maxInterval * 1000);
  131. // 重试间隔乘法策略
  132. backOffPolicy.setMultiplier(multiplier);
  133. return backOffPolicy;
  134. }
  135. @Bean
  136. public SimpleRetryPolicy retryPolicyByProperties() {
  137. SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
  138. int maxAttempts = properties.getListener().getSimple().getRetry().getMaxAttempts();
  139. retryPolicy.setMaxAttempts(maxAttempts);
  140. return retryPolicy;
  141. }
  142. }
4、在程序中创建交换机,队列,并且绑定

DirectRabbitConfig.java(创建direct类型的交换机)

  1. @Configuration
  2. public class DirectRabbitConfig {
  3. //创建一个名为TestDirectQueue的队列
  4. @Bean
  5. public Queue TestDirectQueue(){
  6. // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
  7. // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
  8. // autoDelete:是否自动删除,有消息者订阅本队列,然后所有消费者都解除订阅此队列,会自动删除。
  9. // arguments:队列携带的参数,比如设置队列的死信队列,消息的过期时间等等。
  10. return new Queue("TestDirectQueue",true);
  11. }
  12. //创建一个名为TestDirectExchange的Direct类型的交换机
  13. @Bean
  14. DirectExchange TestDirectExchange(){
  15. // durable:是否持久化,默认是false,持久化交换机。
  16. // autoDelete:是否自动删除,交换机先有队列或者其他交换机绑定的时候,然后当该交换机没有队列或其他交换机绑定的时候,会自动删除。
  17. // arguments:交换机设置的参数,比如设置交换机的备用交换机(Alternate Exchange),当消息不能被路由到该交换机绑定的队列上时,会自动路由到备用交换机
  18. return new DirectExchange("TestDirectExchange",true,false);
  19. }
  20. //绑定交换机和队列
  21. @Bean
  22. Binding bindingDirect(){
  23. //bind队列to交换机中with路由key(routing key
  24. return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("123");
  25. }
  26. }

PS:一定要在该类上加@Configuration该注解,使得程序启动的时候运行配置类。

5、创建生产者测试
  1. @RestController
  2. public class TestController {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. @GetMapping("/sendMessage")
  6. public String sendDirectMessage(){
  7. String messageId = UUID.randomUUID().toString();
  8. String messageData = "test message,hello!";
  9. String current = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
  10. Map<String,Object> map = new HashMap<>();
  11. map.put("messageId",messageId);
  12. map.put("data",messageData);
  13. map.put("current",current);
  14. rabbitTemplate.convertAndSend("TestDirectExchange", "123", map, new CorrelationData(UUID.randomUUID().toString()));
  15. return "ok";
  16. }
  17. }
6、访问localhost:8111/sendMessage推送消息到消息队列中。

由于设置了消息发送确认,所以控制台会输出回调函数调用的内容。
在这里插入图片描述

登录RabbitMq后台查看消息情况。

在这里插入图片描述

7、创建一个消费者,来消费队列中的消息。
  1. @RabbitListener(queues = "TestDirectQueue")
  2. @Component
  3. public class DirectConsumer {
  4. @RabbitHandler
  5. public void process(Map map , Channel channel, Message message) throws IOException {
  6. System.out.println("消费者接受到的消息是:"+map.toString());
  7. //由于配置设置了手动应答,所以这里要进行一个手动应答。注意:如果设置了自动应答,这里又进行手动应答,会出现double ack,那么程序会报错。
  8. channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  9. }
  10. }

PS :@RabbitListener不仅可以加在类上,还可以加载方法体上。上述消费者,在程序启动后,如果该队列不存在,那么会报org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s) 错误。
        所以在消费者端,健壮的写法就是也创建队列和交换机,如果队列和交换机存在,那么就拿来使用,不存在则创建,这样就不会报该错误。
因此@RabbitListener有另一种用法,如下:
 

  1. @Component
  2. public class DirectConsumer2 {
  3. @RabbitHandler
  4. @RabbitListener(bindings = {@QueueBinding(
  5. value = @Queue(value = "q5",durable = "true"),//如果不括号中不指定队列名称,那么这时候创建的就是临时队列,当消费者连接断开的时候,该队列就会消失
  6. exchange = @Exchange(value = "myexchange",durable = "true",type = "direct"),
  7. key = "123")})
  8. public void process(Map map , Channel channel, Message message) throws IOException {
  9. System.out.println("消费者接收到的消息是"+map.toString());
  10. //由于配置设置了手动应答,所以这里要进行一个手动应答。注意:如果设置了自动应答,这里又进行手动应答,会出现double ack,那么程序会报错。
  11. channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  12. }
  13. }

上述列子在@RabbitListener中声明了队列和交换机,并且指定了routing key,当这一对关系存在时,那么会直接使用,不存在就会创建。

上述消费者和生产者都只是使用了Exchange的Direct模式。下面再介绍Fanout和Topic模式。

Fanout模式
fanout会忽略routingkey(路由键)的规则,只要绑定到该exchange上的队列都会收到该消息。所以fanout也相当于广播,队列只要订阅绑定了这个Exchange,那么消息都会被转发到这些队列中。
创建FanoutConfig.java,配置创建Fanout类型的Exchange,再创建三个队列FanoutA、FanoutB、FanoutC。将这三个队列绑定到创建的FanoutExchange中。

  1. @Configuration
  2. public class FanoutConfig {
  3. //创建FanoutExchange
  4. @Bean
  5. FanoutExchange fanoutExchange(){
  6. return new FanoutExchange("FanoutExchange",true,false);
  7. }
  8. //创建队列A
  9. @Bean
  10. Queue queueA(){
  11. return new Queue("FanoutA",true,false,false);
  12. }
  13. //创建队列B
  14. @Bean
  15. Queue queueB(){
  16. return new Queue("FanoutB",true,false,false);
  17. }
  18. //创建队列C
  19. @Bean
  20. Queue queueC(){
  21. return new Queue("FanoutC",true,false,false);
  22. }
  23. //将创建的队列绑定到创建的交换机上
  24. @Bean
  25. Binding bindingA(){
  26. return BindingBuilder.bind(queueA()).to(fanoutExchange());
  27. }
  28. @Bean
  29. Binding bindingB(){
  30. return BindingBuilder.bind(queueB()).to(fanoutExchange());
  31. }
  32. @Bean
  33. Binding bindingC(){
  34. return BindingBuilder.bind(queueC()).to(fanoutExchange());
  35. }
  36. }

在TestController.java中新加一个请求地址用来发送消息到Fanout交换机中

  1. @RestController
  2. public class TestController {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. @GetMapping("/sendMessage2")
  6. public String sendFanoutMessage(){
  7. String messageId = UUID.randomUUID().toString();
  8. String messageData = "test message,hello!";
  9. String current = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
  10. Map<String,Object> map = new HashMap<>();
  11. map.put("messageId",messageId);
  12. map.put("data",messageData);
  13. map.put("current",current);
  14. rabbitTemplate.convertAndSend("FanoutExchange", "", map, new CorrelationData(UUID.randomUUID().toString()));
  15. return "ok";
  16. }
  17. }

访问localhost:8111/sendMessage2,查看消息推送情况。
在这里插入图片描述
看到一条消息被转发到了这三个队列中。

之后创建一个消费者,消费消息,消费者的创建和上述差不多,只不过换了一些监听的队列而已。
FanoutConsumer.java

  1. @Component
  2. public class FanoutConsumer {
  3. //这里把两种@RabbitListener的注解都写出来了,这两种写法都要认得,第二种写法比较健壮
  4. //@RabbitListener(queues = "FanoutA")
  5. @RabbitListener(bindings = {@QueueBinding(
  6. value = @Queue(value = "FanoutA",durable = "true"),
  7. exchange = @Exchange(value = "FanoutExchange",durable = "true",type = "fanout"),
  8. key = ""
  9. )})
  10. @RabbitHandler
  11. public void processA(Map map, Channel channel, Message message) throws IOException {
  12. System.out.println("收到的FanoutA队列的消息是:"+map.toString());
  13. channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  14. }
  15. @RabbitListener(bindings = {@QueueBinding(
  16. value = @Queue(value = "FanoutB",durable = "true"),
  17. exchange = @Exchange(value = "FanoutExchange",durable = "true",type = "fanout"),
  18. key = ""
  19. )})
  20. @RabbitHandler
  21. public void processB(Map map, Channel channel, Message message) throws IOException {
  22. System.out.println("收到的FanoutB队列的消息是:"+map.toString());
  23. channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  24. }
  25. @RabbitListener(bindings = {@QueueBinding(
  26. value = @Queue(value = "FanoutC",durable = "true"),
  27. exchange = @Exchange(value = "FanoutExchange",durable = "true",type = "fanout"),
  28. key = ""
  29. )})
  30. @RabbitHandler
  31. public void processC(Map map, Channel channel, Message message) throws IOException {
  32. System.out.println("收到的FanoutC队列的消息是:"+map.toString());
  33. channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  34. }
  35. }

 

运行程序后结果如下:


Topic模式
exchange会转发到符合routingkey的消息队列中。也就是说发送消息的routingkey符合队列和exchange绑定的routingkey规则,那么这个消息就会被转发到这些队列中。一个消息有可能被多个队列消费。

创建TopicConfig.java,配置创建Topic类型的Exchange,再创建三个队列TopicA、TopicB、TopicC。将这三个队列绑定到创建的TopicExchange中。TopicA绑定的routing key为test.#,TopicB绑定的routing key为test.*,TopicC绑定的routing key为test.topic。
 

  1. @Configuration
  2. public class TopicConifg {
  3. @Bean
  4. TopicExchange topicExchange(){
  5. return new TopicExchange("TopicExchange",true,false);
  6. }
  7. @Bean
  8. Queue TopicqueueA(){
  9. return new Queue("TopicA",true,false,false);
  10. }
  11. @Bean
  12. Queue TopicqueueB(){
  13. return new Queue("TopicB",true,false,false);
  14. }
  15. @Bean
  16. Queue TopicqueueC(){
  17. return new Queue("TopicC",true,false,false);
  18. }
  19. @Bean
  20. Binding TopicbindingA(){
  21. return BindingBuilder.bind(TopicqueueA()).to(topicExchange()).with("test.#");
  22. }
  23. @Bean
  24. Binding TopicbindingB(){
  25. return BindingBuilder.bind(TopicqueueB()).to(topicExchange()).with("test.*");
  26. }
  27. @Bean
  28. Binding TopicbindingC(){
  29. return BindingBuilder.bind(TopicqueueC()).to(topicExchange()).with("test.topic");
  30. }
  31. }

在TestController.java中新加一个请求地址用来发送消息到Topic交换机中。

  1. @GetMapping("/sendMessage3")
  2. public String sendTopicMessage(){
  3. String messageId = UUID.randomUUID().toString();
  4. String messageData = "test message,hello!";
  5. String current = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
  6. Map<String,Object> map = new HashMap<>();
  7. map.put("messageId",messageId);
  8. map.put("data",messageData);
  9. map.put("current",current);
  10. rabbitTemplate.convertAndSend("TopicExchange", "test.topic.a", map, new CorrelationData(UUID.randomUUID().toString()));
  11. return "ok";
  12. }

访问localhost:8111/sendMessage3,推送消息到TopicExchange中,并且routing key 为test.topic.a。查看消息转发情况。

可以看到只有队列TopicA接收到消息了。因为在Topic模式下的Exchange,转发消息的routing key规则是:

#:匹配一个或者多个词
*:匹配一个或者0个词
比如test.topic.a 只会匹配test.#,test.a会匹配test. * 和test.#,test只会匹配test. *

1、修改发送消息的routing key 为test,会发现只有TopicA接收到消息。
2、修改发送消息的routing key 为test.topic时,会发现三个队列都接收到消息。

总结:

在Exchange中,有三种模式:Direct,Fanout,Topic。
Direct模式只会将消息转发到符合绑定routing key的队列中,如果没有符合routing key的队列,那么消息会丢失。而且Direct发送的消息是唯一的,也就是说再Direct中的一个消息,最后只会发送到一个队列中被消费。
Fanout模式会无视routing key,会把消息转发到所有绑定到该交换机上的队列中。所以Fanout中的一个消息,会转发到所有的队列中,也就是如果绑定了多个队列,那么一个相同的消息会在多个队列中。
Topic模式有一套转发的routing key规则,只会把消息转发到符合routing key 的队列中。所以在Topic中的一个消息有可能也会被转发到多个队列中进行消费。
 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/483397
推荐阅读
相关标签
  

闽ICP备14008679号