当前位置:   article > 正文

RabbitMQ详解,入门到基本使用

RabbitMQ详解,入门到基本使用
在讲述MQ之前我们先了解一下一些简单概念。
同步调用:比如打电话。优点:时效性很强。

支付服务要调用别的服务,调用了订单服务,在调用仓储服务,在以此调用别的,时间长。

服务流程如下:

缺点

1.耦合度高。

2.性能下降。

3.资源浪费。

4.级联失败


异步调用:就比如微信发消息,可以和多个人发消息。

服务流程如下:

优点:1.服务解耦(比如支付之后,不发短信提醒了,不用修改代码,直接取消短信服务的订阅即可)

2.性能提高,吞吐量提高。

3.故障隔离,不担心级联失败。

4.流量削峰。

缺点:

1.对Broker的依赖性太强了。

2.架构复杂,业务没有明显的流程,不好管理


MQ:消息队列(MessageQueue),就是事件驱动架构中的Broker。主要用于分布式系统中进行异步任务调度和数据交换。

MQ产品主要有一下四种:

1.RabbitMQ

2.ActiveMQ

3.RocketMQ

4.Kafka

主要的区别如下:


RabbitMQ在Linux下的安装:

1.拉去容器

docker pull rabbitmq:management

2.创建镜像,并配置用户名和密码。(管理界面端口防火墙记得开发)

  1. docker run -d --name my-rabbitmq \
  2. 2 -p 5672:5672 \ # AMQP协议端口
  3. 3 -p 15672:15672 \ # 管理界面端口
  4. 4 -e RABBITMQ_DEFAULT_USER=admin \
  5. 5 -e RABBITMQ_DEFAULT_PASS=admin \
  6. 6 rabbitmq:management

3.进入管理页面(页面如下)

输入网址:[IP地址:管理界面端口]

成功进入之后,我们看一下上面这6列:

1.Overview:总览。主要展示的是MQ的概要信息 。

2.Connections:连接。无论消费者还是生产者都要建立连接。

3.Channels:通道。 消费者,生产者都要创建通道来执行事务。

4.Exchanges:交换机。消息的路由器,把消息路由到队列。

5.Queues:队列。做消息的存储。

6.Admin:管理。创建用户,创建虚拟主机。

MQ的大致架构如下:


AMQP:

(Advanced Message Queuing Protocol)是高级消息队列协议的缩写,它是一个开放标准的应用层协议,主要设计用于分布式系统之间高效、可靠地传输消息。

AMQP不是某个具体的软件产品或服务,而是一种通用的标准接口,任何遵循AMQP协议的软件系统都可以实现相互之间的互联互通,无论它们是由何种编程语言编写,运行在什么操作系统之上。

SpingAMQP:

Spring AMQP(Spring Advanced Messaging Queuing Platform)是Spring框架的一部分,它是为简化在Java应用程序中使用高级消息队列协议(AMQP)而设计的一个抽象和便捷工具集。

简而言之,Spring AMQP的目标是让开发者更易于在Spring应用程序中使用消息队列服务,降低消息驱动架构的复杂性,提高开发效率。

SpringAMQP的基本使用:
1.基于简单队列的使用

生成者模块:

1.导入依赖:

  1. <!--AMQP依赖,包含RabbitMQ-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>

2.配置application.yml:(服务器防火墙端口记得开发嗷)

  1. spring:
  2. rabbitmq:
  3. host: 43.138.67.251 # rabbitMQ的ip地址
  4. port: 5672 # 端口
  5. username: root
  6. password: 123456
  7. virtual-host: /

3.用RabbitTemplate发送消息:

  1. @Autowired
  2. private RabbitTemplate rabbitTemplate;
  3. @Test
  4. public void testSendMessage2SimpleQueue(){
  5. String routingKey = "simple.queues";
  6. String message = "hello, spring amqp!";
  7. rabbitTemplate.convertAndSend(routingKey, message);
  8. }

消费者模块:

也是导入依赖,配置yml。配置一个Bean监听队列,spring项目启动时,加载Bean对队列进行监听,消息阅后即焚。

  1. @Component
  2. public class SpringRabbitListener {
  3. @RabbitListener(queues = "simple")
  4. public void listenSimplequeue(String msg) {
  5. System.out.println("接收到消息:"+msg);
  6. }
  7. }

2.WorkQueue:工作消息队列

和上面的简单队列使用相比,添加多个消费者。

发布订阅模式:

了解一下交换机Exchange:

交换机的作用是把消息按照一定的规则路由给队列,交换机不能缓存消息,路由失败,消息丢失。

下面我们介绍几种常用的交换机:

1.Fanout Exchange:会将接受到的消息路由到每一个绑定的队列queue。

Fanout Exchange的使用:

先把交换机和队列绑定起来

  1. @Configuration
  2. public class FanoutConfig {
  3. //交换机 fanout
  4. @Bean
  5. public FanoutExchange fanoutExchange() {
  6. return new FanoutExchange("fanout.exchange");
  7. }
  8. //队列1 fanout1.queue
  9. @Bean
  10. public Queue fanoutqueue1(){
  11. return new Queue("fanout1.queue");
  12. }
  13. //队列2 fanout2.queue
  14. @Bean
  15. public Queue fanoutqueue2(){
  16. return new Queue("fanout2.queue");
  17. }
  18. //队列1绑定到交换机
  19. @Bean
  20. public Binding bind1(Queue fanoutqueue1, FanoutExchange fanoutExchange) {
  21. return BindingBuilder.bind(fanoutqueue1).to(fanoutExchange);
  22. }
  23. //队列2绑定到交换机
  24. @Bean
  25. public Binding bind2(Queue fanoutqueue2, FanoutExchange fanoutExchange) {
  26. return BindingBuilder.bind(fanoutqueue2).to(fanoutExchange);
  27. }
  28. }

消费者端监听消息:

  1. //Fanout1
  2. @RabbitListener(queues = "fanout1.queue")
  3. public void FanoutexchengeQueue1(String msg){
  4. System.err.println("fanout1.queue接收到消息:["+msg+"]");
  5. }
  6. //Fanout2
  7. @RabbitListener(queues = "fanout2.queue")
  8. public void FanoutexchengeQueue2(String msg){
  9. System.err.println("fanout1.queue接收到消息:["+msg+"]");
  10. }

生产者:

  1. //Fanout交换机
  2. @Test
  3. public void senFanoutExchange() {
  4. String exchangename = "fanout.exchange";
  5. String msg = "Hello FanoutExchange";
  6. rabbitTemplate.convertAndSend(exchangename,"",msg);
  7. }

2.Direct Exchange:将接收到的消息,按照一定规则路由到指定的Queue。

Direct Exchange基本使用:

@RabbitListener里面指定参数会生成并绑定交换机和queue。每一个队列和交换机绑定时有一个BindingKye,生成者发送消息时带一个RoutingKey,交换机只把消息发到BindingKey和RoutingKey一致的queue。

  1. //Direct1
  2. @RabbitListener(bindings = @QueueBinding(
  3. value = @Queue(name = "direct1.queue"),
  4. exchange = @Exchange(name = "direct.exchange",type = ExchangeTypes.DIRECT),
  5. key = {"red","blue"}
  6. ))
  7. public void listenerDirectExchange1(String msg) {
  8. System.err.println("direct1.queue接收到消息:["+msg+"]");
  9. }
  10. //Direct2
  11. @RabbitListener(bindings = @QueueBinding(
  12. value = @Queue(name = "direct2.queue"),
  13. exchange = @Exchange(name = "direct.exchange",type = ExchangeTypes.DIRECT),
  14. key = {"red","yellow"}
  15. ))
  16. public void listenerDirectExchange2(String msg) {
  17. System.err.println("direct2.queue接收到消息:["+msg+"]");
  18. }

生成者:

  1. //Direct交换机
  2. @Test
  3. public void senDirectExchange() {
  4. String exchangename = "direct.exchange";
  5. String msg = "Hello DirectExchange";
  6. rabbitTemplate.convertAndSend(exchangename,"blue",msg);
  7. }

3.Topic Exchange:类似Direct Exchange,区别是生产者发布消息时带的RoutingKey必须是多个单词的列表,并以 分割

Topic Exchange的使用:

消费者:

  1. //Topic1
  2. @RabbitListener(bindings = @QueueBinding(
  3. value = @Queue(name = "topic1.queue"),
  4. exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),
  5. key = {"china.#"}
  6. ))
  7. public void listenTopicExchange1(String msg) {
  8. System.err.println("topic1.queue接收到消息:["+msg+"]");
  9. }
  10. //Topic2
  11. @RabbitListener(bindings = @QueueBinding(
  12. value = @Queue(name = "topic2.queue"),
  13. exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),
  14. key = {"#.news"}
  15. ))
  16. public void listenTopicExchange2(String msg) {
  17. System.err.println("topic2.queue接收到消息:["+msg+"]");
  18. }

生产者:

  1. //Topic交换机
  2. @Test
  3. public void senTopicExchange() {
  4. String exchangename = "topic.exchange";
  5. String msg = "Hello TopicExchange";
  6. rabbitTemplate.convertAndSend(exchangename,"aa.news",msg);
  7. }

SpringAMQP的消息转化器:

SpringAMQP发消息到RabbitMQ,会将消息序列化,用的是JDK序列化,这种方式性能比较低下。

并且为了将Java对象转换为AMQP消息,以及将AMQP消息的内容反序列化回Java对象,通常我们会使用

所以我们要设置一个消息转换器,换一种序列化方式。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/337174
推荐阅读
相关标签
  

闽ICP备14008679号