当前位置:   article > 正文

RabbitMQ(4):springboot整合rabbitmq_rabbitmq版本和2.4boot

rabbitmq版本和2.4boot

springboot-Spring AMQP

Spring有很多不同的项目,其中就有对AMQP的支持:
在这里插入图片描述

Spring AMQP的页面 https://spring.io/projects/spring-amqp
Spring-amqp是对AMQP协议的抽象实现,而spring-rabbit 是对协议的具体实现,也是目前的唯一实现。底层使用的就是RabbitMQ。

springboot整合rabbitmq

源码:

链接:https://pan.baidu.com/s/1PT_d64NuUSQhzNPpx_WDMw?pwd=4mur
提取码:4mur

创建springboot项目

添加AMQP的依赖

 		<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

在application.yml中添加RabbitMQ配置:

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtualHost: /
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

基本消息模型

RabbitmqConfig 配置类

@Configuration
public class RabbitmqConfig {
    //创建基本队列
    @Bean
    public Queue basicQueue(){
        //name队列名称;durable 是否持久化 false不持久化; exclusive是否独有的  ;autoDelete 是否自动删除 ;argument  参数
        //return new Queue("common-queue"); 上面也就是默认的 默认 持久化,非独占,不是自动删除;
         return new Queue("hello",true,false,false,null);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

生产者:

@RestController
public class BasicProducer {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @RequestMapping("/basicProducer")
    public void producer(){
        System.out.println("发送基本模型消息");
        rabbitTemplate.convertAndSend("hello","基本模型消息");
       /**
         * convertAndSend比较常用参数:
         * exchange  指定交换机、
         * RoutingKey 路由key、
         * object 消息体、
         */
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

消费者

@Component
public class BasicConsumer {
    @RabbitListener(queues = "hello")
    public void listener(String msg){
        System.out.println("监听到的消息==="+msg);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

word消息模型

相较于基本消息模型,word消息模型 可以有2个以上消费者共同消费队列中的消息

RabbitmqConfig 配置类

与基本消息模型一样
  • 1

生产者

	与基本消息模型一样
  • 1

消费者

写2个以上消费者
消费者共同消费队列中的消息
默认公平轮询;
  • 1
  • 2
  • 3

怎么能者多劳?

在yml中添加以下配置。

listener:
      simple:
        # 公平分发
        prefetch: 1
  • 1
  • 2
  • 3
  • 4

在这里插入图片描述

订阅模型-Fanout (广播)

将消息交给所有绑定到交换机的队列

RabbitmqConfig 配置类

创建一个交换机多个队列

@Configuration
public class RabbitmqConfig {
 //创建fanout交换机
    @Bean
    public FanoutExchange fanoutExchange() {
        // System.out.println("交换机");
        return new FanoutExchange("fanout-exchange");
    }

    //创建fanout队列
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout-queue1",true,false,false,null);
    }
    //创建fanout队列
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout-queue2",true,false,false,null);
    }

    //把队列绑定到交换机
    @Bean
    public Binding fanoutQueueTofanoutExchange(){
        return  BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
    }
    @Bean
    public Binding fanoutQueueTofanoutExchange2(){
        return  BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

生产者

@RestController
public class FanoutProducer {

    @Autowired
    RabbitTemplate rabbitTemplate;
    /**
     * fanout广播
     */
    @RequestMapping("/fanoutProducer")
    public void fanoutProducer(){
        for(int i=0;i<10;i++){
        rabbitTemplate.convertAndSend("fanout-exchange","","广播消息");
          }
        System.out.println("发送成功");
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

消费者

@Component
public class FanoutConsumer {

    /**
     * fanout 消息模型
     */
    @RabbitListener(queues = "fanout-queue1")
    public void listenerFanout1(String msg) {
        System.out.println("fanout接受消息1" + msg);
    }

    /**
     * fanout 消息模型
     */
    @RabbitListener(queues = "fanout-queue2")
    public void listenerFanout2(String msg) {
        System.out.println("fanout接受消息2" + msg);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

订阅模型-Direct(定向)

把消息交给符合指定routing key 的队列

RabbitmqConfig 配置类

创建一个交换机多个队列

@Configuration
public class RabbitmqConfig {
 //创建direct交换机
    @Bean
    public DirectExchange directExchange() {
        // System.out.println("交换机");
        return new DirectExchange("direct-exchange");
    }

    //创建direct队列
    @Bean
    public Queue directQueue1(){
        return new Queue("direct-queue1",true,false,false,null);
    }
    @Bean
    public Queue directQueue2(){
        return new Queue("direct-queue2",true,false,false,null);
    }

    //把direct队列绑定到交换机
    @Bean
    public Binding directQueueTodirectExchange1(){
        return  BindingBuilder.bind(directQueue1()).to(directExchange()).with("error");
    }
    @Bean
    public Binding directQueueTodirectExchange2(){
        return  BindingBuilder.bind(directQueue2()).to(directExchange()).with("info");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

生产者

根据不同的routingkey 将消息发送不同的队列中

@RestController
public class DirectProducer {

    @Autowired
    RabbitTemplate rabbitTemplate;
    /**
     * direct定向
     * */
    @RequestMapping("/directProducer")
    public void directProducer(){
        rabbitTemplate.convertAndSend("direct-exchange","info","定向消息error");
        System.out.println("发送成功");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

消费者

@Component
public class DirectConsumer {
    /**
     * direct 消息模型
     */
    @RabbitListener(queues = "direct-queue1")
    public void listenerDirect1(String msg){
        System.out.println("direct接受消息1" + msg);
    }
    /**
     * direct 消息模型
     */
    @RabbitListener(queues = "direct-queue2")
    public void listenerDirect2(String msg){
        System.out.println("direct接受消息2" + msg);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

订阅模型-Topics(通配符)

把消息交给符合routing pattern(路由模式) 的队列

RabbitmqConfig 配置类

创建一个交换机多个队列

@Configuration
public class RabbitmqConfig {
   //创建Topic交换机
    @Bean
    public TopicExchange topicExchange() {
        // System.out.println("交换机");
        return new TopicExchange("topic-exchange");
    }

    //创建topic队列
    @Bean
    public Queue topicQueueInfo(){

        return new Queue("topic-queue-info",true,false,false,null);
        //return new Queue("common-queue"); 上面也就是默认的
    }
    //创建topic队列
    @Bean
    public Queue topicQueueError(){

        return new Queue("topic-queue-error",true,false,false,null);
        //return new Queue("common-queue"); 上面也就是默认的
    }

    //把topic队列绑定到交换机
    @Bean
    public Binding InfoQueueToTopicExchange(){
        return  BindingBuilder.bind(topicQueueInfo()).to(topicExchange()).with("info.*");
    }
    @Bean
    public Binding ErrorQueueToTopicExchange(){
        return  BindingBuilder.bind(topicQueueError()).to(topicExchange()).with("common.*");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

生产者

根据不同的routingkey 将消息发送不同的队列中

@RestController
public class TopicProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @RequestMapping("/send")
    public void send(String msg){
        System.out.println(msg);
        rabbitTemplate.convertAndSend("topic-exchange","info.one",msg);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

消费者

@Component
public class TopicConsumer {
    @RabbitListener(queues="topic-queue-info")
    public void listenerInfo(String msg, Message message, Channel channel) throws IOException {
        System.out.println("进入消费者info"+msg);
    }

    @RabbitListener(queues="topic-queue-error")
    public void listenerError(String msg, Message message, Channel channel) throws IOException {
        System.out.println("进入消费者Error"+msg);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/喵喵爱编程/article/detail/956433
推荐阅读
相关标签
  

闽ICP备14008679号