当前位置:   article > 正文

RabbitMQ教程大全看这一篇就够了-java版本_rabbitmq java

rabbitmq java

目录

什么是RabbitMQ?

RabbitMQ 核心概念

Docker 安装 RabbitMQ 

RabbitMQ 控制台页面介绍

RabbitMQ 交换机 Exchange 介绍

Direct Exchange 定向、直连交换机

Fanout Exchange 发布/订阅、广播、扇形交换机

Topic Exchange 主题、通配符交换机

Headers Exchanges(少用)

RabbitMQ 代码 Java版

1:简单队列

2:工作队列 Work Queues 

3:Fanout 发布/订阅 交换机模式

4:Routing 路由 交换机模式

5:Topic 主题 交换机模式

SpringBoot 整合 RabbitMQ

RabbitMQ 消息可靠性投递

生产者到交换机 开启ACK确认可靠消息投递

交换机到队列 可靠消息投递

RabbitMQ 消息确认消费ACK

消息的可靠消费

SpringBoot 配置 RabbitMQ 广播 发布/订阅

RabbitMQ TTL死信队列

什么是 TTL?

什么是死信队列?

什么是死信交换机?

消息什么情况下会成为死信消息?

如何设置消息的TTL存活时间?

RabbitMQ 控制台 操作 死信队列绑定死信交换机

RabbitMQ 延迟队列

什么是延迟队列?

定时消息的使用场景

RabbitMQ 实现延迟消息

SpringBoot 实现延迟队列

RabbitMQ 的集群环境

RabbitMQ 普通集群模式的介绍

RabbitMQ 搭建普通集群环境

1:准备节点环境

2:节点配置成集群

3:SpringBoot 整合 RabbitMQ 普通集群

RabbitMQ 镜像集群模式的介绍(推荐)

策略policy介绍


官网:https://www.rabbitmq.com/

什么是RabbitMQ

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、C、用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不错,与SpringAMQP完美的整合、API丰富易用

RabbitMQ 核心概念

  • Broker

    RabbitMQ的服务端程序,可以认为一个mq节点就是一个broker。

  • Producer 生产者

    创建消息Message,然后发布到RabbitMQ队列中

  • Consumer 消费者

    消费队列中的消息

  • Message 消息

    生产消费的内容,有消息头和消息体,也包括多个属性配置,比如routingKey路由键

  • Queue 队列

    是RabbitMQ的内部对象,用于存储消息,消息都只能存储在队列中

  • Channel 信道

    一条支持多路复用的通道,独立的双向数据流通道,可以发布、订阅、接收消息。信道是建立在真实的TCP连接内的虚拟连接,复用TCP连接的通道

  • Connection 连接

    是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑,一个连接上可以有多个channel进行通信

  • Exchange 交换器

    生产者将消息发送到Exchange,交换器将消息路由到一个或者多个队列中,队列和交换机是多对多的关系。

  • RoutingKey 路由键

    生产者将消息发给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则

    最大长度255 字节

  • Binding 绑定

    通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个绑定键 (BindingKey),这样 RabbitMQ 就知道如何正确地将消息路由到队列了

  • Virtual host 虚拟主机

    用于不同业务模块的逻辑隔离,一个Virtual Host里面可以有若干个Exchange和Queue,同一个VirtualHost里面不能有相同名称的Exchange或Queue

    默认是 / ,可以使用 /dev /test /pro


Docker 安装 RabbitMQ 

使用源码安装需要的依赖多、且版本和维护相对复杂,需要erlang环境、版本也有要求。

linux 上安装 docker

https://github.com/docker-library/docs/tree/master/rabbitmq

docker pull rabbitmq:management             

// 拉取远程镜像,management 带 后台管理页面的版本

docker images                                        

// 查看本机存在的镜像

docker run -d -h rabbitmq_1 --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:management 

// 运行 docker 镜像

参数说明:

  • run -d :  run 运行镜像 -d 后台运行
  • -h :自定义容器的主机名,它会被写到容器内的 /etc/hostname 和 /etc/hosts,作为容器主机IP的别名,并且将显示在容器的bash中
  • --name:自定义容器名称
  • -p 15672:15672 :management 界面管理访问端口
  • -p 5672:5672     :amqp 访问端口
  • -e rabbitma参数

rabbitmq 访问地址:http://ip:15672                // 如果访问不了,请查看防火墙端口是否开放

rabbitmq 默认登录账号和密码:guest/guest

开机自动启动 rabbitmq

docker update 容器ID --restart=always

rabbitma 的主要端口

4369         erlang 发现口
5672         client 端通信口
15672       管理界面 ui 端口
25672       server 间内部通信口


RabbitMQ 控制台页面介绍

RabbitMQ控制面板介绍 - 简书


RabbitMQ 交换机 Exchange 介绍

  • 生产者将消息发送到 Exchange,交换器将消息路由到一个或者多个队列中,交换机有多个类型,队列和交换机是多对多的关系。
  • 交换机只负责转发消息,不具备存储消息的能力,如果没有队列和exchange绑定,或者没有符合的路由规则,则消息会被丢失。
  • RabbitMQ有四种交换机类型,分别是Direct exchange、Fanout exchange、Topic exchange、Headers exchange,最后的不常用。
  • Direct Exchange 定向、直连交换机

  • 将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配
  • 例子:如果一个队列绑定到该交换机上要求路由键 “aabb”,则只有被标记为“aabb”的消息才被转发,不会转发aabb.cc,也不会转发gg.aabb,只会转发aabb
  • 处理路由健
  • Fanout Exchange 发布/订阅、广播、扇形交换机

  • 只需要简单的将队列绑定到交换机上,一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息
  • Fanout交换机转发消息是最快的,用于发布订阅,广播形式,中文是扇形
  • 不处理路由健
  • Topic Exchange 主题、通配符交换机

  • 主题交换机是一种发布/订阅的模式,结合了直连交换机与扇形交换机的特点
  • 将路由键和某模式进行匹配。此时队列需要绑定在一个模式上
  • 符号“#”匹配一个或多个词,符号“*”匹配只匹配一个词
  • 例子:因此“abc.#”能够匹配到“abc.def.ghi”,但是“abc.*” 只会匹配到“abc.def”
  • Headers Exchanges(少用)

  • 根据发送的消息内容中的headers属性进行匹配, 在绑定Queue与Exchange时指定一组键值对
  • 当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配
  • 如果完全匹配则消息会路由到该队列,否则不会路由到该队列
  • 不处理路由键

RabbitMQ 代码 Java版

maven项目中依赖rabbitmq的包

  1. <dependency>
  2. <groupId>com.rabbitmq</groupId>
  3. <artifactId>amqp-client</artifactId>
  4. <version>5.16.0</version>
  5. </dependency>

1:简单队列

官网教程:RabbitMQ tutorial - "Hello World!" — RabbitMQ

  • 发送消息
  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import java.nio.charset.StandardCharsets;
  5. // 发送消息
  6. public class Send {
  7. // 队列名称
  8. private final static String QUEUE_NAME = "hello";
  9. public static void main(String[] argv) throws Exception {
  10. // 创建连接工厂对象
  11. ConnectionFactory factory = new ConnectionFactory();
  12. factory.setHost("192.168.189.75"); // rabbit server 所在IP地址
  13. factory.setPort(5672); // rabbit server amqp端口号
  14. factory.setUsername("guest"); // rabbit server 登录账号
  15. factory.setPassword("guest"); // rabbit server 登录密码
  16. factory.setVirtualHost("/dev"); // 指定连接到哪个虚拟主机
  17. try (
  18. // 创建连接
  19. Connection connection = factory.newConnection();
  20. // 创建信道
  21. Channel channel = connection.createChannel()) {
  22. /*
  23. * queueDeclare:队列不存在时自动创建队列,如果存在使用存在的
  24. * 参数1:队列名称
  25. * 参数2:是否持久化
  26. * 参数3:是否独占
  27. * 参数4:没有消费者的时候是否自动删除队列
  28. * 参数5:其他
  29. */
  30. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  31. // 消息
  32. String message = "Hello World!";
  33. /*
  34. * 发布消息
  35. * 参数1:交换机
  36. * 参数2:队列
  37. * 参数3:其他额外的参数
  38. * 参数4:要发送的消息,byte[]类型
  39. */
  40. channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
  41. System.err.println(" [x] Sent '" + message + "'");
  42. }
  43. }
  44. }

执行代码,可以在 rabbitmq 控制台上看到队列已经被创建了,并且有一条未被消费的消息

  • 消费消息
  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import com.rabbitmq.client.DeliverCallback;
  5. import java.nio.charset.StandardCharsets;
  6. // 消费消息
  7. public class Recv {
  8. // 队列名称
  9. private final static String QUEUE_NAME = "hello";
  10. public static void main(String[] argv) throws Exception {
  11. ConnectionFactory factory = new ConnectionFactory();
  12. factory.setHost("192.168.189.75");
  13. factory.setPort(5672);
  14. factory.setUsername("guest");
  15. factory.setPassword("guest");
  16. factory.setVirtualHost("/dev");
  17. Connection connection = factory.newConnection();
  18. Channel channel = connection.createChannel();
  19. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  20. System.err.println(" [*] Waiting for messages. To exit press CTRL+C");
  21. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  22. String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
  23. System.out.println(" [x] Received '" + message + "'");
  24. };
  25. /*
  26. * basicConsume:监听队列
  27. * 参数1:监听的队列名称
  28. * 参数2:autoAck:是否在收到消息后自动确认(消费端拿到消息后,自动告诉 rabbitmq server 我已经收到消息了)
  29. * 参数3:回调,处理消息
  30. */
  31. channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
  32. });
  33. // 第2个监听队列的方法
  34. // Consumer consumer = new DefaultConsumer(channel) {
  35. // @Override
  36. // public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  37. // String message = new String(body, StandardCharsets.UTF_8);
  38. // System.out.println(" [x] Received '" + message + "'");
  39. // }
  40. // };
  41. // channel.basicConsume(QUEUE_NAME, true, consumer);
  42. }
  43. }

 执行代码,可以在 rabbitmq 控制台上看到队列的消息已经被消费了,并且可以看到 连接信息


2:工作队列 Work Queues 

官网教程:RabbitMQ tutorial - Work Queues — RabbitMQ

例如:生产者一秒可以生产 一万个消息,消费者一秒可以消费 一千个消息,这种情况如果只有一个消费者,消息就会堆积在队列中。这时就需要部署多个消费者节点。

多个消费者负载均衡策略是 轮询。

  • 两个消费者
  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import com.rabbitmq.client.DeliverCallback;
  5. import java.nio.charset.StandardCharsets;
  6. // 第一个消费节点
  7. public class Recv1 {
  8. // 队列名称
  9. private final static String QUEUE_NAME = "work_mq";
  10. public static void main(String[] argv) throws Exception {
  11. ConnectionFactory factory = new ConnectionFactory();
  12. factory.setHost("192.168.189.75");
  13. factory.setPort(5672);
  14. factory.setUsername("guest");
  15. factory.setPassword("guest");
  16. factory.setVirtualHost("/dev");
  17. Connection connection = factory.newConnection();
  18. Channel channel = connection.createChannel();
  19. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  20. System.err.println(" [*] Waiting for messages. To exit press CTRL+C");
  21. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  22. try {
  23. Thread.sleep(1000);
  24. } catch (InterruptedException e) {
  25. throw new RuntimeException(e);
  26. }
  27. String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
  28. System.out.println(" [x] 消费者1: '" + message + "'");
  29. /*
  30. * 处理完消息后,手动确认 Ack
  31. * 参数1:消息标签
  32. * 参数2:是否批量 Ack
  33. */
  34. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  35. };
  36. // 参数2:关闭自动 ack 确认
  37. channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
  38. });
  39. }
  40. }
  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import com.rabbitmq.client.DeliverCallback;
  5. import java.nio.charset.StandardCharsets;
  6. // 第二个消费节点
  7. public class Recv2 {
  8. // 队列名称
  9. private final static String QUEUE_NAME = "work_mq";
  10. public static void main(String[] argv) throws Exception {
  11. ConnectionFactory factory = new ConnectionFactory();
  12. factory.setHost("192.168.189.75");
  13. factory.setPort(5672);
  14. factory.setUsername("guest");
  15. factory.setPassword("guest");
  16. factory.setVirtualHost("/dev");
  17. Connection connection = factory.newConnection();
  18. Channel channel = connection.createChannel();
  19. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  20. System.err.println(" [*] Waiting for messages. To exit press CTRL+C");
  21. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  22. try {
  23. Thread.sleep(3000);
  24. } catch (InterruptedException e) {
  25. throw new RuntimeException(e);
  26. }
  27. String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
  28. System.out.println(" [x] 消费者2: '" + message + "'");
  29. /*
  30. * 处理完消息后,手动确认 Ack
  31. * 参数1:消息标签
  32. * 参数2:是否批量 Ack
  33. */
  34. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  35. };
  36. // 参数2:关闭自动 ack 确认
  37. channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
  38. });
  39. }
  40. }
  • 一个生产者
  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import java.nio.charset.StandardCharsets;
  5. // 发送消息
  6. public class Send {
  7. // 队列名称
  8. private final static String QUEUE_NAME = "work_mq";
  9. public static void main(String[] argv) throws Exception {
  10. // 创建连接工厂对象
  11. ConnectionFactory factory = new ConnectionFactory();
  12. factory.setHost("192.168.189.75");
  13. factory.setPort(5672);
  14. factory.setUsername("guest");
  15. factory.setPassword("guest");
  16. factory.setVirtualHost("/dev");
  17. try (
  18. // 创建连接
  19. Connection connection = factory.newConnection();
  20. // 创建信道
  21. Channel channel = connection.createChannel()) {
  22. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  23. // 批量发送消息
  24. for (int i = 0; i < 10; i++) {
  25. // 消息
  26. String message = "Hello Work! ___ " + i;
  27. channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
  28. System.err.println(" [x] 生产者: '" + message + "'");
  29. }
  30. }
  31. }
  32. }

先启动2个消费者监听队列,再启动生产者生产消息。可以看到消息被轮询消费

设置 多节点消费者负载均衡策略为:公平策略 (能者多劳)

  1. Channel channel = connection.createChannel();
  2. // 消费者设置 qos为 1, 一个消费完后继续消费
  3. channel.basicQos(1);


3:Fanout 发布/订阅 交换机模式

官网教程:https://www.rabbitmq.com/tutorials/tutorial-three-python.html

作用:生产者发布消息后,所有监听广播类型指定交换机的的消费者都可以消费此消息。

  • 2个或多个消费者
  1. import com.rabbitmq.client.*;
  2. import java.nio.charset.StandardCharsets;
  3. // 第一个消费节点
  4. public class Recv1 {
  5. // 交换机名称
  6. private final static String EXCHANGE_NAME = "exchange_fanout";
  7. public static void main(String[] argv) throws Exception {
  8. ConnectionFactory factory = new ConnectionFactory();
  9. factory.setHost("192.168.189.75");
  10. factory.setPort(5672);
  11. factory.setUsername("guest");
  12. factory.setPassword("guest");
  13. factory.setVirtualHost("/dev");
  14. Connection connection = factory.newConnection();
  15. Channel channel = connection.createChannel();
  16. // 绑定 广播类型 的交换机
  17. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
  18. // 获取队列
  19. String queueName = channel.queueDeclare().getQueue();
  20. // 队列和交换机进行绑定,fanout交换机不需要routingKey
  21. channel.queueBind(queueName, EXCHANGE_NAME, "");
  22. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  23. // 消费消息回调
  24. String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
  25. System.out.println(" [x] 消费者1: '" + message + "'");
  26. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  27. };
  28. // 监听消息队列
  29. channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {
  30. });
  31. }
  32. }
  1. import com.rabbitmq.client.*;
  2. import java.nio.charset.StandardCharsets;
  3. // 第二个消费节点
  4. public class Recv2 {
  5. // 交换机名称
  6. private final static String EXCHANGE_NAME = "exchange_fanout";
  7. public static void main(String[] argv) throws Exception {
  8. ConnectionFactory factory = new ConnectionFactory();
  9. factory.setHost("192.168.189.75");
  10. factory.setPort(5672);
  11. factory.setUsername("guest");
  12. factory.setPassword("guest");
  13. factory.setVirtualHost("/dev");
  14. Connection connection = factory.newConnection();
  15. Channel channel = connection.createChannel();
  16. // 绑定 广播类型 的交换机
  17. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
  18. // 获取队列
  19. String queueName = channel.queueDeclare().getQueue();
  20. // 队列和交换机进行绑定,fanout交换机不需要routingKey
  21. channel.queueBind(queueName, EXCHANGE_NAME, "");
  22. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  23. // 消费消息回调
  24. String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
  25. System.out.println(" [x] 消费者2: '" + message + "'");
  26. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  27. };
  28. // 监听消息队列
  29. channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {
  30. });
  31. }
  32. }
  • 一个生产者
  1. import com.rabbitmq.client.BuiltinExchangeType;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.nio.charset.StandardCharsets;
  6. // 发送消息
  7. public class Send {
  8. // 交换机名称
  9. private final static String EXCHANGE_NAME = "exchange_fanout";
  10. public static void main(String[] argv) throws Exception {
  11. // 创建连接工厂对象
  12. ConnectionFactory factory = new ConnectionFactory();
  13. factory.setHost("192.168.189.75");
  14. factory.setPort(5672);
  15. factory.setUsername("guest");
  16. factory.setPassword("guest");
  17. factory.setVirtualHost("/dev");
  18. try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
  19. // 绑定 广播类型 的交换机
  20. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
  21. // 消息
  22. String message = "广播消息。。。";
  23. // 发送消息
  24. channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
  25. }
  26. }
  27. }

4:Routing 路由 交换机模式

官网教程:https://www.rabbitmq.com/tutorials/tutorial-four-java.html

  • 队列和交换机(Direct)绑定,需要指定一个routingKey(也叫Bingding Key)
  • 消息生产者发送消息给交换机,需要指定routingKey
  • 交换机根据消息的routingKey,转发给对应的队列

示例:日志收集系统

  • 一个队列收集 error 日志
  • 一个队列收集 全部 日志
  • 2个消费者,一个消费 error 消息,一个消费 全部 消息
  1. import com.rabbitmq.client.*;
  2. import java.nio.charset.StandardCharsets;
  3. // 第一个队列,消费所有消息
  4. public class Recv1 {
  5. // 交换机名称
  6. private final static String EXCHANGE_NAME = "exchange_direct";
  7. public static void main(String[] argv) throws Exception {
  8. ConnectionFactory factory = new ConnectionFactory();
  9. factory.setHost("192.168.31.71");
  10. factory.setPort(5672);
  11. factory.setUsername("guest");
  12. factory.setPassword("guest");
  13. factory.setVirtualHost("/dev");
  14. Connection connection = factory.newConnection();
  15. Channel channel = connection.createChannel();
  16. // 绑定 直连类型 的交换机
  17. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
  18. // 获取队列
  19. String queueName = channel.queueDeclare().getQueue();
  20. // 队列和交换机进行绑定,direct交换机 需要指定 routingKey
  21. channel.queueBind(queueName, EXCHANGE_NAME, "errorRoutingKey");
  22. channel.queueBind(queueName, EXCHANGE_NAME, "infoRoutingKey");
  23. channel.queueBind(queueName, EXCHANGE_NAME, "debugRoutingKey");
  24. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  25. // 消费消息回调
  26. String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
  27. System.out.println(" [x] 消费者1: '" + message + "'");
  28. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  29. };
  30. // 监听消息队列
  31. channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {
  32. });
  33. }
  34. }
  1. import com.rabbitmq.client.*;
  2. import java.nio.charset.StandardCharsets;
  3. // 第二个队列,消费error消息
  4. public class Recv2 {
  5. // 交换机名称
  6. private final static String EXCHANGE_NAME = "exchange_direct";
  7. public static void main(String[] argv) throws Exception {
  8. ConnectionFactory factory = new ConnectionFactory();
  9. factory.setHost("192.168.31.71");
  10. factory.setPort(5672);
  11. factory.setUsername("guest");
  12. factory.setPassword("guest");
  13. factory.setVirtualHost("/dev");
  14. Connection connection = factory.newConnection();
  15. Channel channel = connection.createChannel();
  16. // 绑定 直连类型 的交换机
  17. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
  18. // 获取队列
  19. String queueName = channel.queueDeclare().getQueue();
  20. // 队列和交换机进行绑定,direct交换机 需要指定 routingKey
  21. channel.queueBind(queueName, EXCHANGE_NAME, "errorRoutingKey");
  22. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  23. // 消费消息回调
  24. String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
  25. System.out.println(" [x] 消费者1: '" + message + "'");
  26. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  27. };
  28. // 监听消息队列
  29. channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {
  30. });
  31. }
  32. }
  •  一个生产者
  1. import com.rabbitmq.client.BuiltinExchangeType;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.nio.charset.StandardCharsets;
  6. // 发送消息
  7. public class Send {
  8. // 交换机名称
  9. private final static String EXCHANGE_NAME = "exchange_direct";
  10. public static void main(String[] argv) throws Exception {
  11. // 创建连接工厂对象
  12. ConnectionFactory factory = new ConnectionFactory();
  13. factory.setHost("192.168.31.71");
  14. factory.setPort(5672);
  15. factory.setUsername("guest");
  16. factory.setPassword("guest");
  17. factory.setVirtualHost("/dev");
  18. try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
  19. // 绑定 广播类型 的交换机
  20. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
  21. // 消息
  22. String errorMsg = "error消息";
  23. String infoMsg = "info消息";
  24. String debugMsg = "debug消息";
  25. // 发送消息
  26. channel.basicPublish(EXCHANGE_NAME, "errorRoutingKey", null, errorMsg.getBytes(StandardCharsets.UTF_8));
  27. channel.basicPublish(EXCHANGE_NAME, "infoRoutingKey", null, infoMsg.getBytes(StandardCharsets.UTF_8));
  28. channel.basicPublish(EXCHANGE_NAME, "debugRoutingKey", null, debugMsg.getBytes(StandardCharsets.UTF_8));
  29. }
  30. }
  31. }


5:Topic 主题 交换机模式

官网教程:RabbitMQ tutorial - Topics — RabbitMQ

  • Topic 可以实现发布订阅模式Fanout 和 路由模式Direct 的功能,更加灵活,支持模式匹配,通配符等。
  • 交换机通过通配符进行转发到对应的队列,* 代表一个词,#代表1个或多个词,一般用#作为通配符居多,比如 #.order, 会匹配 info.order 、sys.error.order, 而 *.order ,只会匹配 info.order, 使用.进行分割多个词。
  • 注意:
    • 交换机和队列绑定时用的binding使用通配符的路由健
    • 生产者发送消息时需要使用具体的路由健

示例:日志收集系统

  • 一个队列收集 error 日志
  • 一个队列收集 全部 日志
  • 2个消费者,一个消费 error 消息,一个消费 全部 消息
  1. import com.rabbitmq.client.*;
  2. import java.nio.charset.StandardCharsets;
  3. // 第一个队列,消费所有消息
  4. public class Recv1 {
  5. // 交换机名称
  6. private final static String EXCHANGE_NAME = "exchange_topic";
  7. public static void main(String[] argv) throws Exception {
  8. ConnectionFactory factory = new ConnectionFactory();
  9. factory.setHost("192.168.31.71");
  10. factory.setPort(5672);
  11. factory.setUsername("guest");
  12. factory.setPassword("guest");
  13. factory.setVirtualHost("/dev");
  14. Connection connection = factory.newConnection();
  15. Channel channel = connection.createChannel();
  16. // 绑定 主题类型 的交换机
  17. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
  18. // 获取队列
  19. String queueName = channel.queueDeclare().getQueue();
  20. // 队列和交换机进行绑定,topic交换机 需要指定 routingKey
  21. channel.queueBind(queueName, EXCHANGE_NAME, "*.log.*");
  22. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  23. // 消费消息回调
  24. String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
  25. System.out.println(" [x] 消费者1: '" + message + "'");
  26. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  27. };
  28. // 监听消息队列
  29. channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {
  30. });
  31. }
  32. }
  1. import com.rabbitmq.client.*;
  2. import java.nio.charset.StandardCharsets;
  3. // 第二个队列,消费error消息
  4. public class Recv2 {
  5. // 交换机名称
  6. private final static String EXCHANGE_NAME = "exchange_topic";
  7. public static void main(String[] argv) throws Exception {
  8. ConnectionFactory factory = new ConnectionFactory();
  9. factory.setHost("192.168.31.71");
  10. factory.setPort(5672);
  11. factory.setUsername("guest");
  12. factory.setPassword("guest");
  13. factory.setVirtualHost("/dev");
  14. Connection connection = factory.newConnection();
  15. Channel channel = connection.createChannel();
  16. // 绑定 主题类型 的交换机
  17. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
  18. // 获取队列
  19. String queueName = channel.queueDeclare().getQueue();
  20. // 队列和交换机进行绑定,topic交换机 需要指定 routingKey
  21. channel.queueBind(queueName, EXCHANGE_NAME, "*.log.error");
  22. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  23. // 消费消息回调
  24. String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
  25. System.out.println(" [x] 消费者1: '" + message + "'");
  26. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  27. };
  28. // 监听消息队列
  29. channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {
  30. });
  31. }
  32. }
  •  一个生产者
  1. import com.rabbitmq.client.BuiltinExchangeType;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.nio.charset.StandardCharsets;
  6. // 发送消息
  7. public class Send {
  8. // 交换机名称
  9. private final static String EXCHANGE_NAME = "exchange_topic";
  10. public static void main(String[] argv) throws Exception {
  11. // 创建连接工厂对象
  12. ConnectionFactory factory = new ConnectionFactory();
  13. factory.setHost("192.168.31.71");
  14. factory.setPort(5672);
  15. factory.setUsername("guest");
  16. factory.setPassword("guest");
  17. factory.setVirtualHost("/dev");
  18. try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
  19. // 绑定 主题类型 的交换机
  20. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
  21. // 消息
  22. String errorMsg = "订单服务 error消息";
  23. String infoMsg = "订单服务 info消息";
  24. String debugMsg = "用户服务 debug消息";
  25. // 发送消息
  26. channel.basicPublish(EXCHANGE_NAME, "order.log.error", null, errorMsg.getBytes(StandardCharsets.UTF_8));
  27. channel.basicPublish(EXCHANGE_NAME, "order.log.info", null, infoMsg.getBytes(StandardCharsets.UTF_8));
  28. channel.basicPublish(EXCHANGE_NAME, "user.log.debug", null, debugMsg.getBytes(StandardCharsets.UTF_8));
  29. }
  30. }
  31. }


SpringBoot 整合 RabbitMQ

pom 文件中添加依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. <version>2.7.5</version>
  5. </dependency>

application.yml 文件配置 rabbitmq

注意:1:guest 账号只能连本机的mq服务,实际开发的时候请创建一个新的账号。2:rabbitmq集成在maven聚合组件中,然后这个组件被其他服务依赖以此达到整合mq的方式的时候,application 文件的后缀名要是 .properties (rabbitmq 读取不到 yml 后缀的配置)

  1. spring:
  2. rabbitmq:
  3. host: 192.168.31.71
  4. port: 5672
  5. username: guest
  6. password: guest
  7. virtual-host: /dev

配置 交换机和队列绑定的 Bean

  1. import org.springframework.amqp.core.*;
  2. import org.springframework.beans.factory.annotation.Qualifier;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. public class RabbitmqConfig {
  7. // 自定义交换机名称
  8. public static final String EXCHANGE_NAME = "order_exchange";
  9. // 自定义队列名称
  10. public static final String QUEUE_NAME = "order_queue";
  11. /**
  12. * 创建 topic 交换机
  13. */
  14. @Bean(EXCHANGE_NAME) // 多个交换机时要指定交换机的Bean名称
  15. public Exchange orderExchange() {
  16. return ExchangeBuilder
  17. // 指定 主题类型的交换机 名称
  18. .topicExchange(EXCHANGE_NAME)
  19. // 是否持久化
  20. .durable(true)
  21. .build();
  22. }
  23. /**
  24. * 创建持久化队列
  25. */
  26. @Bean(QUEUE_NAME) // 多个队列时要指定队里的Bean名称
  27. public Queue orderQueue() {
  28. return QueueBuilder.durable(QUEUE_NAME).build();
  29. }
  30. /**
  31. * 队列和交换机绑定
  32. */
  33. @Bean
  34. public Binding orderBinding(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange) {
  35. return BindingBuilder
  36. // 绑定的队列
  37. .bind(queue)
  38. // 队列绑定到 指定的交换机
  39. .to(exchange)
  40. // 绑定的 routingKey
  41. .with("order.#")
  42. // 没有其他参数
  43. .noargs();
  44. }
  45. }

发送消息

  1. import com.lxx.rabbitmq.config.RabbitmqConfig;
  2. import org.junit.jupiter.api.Test;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.boot.test.context.SpringBootTest;
  6. @SpringBootTest
  7. class RabbitmqTests {
  8. @Autowired
  9. private RabbitTemplate rabbitTemplate;
  10. @Test
  11. public void test() {
  12. /*
  13. * 发送消息
  14. * 参数1:要发送的交换机
  15. * 参数2:指定匹配的 routingKey
  16. * 参数3:要发送的消息
  17. */
  18. rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME, "order.error", "订单 error消息");
  19. }
  20. }

消费者监听队列

  1. import org.springframework.amqp.core.Message;
  2. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. @RabbitListener(queues = "order_queue") // 消费者监听这个队列
  7. public class OrderMqListener {
  8. /**
  9. * 监听到队列有消息后,RabbitHandler 处理指定类型的消息
  10. *
  11. * @param body 消息
  12. */
  13. @RabbitHandler
  14. public void messageHandler(String body, Message message) {
  15. System.err.println(message.getMessageProperties().getMessageId());
  16. System.err.println(message.getMessageProperties().getDeliveryTag());
  17. System.err.println(message.toString());
  18. System.err.println(" X 字符串消费者:" + body);
  19. }
  20. /**
  21. * 监听到队列有消息后,RabbitHandler 处理指定类型的消息
  22. *
  23. * @param body 消息
  24. */
  25. @RabbitHandler
  26. public void messageHandler(Integer body, Message message) {
  27. System.err.println(" X 数字消费者:" + body);
  28. }
  29. }

RabbitMQ 消息可靠性投递

什么是消息的可靠投递?

保证消息百分百发送到消息队列中

如果确保消息的可靠投递

消息生产者 需要接受到mq服务端 接受到消息的确认应答
完善的消息补偿机制,发送失败的消息可以再感知并二次处理

RabbitMQ消息投递路径:生产者->交换机->队列->消费者

通过两个的点控制消息的可靠性投递

  • 生产者到交换机
    • 通过confirmCallback
  • 交换机到队列
    • 通过returnCallback
  • 生产者到交换机 开启ACK确认可靠消息投递

appliction.yml 配置

  1. spring:
  2. rabbitmq:
  3. # 开启消息 confirm 二次确认
  4. publisher-confirm-type: correlated

消息监听代码没变化

发送消息,代码如下:

  1. import org.junit.jupiter.api.Test;
  2. import org.springframework.amqp.rabbit.connection.CorrelationData;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.boot.test.context.SpringBootTest;
  6. @SpringBootTest
  7. class RabbitmqDemoApplicationTests {
  8. @Autowired
  9. private RabbitTemplate rabbitTemplate;
  10. @Test
  11. public void test() {
  12. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
  13. @Override
  14. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  15. System.err.println("ConfirmCallback ================");
  16. System.err.println(" ================ correlationData = " + correlationData);
  17. System.err.println(" ================ ack = " + ack);
  18. System.err.println(" ================ cause = " + cause);
  19. if (ack) {
  20. System.out.println("发送成功");
  21. // 更新数据库消息状态为 成功
  22. } else {
  23. System.err.println("发送失败");
  24. // 更新数据库消息状态为 失败
  25. }
  26. }
  27. });
  28. // 发送消息之前 ,数据库新增一条消息记录状态,状态是 发送 TODO
  29. // 发送消息
  30. rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME, "order.error", "订单 ConfirmCallback 消息");
  31. // 模拟投递失败
  32. // rabbitTemplate.convertAndSend("不存在的交换机", "order.error", "订单 ConfirmCallback消息");
  33. }
  34. }
  • 交换机到队列 可靠消息投递

appliction.yml 配置

  1. spring:
  2. rabbitmq:
  3. # 开启 交换机到 队列
  4. publisher-returns: true
  5. template:
  6. # 为true,则交换机处理消息到路由失败后,则会返回给生产者。 或者代码 rabbitTemplate.setMandatory(true) 是一样的效果
  7. mandatory: true

消息监听代码没变化

发送消息,代码如下:

  1. package com.lxx.rabbitmq;
  2. import com.lxx.rabbitmq.config.RabbitmqConfig;
  3. import org.junit.jupiter.api.Test;
  4. import org.springframework.amqp.core.ReturnedMessage;
  5. import org.springframework.amqp.rabbit.connection.CorrelationData;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.boot.test.context.SpringBootTest;
  9. @SpringBootTest
  10. class RabbitmqDemoApplicationTests {
  11. @Autowired
  12. private RabbitTemplate rabbitTemplate;
  13. @Test
  14. public void test() {
  15. rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
  16. @Override
  17. public void returnedMessage(ReturnedMessage returned) {
  18. int replyCode = returned.getReplyCode();
  19. System.err.println("ReturnsCallback ================");
  20. System.err.println(" ================ code = " + replyCode);
  21. System.err.println(" ================ returned = " + returned.toString());
  22. }
  23. });
  24. // 发送消息
  25. rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME, "order.error", "订单 ReturnsCallback 消息");
  26. // 模拟投递失败
  27. // rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME, "不存在的routingKey", "订单 ReturnsCallback 消息");
  28. }
  29. }

RabbitMQ 消息确认消费ACK

消费者从broker中监听到消息,要确保消息被正常处理。

RabbitMQ 消费者ACK介绍

  • 消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除
  • 消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放入队列中Ready
  • 只有当消费者正确发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。
  • 消息的ACK确认机制默认是开启状态自动ACK,消息如未被进行ACK的消息确认机制,这条消息被锁定Unacked

  • 消息的可靠消费

appliction.yml 配置

  1. spring:
  2. rabbitmq:
  3. listener:
  4. simple:
  5. #开启手动确认消息,如果消息重新入队,进行重试
  6. acknowledge-mode: manual
  7. retry:
  8. enabled: true #是否开启消费者重试
  9. max-attempts: 5 #最大重试次数
  10. initial-interval: 5000ms #重试间隔时间(单位毫秒)
  11. max-interval: 1200000ms #重试最大时间间隔(单位毫秒)
  12. multiplier: 2 #间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间

发送消息代码没变化

消息监听,代码如下:

  1. import com.rabbitmq.client.Channel;
  2. import org.springframework.amqp.core.Message;
  3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. import java.io.IOException;
  7. @Component
  8. @RabbitListener(queues = "order_queue")
  9. public class OrderMqListener {
  10. @RabbitHandler
  11. public void messageHandler(String body, Message message, Channel channel) throws IOException {
  12. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  13. System.err.println(deliveryTag);
  14. System.err.println(message.toString());
  15. System.err.println(" X 字符串消费者:" + body);
  16. // 告诉 broker 消息被正常消费 确认ACK
  17. channel.basicAck(deliveryTag, false);
  18. /*
  19. * 告诉 broker,消息被消费后 拒绝确认ACK
  20. * 参数一:deliveryTag,消息被投递的次数
  21. * 参数二:是否批量 拒绝ACK,false 一条一条的拒绝ack
  22. * 参数上:是否重新投递到队列中
  23. */
  24. //channel.basicNack(deliveryTag, false, true); // 一次可以拒绝接收0个或多个
  25. //channel.basicReject(deliveryTag, true); // 一次只能拒绝接收一个消息
  26. }
  27. }

SpringBoot 配置 RabbitMQ 广播 发布/订阅

一个消息生产者,多个消费者节点,共同消费同一条消息

配置 广播 交换机和队列绑定的 Bean

  1. import org.springframework.amqp.core.*;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. @Configuration
  5. public class RabbitmqConfig {
  6. // 自定义交换机名称
  7. public static final String EXCHANGE_NAME = "order_exchange";
  8. // 自定义队列名称
  9. public static final String QUEUE_NAME = "order_queue";
  10. /**
  11. * 创建 广播 交换机
  12. */
  13. @Bean(EXCHANGE_NAME)
  14. public FanoutExchange orderExchange() {
  15. return ExchangeBuilder.fanoutExchange(EXCHANGE_NAME).durable(true).build();
  16. }
  17. /**
  18. * 创建持久化队列
  19. */
  20. @Bean(QUEUE_NAME)
  21. public Queue orderQueue() {
  22. return QueueBuilder.durable(QUEUE_NAME).build();
  23. }
  24. /**
  25. * 队列和交换机绑定
  26. */
  27. @Bean
  28. public Binding orderBinding() {
  29. return BindingBuilder.bind(orderQueue()).to(orderExchange());
  30. }
  31. }

发送消息

  1. import org.junit.jupiter.api.Test;
  2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.boot.test.context.SpringBootTest;
  5. @SpringBootTest
  6. class RabbitmqTests {
  7. @Autowired
  8. private RabbitTemplate rabbitTemplate;
  9. @Test
  10. public void test() {
  11. /*
  12. * 发送消息
  13. * 参数1:要发送的交换机
  14. * 参数2:广播不要指定路由key
  15. * 参数3:要发送的消息
  16. */
  17. rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME, "", "广播消息");
  18. }
  19. }

消费者监听,消费者可以多节点/集群部署,多节点可以消费同一条消息

  1. import org.springframework.amqp.core.ExchangeTypes;
  2. import org.springframework.amqp.rabbit.annotation.*;
  3. import org.springframework.stereotype.Component;
  4. @Component
  5. public class OrderMqListener {
  6. /**
  7. * 监听到队列有消息后,RabbitHandler 处理指定类型的消息
  8. *
  9. * @param body 消息
  10. */
  11. @RabbitHandler
  12. @RabbitListener(bindings = @QueueBinding(
  13. value = @Queue(), // 注意:此处不能指定队列名称。 如果指定队列只能被一个消费者节点消费
  14. exchange = @Exchange(value = RabbitmqConfig.EXCHANGE_NAME, type = ExchangeTypes.FANOUT)))
  15. public void messageHandler(String body) {
  16. System.err.println(" X 消息 :" + body);
  17. }
  18. }


RabbitMQ TTL死信队列

什么是 TTL?

  • time to live 消息存活时间的意思。
  • 如果消息在存活时间内未被消费,则会被清除。
  • RabbitMQ支持两种ttl设置
    • 整个队列进行配置ttl(居多)
    • 单独消息进行配置ttl

什么是死信队列?

用来存放 在存活时间内未被消费消息 的队列        // 过期消息不清楚,存放在此队列 

什么是死信交换机?

Dead Letter Exchange(死信交换机,缩写:DLX)当消息成为死信后,会被重新发送到另一个交换机,这个交换机就是DLX死信交换机。

注意:死信队列和死信交换机 与 普通队列普通交换机没区别。

消息什么情况下会成为死信消息?

  • 消费者拒收消息(basic.reject/basic.nack),并且没有重新入队 requeue=false
  • 消息在队列中未被消费,且超过队列或者消息本身的过期时间TTL(time-to-live)
  • 队列的消息长度达到极限

成为死信的结果:如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列

如何设置消息的TTL存活时间?

方式一:队列过期,对整个队列消息设置统一过期时间

x-message-ttl      单位:ms毫秒

方式二:消息过期,对单个消息进行设置

expiration           单位:ms毫秒

注意:两者都配置的话,时间短的先触发。


RabbitMQ 控制台 操作 死信队列绑定死信交换机

-- 代码操作和普通操作没有不同,这里学习控制面板的操作

创建死信交换机

创建死信队列

死信队列和死信交换机绑定

新建普通队列,设置队列的过期时间。指定普通队列对应的死信交换机

向普通队列 里发送消息,过期后,消息路由到 死信队列


RabbitMQ 延迟队列

什么是延迟队列?

一种带有延迟功能的消息队列,Producer 将消息发送到消息队列 服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某一个时间投递到 Consumer 进行消费,该消息即定时消息。

定时消息的使用场景

  • 通过消息触发一些定时任务,比如在某一固定时间点向用户发送提醒消息
  • 用户登录之后5分钟给用户做分类推送、用户多少天未登录给用户做召回推送;
  • 消息生产和消费有时间窗口要求:比如在天猫电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条 延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。 如支付未完成,则关闭订单。如已完成支付则忽略

RabbitMQ 实现延迟消息

RabbitMQ本身是不支持延迟队列的。需要结合死信队列的特性,达到延迟消息的目的。

  • 消息生产者
    • 消息投递到普通的交换机
    • 消息过期,进入死信队列
  • 消费消费者
    • 消费者监听死信交换机的队列

SpringBoot 实现延迟队列

配置 死信交换机和死信队列,配置普通交换机和普通队列,配置普通队列绑定到死信交换机

  1. import org.springframework.amqp.core.*;
  2. import org.springframework.beans.factory.annotation.Qualifier;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. public class OrderTimeoutCloseConfig {
  7. // ==================================================死信队列 start========================================================
  8. /**
  9. * 死信交换机,订单超时关闭
  10. */
  11. public static final String ORDER_TIMEOUT_CLOSE_DEAD_EXCHANGE = "order_timeout_close_dead_exchange";
  12. /**
  13. * 死信队列,订单超时关闭
  14. */
  15. public static final String ORDER_TIMEOUT_CLOSE_DEAD_QUEUE = "order_timeout_close_dead_queue";
  16. /**
  17. * 进入死信队列的路由key,订单超时关闭
  18. */
  19. public static final String ORDER_TIMEOUT_CLOSE_ROUTING_KEY = "order_timeout_close_routing_key";
  20. /**
  21. * 创建 死信 交换机
  22. */
  23. @Bean(ORDER_TIMEOUT_CLOSE_DEAD_EXCHANGE)
  24. public TopicExchange orderTimeoutCloseDeadExchange() {
  25. return ExchangeBuilder.topicExchange(ORDER_TIMEOUT_CLOSE_DEAD_EXCHANGE).durable(true).build();
  26. }
  27. /**
  28. * 创建 死信 队列
  29. */
  30. @Bean(ORDER_TIMEOUT_CLOSE_DEAD_QUEUE)
  31. public Queue orderTimeoutCloseDeadQueue() {
  32. return QueueBuilder.durable(ORDER_TIMEOUT_CLOSE_DEAD_QUEUE).build();
  33. }
  34. /**
  35. * 死信队列和死信交换机绑定
  36. */
  37. @Bean
  38. public Binding deadOrderTimeoutBinding(@Qualifier(ORDER_TIMEOUT_CLOSE_DEAD_QUEUE) Queue queue, @Qualifier(ORDER_TIMEOUT_CLOSE_DEAD_EXCHANGE) Exchange exchange) {
  39. return BindingBuilder.bind(queue).to(exchange).with(ORDER_TIMEOUT_CLOSE_ROUTING_KEY).noargs();
  40. }
  41. /*
  42. * 死信队列和死信交换机绑定,方式二
  43. */
  44. // @Bean
  45. // public Binding deadOrderTimeoutBinding() {
  46. // return new Binding(
  47. // ORDER_TIMEOUT_CLOSE_DEAD_QUEUE,
  48. // Binding.DestinationType.QUEUE,
  49. // ORDER_TIMEOUT_CLOSE_DEAD_EXCHANGE,
  50. // ORDER_TIMEOUT_CLOSE_ROUTING_KEY,
  51. // null
  52. // );
  53. // }
  54. // ==================================================死信队列 end========================================================
  55. // ==================================================普通队列 start========================================================
  56. /**
  57. * 普通交换机,订单超时,用于进入死信队列
  58. */
  59. public static final String ORDER_TIMEOUT_INTO_DEAD_EXCHANGE = "order_timeout_into_dead_exchange";
  60. /**
  61. * 普通队列,订单超时,用于进入死信队列
  62. */
  63. public static final String ORDER_TIMEOUT_INTO_DEAD_QUEUE = "order_timeout_into_dead_queue";
  64. /**
  65. * 进入普通队列的路由key,订单超时关闭
  66. */
  67. public static final String ORDER_TIMEOUT_INTO_ROUTING_KEY = "order_timeout_into_routing_key";
  68. /**
  69. * 创建 普通 交换机
  70. */
  71. @Bean(ORDER_TIMEOUT_INTO_DEAD_EXCHANGE)
  72. public TopicExchange orderTimeoutIntoDeadExchange() {
  73. return ExchangeBuilder.topicExchange(ORDER_TIMEOUT_INTO_DEAD_EXCHANGE).durable(true).build();
  74. }
  75. /**
  76. * 创建 普通 队列,普通队列和死信队列进行绑定
  77. */
  78. @Bean(ORDER_TIMEOUT_INTO_DEAD_QUEUE)
  79. public Queue orderTimeoutIntoDeadQueue() {
  80. /* // 方式一
  81. Map<String, Object> args = new HashMap<>(3);
  82. args.put("x-dead-letter-exchange", ORDER_TIMEOUT_CLOSE_DEAD_EXCHANGE); // 要绑定的死信交换机
  83. args.put("x-dead-letter-routing-key", ORDER_TIMEOUT_CLOSE_ROUTING_KEY); // 要绑定的死信 binding key
  84. args.put("x-message-ttl", 10000); // 普通队列的消息过期时间,过期后 消息进入死信队列,单位:ms毫秒
  85. return QueueBuilder.durable(ORDER_TIMEOUT_INTO_DEAD_QUEUE).withArguments(args).build();*/
  86. // 方式二
  87. return QueueBuilder.durable(ORDER_TIMEOUT_INTO_DEAD_QUEUE)
  88. // 要绑定的死信交换机
  89. .deadLetterExchange(ORDER_TIMEOUT_CLOSE_DEAD_EXCHANGE)
  90. // 要绑定的死信 binding key
  91. .deadLetterRoutingKey(ORDER_TIMEOUT_CLOSE_ROUTING_KEY)
  92. // 普通队列的消息过期时间,过期后 消息进入死信队列,单位:ms毫秒
  93. .ttl(10000) // 这里测试指定10秒,正式情况可以指定30分钟
  94. .build();
  95. }
  96. /*
  97. * 普通队列和普通交换机绑定
  98. */
  99. @Bean
  100. public Binding orderTimeoutBinding() {
  101. return new Binding(
  102. ORDER_TIMEOUT_INTO_DEAD_QUEUE,
  103. Binding.DestinationType.QUEUE,
  104. ORDER_TIMEOUT_INTO_DEAD_EXCHANGE,
  105. ORDER_TIMEOUT_INTO_ROUTING_KEY,
  106. null
  107. );
  108. }
  109. // ==================================================普通队列 end========================================================
  110. }

消费者,监听死信队列

  1. import com.rabbitmq.client.Channel;
  2. import org.springframework.amqp.core.Message;
  3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. import java.io.IOException;
  7. @Component
  8. public class OrderTimeoutCloseMQListener {
  9. @RabbitHandler
  10. @RabbitListener(queues = OrderTimeoutCloseConfig.ORDER_TIMEOUT_CLOSE_DEAD_QUEUE) // 监听死信队列
  11. public void messageHandler(String body, Message message, Channel channel) throws IOException {
  12. // 1:监听到 订单消息,拿到订单id
  13. System.err.println(" X 监听死信队列收到消息 body = " + body);
  14. // 2:用订单id,查询数据库订单信息,如果订单状态是 已支付,这里不做操作
  15. // 3:如果订单状态是 未支付,把订单状态设置成 超时未支付
  16. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  17. }
  18. }

生产者,向普通队列发送消息

  1. import net.minidev.json.JSONObject;
  2. import org.junit.jupiter.api.Test;
  3. import org.springframework.amqp.core.ReturnedMessage;
  4. import org.springframework.amqp.rabbit.connection.CorrelationData;
  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.boot.test.context.SpringBootTest;
  8. import java.util.HashMap;
  9. import java.util.Map;
  10. @SpringBootTest
  11. class RabbitmqDemoApplicationTests {
  12. @Autowired
  13. private RabbitTemplate rabbitTemplate;
  14. /**
  15. * 模拟下单成功
  16. */
  17. @Test
  18. public void testBuy() {
  19. // 1:用户下单把订单信息存入数据库,返回订单id
  20. // 2:发送订单id到 普通消息队列
  21. Map<String, String> map = new HashMap<>();
  22. map.put("orderId", "123456789");
  23. rabbitTemplate.convertAndSend(OrderTimeoutCloseConfig.ORDER_TIMEOUT_INTO_DEAD_EXCHANGE, OrderTimeoutCloseConfig.ORDER_TIMEOUT_INTO_ROUTING_KEY, JSONObject.toJSONString(map));
  24. }
  25. }

RabbitMQ 的集群环境

RabbitMQ 普通集群模式的介绍

        集群有 3 个节点,node1、node2、node3,三个节点有相同的元数据(交换机、队列结构),一个消息只存在一个节点上,不在其他节点同时存在。

例如:

        A消息,存在node1节点上。A消息在node2、node3节点上不存在。

消费者监听node1节点可以直接消费到 A消息。假如消费者监听的是node2节点,那么rabbitmq 会把A消息被消费的时候才从 node1 节点取出放入到node2节点,然后node2节点再把消息转发给消费者。

问题:

        1:假如node1节点故障,那么node2无法获取node1节点上未被消费的消息。

        2:如果node1持久化后发生故障,消息需要等到node1恢复正常后才可以正常消费。

        3:如果node1未做持久化发生故障,那么node1节点上的消息将会丢失。

应用场景:

        该模式适用于消息无需持久化的场景,例如日志传输队列。

注意:集群需要保证每个节点有相同的token令牌。

消息持久化


RabbitMQ 搭建普通集群环境

1:准备节点环境

3个节点的访问 web控制台访问端口分别是:15671、15672、15673

准备3个目录,用于放 3个节点

  1. /usr/local/rabbitmq/1
  2. /usr/local/rabbitmq/2
  3. /usr/local/rabbitmq/3

创建 节点1

  1. sudo docker run -d \
  2. --name rabbitmq_1 \
  3. -h rabbitmq_host1 \
  4. -p 4361:4369 \
  5. -p 5671:5672 \
  6. -p 15671:15672 \
  7. -p 25671:25672 \
  8. -e RABBITMQ_NODENAME='rabbit' \
  9. -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_lxx' \
  10. --privileged=true \
  11. -v /usr/local/rabbitmq/1/lib:/var/lib/rabbitmq/ \
  12. rabbitmq:management

创建 节点2

  1. sudo docker run -d \
  2. --name rabbitmq_2 \
  3. -h rabbitmq_host2 \
  4. -p 4362:4369 \
  5. -p 5672:5672 \
  6. -p 15672:15672 \
  7. -p 25672:25672 \
  8. -e RABBITMQ_NODENAME='rabbit' \
  9. -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_lxx' \
  10. --link rabbitmq_1:rabbitmq_host1 \
  11. --privileged=true \
  12. -v /usr/local/rabbitmq/2/lib:/var/lib/rabbitmq/ \
  13. rabbitmq:management

创建 节点3

  1. sudo docker run -d \
  2. --name rabbitmq_3 \
  3. -h rabbitmq_host3 \
  4. -p 4363:4369 \
  5. -p 5673:5672 \
  6. -p 15673:15672 \
  7. -p 25673:25672 \
  8. -e RABBITMQ_NODENAME='rabbit' \
  9. -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_lxx' \
  10. --link rabbitmq_1:rabbitmq_host1 \
  11. --link rabbitmq_2:rabbitmq_host2 \
  12. --privileged=true \
  13. -v /usr/local/rabbitmq/3/lib:/var/lib/rabbitmq/ \
  14. rabbitmq:management

参数说明:

  • -e RABBITMQ_ERLANG_COOKIE:指定集群节点的cookie,节点的cookie要配置相同。 
  • --link:容器互联,让容器之前可以相互ping通
  • --privileged:让容器内部的用户有root权限,不然用户对容器内部的文件没有操作权限permission denied
  • -v :让物理机路径与容器里的路径映射,容器里的路径的数据会存储在物理机上

节点完成后,可以访问 http://ip:端口,查看2个节点是否创建成功。

如果容器启动失败,可以使用 docker logs 容器id 命令查看启动日志。


2:节点配置成集群

配置之前查看节点状态,进入容器内,使用命令:rabbitmqctl cluster_status

配置节点1

  1. docker exec -it 节点1的容器名称 /bin/bash  // 进入启动的docker容器内
  2. rabbitmqctl stop_app                      // 停止 rabbitmq 服务,rabbitmqctl是rabbitmq的操作命令 
  3. rabbitmqctl reset // 重置 rabbitmq
  4. rabbitmqctl start_app // 启动 rabbitmq 服务
  5. exit // 退出 docker 容器

配置节点2,加入集群

  1. docker exec -it 节点2的容器名称 /bin/bash        // 进入启动的docker容器内
  2. rabbitmqctl stop_app                      // 停止 rabbitmq 服务,rabbitmqctl是rabbitmq的操作命令 
  3. rabbitmqctl reset // 重置 rabbitmq
  4. rabbitmqctl join_cluster --ram 节点1的hostname // 加入集群 --ram 参数是以内存的方式加入,不带此参数默认是磁盘的方式,节点1的hostname是:rabbit@rabbitmq_host1
  5. rabbitmqctl start_app // 启动 rabbitmq 服务
  6. exit // 退出 docker 容器

配置节点3,加入集群

  1. docker exec -it 节点3的容器名称 /bin/bash        // 进入启动的docker容器内
  2. rabbitmqctl stop_app                      // 停止 rabbitmq 服务,rabbitmqctl是rabbitmq的操作命令 
  3. rabbitmqctl reset // 重置 rabbitmq
  4. rabbitmqctl join_cluster --ram 节点1的hostname // 加入集群 --ram 参数是以内存的方式加入,不带此参数默认是磁盘的方式,节点1的hostname是:rabbit@rabbitmq_host1
  5. rabbitmqctl start_app // 启动 rabbitmq 服务
  6. exit // 退出 docker 容器

配置完成之后,可以在容器内使用命令:rabbitmqctl cluster_status,查看集群状态,可以看到集群现在有3个节点在运行。1个磁盘节点,2个内存节点

访问网页,可以看到有3个节点

消息队列和交换机在所有节点上存在。消息只在自己的节点上存在,当一个节点宕机后,宕机节点上的消息无法被消费(消息不可用)


3:SpringBoot 整合 RabbitMQ 普通集群

application.yml 文件配置 rabbitmq 集群地址,其他配置不变

  1. spring:
  2. rabbitmq:
  3. listener:
  4. simple:
  5. acknowledge-mode: manual
  6. publisher-returns: true
  7. template:
  8. mandatory: true
  9. publisher-confirm-type: correlated
  10. # host: 192.168.189.75
  11. # port: 5672
  12. virtual-host: /dev
  13. password: guest
  14. username: guest
  15. ### 配置节点地址
  16. addresses: 192.168.189.75:5671,192.168.189.75:5672,192.168.189.75:5673

代码操作,和上面的单节点的一样正常的生产监听消息就行了,这里就不重复贴代码了


RabbitMQ 镜像集群模式的介绍(推荐)

        队列做成镜像队列,镜像队列中的消息在各个节点之间同步(A消息在各个节点中都存在)。

好处:
    实现了高可用,部分节点宕机后,不影响消息的正常消费。
    镜像集群模式可以保证消息100%不丢失。适用于高可用要求高的需求,例如订单服务。

缺点:
    消息数量过多,大量的消息同步会加大网络宽带的消耗。节点越多服务器性能受影响越大
    
注意:集群需要保证每个节点有相同的token令牌。

策略policy介绍

policy是用来控制和修改集群的vhost队列和Exchange复制行为。哪些Exchange或者queue的数据需要复制、同步,以及如何复制同步。

  • 创建一个policy策略

路径:进入rabbitmq控制台 -> Admin -> Policies -> Add / update a policy

参数介绍:

Name:自定义策略名称,建议不要使用空格

Pattern:用于匹配队列/交换机的正则表达式,^ 符号,表示匹配所有

Apply to:应用到交换机和队列

Priority:优先级。一个队列/交换机只会有一个生效的 Policy,如果匹配多个 Policy,则优先级数值最大的 Policy 生效。

Definition:JSON格式的一组键值对,表示设置的属性,会被注入匹配队列/交换机

  • ha-mode:
    • all:表示在集群中所有的节点上进行镜像同步(一般都用这个参数)
    • exactly:表示在指定个数的节点上进行镜像同步,节点的个数由ha-params指定
    • nodes:表示在指定的节点上进行镜像同步,节点名称通过ha-params指定
  • ha-sync-mode:镜像消息同步方式
    • automatic: 自动(默认)
    • manually:手动

policy策略创建完成后,镜像队列就配置成功了。可以看到队列发生了如下变化

镜像集群注意点:

  • 集群启动顺序:先启动磁盘节点 => 再启动内存节点
  • 集群关闭顺序:先关闭内存节点 => 再关闭磁盘节点
  • 最后关闭必须是磁盘节点,否则容易造成集群启动失败、数据丢失等异常情况

可以看到节点宕掉一个后,消息还是存在的

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/515807
推荐阅读
相关标签
  

闽ICP备14008679号