当前位置:   article > 正文

在SpringBoot中对RabbitMQ三种使用方式_springboot使用rabbitmq

springboot使用rabbitmq

基于API的方式

        1.使用AmqpAdmin定制消息发送组件

  1. @Autowired
  2. private AmqpAdmin amqpAdmin;
  3. @Test
  4. public void amqpAdmin(){
  5. //1.定义fanout类型的交换器
  6. amqpAdmin.declareExchange(new FanoutExchange("fanout_exchange"));
  7. //2.定义两个默认持久化队列,分别处理email和sms
  8. amqpAdmin.declareQueue(new Queue("fanout_queue_email"));
  9. amqpAdmin.declareQueue(new Queue("fanout_queue_sms"));
  10. //3.将队列分别与交换器进行绑定
  11. // 队列名 是队列 交换机的名称 路由 其它参数
  12. amqpAdmin.declareBinding(new Binding("fanout_queue_email",Binding.DestinationType.QUEUE,"fanout_exchange","",null));
  13. amqpAdmin.declareBinding(new Binding("fanout_queue_sms",Binding.DestinationType.QUEUE,"fanout_exchange","",null));
  14. }

    

   

2.消息发送者发送消息

             创建实体类

                

  1. import lombok.AllArgsConstructor;
  2. import lombok.Data;
  3. import lombok.NoArgsConstructor;
  4. @Data
  5. @AllArgsConstructor
  6. @NoArgsConstructor
  7. public class User {
  8. private Integer id;
  9. private String name;
  10. }

                发送消息

 

  1. @Autowired
  2. private RabbitTemplate re;
  3. @Test//消息发送者
  4. public void subPublisher(){
  5. User user = new User(1,"小满");
  6. re.convertAndSend("fanout_exchange", "", user);
  7. }

                        如图所以,如果我们直接发送的话就会报这个错,有两种解决方法,第一种是比较常用的让实体类User实现序列化Serializable接口,这里我们不做演示,第二种是写一个配置类,只有在RabbitMQ可以使用

  1. import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
  2. import org.springframework.amqp.support.converter.MessageConverter;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. @Configuration
  6. public class RabbitMQConfig {
  7. //定制JSON格式的消息转化器
  8. @Bean
  9. public MessageConverter messageConverter(){
  10. return new Jackson2JsonMessageConverter();
  11. }
  12. }

        加上配置类后我们发送就不会报错了,我们也可以在RabbitMQ的可视化端口看到我们发送的消息

 

 

        3.发送完消息后接下来就是消费消息了,定义接收消息的业务

  1. import org.springframework.amqp.core.Message;
  2. import org.springframework.amqp.rabbit.annotation.Exchange;
  3. import org.springframework.amqp.rabbit.annotation.Queue;
  4. import org.springframework.amqp.rabbit.annotation.QueueBinding;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.stereotype.Service;
  7. @Service
  8. public class RabbitMQService {
  9. //发布订阅模式: @RabbitListener可以指定当前方法监控哪一个队列
  10. @RabbitListener(queues = "fanout_queue_email")//消费者可以消费多个队列的消息
  11. public void subConsumerEmail(Message message){
  12. //当队列中有内容是方法会自动执行 推荐Object来接收
  13. //官网推荐Message
  14. byte[] body = message.getBody();//Message将数据存放在body中
  15. String msg = new String(body);
  16. System.out.println("邮件业务接收到消息:"+msg);
  17. }
  18. @RabbitListener(queues = "fanout_queue_sms")
  19. public void subConsumerSms(Message message){
  20. byte[] body = message.getBody();
  21. String msg = new String(body);
  22. System.out.println("短信业务接收到消息:"+msg);
  23. }
  24. }

        4.重新运行发送端就可以接收到我们发送的数据,接收的数据可能打印在任意一个控制台中,这是idea的机制,我们不需要管 

基于配置类的方式

        1.在config配置类中定义

  1. import org.springframework.amqp.core.*;
  2. import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
  3. import org.springframework.amqp.support.converter.MessageConverter;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. @Configuration
  7. public class RabbitMQConfig {
  8. //定制JSON格式的消息转化器
  9. @Bean
  10. public MessageConverter messageConverter(){
  11. return new Jackson2JsonMessageConverter();
  12. }
  13. // 1.fanout创建一个交换机
  14. @Bean
  15. public Exchange fanoutExchange(){
  16. return ExchangeBuilder.fanoutExchange("fanout_exchange").build();
  17. }
  18. //2.定义消息队列
  19. @Bean
  20. public Queue fanoutQueueEmail(){
  21. return new Queue("fanout_queue_email");
  22. }
  23. @Bean
  24. public Queue fanoutQueueSms(){
  25. return new Queue("fanout_queue_sms");
  26. }
  27. //3.将创建的队列绑定到对应的交换机上
  28. @Bean
  29. public Binding bingingEmail(){
  30. return BindingBuilder.bind(fanoutQueueEmail()).to(fanoutExchange()).with("").noargs();
  31. }
  32. @Bean
  33. public Binding bingingSms(){
  34. return BindingBuilder.bind(fanoutQueueSms()).to(fanoutExchange()).with("").noargs();
  35. }
  36. }

        2.为了避免api的影响,我们可以在可视化端口将基于api创建的交换机和队列删除

                1)删除交换机

 

 

                2)删除队列,前面也是点击队列的名字 

        可以看到我已经将交换机和消息队列都已经删除,接下来我们重新启动项目 ,配置类可以在启动的时候自动创建

 

         我们的订阅发布模式也是可以正常运行

        

基于注解类的方式        

        1.我们要现将基于配置类的方式注释掉,避免影响我们测试

  1. import org.springframework.amqp.core.Message;
  2. import org.springframework.amqp.rabbit.annotation.Exchange;
  3. import org.springframework.amqp.rabbit.annotation.Queue;
  4. import org.springframework.amqp.rabbit.annotation.QueueBinding;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.stereotype.Service;
  7. @Service
  8. public class RabbitMQService {
  9. @RabbitListener(bindings = @QueueBinding(
  10. value = @Queue("fanout_queue_email"),
  11. exchange=@Exchange(value = "fanout_exchange",type = "fanout")
  12. ))
  13. public void subConsumerEmail(Message message){
  14. //当队列中有内容是方法会自动执行 推荐Object来接收
  15. //官网推荐Message
  16. byte[] body = message.getBody();//Message将数据存放在body中
  17. String msg = new String(body);
  18. System.out.println("邮件业务接收到消息:"+msg);
  19. }
  20. @RabbitListener(bindings = @QueueBinding(
  21. value = @Queue("fanout_queue_sms"),
  22. exchange=@Exchange(value = "fanout_exchange",type = "fanout")
  23. ))
  24. public void subConsumerSms(Message message){
  25. byte[] body = message.getBody();
  26. String msg = new String(body);
  27. System.out.println("短信业务接收到消息:"+msg);
  28. }
  29. }

        提前将交换机和队列删除,然后运行,就会发现会在启动时会自动生成交换机和队列,测试也不会有影响

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/405703?site
推荐阅读
相关标签
  

闽ICP备14008679号