当前位置:   article > 正文

Docker部署RabbitMQ与简单使用

Docker部署RabbitMQ与简单使用

官网地址: Messaging that just works — RabbitMQ

我的Docker博客:Docker-CSDN博客

1.结构

其中包含几个概念:

  • **publisher**:生产者,也就是发送消息的一方

  • **consumer**:消费者,也就是消费消息的一方

  • **queue**:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理

  • **exchange**:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。

  • **virtual host**:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue

2.Docker安装

基于Docker来安装RabbitMQ,使用下面的命令即可:

  1. docker run \
  2. -e RABBITMQ_DEFAULT_USER=user1 \
  3. -e RABBITMQ_DEFAULT_PASS=123 \
  4. -v mq-plugins:/plugins \
  5. --name mq \
  6. --hostname mq \
  7. -p 15672:15672 \
  8. -p 5672:5672 \
  9. --network n1 \
  10. -d \
  11. rabbitmq:3.8-management

root 是 你设置的用户名

123是你设置的密码

n1是你自定义的网络

可以看到在安装命令中有两个映射的端口:

  • 15672:RabbitMQ提供的管理控制台的端口

  • 5672:RabbitMQ的消息发送处理接口

安装完成后,我们访问 http://192.168.150.101:15672http://192.168.48.129:15672/http://192.168.150.101:15672即可看到管理控制台。首次访问需要登录,默认的用户名和密码在配置文件中已经指定了。

注意网址中的192.168.48.129要改成你自己虚拟机的

3.后端

3.1引入依赖

  1. <!--AMQP依赖,包含RabbitMQ-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>

3.2yml

  1. spring:
  2. rabbitmq:
  3. host: 192.168.48.129
  4. port: 5672
  5. virtual-host: /
  6. username: user1
  7. password: 123
  8. listener:
  9. simple:
  10. prefetch: 1

 3.3.1.@Bean的方式声明队列和交换机

新建一个config类
用fanout类型的交换机来做例子
下面的代码会生成一个交换机

  1. import org.springframework.amqp.core.*;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. @Configuration
  5. public class DirectConfig {
  6. //声明交换机
  7. @Bean
  8. public FanoutExchange fanoutExchange(){
  9. return new FanoutExchange("a.fanout");
  10. }
  11. //创建1个队列
  12. @Bean
  13. public Queue fanoutQueue1(){
  14. return new Queue("fanout.queue1");
  15. }
  16. //绑定队列和交换机
  17. @Bean
  18. public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
  19. return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
  20. }
  21. }

接收并处理

写在业务里

  1. import lombok.extern.slf4j.Slf4j;
  2. import org.springframework.amqp.core.ExchangeTypes;
  3. import org.springframework.amqp.rabbit.annotation.Exchange;
  4. import org.springframework.amqp.rabbit.annotation.Queue;
  5. import org.springframework.amqp.rabbit.annotation.QueueBinding;
  6. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  7. import org.springframework.stereotype.Component;
  8. @Slf4j
  9. @Component
  10. public class MqListener {
  11. //业务逻辑
  12. @RabbitListener(queues = "fanout.queue1")
  13. public void listenFanoutQueue1(String msg) throws InterruptedException {
  14. System.out.println("消费者1 收到了 fanout.queue1的消息:【" + msg +"】");
  15. }
  16. }

测试类

  1. @Test
  2. void test(){
  3. String queuename = "hmall.fanout1";
  4. String message = "hello, amqp!";
  5. rabbitTemplate.convertAndSend(queuename, null,message);
  6. }


3.3.2基于注解声明交换机和队列

不用再在配置类里去声明了,直接业务里用注解一同声明交换机和队列了
这里用direct类型的交换机来做例子
下面代码会创建一个交换机b.direct和两个队列direct.queue1,direct.queue2

  1. import lombok.extern.slf4j.Slf4j;
  2. import org.springframework.amqp.core.ExchangeTypes;
  3. import org.springframework.amqp.rabbit.annotation.Exchange;
  4. import org.springframework.amqp.rabbit.annotation.Queue;
  5. import org.springframework.amqp.rabbit.annotation.QueueBinding;
  6. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  7. import org.springframework.stereotype.Component;
  8. import java.util.Map;
  9. @Slf4j
  10. @Component
  11. public class MqListener {
  12. @RabbitListener(bindings = @QueueBinding(
  13. value = @Queue(name = "direct.queue1", durable = "true"),
  14. exchange = @Exchange(name = "b.direct", type = ExchangeTypes.DIRECT),
  15. key = {"red", "blue"}
  16. ))
  17. public void listenDirectQueue1(String msg) throws InterruptedException {
  18. System.out.println("消费者1 收到了 direct.queue1的消息:【" + msg +"】");
  19. }
  20. @RabbitListener(bindings = @QueueBinding(
  21. value = @Queue(name = "direct.queue2", durable = "true"),//队列名,是否持久化
  22. //交换机名,交换机类型
  23. exchange = @Exchange(name = "b.direct", type = ExchangeTypes.DIRECT),
  24. //direct类型交换机需要传的key
  25. key = {"red", "yellow"}
  26. ))
  27. public void listenDirectQueue2(String msg) throws InterruptedException {
  28. System.out.println("消费者2 收到了 direct.queue2的消息:【" + msg +"】");
  29. }
  30. }

测试类

  1. @Test
  2. void testSendDirect() {
  3. String exchangeName = "b.direct";
  4. String msg = "蓝色通知";
  5. rabbitTemplate.convertAndSend(exchangeName, "blue", msg);
  6. }
  7. @Test
  8. void testSendDirect2() {
  9. String exchangeName = "b.direct";
  10. String msg = "红色通知";
  11. rabbitTemplate.convertAndSend(exchangeName, "red", msg);
  12. } @Test
  13. void testSendDirect3() {
  14. String exchangeName = "b.direct";
  15. String msg = "黄色通知";
  16. rabbitTemplate.convertAndSend(exchangeName, "yellow", msg);
  17. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/551813
推荐阅读
相关标签
  

闽ICP备14008679号