当前位置:   article > 正文

RabbitMQ(一) 原理解析及其生产消费模式_rabbitmq 消费模式

rabbitmq 消费模式

一、RabbitMQ 原理图及其分析

图一:RabbitMQ原理图

  • Virtual Host : 当不同的用户的使用同一个RabbitMQ 服务时,可以划分出多个 vhost,每个用户在自己的vhost中创建 Exchange/Queue 等。
  • Broker: 接收和分发消息的应用,也就是RabbitMQ的功能。
  • Connection : 生产者或者消费者和 Broker 建立的 TCP 连接。
  • Channel : 每次访问 RabbitMQ 都建立一个 Connection,如果每次都要建立一个 TCP 连接,开销是很大的,效率也很低。Channel 是在 Connection 内部建立的逻辑连接,Channel 之间是完全隔离的,Channel 作为轻量级的 Connection 减少了操作系统建立TCP连接的开销。
  • Exchange:消费者发送的消息首先会到达Exchange,根据 RoutingKey,分发消息到 Queue 中去,常见的类型有:Direct、Topic、Fanout。
  • Queue :消息最终存储的地方,等待消费者的消费。
  • Binding : Exchange 和 Queue 建立的虚拟连接, Binding 中包含 RoutingKey, Binding 信息被存储在 Exchange 中的查询表中。

二、核心功能及代码实现

1、生产者、消费者(简单模式)

在这里插入图片描述

图二:简单模式

实现步骤:创建队列 → 创建生产者 → 创建消费者 → 测试代码

//1、创建队列
@Configuration
public class HelloConfiguration {

    public static final String QUEUE_NAME = "queue_name";

    @Bean
    public Queue helloQueue() {
        return QueueBuilder.durable(QUEUE_NAME).build();
    }
}

// 2、创建生产者
@Slf4j
@RestController
public class HelloController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMessage/{message}")
    public void helloWorld(@PathVariable String message) {
        rabbitTemplate.convertAndSend("", QUEUE_NAME, message);
    }
}

// 3、创建消费者 (此处可以进行多个消费者的绑定,只需要在写一个消费者绑定到对列就可以)
@Slf4j
@Component
public class HelloConsumer {

    @RabbitListener(queues = QUEUE_NAME)
    public void helloConsumer(Message message) {
        log.info("消费者消费了消息:{}", new String(message.getBody()));
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

2、生产者、消费者(扇出模式)

图三:扇出模式

实现步骤:创建交换机 → 创建队列 → 绑定交换机和队列 → 创建生产者 → 创建消费者 → 测试代码
// 1、创建交换机 → 创建队列 → 绑定交换机和队列
@Configuration
public class FanoutConfiguration {

    public static final String FANOUT_EXCHANGE_NAME = "exchange_name";

    public static final String FANOUT_QUEUE_NAME_ONE = "queue_name_one";

    public static final String FANOUT_QUEUE_NAME_TWO = "queue_name_two";

		// 创建交换机
    @Bean
    public FanoutExchange fanoutExchange() {
        return ExchangeBuilder.fanoutExchange(FANOUT_EXCHANGE_NAME).build();
    }

		// 创建队列one
    @Bean
    public Queue queueOne() {
        return QueueBuilder.durable(FANOUT_QUEUE_NAME_ONE).build();
    }

		// 创建队列two
    @Bean
    public Queue queueTwo() {
        return QueueBuilder.durable(FANOUT_QUEUE_NAME_TWO).build();
    }

		// 绑定交换机和队列one
    @Bean
    public Binding queueOneToFanoutExchange(Queue queueOne, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queueOne).to(fanoutExchange);
    }
		
		// 绑定交换机和队列two
    @Bean
    public Binding queueTwoToFanoutExchange(Queue queueTwo, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queueTwo).to(fanoutExchange);
    }

}

// 2、创建生产者
@Slf4j
@RestController
public class FanoutController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendFanoutMessage/{message}")
    public void sendFanoutMessage(@PathVariable String message) {
        rabbitTemplate.convertAndSend(FANOUT_EXCHANGE_NAME, "", message);
    }
}

// 3、创建消费者
@Slf4j
@Component
public class FanoutOneConsumer {

    @RabbitListener(queues = FANOUT_QUEUE_NAME_ONE)
    public void receiveFanoutQueue(Message message) {
        log.info("consumer one 消费了消息:{}", new String(message.getBody()));
    }
}

@Slf4j
@Component
public class FanoutTwoConsumer {

    @RabbitListener(queues = FANOUT_QUEUE_NAME_TWO)
    public void receiveFanoutQueue(Message message) {
        log.info("consumer two 消费了消息:{}", new String(message.getBody()));
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76

3、生产者、消费者(路由模式)

图四:路由模式

实现步骤:创建交换机 → 创建队列 → 绑定交换机和队列 → 创建生产者 → 创建消费者 → 测试代码

// 1、创建交换机 → 创建队列 → 绑定交换机和队列
@Configuration
public class RoutingConfiguration {

    public static final String ROUTING_EXCHANGE_NAME = "routing_exchange_name";

    public static final String ROUTING_QUEUE_NAME_ONE = "routing_queue_one";

    public static final String ROUTING_QUEUE_NAME_TWO = "routing_queue_two";

    @Bean
    public DirectExchange directExchange() {
        return ExchangeBuilder.directExchange(ROUTING_EXCHANGE_NAME).build();
    }

    @Bean
    public Queue routingQueueOne() {
        return QueueBuilder.durable(ROUTING_QUEUE_NAME_ONE).build();
    }

    @Bean
    public Queue routingQueueTwo() {
        return QueueBuilder.durable(ROUTING_QUEUE_NAME_TWO).build();
    }

    @Bean
    public Binding queueOneBindingDirectExchangeWithInfo(Queue routingQueueOne, DirectExchange directExchange) {
        return BindingBuilder.bind(routingQueueOne).to(directExchange).with("info");
    }

    @Bean
    public Binding queueTwoBindingDirectExchangeWithInfo(Queue routingQueueTwo, DirectExchange directExchange) {
        return BindingBuilder.bind(routingQueueTwo).to(directExchange).with("info");
    }

    @Bean
    public Binding queueTwoBindingDirectExchangeWithWarning(Queue routingQueueTwo, DirectExchange directExchange) {
        return BindingBuilder.bind(routingQueueTwo).to(directExchange).with("warning");
    }

    @Bean
    public Binding queueTwoBindingDirectExchangeWithError(Queue routingQueueTwo, DirectExchange directExchange) {
        return BindingBuilder.bind(routingQueueTwo).to(directExchange).with("error");
    }
}

// 2、创建生产者
@RestController
public class RoutingController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/sendRoutingMessageInfo/{message}")
    public void sendRoutingMessage(@PathVariable String message) {
        rabbitTemplate.convertAndSend(ROUTING_EXCHANGE_NAME, "info", message.getBytes());
    }

    @RequestMapping("/sendRoutingMessageWarning/{message}")
    public void sendRoutingMessageWarning(@PathVariable String message) {
        rabbitTemplate.convertAndSend(ROUTING_EXCHANGE_NAME, "warning", message.getBytes());
    }

    @RequestMapping("/sendRoutingMessageError/{message}")
    public void sendRoutingMessageError(@PathVariable String message) {
        rabbitTemplate.convertAndSend(ROUTING_EXCHANGE_NAME, "error", message.getBytes());
    }
    
}

 // 3、创建消费者
@Slf4j
@Component
public class RoutingOneConsumer {

    @RabbitListener(queues = ROUTING_QUEUE_NAME_ONE)
    public void receiveMessage(Message message) {
        log.info("RoutingOneConsumer接收到的消息为:{}", new String(message.getBody()));
    }
}

@Slf4j
@Component
public class RoutingTwoConsumer {

    @RabbitListener(queues = ROUTING_QUEUE_NAME_TWO)
    public void receiveMessage(Message message) {
        log.info("RoutingTwoConsumer接收到的消息为:{}", new String(message.getBody()));
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91

4、生产者、消费者(主题模式)

图五:主题模式

- 该模式必须满足是**一个单词的列表,以 “.” 分隔。** - *** (星号)可以代替一个单词** - **# (井号)可以替代零个或多个单词**

.green. : 三个单词,中间的单词为green

..blue : 三个单词, 最后一个单词为blue

red.# : 以red 开头

注:如果满足两个条件路由则只发送一条

实现步骤:创建交换机 → 创建队列 → 绑定交换机和队列 → 创建生产者 → 创建消费者 → 测试代码

// 1、创建交换机 → 创建队列 → 绑定交换机和队列
@Configuration
public class TopicConfiguration {

    public static final String TOPIC_EXCHANGE_NAME = "topic_exchange_name";

    public static final String TOPIC_QUEUE_ONE_NAME = "topic_queue_one";

    public static final String TOPIC_QUEUE_TWO_NAME = "topic_queue_two";

    @Bean
    public TopicExchange topicExchange() {
        return ExchangeBuilder.topicExchange(TOPIC_EXCHANGE_NAME).build();
    }

    @Bean
    public Queue topicQueueOne() {
        return QueueBuilder.durable(TOPIC_QUEUE_ONE_NAME).build();
    }

    @Bean
    public Queue topicQueueTwo() {
        return QueueBuilder.durable(TOPIC_QUEUE_TWO_NAME).build();
    }

    @Bean
    public Binding topicQueueOneBindingExchangeWithGreen(Queue topicQueueOne, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicQueueOne).to(topicExchange).with("*.green.*");
    }

    @Bean
    public Binding topicQueueTwoBindingExchangeWitBlue(Queue topicQueueTwo, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicQueueTwo).to(topicExchange).with("*.*.blue");
    }

    @Bean
    public Binding topicQueueTwoBindingExchangeWitRed(Queue topicQueueTwo, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicQueueTwo).to(topicExchange).with("red.#");
    }
}
// 2、创建生产者 
@RestController
public class TopicController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/sendTopicMessageColor/{message}")
    public void sendRoutingMessage(@PathVariable String message) {
        rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, "red.green.blue", message.getBytes());
    }

    @RequestMapping("/sendTopicMessageBlue/{message}")
    public void sendTopicMessageBlue(@PathVariable String message) {
        rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, "a.a.blue", message.getBytes());
    }

    @RequestMapping("/sendTopicMessageRedAndBlue/{message}")
    public void sendTopicMessageRedAndBlue(@PathVariable String message) {
        rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, "red.a.blue", message.getBytes());
    }
}

// 3、创建消费者
@Slf4j
@Component
public class TopicOneConsumer {

    @RabbitListener(queues = TOPIC_QUEUE_ONE_NAME)
    public void receiveMessage(Message message) {
        log.info("TopicOneConsumer接收到的消息为:{}", new String(message.getBody()));
    }
}

@Slf4j
@Component
public class TopicTwoConsumer {

    @RabbitListener(queues = TOPIC_QUEUE_TWO_NAME)
    public void receiveMessage(Message message) {
        log.info("TopicTwoConsumer接收到的消息为:{}", new String(message.getBody()));
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82

5、生产者、消费者(发布确认模式)

图六:发布确认模式

首先在在配置文件中添加

spring.rabbitmq.publisher-confirm-type=correlated
  • 1

实现步骤:创建交换机 → 创建队列 → 绑定交换机和队列 → 创建回调函数 → 创建生产者 → 创建消费者 → 测试代码

// 1、创建交换机 → 创建队列 → 绑定交换机和队列 
@Configuration
public class ConfirmConfiguration {

    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";

    public static final String CONFIRM_QUEUE_NAME = "confirm.queue";

    @Bean
    public DirectExchange confirmDirectExchange() {
        return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).build();
    }

    @Bean
    public Queue confirmQueue() {
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }

    @Bean
    public Binding confirmQueueBindingConfirmDirectExchange(Queue confirmQueue, DirectExchange confirmDirectExchange) {
        return BindingBuilder.bind(confirmQueue).to(confirmDirectExchange).with("confirm_key");
    }
}

// 2、创建回调函数
@Slf4j
@Component
public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            log.info("交换机已经收到ID为 {} 的消息", correlationData.getId());
        } else {
            log.info("交换机还未收到Id为 {} 的消息,由于原因 {}", correlationData.getId(), cause);
        }
    }

    @Override
    public void returnedMessage(ReturnedMessage returned) {
        log.error("消息:{}, 被交换机{}退回, 退回原因:{}, 路由的key:{}",
                new String(returned.getMessage().getBody()),
                returned.getExchange(),
                returned.getReplyText(),
                returned.getRoutingKey());
    }
}

//创建生产者
@RestController
public class ConfirmController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/sendConfirmMessage/{message}")
    public void sendConfirmMessage(@PathVariable String message) {
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, "confirm_key", message.getBytes(), correlationData);
    }
}

// 创建消费者
@Slf4j
@Component
public class ConfirmConsumer {

    @RabbitListener(queues = CONFIRM_QUEUE_NAME)
    public void receiveConfirmQueue(Message message) {
        log.info("ConfirmConsumer 消费了消息:{}", new String(message.getBody()));
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82

三、总结

通过自己的学习、画图理解和总结,加深了对RabbitMQ的 Exchange、routingKey、Queue、Producer、Consumer之间交互的理解,接下来还会通过画图的方式去分析死信队列、延时队列、优先级队列等。


声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/400445
推荐阅读
相关标签
  

闽ICP备14008679号