当前位置:   article > 正文

RabbitMQ笔记(基础篇)

RabbitMQ笔记(基础篇)

视频: 

MQ基础-01.RabbitMQ课程介绍_哔哩哔哩_bilibiliicon-default.png?t=N7T8https://www.bilibili.com/video/BV1mN4y1Z7t9?p=1&vd_source=d0ea58f1127eed138a4ba5421c577eb1

一、RabbitMQ简介

1.同步调用

优势:时效性强,等待结果后才返回

劣势:拓展性差,性能下降,级联失败问题

2.异步调用

异步调用就是基于消息通知的方式,一般含有三个角色

(1)消息发送者:投递消息的人,原来的调用方

(2)消息代理:管理、暂存、转发消息,可以理解微信服务器

(3)消息接受者:接收和处理消息的人,原来服务提供方

Broker是消息代理

二.RabbitMQ的安装

RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:RabbitMQ: One broker to queue them all | RabbitMQicon-default.png?t=N7T8https://www.rabbitmq.com/

三.RabbitMQ入门

(1)登录RabbitMQ后添加队列

(2)交换机先绑定队列名字

(3)交换机发送消息给队列

 

队列可以查看接收到的消息

 四、数据隔离

将Virtual host切换为/

 

 (1)新建一个用户

(2)为用户创建virtual host

 

(3)测试不同virtual host直接数据隔离现象,通过修改virtual host即可

五、Java客户端

(1)入门示例

1.引入依赖

  1. <!--AMQP依赖,包含RabbitMQ-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>

2.配置RabbitMQ服务端信息(消费者和生产者都需要配置)

  1. spring:
  2. rabbitmq:
  3. virtual-host: /hamll
  4. port: 5672
  5. host: 192.168.92.136
  6. username: hmall
  7. password: 123

3.消息发送方

  1. package cn.itcast.mq.helloworld;
  2. import org.junit.jupiter.api.Test;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.boot.test.context.SpringBootTest;
  6. @SpringBootTest
  7. public class SpringAmqpTest {
  8. @Autowired
  9. private RabbitTemplate rabbitTemplate;
  10. @Test
  11. void testSendMessageQueue(){
  12. String queueName = "simple.queue";
  13. String msg = "Hello,amqp";
  14. rabbitTemplate.convertAndSend(queueName,msg);
  15. }
  16. }

4.消息接收方(不断接收消息)

  1. package cn.itcast.mq.listeners;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. @Slf4j
  6. @Component
  7. public class MqListener {
  8. @RabbitListener(queues = "simple.queue")
  9. public void listenSimpleQueue(String msg){
  10. System.out.println("消费者收到消息:"+msg);
  11. }
  12. }

(2)消费者消息推送限制

 (3)Fanout交换机

1.创建hmall.fanout交换机,绑定fanout.queue1和fanout.queue2

2.消息发送方

  1. @Test
  2. void testSendFanout(){
  3. String exchangeName = "hmall.fanout";
  4. String msg = "Hello,everyone!";
  5. rabbitTemplate.convertAndSend(exchangeName,null,msg);
  6. }

3.消息接收方

  1. @RabbitListener(queues = "fanout.queue1")
  2. public void listenFanoutQueue(String msg) throws InterruptedException {
  3. System.out.println("消费者1收到消息:"+msg);
  4. }
  5. @RabbitListener(queues = "fanout.queue2")
  6. public void listenFanout2Queue(String msg) throws InterruptedException {
  7. System.err.println("消费者2收到消息:....."+msg);
  8. }

(4)Direct交换机

注意:Direct交换机绑定队列时配置Routing Key

如下图所示:

绑定queue1要配置blue和red的Routing Key,而绑定queue2要配置yellow和red的Routing Key

1.创建hmall.direct交换机,绑定direct.queue1和direct.queue2

2.消息发送方

  1. @Test
  2. void testSendDirect(){
  3. String exchangeName = "hmall.direct";
  4. String msg = "Hello,every Direct!";
  5. rabbitTemplate.convertAndSend(exchangeName,"blue",msg);
  6. }

3.消息接收方

  1. @RabbitListener(queues = "direct.queue1")
  2. public void listenDirectQueue(String msg) throws InterruptedException {
  3. System.out.println("消费者1收到消息:"+msg);
  4. }
  5. @RabbitListener(queues = "direct.queue2")
  6. public void listenDirect2Queue(String msg) throws InterruptedException {
  7. System.err.println("消费者2收到消息:....."+msg);
  8. }

 (5)Topic交换机

 

1.Topic交换机绑定队列

注意:Topic交换机绑定队列时配置Routing Key

2. 消息发送者

  1. @Test
  2. void testSendTopic(){
  3. String exchangeName = "hmall.topic";
  4. String msg = "Hello,every Topic!";
  5. rabbitTemplate.convertAndSend(exchangeName,"china.hello",msg);
  6. }

3.消息接收方

  1. @RabbitListener(queues = "topic.queue1")
  2. public void listenTopicQueue(String msg){
  3. System.out.println("消费者1收到消息:"+msg);
  4. }
  5. @RabbitListener(queues = "topic.queue2")
  6. public void listenTopicQueue2(String msg){
  7. System.err.println("消费者2收到消息:....."+msg);
  8. }

(6)声明队列和交换机方式一

两种创建交换机、队列、和绑定队列的方式

  1. package cn.itcast.mq.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. @Configuration
  6. public class FanoutConfig {
  7. @Bean
  8. public FanoutExchange fanoutExchange(){
  9. // ExchangeBuilder.fanoutExchange("hmall.fanout").build();
  10. return new FanoutExchange("hmall.fanout1");
  11. }
  12. @Bean
  13. public Queue fanoutQueue3(){
  14. // QueueBuilder.durable("fanout.queue1").build();
  15. return new Queue("fanout.queue3");
  16. }
  17. @Bean
  18. public Binding fanoutBinding3(Queue fanoutQueue3,FanoutExchange fanoutExchange){
  19. return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);
  20. }
  21. @Bean
  22. public Queue fanoutQueue4(){
  23. return new Queue("fanout.queue4");
  24. }
  25. @Bean
  26. public Binding fanoutBinding4(){
  27. return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange());
  28. }
  29. }

(7)声明队列和交换机方式二

 示例代码:

  1. @RabbitListener(bindings = @QueueBinding(
  2. value = @Queue(name = "direct.queue1",durable = "true"),
  3. exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),
  4. key = {"red","blue"}
  5. ))
  6. public void listenDirectQueue(String msg) throws InterruptedException {
  7. System.out.println("消费者1收到消息:"+msg);
  8. }
  9. @RabbitListener(bindings = @QueueBinding(
  10. value = @Queue(value = "direct.queue2",durable = "true"),
  11. exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),
  12. key = {"red","yellow"}
  13. ))
  14. public void listenDirect2Queue(String msg) throws InterruptedException {
  15. System.err.println("消费者2收到消息:....."+msg);
  16. }

(8)消息转换器

 1.添加一个队列,名为object.queue

2.编写单元测试,向队列中发送一条消息,消息的类型为Map

  1. @Test
  2. void testSendObject(){
  3. Map<String, Object> msg = new HashMap<>(2);
  4. msg.put("name","Jack");
  5. msg.put("age",21);
  6. rabbitTemplate.convertAndSend("object.queue",msg);
  7. }

3.打开控制台,发现发送来的消息是一串乱码,解决方式如下:

3.1引入依赖:
  1. <!-- Jackson-->
  2. <dependency>
  3. <groupId>com.fasterxml.jackson.dataformat</groupId>
  4. <artifactId>jackson-dataformat-xml</artifactId>
  5. </dependency>
 3.2配置MessageConverter
  1. @Bean
  2. public MessageConverter messageConverter(){
  3. return new Jackson2JsonMessageConverter();
  4. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/490518
推荐阅读
相关标签
  

闽ICP备14008679号