当前位置:   article > 正文

RabbitMQ相关总结

RabbitMQ相关总结

Broker

异步调用中用Broker进行事件订阅和调用,完成解耦

没有强依赖,不用担心级联失败

流量削峰

MQ 的下载

1.可以使用命令拉取镜像

docker pull rabbitmq:3-management

2.也可以直接去官网下载tar包,然后上传到虚拟机上面

spring AMQP        消息队列

Basic Queue 简单队列模型   

只需要简单的引入amqp依赖,

  1. <!--AMQP依赖,包含RabbitMQ-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>

然后配置接收和发送端的地址

  1. spring:
  2. rabbitmq:
  3. host: 192.168.xxx.xxx # 自己主机名
  4. port: 5672 # 端口
  5. virtual-host: / # 虚拟主机
  6. username: xxxxxx # 用户名
  7. password: xxxxxx # 密码

然后调用方法发送或结合搜消息即可

发送:

  1. package cn.itcast.mq.spring;
  2. import org.junit.Test;
  3. import org.junit.runner.RunWith;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.boot.test.context.SpringBootTest;
  7. import org.springframework.test.context.junit4.SpringRunner;
  8. @RunWith(SpringRunner.class)
  9. @SpringBootTest
  10. public class SpringAmqpTest {
  11. @Autowired
  12. private RabbitTemplate rabbitTemplate;
  13. @Test
  14. public void testSimpleQueue() {
  15. // 队列名称
  16. String queueName = "simple.queue";
  17. // 消息
  18. String message = "hello, spring amqp!";
  19. // 发送消息
  20. rabbitTemplate.convertAndSend(queueName, message);
  21. }
  22. }

接收:

  1. package cn.itcast.mq.listener;
  2. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  3. import org.springframework.stereotype.Component;
  4. @Component
  5. public class SpringRabbitListener {
  6. @RabbitListener(queues = "simple.queue")
  7. public void listenSimpleQueueMessage(String msg) throws InterruptedException {
  8. System.out.println("spring 消费者接收到消息:【" + msg + "】");
  9. }
  10. }

当然了,如果不是在父工程里面配置的依赖则需要在单个项目里面单独配置

Work Queue  一队列,多消费者

假设编辑五百条消息:

  1. /**
  2. * workQueue
  3. * 向队列中不停发送消息,模拟消息堆积。
  4. */
  5. @Test
  6. public void testWorkQueue() throws InterruptedException {
  7. // 队列名称
  8. String queueName = "simple.queue";
  9. // 消息
  10. String message = "hello, message_";
  11. for (int i = 0; i < 500; i++) {
  12. // 发送消息
  13. rabbitTemplate.convertAndSend(queueName, message + i);
  14. Thread.sleep(20);
  15. }
  16. }

定义两个接受者用不同效率接收:

  1. @RabbitListener(queues = "simple.queue")
  2. public void listenWorkQueue1(String msg) throws InterruptedException {
  3. System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
  4. Thread.sleep(20);
  5. }
  6. @RabbitListener(queues = "simple.queue")
  7. public void listenWorkQueue2(String msg) throws InterruptedException {
  8. System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
  9. Thread.sleep(200);
  10. }

然后可以看出接受者接收消息并未在预定时间被消费,原因是被队列平均分配了,只要重新定制规则即可:

  1. spring:
  2. rabbitmq:
  3. listener:
  4. simple:
  5. prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

交换机

Fanout

创建FanoutExchange交换机和Queue队列,然后交换机和队列绑定:

  1. package cn.itcast.mq.config;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.FanoutExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. @Configuration
  9. public class FanoutConfig {
  10. /**
  11. * 声明交换机
  12. * @return Fanout类型交换机
  13. */
  14. @Bean
  15. public FanoutExchange fanoutExchange(){
  16. return new FanoutExchange("itcast.fanout");
  17. }
  18. /**
  19. * 第1个队列
  20. */
  21. @Bean
  22. public Queue fanoutQueue1(){
  23. return new Queue("fanout.queue1");
  24. }
  25. /**
  26. * 绑定队列和交换机
  27. */
  28. @Bean
  29. public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
  30. return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
  31. }
  32. /**
  33. * 第2个队列
  34. */
  35. @Bean
  36. public Queue fanoutQueue2(){
  37. return new Queue("fanout.queue2");
  38. }
  39. /**
  40. * 绑定队列和交换机
  41. */
  42. @Bean
  43. public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
  44. return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
  45. }
  46. }

绑定完成后就可以写消费者和生产者代码了

生产者

  1. @Test
  2. public void testFanoutExchange() {
  3. // 队列名称
  4. String exchangeName = "itcast.fanout";
  5. // 消息
  6. String message = "hello, everyone!";
  7. rabbitTemplate.convertAndSend(exchangeName, "", message);
  8. }

消费者

  1. @RabbitListener(queues = "fanout.queue1")
  2. public void listenFanoutQueue1(String msg) {
  3. System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
  4. }
  5. @RabbitListener(queues = "fanout.queue2")
  6. public void listenFanoutQueue2(String msg) {
  7. System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
  8. }

交换机的作用是什么?

* 接收publisher发送的消息
* 将消息按照规则路由到与之绑定的队列
* 不能缓存消息,路由失败,消息丢失
* FanoutExchange的会将消息路由到每个绑定的队列

声明队列、交换机、绑定关系的Bean是什么?

* Queue
* FanoutExchange
* Binding

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/347042
推荐阅读
  

闽ICP备14008679号