当前位置:   article > 正文

RabbitMQ_rabbitmq:management

rabbitmq:management

消息队列

同步:A向B发送消息,A在接受到B的反馈之前都停止活动,接受到B的反馈后进行下一步操作

异步:A向B发送消息,A不用等B的反馈,在给B发送完成消息之后就可以进行下一步操作

同步调用的缺点:

        耦合度高:增加新功能都要重新修改代码                                                                                          性能下降:调用者需要等待服务提供者响应,如果调用链过长则响应时间等于每次调用的时间之和                                                                                                                                                        资源浪费:调用链中的每个服务在等待响应过程中,不能释放请求占用的资源,高并发场景下会极度浪费系统资源                                                                                                                                级联失败:如果服务提供者出现问题,所有调用方都会跟着出问题,如同多米诺骨牌一样,迅速导致整个微服务群故障

异步调用的优点:解耦合(事件驱动模式)、性能提升(不用等待响应)、 没有级联、流量消峰(应对高并发)       

  一、RabbitMQ的部署                                                                                                  

        1.拉取镜像               docker pull rabbitmq:3-management                                                             3:是rabbitmq的版本        management:是指RabbitMQ 的 management 镜像,management           镜像提供了许多管理工具和功能,如消息队列管理、用户管理、权限管理等,这些功能在普           通的 Docker 镜像中可能没有或者不完整。

        2.执行容器镜像    

        docker run  -e RABBITMQ_DEFAULT_USER=user  -e                    RABBITMQ_DEFAULT_PASS=123456  --name mq  --hostname mq1  -p 15672:15672  -p         5672:5672  -d  rabbitmq:3-mannagement

        -e RABBITMQ_DEFAULT_USER=user:运行的用户名为user                                                          -e RABBITMQ_DEFAULT_PASS=123456:运行的密码为123456                                                    --name mq:运行的容器名称                                                                                                              --hostname mq1:容器的主机名                                                                                                        -p 15672:15672:控制台的端口                                                                                                        -p 5672:5672:消息通信的端口                                                                                                          -d:后台运行                                                                                                                                      rabbitmq:3-management:镜像

二、RabbitMQ的结构

        

publisher:生产者               exchange:交换机                queue:消息队列                consumer:消费者

virustualhost:虚拟主机(各个虚拟主机互不干扰)         channel:操作MQ的工具

三、RabbitMQ的常见消息模型(基于AMQP实现)

        基本消息队列(BasicQueue)

        工作消息队列(WorkQueue)

        发布订阅(Publish、Subscribe):Fanout Exchange:广播

                                                                 Direct Exchange:路由

                                                                 Topic Exchange:主题

        1.基本消息队列(BasicQueue)

        

   导入依赖和配置

  1. <!--依赖放在pom文件中-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>
  6. //yml配置文件,配置主机,端口,用户名,密码,虚拟主机
  7. spring:
  8. rabbitmq:
  9. host: 192.168.35.150 #自己运行rabbitmq的ip地址
  10. port: 5672
  11. virtual-host: /
  12. username: user #之前运行rabbitmq镜像的用户名
  13. password: 123456 #之前运行rabbitmq镜像的密码
  14. listener:
  15. simple:
  16. prefetch: 1 #有多个消费者时,消费者会默认预取消息,设置取消息的上限为1

 发送消息

  1. @RunWith(SpringRunner.class)
  2. @SpringBootTest
  3. public class PublisherTest {
  4. @Autowired
  5. private RabbitTemplate template;
  6. @Test
  7. public void sendSimpleQueue(){
  8. String message="hello RabbitAMQP"; //消息
  9. String queueName="simple.queue"; //队列
  10. template.convertAndSend(message,queueName); //发送消息
  11. }
  12. }

 接受消息

  1. @Component
  2. public class RabbitMQListener {
  3. @RabbitListener(queues = "simple.queue") //RabbitListener监听
  4. public void listenerMessage(String msg){ //用msg来接受监听到的消息
  5. System.out.println("接受到消息:"+msg);
  6. }
  7. }

 2.工作消息队列(WorkQueue)

多个消费者,提高消息处理速度,避免消息堆积

发送消息 

  1. @Test
  2. public void sendWorkQueue() throws InterruptedException {
  3. String message="hello RabbitAMQP";
  4. String queueName="simple.queue";
  5. for(int i=0;i<50;i++){
  6. template.convertAndSend(queueName,message+i);
  7. Thread.sleep(20); //休眠让消费者有时间处理消息
  8. }
  9. }

接受消息

  1. //消费者1
  2. @RabbitListener(queues = "simple.queue")
  3. public void listenerWork1(String msg) throws InterruptedException {
  4. System.out.println("接受到消息1:"+msg+" "+ LocalDateTime.now());
  5. Thread.sleep(20); //休眠时间代表消息的处理能力 1是2的10倍
  6. }
  7. //消费者2
  8. @RabbitListener(queues = "simple.queue")
  9. public void listenerWork2(String msg) throws InterruptedException {
  10. System.err.println("接受到消息2:"+msg+" "+LocalDateTime.now());
  11. Thread.sleep(200);
  12. }

   3.发布订阅(Publish、Subscribe)

        fanout(广播):默认自动发送到多个消费者

        

   Direct (路由):根据key,如图中的(orange,black,green)路由到对应的消费者

Topic (主题):根据key (key必须是多个单词,并以.号区分),可以用通配符来代替

 发送消息

  1. //发送消息
  2. @Test
  3. public void sendFanoutExchange(){ //fanout
  4. String exchangeName="fanout";
  5. String message="hello fanoutExchange";
  6. template.convertAndSend(exchangeName,"",message);
  7. }
  8. @Test
  9. public void sendDirectExchange(){ //direct
  10. String exchangeName="direct";
  11. String message="hello blue";
  12. template.convertAndSend(exchangeName,"blue",message);
  13. }
  14. @Test
  15. public void sendTopicExchange(){ //topic
  16. String exchangeName="topic";
  17. String message="hello topic";
  18. template.convertAndSend(exchangeName,"china.news",message);
  19. }

接受消息

  1. //接受消息1
  2. @RabbitListener(bindings=@QueueBinding(
  3. value = @Queue(name = "direct.queue1"),
  4. exchange = @Exchange(name = "direct",type = "direct"),
  5. key = {"red","blue"} //这个为direct,fanout没有key
  6. //key={"china.#"} //这个是topic
  7. ))
  8. public void listenerDirectExchange1(String message){
  9. System.err.println("接受到消息direct1:"+message+" "+LocalDateTime.now());
  10. //Thread.sleep(20) //这个fanout休眠对于每个消息的处理
  11. }
  12. //接受消息2
  13. @RabbitListener(bindings=@QueueBinding(
  14. value = @Queue(name = "direct.queue2"),
  15. exchange = @Exchange(name = "direct",type = "direct"),
  16. key = {"yellow"} //这个为direct,fanout没有key
  17. //key={"#.news"} //这个是topic
  18. ))
  19. public void listenerDirectExchange2(String message){
  20. System.err.println("接受到消息direct2:"+message+" "+LocalDateTime.now());
  21. //Thread.sleep(20) //这个fanout休眠对于每个消息的处理
  22. }

四、消息转换器

在SpringAMQP的发送方法中,接收消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送

发送消息

  1. @Test
  2. public void testSendMap() throws InterruptedException {
  3. // 准备消息
  4. Map<String,Object> msg = new HashMap<>();
  5. msg.put("name", "张三");
  6. msg.put("age", 30);
  7. // 发送消息
  8. rabbitTemplate.convertAndSend("simple.queue", msg);
  9. }

接受消息

  1. @RabbitListener(queues = "simple.queue")
  2. public void listenSimpleQueueMessage(Map<String, Object> msg) throws InterruptedException {
  3.     System.out.println("spring 消费者接收到消息:" + msg);
  4. }

因为默认发送的数据是jdk序列化之后的,难以阅读。所以要进行反序列化。

导入依赖

  1. <!--jackson依赖-->
  2. <dependency>
  3. <groupId>com.fasterxml.jackson.dataformat</groupId>
  4. <artifactId>jackson-dataformat-xml</artifactId>
  5. <version>2.13.3</version>
  6. </dependency>

在config配置类中配置bean

  1. @Bean
  2. public MessageConverter jsonMessageConverter(){
  3. return new Jackson2JsonMessageConverter();
  4. }

重启服务
 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/512058
推荐阅读
相关标签
  

闽ICP备14008679号