赞
踩
参考API : Overview (RabbitMQ Java Client 5.20.0 API)
参考文档: RabbitMQ: One broker to queue them all | RabbitMQ
目录
在此之前您需要了解生产者消费者模型... 学习的时候, 应该结合下面这张图一起看.
- package one;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- /**
- * 消费者
- */
- public class Consumer {
- public static final String QUEUE_NAME = "hello";
-
- // 接受消息
- public static void main(String[] args) throws IOException, TimeoutException {
- // 创建链接工厂
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("106.14.165.91");
- factory.setPassword("123");
- factory.setUsername("admin");
-
- // 进行连接
- Connection connection = factory.newConnection();
-
- // 链接成功之后创建一个信道
- Channel channel = connection.createChannel();
-
- // 消费者消费消息
- /**
- * 参数
- * 1.消费哪个队列
- * 2.消费成功之后,是否要自动应答,true表示自动应答, 否则false
- * 3.未消费成功的回调方法
- * 4.消费者取消消费的回调
- */
- channel.basicConsume(QUEUE_NAME, true, new DeliverCallback() {
- @Override
- public void handle(String var1, Delivery var2) throws IOException {
- String msg = new String(var2.getBody());
- System.out.println(msg);
- }
- }, new CancelCallback() {
- @Override
- public void handle(String s) throws IOException {
- System.out.println("发生错误:" + s);
- }
- });
-
-
- }
- }
- package one;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- public class Producer {
- public static final String QUEUE_NAME = "hello";
-
- // 发消息
- public static void main(String[] args) throws IOException, TimeoutException {
- // 创建一个链接工厂
- ConnectionFactory factory = new ConnectionFactory();
-
- // 工厂的ip, 链接rabbit队列
- factory.setHost("106.14.165.91");
- factory.setUsername("admin");
- factory.setPassword("123");
-
- // 建立连接
- Connection connection = factory.newConnection();
- // 获取信道
- Channel channel1 = connection.createChannel();
-
- // 生成一个队列
- /**
- * 第一个参数: 队列名称
- * 第二个参数: 消息是否持久化, true表示存储在磁盘上, 否则表示存储在内存中(默认)
- * 第三个参数: 该队列是否消息共享, true表示可以多个消费者消费, 否则只能一个消费者消费
- * 第四个参数: 是否自动删除, 最后一个消费者断开连接之后, 该队列是否自动删除,true表示自动删除
- * 其他参数:
- */
- channel1.queueDeclare(QUEUE_NAME,false,false,false,null);
- long nextPublishSeqNo = channel1.getNextPublishSeqNo();
- System.out.println(nextPublishSeqNo);
- // 发送消息
- String msg = "hello world";
-
- /** 参数列表
- * 1 : 发送到哪个交换机
- * 2 : 路由的key值, 本次是队列名称
- * 3 : 其他参数
- * 4 : 消息体
- */
- channel1.basicPublish("",QUEUE_NAME,null,msg.getBytes());
- long nextPublishSeqNo2 = channel1.getNextPublishSeqNo();
- System.out.println(nextPublishSeqNo);
- channel1.basicPublish("",QUEUE_NAME,null,msg.getBytes());
- long nextPublishSeqNo3 = channel1.getNextPublishSeqNo();
-
- System.out.println("消息发送完毕over");
-
- }
- }
Connection:publisher/consumer和broker之间的TCP连接, Channel:如果每一次访问 RabbitMQ 都建立一个Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id 帮助客户端和message broker 识别 channel,所以channel之间是完全隔离的。
Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销.
创建一个连接工厂之后, 设置对应Rabbitmq在哪个服务器上面, 并提供安全访问的验证.
在建立连接工厂之后进行连接, 就可以使用工厂创建连接.
- ConnectionFactory factory = new ConnectionFactory();
- // 工厂的ip, 链接rabbit队列
- factory.setHost("106.14.165.11");
- factory.setUsername("usr");
- factory.setPassword("123456");
-
- Connection connection = factory.newConnection();
-
- Channel channel = connection.createChannel();
创建链接之后就可以使用这个链接的对象来创建channel.
生产者生产消息, 然后通过channel发送给队列. 通过创建的channel对象, 调用其中的basicPublish方法来将消息发送给队列.
basicPublish
是 RabbitMQ 中用于发布消息到指定交换机的方法。它的主要作用是允许生产者将消息发送到 RabbitMQ 的交换机,然后交换机根据路由规则将消息发送到相应的队列中,以供消费者消费。
basicPublish参数解析:
basicPublish有三个重载版本:
void basicPublish(String exchange , String routingKey , AMQP.BasicProperties props, byte[] body ) throws IOException;
exchange这个参数, 如果指定默认的交换机, 也就是如下图所示:
void basicPublish(String exchange, String routingKey, boolean var3, AMQP.BasicProperties props, byte[] body) throws IOException;
routingKey
:与第一个方法中的意义相同,分别是交换机名称和路由键。var3
(boolean):是否强制路由(mandatory routing)。如果设置为true
,并且消息无法路由到任何队列(没有匹配的绑定),那么RabbitMQ会返回一个错误给生产者。如果设置为false
,消息将被丢弃。void basicPublish(String var1, String var2, boolean var3, boolean var4, AMQP.BasicProperties var5, byte[] var6) throws IOException;
var1
和 var2
:与前两个方法中的意义相同,分别是交换机名称和路由键。var3
(boolean):是否强制路由,与第二个方法中的意义相同。var4
(boolean):是否立即发布(immediate flag)。如果设置为true
,并且消息无法路由到任何消费者(没有匹配的队列或消费者不在线),那么RabbitMQ会返回一个错误给生产者。如果设置为false
,消息将被存储在队列中等待消费者。var5
和 var6
:与第一个方法中的意义相同,分别是消息属性和消息体。需要注意的是, 如果你指定默认的交换机, 也就是default交换机, 那么第二个参数routingKey的意思就变成了queue了, 也就是第二个参数改为 对应的队列的名称.
消费者消费消息的方法为basicConsume() 这个方法有很多个重载, 如下:
地址: Channel (RabbitMQ Java Client 5.20.0 API)
这里只讲解最常见的, 也是初学者最常用的一个方法:
- basicConsume(
- String queue,
- boolean autoAck,
- Map<String,Object> arguments,
- DeliverCallback deliverCallback,
- CancelCallback cancelCallback
- )
参数解析:
true
,则一旦消息被交付给消费者,RabbitMQ 会自动将其标记为已确认,即使消费者还没有实际处理完这条消息。这种模式下,如果消费者在处理消息时崩溃或发生错误,那么这条消息就会丢失,因为 RabbitMQ 认为它已经被成功处理了。false
,则消费者需要显式地调用 basicAck
方法来确认消息已被成功处理。这样,如果消费者在处理消息时崩溃,RabbitMQ 会重新将这条消息放回队列中,等待其他消费者处理,从而保证了消息的可靠性。basicCancel
)时,会自动调用这个回调。这个回调可以用于执行清理工作,比如释放资源、记录日志等。下面是 DeliverCallback 和CancelCallback 两个接口的代码:
- @FunctionalInterface
- public interface DeliverCallback {
- void handle(String var1, Delivery var2) throws IOException;
- }
- @FunctionalInterface
- public interface CancelCallback {
- void handle(String var1) throws IOException;
- }
我们需要重写里面的handle方法, 示例如下:
- channel.basicConsume(QUEUE_NAME, true, new DeliverCallback() {
- @Override
- public void handle(String var1, Delivery var2) throws IOException {
- // ...
- }
- }, new CancelCallback() {
- @Override
- public void handle(String s) throws IOException {
- // ...
- }
- });
当然你也可以使用lambda表达式;
生产者使用的是basicPublish来将消息推送至队列, 也就是:
channel1.basicPublish("",QUEUE_NAME,null,msg.getBytes());
但是存在一个问题, 如果你basicPublish指定的交换机不存在? 那么你推送消息到你指定的交换机, 就会发生异常, 所以除非你的RabbitMQ-server本地已经创建了这个交换机, 那么就不需要其他操作, 但是如果你没有你指定的名称的交换机, 那么就应该去声明一个交换机.
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String,Object> arguments)
queueDeclare()
也就是方法: queueDeclare() , 它可以指定参数, 也可以不指定, 下面是他们的解释:
queueDeclare
是 RabbitMQ Java 客户端库中用于声明队列的方法。这个方法有两个版本,一个不带参数,另一个带有多个参数以提供队列的详细配置。下面我将详细解释这两个方法及其参数的作用。
第一个方法:queueDeclare()
Actively declare a server-named exclusive, autodelete, non-durable queue.
这个方法不带任何参数。当你调用这个方法时,RabbitMQ 会为你创建一个新的队列,该队列的名称将由 RabbitMQ 自动生成,并且这个队列是非持久的、排他的、自动删除的,且不带任何额外的参数。
由于没有指定队列名称,你通常无法预先知道队列的确切名称,这可能会在某些场景下造成不便,比如当你需要多个消费者共享同一个队列时。此外,由于队列是非持久的,如果 RabbitMQ 服务器重启,这个队列将会丢失,所有在队列中的消息也会丢失。
第二个方法:queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String,Object> arguments)
这个方法允许你更细致地配置队列的属性。下面是每个参数的解释:
true
,队列会在 RabbitMQ 服务器重启后依然存在。如果设置为 false
,队列则是非持久的,服务器重启后队列将不存在。true
,队列只能被声明它的连接使用,并且当连接关闭时,队列会被自动删除。这通常用于临时队列。true
,当最后一个消费者断开连接后,队列会自动删除。如果设置为 false
,则不会自动删除队列。对比两个方法
第一个方法(无参数版本)非常简单易用,但功能有限。它适用于那些不需要复杂队列配置的场景,比如临时测试或简单应用。然而,由于它创建的队列是非持久的,且名称不可预知,因此它可能不适用于需要持久化存储或精确控制队列名称的场景。
第二个方法(带参数版本)提供了更丰富的队列配置选项,使得你可以更精确地控制队列的行为。通过设置不同的参数,你可以创建持久化队列、排他队列、自动删除队列,以及带有额外属性的队列。这使得这个方法适用于那些需要复杂队列配置和高级特性的场景。
在实际应用中,你应该根据应用的需求来选择使用哪个方法。如果你只是需要一个简单的、临时的队列来传递消息,那么无参数版本可能足够了。但如果你需要确保队列的持久性、控制队列的名称、设置队列的额外属性等,那么你应该使用带参数版本。
对于消费者同样如此
AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String,Object> arguments)
这个方法确实是用于声明(或创建)一个队列的。在RabbitMQ中,队列的声明是一个幂等操作,这意味着即使队列已经存在,再次声明它也不会产生错误或导致任何不期望的行为。
当调用这个方法时,RabbitMQ会检查是否已经存在具有相同名称的队列:
如果队列不存在:RabbitMQ会根据提供的参数(如durable
、exclusive
、autoDelete
和arguments
)创建一个新的队列。
如果队列已经存在:RabbitMQ会忽略声明请求中的大多数参数(除了exclusive
和autoDelete
,这两个参数仅在首次声明队列时生效),并返回队列的当前属性。重要的是要注意,即使队列已经存在,durable
标志也不会影响现有队列的持久性。如果队列在原始声明时是持久的,那么它将继续是持久的,即使后续的声明将其标记为非持久的。
因此,如果你尝试声明一个已经存在的队列,RabbitMQ不会报错或采取任何特别的行动,除了验证提供的exclusive
和autoDelete
标志是否与原始声明一致(如果不一致,操作会失败)。其他参数(如durable
)将不会影响已存在的队列(但是不报错并不是绝对的, 这个需要根据版本说明去判断, 不能肯定它不报错)。
最后,需要注意的是,虽然声明队列本身不会抛出IOException
,但如果在与RabbitMQ服务器通信时发生网络问题或其他I/O问题,这个方法可能会抛出IOException
。因此,在实际使用中,你应该妥善处理这些潜在的异常。
工作队列, 主要是避免立即执行资源密集型任务, 而不得不等待它完成, 相反我们安装任务之后执行, 我们把任务封装为消息并将其发送给队列, 在后台运行的工作进程将弹出任务并最终执行作业, 当有多个线程工作时, 这些工作线程讲义气处理这些任务.
RabbitMQ的工作队列(Work Queue)是一种消息队列模式,它允许你将任务(通常表示为消息)分发给多个消费者(工作进程)进行并行处理。这种模式特别适用于那些可以并行处理且不需要按照特定顺序完成的任务。
在工作队列模式中,生产者发送消息到队列中,一个或多个消费者从队列中接收并处理这些消息。每个消息都会被一个消费者处理,并且通常不会被多个消费者处理(除非有明确的路由或复制逻辑)。这种模式非常适合用于处理后台任务,如批量电子邮件发送、日志处理、图像处理等。
RabbitMQ的工作队列模式有以下几个关键特点:
任务分发:生产者将任务作为消息发送到队列中。RabbitMQ负责将消息从队列中取出并分发给一个或多个消费者。分发通常基于消息的先进先出(FIFO)顺序,但也可以通过其他策略(如优先级队列)进行定制。
并行处理:多个消费者可以同时从队列中接收消息并处理任务。这使得任务可以并行执行,从而提高了整体的处理速度。
消息确认:为了确保消息的可靠处理,消费者通常在处理完消息后会向RabbitMQ发送一个确认信号(ack)。这样,即使消费者在处理消息时崩溃,RabbitMQ也可以将未确认的消息重新放回队列中,等待其他消费者处理。这种机制保证了消息的可靠性。
持久化:通过配置队列和消息的持久化属性,可以确保即使在RabbitMQ服务器重启后,消息也不会丢失。这对于处理重要任务至关重要。
扩展性:工作队列模式具有很好的扩展性。你可以根据需要添加更多的消费者来处理更多的任务,从而轻松应对负载的增加。
使用RabbitMQ的工作队列模式,你可以构建高效、可靠且可扩展的后台任务处理系统,以满足各种应用场景的需求。
下面我们来一一列举出案例来解析工作队列的特性....
RabbitMQ 在默认情况下,其分发机制是公平的,它试图将消息平均地分发给各个消费者,确保每个消费者都有机会处理大致相同数量的消息。这种分发并不是随机的,而是按照一定的顺序或规则进行。
但是这种分发模式会有一个很大的问题, 那么就是如果一个消费者处理消息的速度慢, 一个快, 那么就会有一个消费者产生饥饿的情况, 而另外一个消费者非常忙碌, 严重的队列会出现消息积压的情况. 此时产生饥饿的消费者没有完全利用cpu来消费消息, 所以就产生了资源的浪费, 为了避免这个情况
在 RabbitMQ 中,如果消息被平均分发到多个消费者(如消费者a和b),但消费者的处理速度不同(如a处理速度很快,b处理速度很慢),那么未被消费的消息会继续保留在队列中,等待消费者处理。具体来说,当消费者a迅速处理完自己的消息后,它会继续从队列中获取并处理新的消息(如果有的话)。而消费者b由于处理速度慢,它还未消费完的消息会留在队列中,等待其逐渐处理。
RabbitMQ 本身并没有为每个消费者设置单独的缓存来存储未处理的消息。消息的处理和存储都是在队列层面进行的。队列是消息的缓冲区,它负责存储和分发消息给消费者。消费者按照自己的速度从队列中拉取(或在某些配置下由队列推送)消息进行处理。
为了解决这种情况, 可以使用basicQos(1)方法来设置每个消费者同时只能消费一个消息, 这个设置将会告诉队列, 给我发送的消息, 同时不能超过一个 , 或者说是"别给我发送消息, 除非我上一个消息已经处理并应答", 同时, 他会将第二个消息发送给另外一个空闲的消费者来处理.
- int prefetchCount = 1;
- channel.basicQos(prefetchCount);
但是如果所有的消费者都处于忙碌状态, 消息无法即使处理, 那么如果你还有必要维护这个队列, 那么推荐您多创建几个消费者去消费.
首先创建两个消费者, 创建一个生产者, 看看他们之间的任务是如何分配的:
消费者1 :
- package MutiThreadWorkQueue;
-
- import Util.RabbitMQUtil;
- import com.rabbitmq.client.CancelCallback;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.DeliverCallback;
- import com.rabbitmq.client.Delivery;
-
- import java.io.IOException;
-
-
- /**
- * 工作队列1
- * 也就是消费者1
- */
- public class Worker1 {
- // 接收消息
- public static void main(String[] args) throws IOException {
- Channel channel = RabbitMQUtil.getChannel();
-
- // sout
- System.out.println("worker1 : ");
- // 接收消息
- channel.basicConsume(RabbitMQUtil.QUEUE_NAME, true, new DeliverCallback() {
- @Override
- public void handle(String s, Delivery delivery) throws IOException {
- System.out.println("worker1: " + new String(delivery.getBody()));
- }
- }, new CancelCallback() {
- @Override
- public void handle(String s) throws IOException {
- System.out.println("worker1 发生错误");
- }
- });
- }
- }
消费者2同消费者1一样, 只不过里面的一些向控制台输出的提示信息发生了一些修改, 例如:
System.out.println("worker2 : ");
生产者
- package MutiThreadWorkQueue;
-
- import Util.RabbitMQUtil;
- import com.rabbitmq.client.Channel;
-
- import java.io.IOException;
- import java.util.Scanner;
-
- public class Producer {
- public static void main(String[] args) throws IOException {
- Channel channel = RabbitMQUtil.getChannel();
-
- // 从控制台输入 接收信息
- Scanner scanner = new Scanner(System.in);
- while(scanner.hasNext()) {
- String msg = scanner.next();
- channel.basicPublish("",RabbitMQUtil.QUEUE_NAME,null,msg.getBytes());
- System.out.println("发送消息: " + msg + ",发送完毕");
- }
- }
- }
RabbitMQUtil :
- package Util;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- public class RabbitMQUtil {
- public static final String QUEUE_NAME = "hello";
-
-
-
- public static Channel getChannel() {
- ConnectionFactory factory = new ConnectionFactory();
-
- // 工厂的ip, 链接rabbit队列
- factory.setHost("106.14.165.91");
- factory.setUsername("admin");
- factory.setPassword("123");
-
- // 建立连接
- Connection connection = null;
- try {
- connection = factory.newConnection();
- return connection.createChannel();
- } catch (IOException e) {
- throw new RuntimeException(e);
- } catch (TimeoutException e) {
- throw new RuntimeException(e);
- }
- // 获取信道
- }
-
-
-
- }
查看rabbitmq是否已经存在RabbitMQUtil.QUEUE_NAME这个字符串对应的队列:
已经存在, 直接启动生产者和消费者, 然后在生产者中多次输入信息:
查看消费者1和消费者2 :
如果你多次重复的去实验你就会发现, 总是奇数的在woker1或者是woker2.
为什么?
我们首先看看消费者的消费的代码:
- channel.basicConsume(RabbitMQUtil.QUEUE_NAME, true, new DeliverCallback() {
- @Override
- public void handle(String s, Delivery delivery) throws IOException {
- System.out.println("worker1: " + new String(delivery.getBody()));
- }
- }, new CancelCallback() {
- @Override
- public void handle(String s) throws IOException {
- System.out.println("worker1 发生错误");
- }
- });
然后根据上面的参数解析, 可以发现, 其实他是使用的这个构造方法:
这个构造方法有什么特别之处? 那就是它没有指定exclusive, 也就是没有指定它是否是排他的, 但是不设置就不代表没有隐式设置.
官方文档给出的这个构造方法的描述是:
也就是non-sxclusive, 我们知道exclusive是排他的意思, 那么non-exclusive就是不拍他的, 也就是说, 消费的时候允许其他消费者一起共享处理. 但是每一个任务只能分发给一个消费者.
首先你得确认一个东西, 那就是, 消息从生产者这里发送出去, 就可以不管了吗, 队列将消息分配给消费者, 就可以不管了吗, 当然不是, 还需要使用一种应答机制, 你可以将它和TCP协议的应答报文机制和超时重传进行一个对比.
RabbitMQ 的消息应答机制是一个确保消息在发送和接收过程中可靠性的重要手段。这种机制主要用于处理消费者在处理消息时可能出现的异常情况,如消费者在处理消息过程中宕机,导致消息丢失。
RabbitMQ 一旦向消费者传递了一条消息,通常会将该消息标记为已发送。然而,如果消费者在处理消息的过程中发生宕机,未处理的消息可能会丢失。为了保证消息在发送过程中不丢失,RabbitMQ 引入了消息应答机制。
消息应答机制的工作原理是:消费者在接收到消息并且处理完该消息之后,会向 RabbitMQ 发送一个确认信号,告诉 RabbitMQ 它已经处理了该消息,此时 RabbitMQ 可以安全地将该消息从队列中删除。
RabbitMQ 提供了两种消息应答模式:
- DeliverCallback deliverCallback = (consumerTag, delivery) -> {
- String message = new String(delivery.getBody(), "UTF-8");
-
- System.out.println(" [x] Received '" + message + "'");
- try {
- doWork(message);
- } finally {
- System.out.println(" [x] Done");
- }
- };
- boolean autoAck = true; // acknowledgment is covered below
- channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
在实际使用中,可以根据应用的需求和消息的重要性来选择适合的消息应答模式。对于要求消息可靠传递的场景,建议使用手动应答模式;而对于对消息丢失容忍度较高的场景,可以选择自动应答模式以提高处理效率。
对于自动应答, 你想想,你有没有在什么地方见过?? 那肯定见过呀, 就在我们的api解析中, basicConsume的构造方法中, 存在一个参数, 名为AutoAck, 也就是Auto acknowledgement, 自动确认. 此时, 如果你设置为true, 那么就表明他是自动确认, 当队列将消息发送给消费者, 只要消费者接收到消息之后, RabbitMQ会立即将消息标记为已消费, 然后删除.
但是这样不安全, 得换一个更安全的方法 : 手动应答.
开启消息应答(手动) 你首先需要设置消费者的消费autoack为false :
官方有这样一句话描述basicAck:
简而言之就是, 如果你没有进行手动应答, 虽然是一个很容易犯的错误, 但是他会造成严重的后果, 也就是当你的客户端退出的时候消息会被重新推送(就像一些消息被无规则的推送), 但是RabbitMQ将会占用越来越多的内存, 这是因为这些消息没有得到正确的处理.
接下来我们看看basicAck这个方法的声明:
- // Acknowledge one or several received messages.
- basicAck(long deliveryTag, boolean multiple)
参数解析:
long
)参数,代表要确认的消息的投递标签(delivery tag)。投递标签是 RabbitMQ 在发送消息给消费者时附带的,用于唯一标识这个消息。通过确认特定的投递标签,消费者可以告诉 RabbitMQ 它已经处理了哪个消息。boolean
)参数,指示是否确认一个投递标签范围内的多个消息。如果 multiple
设置为 true
,则 RabbitMQ 会将投递标签小于或等于指定 deliveryTag
的所有未确认消息标记为已确认。如果 multiple
设置为 false
,则仅确认具有指定 deliveryTag
的单个消息。那么, 这个deliveryTag和multiple从哪里来? 还记得处理接口DeliverCallback 吗, 每次消息队列向这个消费者发送消息, 消费者就会调用这个接口.
DeliverCallback
在 RabbitMQ 的 Java 客户端中是一个回调接口,用于处理从 RabbitMQ 队列接收到的消息。当 RabbitMQ 服务器向消费者发送消息时,它会调用这个回调接口,并将消息作为参数传递给 DeliverCallback
的实现方法。
具体来说,DeliverCallback
的实现方法接收两个参数:
consumerTag
:这是一个唯一标识消费者的标签,用于在多个消费者之间区分不同的消费实例, 是队列发送给消费者的时候自动为消费者分配的。delivery
:这是一个 Delivery
对象,它包含了从 RabbitMQ 接收到的消息的内容以及其他相关信息,如消息的包体(body)、消息的头部(headers)、消息的投递标签(delivery tag)等。 DeliverCallback
不是一个缓存。它仅仅是一个回调函数,用于实时处理从 RabbitMQ 服务器接收到的消息。每当有新消息到达时,RabbitMQ 就会调用这个回调函数,并将消息传递给它。因此,你的消费者代码需要在 DeliverCallback
的实现中编写处理消息的逻辑。
例如,在上面的代码示例中,当接收到消息时,DeliverCallback
的实现会打印出消息内容,模拟一些处理过程(在这个例子中是等待两秒),然后发送一个确认信号给 RabbitMQ,告诉它消息已经被成功处理。
需要注意的是,DeliverCallback
的实现应该尽可能快地处理消息并发送确认信号,以避免消息在队列中堆积。如果处理消息的过程非常耗时,或者有可能失败,你可能需要考虑使用更复杂的错误处理机制,比如重试逻辑、死信队列等。
- channel.basicConsume(TASK_QUEUEN_NAME, false, new DeliverCallback() {
- @Override
- public void handle(String s, Delivery delivery) throws IOException {
- String message = new String(delivery.getBody(), "UTF-8");
-
- System.out.println(" [x] Received '" + message + "'");
- try {
- doWork(message);
- } finally {
- System.out.println(" [x] Done");
- channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
- }
- }
- }, new CancelCallback() {
- @Override
- public void handle(String s) throws IOException {
- System.out.println("c1 消费者接收到 取消接口消费回调逻辑");
- }
- });
至于multiple, 批量应答以减少网络拥堵:
处理消息:
如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或TCP连接丢失),导致消息未发送ACK确认,RabbitMQ将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息
为了保证消息被安全的发送给broker, 也就是RabbitMQ队列, 你应该需要使用到一些策略, 来让发布确认生效.
开启发布确认模式
- Channel channel = connection.createChannel();
- channel.confirmSelect();
发布确认模式是, AMQP 0.9.1 协议对于RabbitMQ的扩展, 所以发布确认模式不是默认启动的, 发布确认需要再channel 频道开启, 使用上述的confirmSelect()方法来开启发布确认.
开启发布确认之后, producer每次发送消息之后, 都会遵循相应的确认策略, 可以单个确认, 也可以批量确认, 下面是发布确认的一些常用确认方法:
这个方法会阻塞当前线程,直到自上次调用此方法以来发布的所有消息都被Broker确认(ack)或拒绝(nack)。如果没有设置超时时间,它可能会无限期地等待,直到所有消息都被处理。
返回类型是boolean
,但在大多数情况下,此方法可能会因为阻塞而不返回任何值。实际上,其返回值的意义可能取决于具体的RabbitMQ客户端库实现,但通常这种同步等待方法不会使用其返回值来进行流控制或错误处理。
2. waitForConfirms(long timeout)
与上一个方法类似,这个方法也会阻塞当前线程,等待Broker对消息的确认或拒绝。但是,它接受一个超时参数timeout
,表示等待的最大时间(以毫秒为单位)。如果在指定的超时时间内Broker没有对所有消息进行确认或拒绝,那么该方法将停止等待并返回。
返回类型是boolean
,但同样,返回值的意义可能取决于具体的RabbitMQ客户端库实现。通常,如果所有消息都在超时前得到了确认,则返回true
;如果超时了,则返回false
。
3. waitForConfirmsOrDie()
这个方法的行为与waitForConfirms()
类似,也会阻塞当前线程,等待Broker对所有消息进行确认或拒绝。但是,如果Broker没有对所有消息进行确认或拒绝,那么这个方法不会返回,而是会抛出异常(通常是运行时异常),导致当前线程终止。
由于这个方法可能导致线程终止,因此它通常用于那些对消息确认有严格要求的场景,并且愿意在消息未得到确认时让整个程序失败。
4. waitForConfirmsOrDie(long timeout)
这个方法结合了waitForConfirms(long timeout)
和waitForConfirmsOrDie()
的特点。它会在指定的超时时间内等待Broker对所有消息进行确认或拒绝。如果超时时间到了,而Broker还没有对所有消息进行确认或拒绝,那么这个方法会抛出异常,导致当前线程终止。
这种方法在需要确保消息被处理但又不想无限期等待的情况下非常有用。它允许设置一个合理的超时时间,以便在消息处理失败时能够及时地采取其他措施。
单独的发送消息
- while (thereAreMessagesToPublish()) {
- byte[] body = ...;
- BasicProperties properties = ...;
- channel.basicPublish(exchange, queue, properties, body);
- // uses a 5 second timeout
- channel.waitForConfirmsOrDie(5_000);
- }
上面的例子中我们发送了一个消息, 然后等待他的确认(waitForConfirmsOrDie(5_000)), 这个方法将会再消息得到队列的确认之后返回, 如果消息没有在指定time内确认, 或者是由于某些原因队列无法返回确认消息(比如网络原因) , 那么该方法就会抛出异常, 这种异常的处理一般是记录日志, 或者重新将消息发送.
不同的客户端库拥有不同的方法区同步处理发布者确认模式, 所以确保仔细阅读你所使用的客户端的文件.
缺点:
这种方法虽然是很简便的, 但是也有一些主要的缺点, 它大大降低了发布者发布的效率, 因为一个消息的确认, 阻止了发布随后将要发布的所有消息.这种方法, 将不能提供每秒发送几百条消息的吞吐量, 但是这种方法对于某一些应用来说, 还是很不错的, 足够支持一个应用了.
批量发布
- int batchSize = 100;
- int outstandingMessageCount = 0;
- while (thereAreMessagesToPublish()) {
- byte[] body = ...;
- BasicProperties properties = ...;
- channel.basicPublish(exchange, queue, properties, body);
- outstandingMessageCount++;
- if (outstandingMessageCount == batchSize) {
- channel.waitForConfirmsOrDie(5_000);
- outstandingMessageCount = 0;
- }
- }
- if (outstandingMessageCount > 0) {
- channel.waitForConfirmsOrDie(5_000);
- }
对比上面的一次性发布确认, 可以看到这个java代码有很大的不同, 首先, 他不并不是每一次循环都进行一个等待确认, 而是当 outstandingMessageCount == batchSize 这个条件成立再进行确认.
等待批量发送的消息被确认, 这个提高了吞吐量(对比于单独确认), 差不多时单独确认的20 ~ 30倍的效率提升, 但是他的一个缺点就是, 我们不能明确知道在失败的情况中, 是什么原因造成这种失败. 所以我们需要让整个批量发送维护在内存中来记录一些有用的东西, 或者重新发送该消息, 并且这种方法依然是同步的, 也就是在等待确认的时候, 会阻塞当前线程, 也就会阻止当前线程继续publish消息.
异步确认
- Channel channel = connection.createChannel();
- channel.confirmSelect();
- channel.addConfirmListener((sequenceNumber, multiple) -> {
- // code when message is confirmed
- }, (sequenceNumber, multiple) -> {
- // code when message is nack-ed
- });
broker异步的确认发送过来的消息, 仅仅只需要在客户端上注册一个回调函数, 来监视这些确认信息.
这里有两个回调, 一个是已经确认的消息, 一个是被拒绝的消息(你可以理解为被RabbitMQ丢弃的消息), 每一次回调都有两个参数:
每个消息在发布之前, 你可以通过下面的方法来获取到序列号:
- int sequenceNumber = channel.getNextPublishSeqNo());
- ch.basicPublish(exchange, queue, properties, body);
你可以使用这个序列号来找到对应的被拒绝或者是被确认的消息, 然后做出相关的处理操作. 但是在此之前, 你应该首先维护一个 key -value 的map, 以便记录sequenceNumber和对应消息的关联.
下面是一些代码案例:
- ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
- // ... code for confirm callbacks will come later
- String body = "...";
- outstandingConfirms.put(channel.getNextPublishSeqNo(), body);
- channel.basicPublish(exchange, queue, properties, body.getBytes());
如何使用这个ConcurrentSkipListMap?
- ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
- ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {
- if (multiple) {
- ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(
- sequenceNumber, true
- );
- confirmed.clear();
- } else {
- outstandingConfirms.remove(sequenceNumber);
- }
- };
-
- channel.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> {
- String body = outstandingConfirms.get(sequenceNumber);
- System.err.format(
- "Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",
- body, sequenceNumber, multiple
- );
- cleanOutstandingConfirms.handle(sequenceNumber, multiple);
- });
- // ... publishing code
源码:
- public class Delivery {
- private final Envelope _envelope;
- private final AMQP.BasicProperties _properties;
- private final byte[] _body;
-
- public Delivery(Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
- this._envelope = envelope;
- this._properties = properties;
- this._body = body;
- }
-
- public Envelope getEnvelope() {
- return this._envelope;
- }
-
- public AMQP.BasicProperties getProperties() {
- return this._properties;
- }
-
- public byte[] getBody() {
- return this._body;
- }
- }
解析:
这个Delivery
类是在RabbitMQ的Java客户端中使用的,用于封装从RabbitMQ服务器接收到的消息。下面我将详细解释类中的参数和方法的作用:
参数:
Envelope envelope
:
deliveryTag
(投递标签)、exchange
(交换机)名称、routingKey
(路由键)等。deliveryTag
是一个唯一的标识符,用于确认(ack)或拒绝(nack)特定的消息。AMQP.BasicProperties properties
:
contentType
表示消息的内容类型(例如,text/plain
或application/json
),headers
可以包含自定义的键值对,用于传递额外的信息。byte[] body
:
properties
中的contentType
字段来确定如何解析这个字节数组。例如,如果contentType
是text/plain
,你可能需要将其转换为字符串;如果是application/json
,你可能需要将其解析为JSON对象。方法:
public Envelope getEnvelope()
:
Envelope
对象)。通过这个方法,你可以获取到消息的deliveryTag
,进而在处理完消息后进行确认或拒绝操作。public AMQP.BasicProperties getProperties()
:
AMQP.BasicProperties
对象)。你可以使用这个方法获取到消息的contentType
、headers
等字段,以便正确地解析和处理消息内容。public byte[] getBody()
:
getProperties()
返回的属性来确定如何解析这个字节数组。使用场景:
当你在RabbitMQ的Java客户端中消费消息时,RabbitMQ服务器会将消息封装为一个Delivery
对象,并通过DeliverCallback
回调给你。你可以在回调中处理这个消息,例如解析消息内容、执行业务逻辑,并在处理完后通过channel.basicAck
方法发送确认。
总之,Delivery
类及其参数和方法在RabbitMQ的Java客户端中起到了封装和传递消息的作用,使得开发者能够方便地获取和处理从RabbitMQ服务器接收到的消息。
Envelope元数据
- //
- // Source code recreated from a .class file by IntelliJ IDEA
- // (powered by FernFlower decompiler)
- //
-
- package com.rabbitmq.client;
-
- public class Envelope {
- private final long _deliveryTag;
- private final boolean _redeliver;
- private final String _exchange;
- private final String _routingKey;
-
- public Envelope(long deliveryTag, boolean redeliver, String exchange, String routingKey) {
- this._deliveryTag = deliveryTag;
- this._redeliver = redeliver;
- this._exchange = exchange;
- this._routingKey = routingKey;
- }
-
- public long getDeliveryTag() {
- return this._deliveryTag;
- }
-
- public boolean isRedeliver() {
- return this._redeliver;
- }
-
- public String getExchange() {
- return this._exchange;
- }
-
- public String getRoutingKey() {
- return this._routingKey;
- }
-
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("Envelope(deliveryTag=").append(this._deliveryTag);
- sb.append(", redeliver=").append(this._redeliver);
- sb.append(", exchange=").append(this._exchange);
- sb.append(", routingKey=").append(this._routingKey);
- sb.append(")");
- return sb.toString();
- }
- }
Envelope
类是 RabbitMQ Java 客户端库中的一个类,它用于封装从 RabbitMQ 服务器接收到的消息的元数据。这个类包含了关于消息的一些重要信息,比如投递标签(deliveryTag
)、是否重新投递(redeliver
)、交换机名称(exchange
)和路由键(routingKey
)。
下面是 Envelope
类中每个字段和方法的详细解释:
字段:
_deliveryTag
:
long
_redeliver
:
boolean
true
。_exchange
:
String
_routingKey
:
String
方法:
getDeliveryTag()
:
long
isRedeliver()
:
boolean
getExchange()
:
String
getRoutingKey()
:
String
toString()
:
String
Object
类中的 toString
方法,用于返回 Envelope
对象的字符串表示形式,方便调试和日志记录。使用场景:
当消费者从 RabbitMQ 接收消息时,每条消息都会附带一个 Envelope
对象。消费者可以使用 Envelope
对象中的方法来获取消息的元数据,并根据这些信息来决定如何处理消息。例如,消费者可以使用 getDeliveryTag
方法获取投递标签,以便在处理完消息后发送确认。
我们上面已经了解如何保证任务不会丢失, 即使消费者连接丢失. 但是我们的任务依然会有丢失的风险, 例如RabbitMQ服务器崩掉.
当RabbitMQ服务器退出或者崩溃的时候, 他将会清除队列和消息, 除非你指定它不清除. 我们需要做两件事情, 来保证即使是服务器崩溃也不会丢失数据.
首先我们需要确保队列会在RabbitMQ节点重启之后存活, 要想做到这样, 就需要声明这个队列为持久化模式
- boolean durable = true;
- channel.queueDeclare("hello", durable, false, false, null);
但是前面我们讲到, 我们应该避免对一个已经存在的队列重新定义, 因为他不会生效, RabbitMQ是不允许使用不同的参数(durable, autoDelete,exclusive等)重新定义一个已经存在的queue的. 即使这个语句本身是正确的. 如果你这样做将会返回一个错误信息.
你可以声明一个不同名称的queue:
- boolean durable = true;
- channel.queueDeclare("task_queue", durable, false, false, null);
设置了以上的信息之后, 就可以保证此时这个队列将不会在RabbitMQ重启的时候丢失了, 但是这并不意味着RabbitMQ重启之后, 消息不会丢失, 因为你仅仅只是持久化了queue, 而不是消息, 现在我们需要将我们的消息同时也标记为持久化模式.
如何将消息体设置为durable? 我们思考一下, 首先消息是从producer那边publish过来的, 那么我们可不可以从basicPublish这个方法中找线索?? 还真被你找到了, 如下:
basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)
我们在推送消息的时候, 可以设置一个属性AMQP.BasicProperties props, 这个属性定义如下:
- public static class BasicProperties extends AMQBasicProperties {
- private String contentType;
- private String contentEncoding;
- private Map<String, Object> headers;
- private Integer deliveryMode;
- private Integer priority;
- private String correlationId;
- private String replyTo;
- private String expiration;
- private String messageId;
- private Date timestamp;
- private String type;
- private String userId;
- private String appId;
- private String clusterId;
-
- // 方法体 ... 省略
-
- }
这个BasicProperties
类继承自AMQBasicProperties
,它扩展了AMQP协议中消息属性的基础定义。AMQP(高级消息队列协议)是一个开放、可靠、面向消息的中间件协议,它支持多种消息传递模式,包括发布/订阅、点对点、请求/响应等。下面是对该类中一些属性和方法的基本解释:
属性:
text/plain
或application/json
。这有助于接收方知道如何解析消息内容。UTF-8
。方法:
通常,该类还会包含一些用于获取和设置这些属性的getter和setter方法,以及可能的其他方法用于序列化、反序列化或比较属性等。具体的方法实现取决于这个类的完整源代码。
使用场景:
这些属性通常用于确保消息的正确路由、处理和持久化。例如,发送方可能会设置replyTo
和correlationId
以接收RPC回复;或者设置priority
来确保某些关键消息优先被处理。接收方则会使用这些属性来正确地处理或路由消息。
其中有一个deliveryMode, 这个表示消息的持久化 .
所以我们第一个想到的就是通过构建一个BasicProperties对象,然后设置里面的属性,然后传入给basicPublish, 如下:
- import com.rabbitmq.client.AMQP.BasicProperties;
- import com.rabbitmq.client.Channel;
- import java.nio.charset.StandardCharsets;
- import java.util.HashMap;
- import java.util.Map;
-
- // ... 其他代码 ...
-
- // 创建消息属性
- Map<String, Object> headers = new HashMap<>();
- headers.put("custom-header", "some-value");
-
- BasicProperties properties = new BasicProperties.Builder()
- .contentType("text/plain")
- .contentEncoding("UTF-8")
- .headers(headers)
- .deliveryMode(2) // 设置为持久化消息
- .priority(1)
- .correlationId("my-correlation-id")
- .replyTo("my-reply-queue")
- .expiration("60000") // 消息将在60秒后过期
- .messageId("my-message-id")
- .timestamp(new java.util.Date())
- .type("my-message-type")
- .userId("my-user-id")
- .appId("my-app-id")
- .clusterId("my-cluster-id")
- .build();
-
- // 获取RabbitMQ的Channel
- Channel channel = connection.createChannel();
-
- // 发布消息到指定的交换机和路由键,并带上属性
- String exchange = "my-exchange";
- String routingKey = "my.routing.key";
- String messageBody = "Hello, RabbitMQ!";
- channel.basicPublish(exchange, routingKey, properties, messageBody.getBytes(StandardCharsets.UTF_8));
-
- // ... 其他代码 ...
除此之外, 官方还提供了第二种方法, 你可以不用build一个BasicProperties,而是直接使用封装好的AMQP.BasicProperties实例对象MessageProperties来直接传入:
- import com.rabbitmq.client.MessageProperties;
-
- channel.basicPublish("", "task_queue",
- MessageProperties.PERSISTENT_TEXT_PLAIN,
- message.getBytes());
下面是MessageProperties的原码:
- //
- // Source code recreated from a .class file by IntelliJ IDEA
- // (powered by FernFlower decompiler)
- //
-
- package com.rabbitmq.client;
-
- import java.util.Date;
- import java.util.Map;
-
- public class MessageProperties {
- public static final AMQP.BasicProperties MINIMAL_BASIC = new AMQP.BasicProperties((String)null, (String)null, (Map)null, (Integer)null, (Integer)null, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);
- public static final AMQP.BasicProperties MINIMAL_PERSISTENT_BASIC = new AMQP.BasicProperties((String)null, (String)null, (Map)null, 2, (Integer)null, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);
- public static final AMQP.BasicProperties BASIC = new AMQP.BasicProperties("application/octet-stream", (String)null, (Map)null, 1, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);
- public static final AMQP.BasicProperties PERSISTENT_BASIC = new AMQP.BasicProperties("application/octet-stream", (String)null, (Map)null, 2, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);
- public static final AMQP.BasicProperties TEXT_PLAIN = new AMQP.BasicProperties("text/plain", (String)null, (Map)null, 1, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);
- public static final AMQP.BasicProperties PERSISTENT_TEXT_PLAIN = new AMQP.BasicProperties("text/plain", (String)null, (Map)null, 2, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);
-
- public MessageProperties() {
- }
- }
此处的消息持久化为最后一个PERSISTENT_TEXT_PLAIN, 使用的构造方法为:
- public BasicProperties(String contentType, String contentEncoding, Map<String, Object> headers, Integer deliveryMode, Integer priority, String correlationId, String replyTo, String expiration, String messageId, Date timestamp, String type, String userId, String appId, String clusterId) {
- this.contentType = contentType;
- this.contentEncoding = contentEncoding;
- this.headers = headers == null ? null : Collections.unmodifiableMap(new HashMap(headers));
- this.deliveryMode = deliveryMode;
- this.priority = priority;
- this.correlationId = correlationId;
- this.replyTo = replyTo;
- this.expiration = expiration;
- this.messageId = messageId;
- this.timestamp = timestamp;
- this.type = type;
- this.userId = userId;
- this.appId = appId;
- this.clusterId = clusterId;
- }
关于消息持久化的说明:
将一个消息设置为持久化, 并不能完全保证消息不会丢失. 尽管它告诉RabbitMQ将message保存在硬硬盘, 当RabbitMQ已经接收一个消息, 但是还没有被保存的时候, 仍然会有一段很短的时间窗口, 这段时间窗口如果发生事故, 也可能导致消息丢失.
如果你想保证message的durable的正确性和有效性, 你可以参考Publish and confirm模式 : Consumer Acknowledgements and Publisher Confirms | RabbitMQ
这个部分我们将做一些完全不一样的事情 -- 我们将会把一个消息发送给多个消费者, 这个模式就被称为发布订阅模式.
为了用图解寿命这个模式, 我们将会建立一个简单的日志系统, 他将会包含两个项目, 第一个会发送日志消息, 第二个会接受然后打印这些消息.
在这个日志系统中, 每一个接受者的副本都会收到消息, 因此我们可以启动一个接受者, 也可以称为消费者, and将这些log消息导向硬盘, 与此同时, 我们将会跑起另外一个消费者并且看到这些日志打印到屏幕上.
其实一个消息并不是直接传递给队列的, 而是指定交换机, 然后由交换机传递给对应的队列.
我们之前所构造的例子中, 包含这三个部分:
RabbitMQ的核心消息模式, 是生产者永远都不会直接给队列发送任何消息, 事实上大多数情况下, 生产者会并不知道它生产的消息将会被发送到哪个队列.
相反, 生产者仅仅只能发送消息给交换机, 一个交换机是一个很简单的实现, 一方面它接受来自生产者的消息, 另外一方面,它将这些消息转发给队列. 交换机必定确切的知道它收到消息之后, 这个消息将会被发送到哪个队列. 比如说它是否会被添加到一个指定的队列, 或者是是其他的队列. 亦或是将其丢弃. 不管是哪种, 这些规则都是由交换机的类型决定
首先创建一个交换机:
然后给这个交换机绑定一个队列, 如下:
可以看到这个test交换机绑定了一个test队列, 绑定之后指定routingKey, 后期producer发送消息的时候可以通过exchangeName来指定交换机, 然后通过routingKey来指定要传入哪个队列.
那我可以将两个交换机绑定的队列, 并且将其指定的routingKey的值设置为一样的吗?
一个交换机确实可以绑定两个队列,并且这两个绑定队列的routingKey可以设置为一样。但是,这样做的话,当消息使用这个特定的routingKey发送到交换机时,交换机会将消息路由到这两个队列中,实现消息的广播效果。
在实际应用中,是否使用相同的routingKey取决于你的业务需求。如果你希望消息被发送到多个队列进行处理,那么可以设置相同的routingKey。但如果你希望根据不同的routingKey将消息路由到不同的队列,以实现更细粒度的控制,那么就应该为每个队列设置不同的routingKey。
此外,需要注意的是,routingKey的匹配规则还受到交换机类型的影响。例如,在Direct Exchange中,routingKey必须与队列的绑定键完全匹配;而在Topic Exchange中,routingKey可以与绑定键进行模式匹配。因此,在设置routingKey时,还需要考虑你使用的交换机类型。
交换机的类型:
在这个设置中,我们可以看到direct exchange X绑定了两个队列。第一个队列用绑定密钥橙色绑定,第二个队列有两个绑定,一个绑定密钥黑色,另一个绑定密钥绿色。
在这样的设置中,发布到交换机的带有路由关键字橙色的消息将被路由到队列Q1。路由关键字为黑色或绿色的消息将发送到Q2。所有其他消息都将被丢弃。
多次绑定:
使用同一个RoutingKey绑定多个队列是完全合法的, 例如上图, 我们可以给c1和c2这两个队列绑定同一个direct类型的交换机, 并且使用同一个RoutingKey : black. 上图这个案例中, 这个direct交换机的作用就类似于一个fanout交换机.
.
和*
),使得RoutingKey可以与多个BindingKey匹配。这样,一个消息可以被路由到多个队列中。
创建交换机, 可以通过RabbitMQ提供的web插件来生成 :
可以通过java client来生成:
channel.exchangeDeclare("logs", "fanout");
其声明如下:
现在我们就可以通过这个交换机来推送消息:
channel.basicPublish( "logs", "", null, message.getBytes());
但是有人可能会想起来, 这和我们之前写的不一样, 我们之前没有指定这个交换机name啊, 或者是指定了一个空字符串, 如下:
channel.basicPublish("", "hello", null, message.getBytes());
为什么它还是能够指定到hello这个队列??
那是因为:
The default exchange is implicitly bound to every queue, with a routing key equal to the queue name. It is not possible to explicitly bind to, or unbind from the default exchange. It also cannot be deleted.
你指定的空串代表它的交换机为默认交换机, 默认交换机是队列在创建的时候, 已经和队列进行绑定了, 这样保证每个队列能有一个初始化的默认的交换机. 如果你指定的是默认交换机, 那么这个routingKey就为你指定的队列名字了.
并且你不能显示的去让队列绑定默认交换机, 也不能让队列和默认交换机解绑, 当然, 默认交换机也不能被删除.
案例
实现一个fanout交换机, 实现一个生产者, 两个队列, 两个队列bind到这个fanout交换机, 创建两个消费者, 分别接受两个队列的消息.
实现一个生产者, 可以不断地输入数据 :
- package fanoutExchangeTest;
-
- import Util.RabbitMQUtil;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.ConfirmCallback;
-
- import java.io.IOException;
- import java.util.Scanner;
- import java.util.concurrent.ConcurrentNavigableMap;
- import java.util.concurrent.ConcurrentSkipListMap;
- import java.util.concurrent.TimeoutException;
-
- public class Producer {
- public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
- Channel channel = RabbitMQUtil.getChannel();
-
- // declaring an exchange named logs and its type is fanout
- channel.exchangeDeclare("logs","fanout");
-
- // bind queue
- channel.queueBind("queue1","logs", "logsToQueue1");
- channel.queueBind("queue2","logs", "logsToQueue2");
-
- // declaring two queues the one named queue1 and the other one named queue2
- // channel.queueDeclare("queue1",true,true,false,null);
- // channel.queueDeclare("queue2",true,true,false,null);
- // manage message
- ConcurrentNavigableMap<Long, String> map = new ConcurrentSkipListMap<>();
- // publish and confirm
- channel.confirmSelect();
-
- // callback : success
- ConfirmCallback success = (sequenceNumber,multiple) -> {
- if (multiple) {
- ConcurrentNavigableMap<Long, String> longStringConcurrentNavigableMap = map.headMap(sequenceNumber,true);
- longStringConcurrentNavigableMap.clear();
- } else {
- map.remove(sequenceNumber);
- }
- };
- // callback : fail
- ConfirmCallback fail = (sequenceNumber,multiple) -> {
- String body = map.get(sequenceNumber);
- System.err.format(
- "Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",
- body, sequenceNumber, multiple
- );
- success.handle(sequenceNumber,multiple);
- };
-
- // add non - sycn listener
- channel.addConfirmListener(success,fail);
-
- // publis code
- Scanner scanner = new Scanner(System.in);
- while (scanner.hasNext()) {
- String msg = scanner.next();
- channel.basicPublish("logs", "testFanout",null, msg.getBytes());
- channel.waitForConfirmsOrDie(3000L);
- }
-
- }
- }
创建两个消费者:
- package fanoutExchangeTest;
-
- import Util.RabbitMQUtil;
- import com.rabbitmq.client.Channel;
-
- import java.io.IOException;
-
- public class Consumer1 {
- public static void main(String[] args) throws IOException {
- Channel channel = RabbitMQUtil.getChannel();
- channel.queueDeclare("queue1",false,false,false,null);
-
- channel.basicConsume("queue1", false, (s, delivery) -> {
- String message = new String(delivery.getBody(), "UTF-8");
-
- System.out.println(" [Consumer1] Received '" + message + "'");
- channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
- }, s -> {
- System.out.println("nothing");
- });
- }
- }
- package fanoutExchangeTest;
-
- import Util.RabbitMQUtil;
- import com.rabbitmq.client.Channel;
-
- import java.io.IOException;
-
- public class Consumer2 {
- public static void main(String[] args) throws IOException {
- Channel channel = RabbitMQUtil.getChannel();
- channel.queueDeclare("queue2",false,false,false,null);
-
- channel.basicConsume("queue2", false, (s, delivery) -> {
- String message = new String(delivery.getBody(), "UTF-8");
-
- System.out.println(" [Consumer2] Received '" + message + "'");
- channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
- }, s -> {
- System.out.println("nothing");
- });
- }
- }
首先启动两个消费者, 然后启动生产者, 随后输入数据, 输出:
有时候我们需要一些流动性, 变化性很强的数据, 就可以创建临时队列, 他有如下特性:
下面是如何进行获取一个临时队列:
String queueName = channel.queueDeclare().getQueue();
死信队列(Dead-Letter Queue,DLQ)是一种特殊的队列,用于存放无法被正常处理的消息。这些消息可能由于各种原因,如消息被拒绝、消息过期、队列达到最大长度、消息格式错误或处理过程中抛出异常等,无法被消费者正常消费。通过将这些无法处理的消息放入死信队列,可以防止它们阻塞正常的消息处理流程,同时也方便进行后续的问题排查和处理。
死信队列在消息中间件中是一个重要的概念,它增强了消息的可靠性,有效避免了因消息处理失败而引起的数据丢失和系统异常。此外,死信队列中的消息可以进行特殊处理,如记录日志、统计失败次数、发送告警通知等,有助于监控系统的健康状况,并对处理失败的消息进行进一步的分析和处理。
值得注意的是,死信队列通常需要手动维护,而不是自动清空,因为死信消息往往需要人工分析和处理。在实际应用中,可以通过查询、导出和重新发送进入死信队列的死信消息,按需管理死信消息,避免消息漏处理。
消费者C1代码:
- package DeadQueue;
-
- import Util.RabbitMQUtil;
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.DeliverCallback;
-
- import java.sql.SQLOutput;
- import java.util.HashMap;
- import java.util.Map;
-
- /**
- * 消费者1
- */
- public class Consumer1 {
- // 有两个交换机
- public static final String normal_exchange = "normal_exchange";
-
- // 死信交换机
- public static final String dead_exchange = "dead_exchange";
-
- // 普通队列
- public static final String normal_queue = "normal_queue";
-
- // 死信队列
- public static final String dead_queue = "dead_queue";
-
- public static void main(String[] args) throws Exception{
- Channel channel = RabbitMQUtil.getChannel();
-
- // 声明两个交换机: 死信交换机和普通交换机
- channel.exchangeDeclare(normal_exchange, BuiltinExchangeType.DIRECT);
- channel.exchangeDeclare(dead_exchange,BuiltinExchangeType.DIRECT);
-
- Map<String, Object> map = new HashMap<>();
- // 过期时间
- map.put("x-message-ttl",10000);
- // 正常队列设置死信交换机
- map.put("x-dead-letter-exchange",dead_exchange);
- // 设置死信消息的RoutingKey
- map.put("x-dead-letter-routing-key", "lisi");
-
- // 声明两个队列
- channel.queueDeclare(normal_queue,false,false,false,map); // 声明将死信发送给死信交换机
- channel.queueDeclare(dead_queue,false,false,false,null);
-
- // 绑定交换机和队列
- // 绑定普通队列和消费者1
- channel.queueBind(normal_queue,normal_exchange,"zhangsan");
- channel.queueBind(dead_queue,dead_exchange,"lisi");
-
- DeliverCallback deliverCallback = (tag,msg) -> {
- System.out.println("consumer1接收到消息: " + new String(msg.getBody(),"UTF-8"));
- };
- channel.basicConsume(normal_queue,true, deliverCallback, tag-> {});
- }
- }
消费者C2代码:
- package DeadQueue;
-
- import Util.RabbitMQUtil;
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.DeliverCallback;
-
- import java.util.HashMap;
- import java.util.Map;
-
- /**
- * 消费者1
- */
- public class Consumer2 {
- // 死信队列
- public static final String dead_queue = "dead_queue";
-
- public static void main(String[] args) throws Exception{
- Channel channel = RabbitMQUtil.getChannel();
-
-
- DeliverCallback deliverCallback = (tag,msg) -> {
- System.out.println("consumer2接收到消息: " + new String(msg.getBody(),"UTF-8"));
- };
- channel.basicConsume(dead_queue,true, deliverCallback, tag-> {});
- }
- }
生产者代码:
- package DeadQueue;
-
- import Util.RabbitMQUtil;
- import com.rabbitmq.client.Channel;
-
- public class Producer {
- // 定义一个普通交换机即可
- public static final String normal_exchange = "normal_exchange";
-
- public static void main(String[] args) throws Exception{
- Channel channel = RabbitMQUtil.getChannel();
- // 死信消息
- for (int i = 0; i < 10; i++) {
- String msg = "info" + i;
- channel.basicPublish(normal_exchange,"zhangsan", null/* 这里消息的过期时间已经在队列声明的时候设置*/, msg.getBytes());
- }
- }
- }
首先启动消费者C1, 让其创建相关队列和交换机, 随后关闭消费者C1模拟其崩溃, 然后开启生产者, 发现normal队列里面产生了10条无法被消费消息;
随后开启消费者C2, 来消费死信队列的消息:
当然, 一个消息被放入死信队列当然不止 设置过期时间这一种, 还可以设置队列最大长度, 当普通队列的长度到达最大值的时候, 这个时候额外的消息会被放入死信队列
- Map<String, Object> props = new HashMap<>();
- // 过期时间
- // props.put("x-message-ttl",10000);
- // 设置最大长度为6
- props.put("x-max-length",6);
当然你也可以主动拒绝消息, 而不是被动的触发转发给死信队列.
如何设置主动拒绝?
- // 其他代码
- DeliverCallback deliverCallback = (tag,msg) -> {
- String getMsg = new String(msg.getBody(), StandardCharsets.UTF_8);
- if (getMsg.equals("info")) {
- System.out.println("消息:" + getMsg + " 被拒绝");
-
- // 拒绝策略需要开启手动应答
- // 第二个参数设置为false表示 不会重新将此消息返回原来的队列.
- channel.basicReject(msg.getEnvelope().getDeliveryTag(),false);
- } else {
- System.out.println("consumer1接收到消息: " + getMsg);
- }
- };
- channel.basicConsume(normal_queue,false, deliverCallback, tag-> {});
有些时候, 我们并不是需要立即就将消息拿出来处理, 而是需要等待特定的时间, 然后再对它进行处理, 延迟队列就实现了这一点.
RabbitMQ中的延迟队列(Delay Queue)是一种特殊的队列,其中的消息不会立即被消费,而是会被延迟一段时间后才进行处理。这种队列主
要用于那些需要在未来某个时间点被处理的消息,比如定时任务、订单超时未支付自动取消等场景。
延迟队列的底层原理通常是由消息过期时间(TTL)和交换机组成。生产者发送消息到交换机时,会设置消息的过期时间。当这个时间到达后,消息会从交换机发送到真正的队列中,等待消费者进行消费。
RabbitMQ中的延迟队列提供了一种自动化的方式,可以延迟处理特定的业务逻辑,而无需进行额外的预处理操作。
需要注意的是,RabbitMQ本身并不直接支持延迟队列,但可以通过一些插件或者特定的消息属性和交换机类型来实现延迟队列的功能。例如,可以使用RabbitMQ的延迟消息插件(rabbitmq-delayed-message-exchange)来实现延迟队列。
首先创建SpringBoot项目, 然后引入依赖:
- <!-- rabbitMQ-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
写入配置文件:
- spring:
- rabbitmq:
- host: your-rabbitMQ-host
- port: your-port
- username: username
- password: password
声明配置类:
- package com.example.rabbitmqtest.config;
-
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.DirectExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class RabbitMQConfig {
- // 确定交换机名
- public static final String EXCHANGE_NAME = "TEST_EXCHANGE";
- // 确定队列名
- public static final String QUEUE_NAME = "QUEUE_NAME";
-
- // 确定RoutingKey
- public static final String ROUTING_KEY = "ROUTING_KEY";
-
- // 声明交换机
- @Bean(EXCHANGE_NAME)
- public DirectExchange getExchange() {
- return new DirectExchange(EXCHANGE_NAME);
- }
-
- // 声明队列
- @Bean(QUEUE_NAME)
- public Queue getQueue() {
- return new Queue(QUEUE_NAME);
- }
-
- // 绑定交换机和队列
- @Bean
- public Binding queueBindingExchange(
- @Qualifier(QUEUE_NAME) Queue queue,
- @Qualifier(EXCHANGE_NAME) DirectExchange exchange) {
- return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
- }
- }
建立controller接口接收消息, 并将消息以生产者的身份发送给交换机:
- @ResponseBody
- @Controller
- @RequestMapping("/your-path")
- public class SendMsgController {
- // 使用这个模板类来来对RabbitMQ进行操作
- @Resource
- private RabbitTemplate rabbitTemplate;
-
- // 添加路径
- @GetMapping("/child-path/{message}")
- public void sendMSG(@PathVariable String message) {
- System.out.println("当前系统的时间:{" + new Date().toString() +"},发送一条消息给两个ttl队列:{"+message+"}");
- // 发送消息
- rabbitTemplate.convertAndSend(ExchangeName, "RoutingKey", message);
- }
- }
声明消费者:
- package com.example.rabbitmqtest.consumer;
-
- import com.example.rabbitmqtest.config.TTLQueueConfig;
- import com.rabbitmq.client.Channel;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- import java.nio.charset.StandardCharsets;
- import java.util.Date;
-
- /**
- * 队列ttl 的消费者
- */
-
- @Component
- public class DeadLetterQueueConsumer {
- // 接收消息, 添加监听器, 监听对应queue中的消息, 可以包含多个queue, 多个queue之间使用逗号隔开
- @RabbitListener(queues = {"queue1","queue2","queue3"})
- public void recieveD(Message message, Channel channel) throws Exception {
- String msg = new String(message.getBody(), StandardCharsets.UTF_8);
- System.out.println("当前系统的时间:{" + new Date().toString() +"},发送一条消息给两个ttl队列:{"+msg+"}");
-
- }
- }
此时如果消息 发布出去, 但是由于某种原因接受失败:
如果生产者发的, 交换机没有回应, 那么就应该调用回调接口来确认消息是否发送失败. 如果消费者发的消息没有发送过去, 那么就会触发回调接口, 让消息被缓存在内存中.
- @FunctionalInterface
- public interface ConfirmCallback {
- void confirm(@Nullable CorrelationData var1, boolean var2, @Nullable String var3);
- }
参数:
实现类:
- package com.example.rabbitmqtest.config;
-
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.PostConstruct;
- import javax.annotation.Resource;
-
- @Component
- public class MyCallBack implements RabbitTemplate.ConfirmCallback {
- @Resource
- private RabbitTemplate rabbitTemplate;
-
- @PostConstruct
- public void init() {
- rabbitTemplate.setConfirmCallback(this);
- }
-
- /**
- * 交换机确认回调方法
- * @param correlationData 保存回调函数的消息的id以及其相关信息
- * @param b 交换机收到消息为true, 否则为false
- * @param s 失败的原因
- *
- */
- @Override
- public void confirm(CorrelationData correlationData, boolean b, String s) {
- if(b) {
- System.out.println("success: 已接收到的消息的id>" + (correlationData!=null ? correlationData.getId() : "null"));
- } else {
- System.out.println("false:" + s);
- }
- }
- }
@PostConstruct
注解标记的方法会在依赖注入完成后,自动被Spring框架调用。因此,在init
方法中,您可以安全地访问rabbitTemplate
字段,并将其确认回调设置为当前实例(this
)。这样,当RabbitMQ确认消息时,就会调用MyCallBack
中的confirm
方法。
关联给生产者:
- @ResponseBody
- @Controller
- @RequestMapping("/your-path")
- public class SendMsgController {
- // 使用这个模板类来来对RabbitMQ进行操作
- @Resource
- private RabbitTemplate rabbitTemplate;
-
- // 添加路径
- @GetMapping("/child-path/{message}")
- public void sendMSG(@PathVariable String message) {
- CorrelationData correlationData = new CorrelationData();
- System.out.println("当前系统的时间:{" + new Date().toString() +"},发送一条消息给两个ttl队列:{"+message+"}");
- // 发送消息
- rabbitTemplate.convertAndSend(ExchangeName, "RoutingKey", message,correlationData );
- }
- }
配置文件 启动确认机制
- spring:
- rabbitmq:
- publisher-confirm-type: correlated
同时, 不只是correlated, 还有:
⚫ NONE
禁用发布确认模式,是默认值
⚫ CORRELATED
发布消息成功到交换器后会触发回调方法
⚫ SIMPLE
单个确认
CallBack接口, 如果RoutingKey是正确的, 可以路由到对应的队列, 那么当消息正确被接受的时候, RabbitMQ调用回调并返回true, 失败则返回false, 但是如果指定的RoutingKey不存在, 那么消息就会被直接丢弃. 但是生产者是不知道的, 所以 我们应该想办法处理这个被丢弃的消息.
- package com.example.rabbitmqtest.config;
-
- import org.springframework.amqp.core.ReturnedMessage;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
-
- import javax.annotation.PostConstruct;
- import javax.annotation.Resource;
- import java.nio.charset.StandardCharsets;
-
-
- public class MyReturnCallBack implements RabbitTemplate.ReturnsCallback {
- @Resource
- private RabbitTemplate rabbitTemplate;
-
- @PostConstruct
- private void init() {
- rabbitTemplate.setMandatory(true);
- rabbitTemplate.setReturnsCallback(this);
- }
- @Override
- public void returnedMessage(ReturnedMessage returnedMessage) {
- System.out.println("被回退的消息内容为: " + new String(returnedMessage.getMessage().getBody(), StandardCharsets.UTF_8));
-
- }
- }
在上述代码中, 首先需要让一个类实现RabbitTemplate.ReturnsCallback这个接口, 然后重写其中的returnedMessage方法, 如下, 消息中接受一个ReturnedMessage类型的参数, 它的具体源码如下:
- package org.springframework.amqp.core;
-
- public class ReturnedMessage {
- private final Message message;
- private final int replyCode;
- private final String replyText;
- private final String exchange;
- private final String routingKey;
-
- // 下面是一些构造方法 和 其对应的get方法, 以及toString方法, 这里省略
- }
这个类是Spring AMQP中的一个类, 用于封装RabbitMQ返回给生产者的消息信息. 当RabbitMQ的交换机无法将路由器路由到任何队列的时候, 它就会将消息返回给生产者. 并附带一些额外的信息. 例如上图所示的源码. 下面是对该类中的每一个成员变量的一个解释:
Message message
:这是原始的消息对象,包含了消息的体和其他属性,比如消息头、内容类型等。int replyCode
:这是RabbitMQ返回的回复码。它是一个整数,通常用于指示返回的原因或状态。具体的值可能依赖于RabbitMQ的配置或文档。String replyText
:这是RabbitMQ返回的回复文本。它通常是一个描述性的字符串,用于解释为什么消息被返回。String exchange
:这是发送消息时使用的交换器名称。当消息返回时,这个交换器名称可以帮助你定位问题,了解是哪个交换器未能正确路由消息。String routingKey
:这是发送消息时使用的路由键。路由键用于决定消息应该被路由到哪个队列。当消息返回时,路由键可以帮助你理解为何消息没有被路由到预期的队列。上述讲述了, 如何接收到被回退的消息, 然后手动做出相关处理, 但是如果消息内容多了,复杂了,手动处理的成本也会很高, 所以这个时候, 我们就需要另外一种方法来自动处理没有被正确路由并被返回的消息.
可以使用备份交换机,在正常交换机无法正确路由某个消息的时候, 这个交换机就会将这个消息发送给备用交换机, 备用交换机连接着用来处理这些被回退的消息的队列和消费者(你也可以理解为一种监察者, 也就是warning consumer or backup consumer).
- package com.example.rabbitmqtest.config;
-
- import org.springframework.amqp.core.*;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class ConfirmConfig {
- public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
- public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
- public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
- public static final String BACKUP_QUEUE_NAME = "backup.queue";
- public static final String WARNING_QUEUE_NAME = "warning.queue"; // 声明确认队列
-
- // 声明确认队列
- @Bean("confirmQueue")
- public Queue confirmQueue() {
- return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
- }
-
- //声明确认队列绑定关系
- @Bean
- public Binding queueBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") DirectExchange exchange) {
- return BindingBuilder.bind(queue).to(exchange).with("key1");
- }
-
- //声明备份Exchange
- @Bean("backupExchange")
- public FanoutExchange backupExchange() {
- return new FanoutExchange(BACKUP_EXCHANGE_NAME);
- }
-
- //声明确认Exchange交换机的备份交换机
- @Bean("confirmExchange")
- public DirectExchange confirmExchange() {
- ExchangeBuilder exchangeBuilder =
- ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
- .durable(true)
- // 设置备份交换机
- .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME);
- return (DirectExchange) exchangeBuilder.build();
- }
-
- // 声明警告队列
- @Bean("warningQueue")
- public Queue warningQueue() {
- return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
- }
-
- // 声明报警队列绑定关系
- @Bean
- public Binding warningBinding(@Qualifier("warningQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange backupExchange) {
- return BindingBuilder.bind(queue).to(backupExchange);
- }
-
- // 声明备份队列
- @Bean("backQueue")
- public Queue backQueue() {
- return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
- }
-
- // 声明备份队列绑定关系
- @Bean
- public Binding backupBinding(@Qualifier("backQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange backupExchange) {
- return BindingBuilder.bind(queue).to(backupExchange);
- }
- }
然后添加一个报警消费者即可
- @Component
- @Slf4j // 这里你也可以不使用slf4j
- public class WarningConsumer {
- public static final String WARNING_QUEUE_NAME = "warning.queue";
- @RabbitListener(queues = WARNING_QUEUE_NAME)
- public void receiveWarningMsg(Message message) {
- String msg = new String(message.getBody());
- log.error("报警发现不可路由消息:{}", msg);
- }
- }
创建两个队列 QA和QB, QA的消息过期时间设置为10s, QB为40s, 然后创建一个直接交换机X, 和死信交换机Y, 也是direct类型, 然后创建一个死信队列QD, c为死信消费者.
按照上面的模式, 进行书写配置类:
- package com.example.rabbitmqtest.config;
-
- import org.springframework.amqp.core.*;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- import java.util.HashMap;
- import java.util.Map;
- import java.util.Objects;
-
- /**
- * 声明一个ttl队列
- */
- @Configuration
- public class TTLQueueConfig {
- // 普通交换机X
- public static final String X_EXCHANGE = "X";
- // 死信交换机Y
- public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
-
- // 普通队列QA, QB
- public static final String QUEUE_A = "QA";
- public static final String QUEUE_B = "QB";
- // 死信队列QD
- public static final String DEAD_LETTER_QUEUE_QD = "QD";
-
- // 创建普通交换机
- @Bean(X_EXCHANGE)
- public DirectExchange x_exchage() {
- return new DirectExchange(X_EXCHANGE);
- }
- @Bean(Y_DEAD_LETTER_EXCHANGE)
- public DirectExchange Y_DEAD_LETTER_EXCHANGE() {
- return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
- }
- // 声明普通队列
- // QA
- @Bean(QUEUE_A)
- public Queue queue_a() {
- Map<String, Object> map = new HashMap<>();
- // 设置死信交换机
- map.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
- map.put("x-dead-letter-routing-key", "YD");
-
- // 设置ttl 单位是ms
- map.put("x-message-ttl",10000);
- return QueueBuilder.durable(QUEUE_A).withArguments(map).build();
- }
- // QB
- @Bean(QUEUE_B)
- public Queue queue_b() {
- Map<String, Object> map = new HashMap<>();
- // 设置死信交换机
- map.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
- map.put("x-dead-letter-routing-key", "YD");
-
- // 设置ttl 单位是ms
- map.put("x-message-ttl",40000);
- return QueueBuilder.durable(QUEUE_B).withArguments(map).build();
- }
-
- // 死信队列
- @Bean(DEAD_LETTER_QUEUE_QD)
- public Queue queue_d () {
- return QueueBuilder.durable(DEAD_LETTER_QUEUE_QD).build();
- }
-
- // 绑定交换机
- // QA QB 绑定普通交换机X
- @Bean // X板顶QA, RoutingKey为XA
- public Binding queueABindingToX(
- @Qualifier(QUEUE_A) Queue QA,
- @Qualifier(X_EXCHANGE) DirectExchange X) {
- return BindingBuilder.bind(QA).to(X).with("XA");
- }
- @Bean // X板顶QB, RoutingKey为XB
- public Binding queueBBindingToX(
- @Qualifier(QUEUE_B) Queue QB,
- @Qualifier(X_EXCHANGE) DirectExchange X) {
- return BindingBuilder.bind(QB).to(X).with("XB");
- }
-
-
- }
首先我们需要接收来自接口的消息, 然后将消息通过交换机X发送给队列QB和QA :
- /**
- * 发送延迟消息
- *
- * 发送消息至 localhost ..... /ttl/sendMSG/test
- */
-
- @ResponseBody
- @Controller
- @RequestMapping("/ttl")
- public class SendMsgController {
- @Resource
- private RabbitTemplate rabbitTemplate;
-
- @GetMapping("/sendMSG/{message}")
- public void sendMSG(@PathVariable String message) {
- System.out.println("当前系统的时间:{" + new Date().toString() +"},发送一条消息给两个ttl队列:{"+message+"}");
-
- rabbitTemplate.convertAndSend(TTLQueueConfig.X_EXCHANGE, "XA","消息来则ttl为10s的队列" + message);
- rabbitTemplate.convertAndSend(TTLQueueConfig.X_EXCHANGE, "XB","消息来则ttl为40s的队列" + message);
- }
- }
- package com.example.rabbitmqtest.consumer;
-
- import com.example.rabbitmqtest.config.TTLQueueConfig;
- import com.rabbitmq.client.Channel;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- import java.nio.charset.StandardCharsets;
- import java.util.Date;
-
- /**
- * 队列ttl 的消费者
- */
-
- @Component
- public class DeadLetterQueueConsumer {
- // 接收消息
- @RabbitListener(queues = {TTLQueueConfig.DEAD_LETTER_QUEUE_QD})
- public void recieveD(Message message, Channel channel) throws Exception {
- String msg = new String(message.getBody(), StandardCharsets.UTF_8);
- System.out.println("当前系统的时间:{" + new Date().toString() +"},发送一条消息给两个ttl队列:{"+msg+"}");
-
- }
- }
展示:
查看后端:
如图:
此处新增了一个Queue, 名为QC, 该队列不需要设置ttl时间.
我们首先发送一条过期时间为20000ms的消息给QC, 然后发送一个2000ms的消息给QC,会发现:
消息2虽然过期时间段, 但是它并没有优先发送给死信队列, 反而是过期时间长的消息1先发送给死信队列处理..
因为RabbitMQ只会检查第一个消息是否过期, 如果过期则丢弃到死信队列. 然后再去检查第二个消息.
想象一下, 一个队列可以存储转发很多个消息来让特定的消费者消费,但是在复杂的情况中,消息也是分优先级的,比如说, 加入天猫是我创建的后台也是我写的, 那么它里面有很多商户,在收到顾客的消费请求的时候会创建很多订单, 但是不同的商家订单是不一样的. 我们必须对这些商家做出区分, 比如我们优先处理那些订单量大的商家的订单,给他们做一个优先处理.
考虑到使用redis来做消息队列,但是redis只能使用一个list作为一个简单的消息队列.并不能实现一个优先级的场景, 所以考虑到我们RabbitMQ的优先级队列.
登录到web插件:
添加新队列:
这里的10意味着,队列接受的消息的优先级的范围是0~10,包括0和10也就是[0,10],你发送消息的时候可以将发送到该优先级队列的消息设置一个0~10的优先级.
RabbitMQ会确保具有更高级别的消息优先于较低等级的消息被消费,如果有多个消费者,并且他们都是空闲的,那么具有最高优先级的消息将被传递给其中一个消费者.
但是需要主义的是,如果你不设置x-max-priority,那么默认的最大优先级就是0,这就意味着所有的消息都具有相同的优先级,那么就和普通队列一样了.设置优先级的时候确保在优先级范围内,否则RabbitMQ会拒绝该消息.
- package PriorityQueue;
-
- import com.rabbitmq.client.AMQP;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- import java.nio.charset.StandardCharsets;
-
- public class Producer {
- public static void main(String[] args) throws Exception{
- // 建立连接
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 工厂的ip, 链接rabbit队列
- connectionFactory.setHost("106.14.165.91");
- connectionFactory.setUsername("admin");
- connectionFactory.setPassword("***");
-
- Connection connection = connectionFactory.newConnection();
-
- Channel channel = connection.createChannel();
-
- // 构造消息
- for (int i = 0; i <= 10; i++) {
- String msg = "消息:" + i;
- if (i % 2 == 0) {
- // 设置优先级的属性
- AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(i).build();
- channel.basicPublish("exchangeName","routingKey",properties,msg.getBytes(StandardCharsets.UTF_8));
- } else {
- channel.basicPublish("exchangeName","routingKey",null,msg.getBytes(StandardCharsets.UTF_8));
- }
- }
- System.out.println("消息发送完毕");
- }
- }
- package PriorityQueue;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.DeliverCallback;
-
- import java.nio.charset.StandardCharsets;
- import java.util.HashMap;
- import java.util.Map;
-
- public class Consumer {
- public static void main(String[] args) throws Exception{
- // 建立连接
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 工厂的ip, 链接rabbit队列
- connectionFactory.setHost("106.14.165.91");
- connectionFactory.setUsername("admin");
- connectionFactory.setPassword("***");
-
- Connection connection = connectionFactory.newConnection();
-
- Channel channel = connection.createChannel();
-
- // 设置优先级属性
- Map<String,Object> params = new HashMap<>();
- params.put("x-max-priority",10);
- // 声明一个优先级队列
- channel.queueDeclare("queueName",true,false,false,params);
-
- System.out.println("启动消费者");
- // 确认回调函数
- DeliverCallback deliverCallback = (consumerTag,delivery) -> {
- String recieveMsg = new String(delivery.getBody(), StandardCharsets.UTF_8);
- System.out.println("接收到消息:" + recieveMsg);
- };
-
- channel.basicConsume("queueName",deliverCallback,(consumerTag)-> {
- System.out.println("消费者无法消费此消息时被调用");
- });
- }
- }
懒惰队列是一个classic类型的队列, 但是这个队列是以lazy模式运行的. 当你将某个队列设置为lazy模式, 那么在队列里面的消息就会被尽早的存如硬盘, 这些存入硬盘的消息会在他们被消费者请求消费的时候加载到内存中.
这样设计的原因就是希望队列能支持一个更多数量的消息的存储. 消费者如果由于各种原因下线,那么队列中的元素就会堆积在内存, 导致内存溢出, 此时需要处理这些消息, 懒惰队列, 也可以称之为惰性队列, 它的处理方法就是将其持久化到硬盘, 然后要使用或者消费的时候就拿出来.
对比消息的持久化, broker会将消息写入磁盘的时候, 也会给内存中进行一个备份, RabbitMQ释放内存的时候, 将消息持久化到硬盘是一个比较消耗资源的操作, 同时也会阻塞队列, 进而无法接受新的消息.
队列的两种模式:
如何创建一个lazy queue?
- Map<String,Object> params = new HashMap<>();
- params.put("x-queue-mode","lazy");
- channel.queueDeclare("queueName",true,false,false,params);
如果同时设置了这两种的话, 那么policy优先生效
内存对比
在发送1百万条消息,每条消息大概占1KB的情况下,普通队列占用内存是1.2GB,而惰性队列仅仅占用1.5MB
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。