当前位置:   article > 正文

RabbitMQ 安装和使用_安装mq

安装mq

一.简介

1.MQ概述

MQ全称 Message Queue([kjuː])(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。(队列是一种容器,用于存放数据的都是容器,存放消息的就是消息队列)

2.优势

应用解耦:提高系统容错性和可维护性。

异步提速:提升用户体验和系统吞吐量。

削峰填谷:提高系统稳定性。

应用解耦

系统的耦合性越高,容错性就越低,可维护性就越低。

异步提速

一个下单操作耗时:20 + 300 + 300 + 300 = 920ms

用户点击完下单按钮后,需要等待920ms才能得到下单响应,太慢!

用户点击完下单按钮后,只需等待25ms就能得到下单响应 (20 + 5 = 25ms)。

提升用户体验和系统吞吐量(单位时间内处理请求的数目)。

以前920ms处理一个请求,现在25ms处理一个请求,系统的吞吐量(单位时间内访问量)增加

削峰填谷(削峰)

使用了 MQ 之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在 MQ 中,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做“填谷”。

使用MQ后,可以提高系统稳定性。

3.劣势

系统可用性降低

系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保证MQ的高可用?

系统复杂度提高

MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?

一致性问题

A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理失败。如何保证消息数据处理的一致性?

4.使用MQ满足的条件

消费者-->生产者

1.生产者不需要从消费者处获得反馈。引入消息队列之前的直接调用,其接口的返回值应该为空,这才让明明下层的动作还没做,上层却当成动作做完了继续往后走,即所谓异步成为了可能。

订单->库存

2.容许短暂的不一致性。

3.确实是用了有效果。即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ这些成本。

5. 常见的MQ产品

目前业界有很多的 MQ 产品,例如 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,也有直接使用 Redis 充当消息队列的案例,而这些消息队列产品,各有侧重,在实际选型时,需要结合自身需求及 MQ 产品特征,综合考虑。

二.RabbitMQ基本介绍

AMQP,即 Advanced Message Queuing Protocol(英[ˈprəʊtəkɒl])(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP 规范发布。类比HTTP。

2007年,Rabbit 技术公司基于 AMQP 标准开发的 RabbitMQ 1.0 发布。RabbitMQ 采用 Erlang 语言开发。Erlang 语言由 Ericson 设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。

1. RabbitMQ 基础架构

Broker 中间者 服务

procedure 和consumer都是客户端

客户端通过链接和服务端进行通信 所以需要建立起来连接 然后进行通信a

使用channel(管道)节省资源

一个rabbitmq里面有很多的虚拟机 相当于mysql里面有很多数据库,数据库里面有很多表,都是独立的。

每个虚拟机里面有很多的exchange和queue 独立分区的作用

2. RabbitMQ的6 种工作模式

RabbitMQ 提供了 6 种工作模式:

简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式(远程调用,不太算 MQ;暂不作介绍)。==

官网对应模式介绍:RabbitMQ Tutorials — RabbitMQ

3. 再谈市场上常见的消息队列

ActiveMQ:基于JMS

ZeroMQ:基于C语言开发

RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好

RocketMQ:基于JMS,阿里巴巴产品

Kafka:类似MQ的产品;分布式消息系统,高吞吐量。

三.RabbitMQ的安装和配置

RabbitMQ: easy to use, flexible messaging and streaming — RabbitMQicon-default.png?t=N7T8https://www.rabbitmq.com/

Linux下载并安装rabbitmq-server-3.6.5-1.noarch.rpm_linux rabbitmq 3.6下载-CSDN博客文章浏览阅读2.6k次,点赞3次,收藏10次。目录1.安装rabbitmq所需要的依赖包2.下载安装包3.安装服务命令4.修改配置5.启动rabbitmq6.rabbitmq控制台安装7.访问你的虚拟机 ip:15627 会出现下面的页面 用户名和密码都是 guest8.常用命令1.安装rabbitmq所需要的依赖包yum install build-essential openssl opens..._linux rabbitmq 3.6下载https://blog.csdn.net/Kermit_father/article/details/105181740

1. 安装依赖环境

在线安装依赖环境:

yum -y install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz

2. 安装Erlang

​# 安装

rpm -ivh erlang-22.0.7-1.el7.x86_64.rpm

3. 安装RabbitMQ

#安装依赖的包

rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm

#安装rabbitmq

rpm -ivh rabbitmq-server-3.7.18-1.el7.noarch.rpm

rpm -ivh erlang-22.0.7-1.el7.x86_64.rpm socat-1.7.3.2-2.el7.x86_64.rpm rabbitmq-server-3.7.18-1.el7.noarch.rpm

1.启动RabbitMQ

systemctl start rabbitmq-server # 启动服务

systemctl stop rabbitmq-server # 停止服务

systemctl restart rabbitmq-server # 重启服务

systemctl status rabbitmq-server #查看状态

2.开启管理界面及配置

# 开启管理界面

rabbitmq-plugins enable rabbitmq_management

# 修改默认配置信息

vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.18/ebin/rabbit.app

# 比如修改密码、配置等等,例如:loopback_users 中的 <<"guest">>,只保留guest

修改之后重启一下rabbitmq

3.打开客户端

192.168.158.129:15672

虚拟机ip+端口号

使用guest/guest登录之后出现如下即为安装成功

创建虚拟主机

添加用户

掉换虚拟主机

添加完之后就可以用我们创建的登陆了

添加队列

添加消息(生产者)

还可以重复添加

查看消息

四.RabbitMQ入门

1.搭建SpringBoot项目

2.添加依赖

  1. <dependency>
  2. <groupId>com.rabbitmq</groupId>
  3. <artifactId>amqp-client</artifactId>
  4. <version>5.6.0</version>
  5. </dependency>

3.编写消费者

  1. package com.aaa.test;
  2. import com.rabbitmq.client.*;
  3. import org.junit.jupiter.api.Test;
  4. import java.io.IOException;
  5. import java.util.concurrent.TimeoutException;
  6. public class MyTest {
  7. @Test
  8. public void aaa() throws IOException, TimeoutException {
  9. // 根据mq 建立连接
  10. ConnectionFactory connectionFactory = new ConnectionFactory();
  11. connectionFactory.setUsername("pxy");
  12. connectionFactory.setPassword("pxy");
  13. connectionFactory.setHost("192.168.158.129");
  14. connectionFactory.setPort(5672);
  15. connectionFactory.setVirtualHost("/pxy");
  16. //建立连接
  17. Connection connection = connectionFactory.newConnection();
  18. //管道
  19. Channel channel = connection.createChannel();
  20. //消费消息
  21. Consumer consumer = new DefaultConsumer(channel){
  22. @Override
  23. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  24. // 获取队列里面的消息
  25. // 字符串
  26. String s = new String(body);
  27. System.out.println("mq中的消息"+s);
  28. }
  29. };
  30. channel.basicConsume("test",true,consumer);
  31. }
  32. }

客户端中消息被消费掉了

4.编写生产者

  1. package com.aaa.test;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import org.junit.jupiter.api.Test;
  6. import java.io.IOException;
  7. import java.util.concurrent.TimeoutException;
  8. public class MyProcedureTest {
  9. //生产者
  10. @Test
  11. public void procedure() throws IOException, TimeoutException {
  12. // 根据mq 建立连接
  13. ConnectionFactory connectionFactory = new ConnectionFactory();
  14. connectionFactory.setUsername("pxy");
  15. connectionFactory.setPassword("pxy");
  16. connectionFactory.setHost("192.168.158.129");
  17. connectionFactory.setPort(5672);
  18. connectionFactory.setVirtualHost("/pxy");
  19. //建立连接
  20. Connection connection = connectionFactory.newConnection();
  21. //管道
  22. Channel channel = connection.createChannel();
  23. // 创建队列
  24. //队列的名称 持久化 是否独占 是否自动删除 参数
  25. channel.queueDeclare("test1",false,false,false,null);
  26. //发布消息 (交换机 队列的名字 基础的属性 发布的消息)
  27. channel.basicPublish("","test1",null,"四道口附近".getBytes());
  28. }
  29. }

五.工作模式

官网对应模式介绍:RabbitMQ Tutorials — RabbitMQ

1. Work queues工作队列模式

一个生成者两个消费者

Work Queues与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。

应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。

比如:就是一个生成者两个消费者,生产者生产10条消息两个消费者同时启动拿取这十条消息谁抢到就是谁的但是我得到的结果是你哪一条我拿一条各拿了5条

2. 订阅模式类型

  • P:生产者,也就是要发送消息的程序
  • C:消费者:消息的接受者,会一直等待消息到来。
  • queue:消息队列,图中红色部分
  • Exchange有常见以下3种类型:
    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key 的队列
    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

交换机(生产者): 

  1. package com.aaa.test;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.ConnectionFactory;
  6. import org.junit.jupiter.api.Test;
  7. import java.io.IOException;
  8. import java.util.concurrent.TimeoutException;
  9. public class MyProcedureTest {
  10. //生产者
  11. @Test
  12. public void procedure() throws IOException, TimeoutException {
  13. // 根据mq 建立连接
  14. ConnectionFactory connectionFactory = new ConnectionFactory();
  15. connectionFactory.setUsername("pxy");
  16. connectionFactory.setPassword("pxy");
  17. connectionFactory.setHost("192.168.158.129");
  18. connectionFactory.setPort(5672);
  19. connectionFactory.setVirtualHost("/pxy");
  20. //建立连接
  21. Connection connection = connectionFactory.newConnection();
  22. //管道
  23. Channel channel = connection.createChannel();
  24. /**
  25. * 发布订阅模式
  26. * 交换机 fanout
  27. */
  28. //创建交换机
  29. channel.exchangeDeclare("myex1", BuiltinExchangeType.FANOUT,false);
  30. // 创建队列
  31. //队列的名称 持久化 是否独占 是否自动删除 参数
  32. channel.queueDeclare("testmyex1",false,false,false,null);
  33. channel.queueDeclare("testmyex2",false,false,false,null);
  34. //交换机绑定队列
  35. channel.queueBind("testmyex1","myex1","");
  36. channel.queueBind("testmyex2","myex1","");
  37. channel.basicPublish("myex1","",null,"test===".getBytes());
  38. // for (int i = 0; i < 10; i++) {
  39. // //发布消息 交换机 队列的名字 基础的属性 发布的消息
  40. // channel.basicPublish("","test1",null,("四道口附近"+i).getBytes());
  41. // }
  42. }
  43. }

就是生产者的消息给到交换机然后交换机里的信息分别给到消费者

3.Routing路由模式

路由模式特点:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

在客户端

创建一个交换机

交换机类型 direct

创建两个队列绑定交换机

在交换机添加消息路径一致的才可以添加上

代码:

  1. package com.aaa.test;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.ConnectionFactory;
  6. import org.junit.jupiter.api.Test;
  7. import java.io.IOException;
  8. import java.util.concurrent.TimeoutException;
  9. //路由模式
  10. public class MyProcedureExDirect {
  11. //生产者
  12. @Test
  13. public void procedure() throws IOException, TimeoutException {
  14. // 根据mq 建立连接
  15. ConnectionFactory connectionFactory = new ConnectionFactory();
  16. connectionFactory.setUsername("pxy");
  17. connectionFactory.setPassword("pxy");
  18. connectionFactory.setHost("192.168.158.129");
  19. connectionFactory.setPort(5672);
  20. connectionFactory.setVirtualHost("/pxy");
  21. //建立连接
  22. Connection connection = connectionFactory.newConnection();
  23. //管道
  24. Channel channel = connection.createChannel();
  25. /**
  26. * 发布订阅模式
  27. * 交换机 fanout
  28. */
  29. //创建交换机
  30. channel.exchangeDeclare("exchange_direct_test", BuiltinExchangeType.DIRECT,false);
  31. // 创建队列
  32. //队列的名称 持久化 是否独占 是否自动删除 参数
  33. channel.queueDeclare("exchange_direct_queue_1",false,false,false,null);
  34. channel.queueDeclare("exchange_direct_queue_2",false,false,false,null);
  35. //交换机绑定队列
  36. channel.queueBind("exchange_direct_queue_1","exchange_direct_test","error");
  37. channel.queueBind("exchange_direct_queue_2","exchange_direct_test","test");
  38. channel.queueBind("exchange_direct_queue_2","exchange_direct_test","test2");
  39. channel.basicPublish("exchange_direct_test","error",null,"测试一个路由模式===".getBytes());
  40. // for (int i = 0; i < 10; i++) {
  41. // //发布消息 交换机 队列的名字 基础的属性 发布的消息
  42. // channel.basicPublish("","test1",null,("四道口附近"+i).getBytes());
  43. // }
  44. }
  45. }

消费者不变改一下队列名

4.topics 通配符模式

Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符

Routingkey 一般都是有一个或多个单词组成,多个单词之间以” . ”分割,例如: item.insert

通配符规则:

#:匹配一个或多个词 

*:匹配不多不少恰好1个词   test.* test.insert

举例:

item.#:能够匹配item.insert.abc 或者 item.insert

item.*:只能匹配item.insert 

图解:

  • 红色Queue:绑定的是usa.# ,因此凡是以 usa.开头的routing key 都会被匹配到
  • 黄色Queue:绑定的是#.news ,因此凡是以 .news结尾的 routing key 都会被匹配

交换机类型:topic

代码:

  1. package com.aaa.test;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.ConnectionFactory;
  6. import org.junit.jupiter.api.Test;
  7. import java.io.IOException;
  8. import java.util.concurrent.TimeoutException;
  9. //路由模式
  10. public class MyProcedureExTopic {
  11. //生产者
  12. @Test
  13. public void procedure() throws IOException, TimeoutException {
  14. // 根据mq 建立连接
  15. ConnectionFactory connectionFactory = new ConnectionFactory();
  16. connectionFactory.setUsername("pxy");
  17. connectionFactory.setPassword("pxy");
  18. connectionFactory.setHost("192.168.158.129");
  19. connectionFactory.setPort(5672);
  20. connectionFactory.setVirtualHost("/pxy");
  21. //建立连接
  22. Connection connection = connectionFactory.newConnection();
  23. //管道
  24. Channel channel = connection.createChannel();
  25. /**
  26. * 发布订阅模式
  27. * 交换机 fanout
  28. */
  29. //创建交换机
  30. channel.exchangeDeclare("exchange_topic_test", BuiltinExchangeType.TOPIC,false);
  31. // 创建队列
  32. //队列的名称 持久化 是否独占 是否自动删除 参数
  33. channel.queueDeclare("exchange_topic_queue_1",false,false,false,null);
  34. channel.queueDeclare("exchange_topic_queue_2",false,false,false,null);
  35. //交换机绑定队列
  36. channel.queueBind("exchange_topic_queue_1","exchange_topic_test","test.#");
  37. channel.queueBind("exchange_topic_queue_2","exchange_topic_test","*.aaa");
  38. channel.queueBind("exchange_topic_queue_2","exchange_topic_test","test.*");
  39. channel.basicPublish("exchange_topic_test","test.aaa",null,"测试一个路由模式===".getBytes());
  40. // for (int i = 0; i < 10; i++) {
  41. // //发布消息 交换机 队列的名字 基础的属性 发布的消息
  42. // channel.basicPublish("","test1",null,("四道口附近"+i).getBytes());
  43. // }
  44. }
  45. }

消费者一样改写列名

6. 模式总结

RabbitMQ工作模式:

1、简单模式 HelloWorld

一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)

`2、工作队列模式 Work Queue

一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)

3、发布订阅模式 Publish/subscribe

需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列

4、路由模式 Routing

需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列

5、通配符模式 Topic

需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列

六.SpringBoot整合RabbitMQ

1.依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-test</artifactId>
  8. </dependency>

2.生产者

  1. package com.aaa.config;
  2. import com.rabbitmq.client.AMQP;
  3. import org.springframework.amqp.core.*;
  4. import org.springframework.beans.factory.annotation.Qualifier;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. @Configuration
  9. public class TopicMqConfig {
  10. @Value("${mq.exchange.name}")
  11. public String EXCHANGENAME;
  12. @Value("${mq.queue.name1}")
  13. public String QUEUENAME1;
  14. @Value("${mq.queue.name2}")
  15. public String QUEUENAME2;
  16. //1.声明 交换机
  17. @Bean("ex1")
  18. public Exchange getExchange() {
  19. Exchange build = ExchangeBuilder.topicExchange(EXCHANGENAME).durable(false).build();
  20. return build;
  21. }
  22. //2.队列
  23. @Bean("queue1")
  24. public Queue getQueue1() {
  25. Queue build = QueueBuilder.nonDurable(QUEUENAME1).build();
  26. return build;
  27. }
  28. @Bean("queue2")
  29. public Queue getQueue2() {
  30. Queue build = QueueBuilder.nonDurable(QUEUENAME2).build();
  31. return build;
  32. }
  33. //3.交换机和队列进行绑定
  34. @Bean("binding1")
  35. public Binding bindQueueToExchange(Exchange exchange, @Qualifier("queue1") Queue queue){
  36. Binding noargs = BindingBuilder.bind(queue).to(exchange).with("*.*").noargs();
  37. return noargs;
  38. }
  39. @Bean("binding2")
  40. public Binding bindQueue2ToExchange(Exchange exchange, @Qualifier("queue2") Queue queue){
  41. Binding noargs = BindingBuilder.bind(queue).to(exchange).with("test.#").noargs();
  42. return noargs;
  43. }
  44. }

application.yml 配置

  1. spring:
  2. rabbitmq:
  3. username: pxy
  4. password: pxy
  5. host: 192.168.158.129
  6. virtual-host: /pxy #????????????/????
  7. port: 5672
  8. mq:
  9. exchange:
  10. name: test_exchenge_topic
  11. queue:
  12. name1: test_topic_exchange_queue_1
  13. name2: test_topic_exchange_queue_2

测试类

  1. package com.aaa;
  2. import org.junit.jupiter.api.Test;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Value;
  5. import org.springframework.boot.autoconfigure.SpringBootApplication;
  6. import org.springframework.boot.test.context.SpringBootTest;
  7. import javax.annotation.Resource;
  8. @SpringBootTest
  9. public class MqTest {
  10. @Value("${mq.exchange.name}")
  11. public String EXCHANGENAME;
  12. @Resource
  13. private RabbitTemplate rabbitTemplate;
  14. @Test
  15. void sendMsg(){
  16. rabbitTemplate.convertAndSend(EXCHANGENAME,"test.topic","测试一下SpringBoot和rabbitmq整合");
  17. }
  18. }

3.消费者

  1. package com.aaa.message;
  2. import org.springframework.amqp.core.Message;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. public class ConsumerMessage {
  7. @RabbitListener(queues = "test_topic_exchange_queue_1")
  8. public void aaa(Message message){
  9. byte[] body = message.getBody();
  10. String string = new String(body);
  11. System.out.println("-------------"+string);
  12. }
  13. }

application.yml 配置

  1. spring:
  2. rabbitmq:
  3. username: pxy
  4. password: pxy
  5. host: 192.168.158.129
  6. virtual-host: /pxy #????????????/????
  7. port: 5672
  8. #mq:
  9. # exchange:
  10. # name: test_exchenge_topic
  11. # queue:
  12. # name1: test_topic_exchange_queue_1
  13. # name2: test_topic_exchange_queue_2
  14. server:
  15. port: 8085

启动类

  1. package com.aaa;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. /**
  5. * Hello world!
  6. *
  7. */
  8. @SpringBootApplication
  9. public class ConsumerApp
  10. {
  11. public static void main( String[] args )
  12. {
  13. System.out.println( "Hello World!" );
  14. SpringApplication.run(ConsumerApp.class);
  15. }
  16. }

七.rabbitmq的高级特性

1. 消息的可靠性传递

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。

  • confirm 确认模式

确认队列中的消息是否被消费者消费

  • return 退回模式

交换机中的消息是否到达队列里面

confirm确认模式

application.yml配置

return 退回模式

  1. @Test
  2. void sendMsgReturn(){
  3. // 使用的是回退模式
  4. rabbitTemplate.setMandatory(true);
  5. rabbitTemplate.setReturnsCallback(returnedMessage -> System.out.println("消息回退,回退的消息是:"+ new String(returnedMessage.getMessage().getBody())));
  6. rabbitTemplate.convertAndSend(EXCHANGENAME,"test.topic","测试一下SpringBoot和rabbitmq整合");
  7. }

2.consumer Ack

ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。

broker发送消息给消费端的一种可靠性保证

有三种确认方式:

自动确认:acknowledge="none" 。不管处理成功与否,业务处理异常也不管

(当消费者意担接收到消息之后,消费者就会给broker一个回执,证明已经接收到消息 了,不管消息到底是否成功)

手动确认:acknowledge="manual" 。可以解决业务异常的情况

(收到消息之后不会立马确认收到消息,当业务处理没有问题的时候手动的调用代码的方 式来进行处理,如果业务失败了,就可以进行额外的操作)

根据异常情况确认:acknowledge="auto",(这种方式使用麻烦,不作讲解)

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。

package com.aaa.message;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareBatchMessageListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;

@Component

public class ShouDongQianShou implements ChannelAwareMessageListener {

    @Override
    @RabbitListener(queues = "test_topic_exchange_queue_2")
    public void onMessage(Message message, Channel channel) throws Exception {
        Thread.sleep(200);//2秒
        byte[] body = message.getBody();
        String s = new String(body);
        System.out.println(s);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try {
//            System.out.println(1/0);
            channel.basicAck(deliveryTag,true);
        }catch (Exception e){
            System.out.println("拒绝接收");
            channel.basicNack(deliveryTag,true,true);
        }
    }

}

3.消费端限流

削峰

秒杀开始的那一瞬间,会有大量用户冲击进来,所以在开始时候会有一个瞬间流量峰值。如何把瞬间的流量峰值变得更平缓,是能否成功设计好秒杀系统的关键因素。实现流量削峰填谷,一般的采用缓存和 MQ 中间件来解决

配置

在配置 prefetch属性设置消费端一次拉取多少消息

消费端的确认模式一定为手动确认。acknowledge="manual"

需要配置一次拉取多少消息

4.TTl

全称 Time To Live(存活时间/过期时间)。

当消息到达存活时间后,还没有被消费,会被自动清除。

RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。

可以在管理台新建队列、交换机,绑定

创建一个交换机

设置队列

我这设置了30秒也就是30之内没有消费掉的会就会消失

代码实现

生产者配置

详细生产者在SpringBoot整合里

x-message-tll 过期时间

x-max-length 队列里面最多接受几条消息

x-expires 消息过期时间

5.死信队列

死信队列就是死信交换机,因为其他MQ产品中没有交换机的概念

比如消息队列的消息过期,如果绑定了死信交换器,那么该消息将发送给死信交换机

消息在什么情况下会成为死信?

1.队列消息长度到最大的限制

最大的长度设置为10当第11条消息进来的时候就会成为死信

2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false(不重新回到队列中)

设置消费者为手动签收的状态

3. 原队列存在消息过期设置,消息到达超时时间未被消费;

队列绑定交换机的方式是什么?

给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key

// 1.  交换机  :正常的交换机   死信交换机

// 2.队列  :正常的  死信

//3.绑定   正常ex - 正常的que

正常的que和死信交换机

死信ex-死信queue

  1. package com.aaa.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.beans.factory.annotation.Qualifier;
  4. import org.springframework.beans.factory.annotation.Value;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. @Configuration
  8. public class TopicMqDeadConfig {
  9. @Value("${mq1.exchange.name1}")
  10. public String EXCHANGENAME;
  11. @Value("${mq1.exchange.name2}")
  12. public String DEADEXCHANGE;
  13. @Value("${mq1.queue.name1}")
  14. public String QUEUENAME1;
  15. @Value("${mq1.queue.name2}")
  16. public String QUEUENAME2;
  17. //1.声明 正常的 交换机
  18. @Bean("ex1")
  19. public Exchange getExchange() {
  20. Exchange build = ExchangeBuilder.topicExchange(EXCHANGENAME).durable(false).build();
  21. return build;
  22. }
  23. //2.正常的队列
  24. @Bean("queue1")
  25. public Queue getQueue1() {
  26. Queue build = QueueBuilder.nonDurable(QUEUENAME1)
  27. .withArgument("x-message-ttl",30000)
  28. .withArgument("x-max-length",10)
  29. .withArgument("x-dead-letter-exchange",DEADEXCHANGE)
  30. .withArgument("x-dead-letter-routing-key","dead.test")
  31. .build();
  32. return build;
  33. }
  34. /*
  35. 死信的队列
  36. * */
  37. @Bean("queue2")
  38. public Queue getQueue2() {
  39. Queue build = QueueBuilder.nonDurable(QUEUENAME2)
  40. .build();
  41. return build;
  42. }
  43. /**
  44. * 死信交换机
  45. * @return
  46. */
  47. @Bean("ex2")
  48. public Exchange getDeadExchange() {
  49. Exchange build = ExchangeBuilder.topicExchange(DEADEXCHANGE).durable(false).build();
  50. return build;
  51. }
  52. //3.交换机和队列进行绑定
  53. @Bean("binding1")
  54. public Binding bindQueueToExchange(@Qualifier("ex1") Exchange exchange, @Qualifier("queue1") Queue queue){
  55. Binding noargs = BindingBuilder.bind(queue).to(exchange).with("normal.*").noargs();
  56. return noargs;
  57. }
  58. @Bean("binding2")
  59. public Binding bindQueue2ToExchange(@Qualifier("ex2") Exchange exchange, @Qualifier("queue2") Queue queue){
  60. Binding noargs = BindingBuilder.bind(queue).to(exchange).with("dead.*").noargs();
  61. return noargs;
  62. }
  63. }

测试类

  1. package com.aaa;
  2. import org.junit.jupiter.api.Test;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Value;
  5. import org.springframework.boot.test.context.SpringBootTest;
  6. import javax.annotation.Resource;
  7. @SpringBootTest
  8. public class MqTest2 {
  9. @Value("${mq1.exchange.name1}")
  10. public String EXCHANGENAME;
  11. @Resource
  12. private RabbitTemplate rabbitTemplate;
  13. @Test
  14. void sendMsgReturn() {
  15. for (int i = 0; i < 20; i++) {
  16. rabbitTemplate.convertAndSend(EXCHANGENAME, "normal.test", "测试一下死信消息");
  17. }
  18. }
  19. }
总结:

1. 死信交换机和死信队列和普通的没有区别

2. 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列

3. 消息成为死信的三种情况:

1. 队列消息长度到达限制;

2. 消费者拒接消费消息,并且不重回队列;

3. 原队列存在消息过期设置,消息到达超时时间未被消费;

6.延迟队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

场景:

  • 1. 下单后,30分钟未支付,取消订单,回滚库存
  • 2. 新用户注册成功7天后,发送短信问候。

实现方式:

1. 定时器

2. 死信队列

在RabbitMQ中并未提供延迟队列功能。但是可以使用:TTL+死信队列 组合实现延迟队列的效果。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/632855
推荐阅读
相关标签
  

闽ICP备14008679号