赞
踩
RabbitMQ是消息代理:接受并转发消息。可以将其视为邮局
将要发布的邮件放在邮箱中时,可以确保邮递员最终将邮件传递给收件人。以此类推,RabbitMQ是一个邮箱、一个邮局和一个邮递员
RabbitMQ与邮局之间的主要区别在于,它不处理纸张。只是做接收、存储和转发数据消息的二进制码
AMQP(Advanced Message Queuing Protocol,高级消息队列协议)
应用层协议的一个开放标准,为面向消息的中间件设计。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全
MQ主要是在两个应用程序之间进行通信的中间件。而消息中间件可以用生产者-消费者模式来理解
RabbitMQ在基本的消息中间件的基础上,加了一个exchange(交换机)
核心组件 | 描述 |
---|---|
Exchange | 消息交换机。指定消息按什么规则,路由到哪个队列 |
Queue | 消息队列载体。每个消息都会被投入到一个或多个队列 |
Binding | 绑定。它的作用就是把exchange和queue按照路由规则绑定起来 |
Routing Key | 路由关键字。exchange根据这个关键字进行消息投递 |
(1)客户端连接到消息队列服务器,打开一个channel
(2)客户端声明一个exchange,并设置相关属性;相关属性包括exchange的名字,exchange是否可以被持久化(持久化到磁盘),exchange是否自动删除
(3)客户端声明一个queue,并设置相关属性,相关属性包括queue的名字,queue是否可以被持久化(持久化到磁盘),queue是否自动删除
(4)声明exchange和queue之间的binding关系,用唯一的key来进行绑定
(5)客户端投递消息到exchange。不同类型的exchange根据路由规则将消息分发到不同的queue中
(6)客户端监听到queue中的内容,将消息取出消费,发送ack到rabbitmq中,删除该条消息
1.生产仅意味着发送。发送消息的程序是生产者
2.队列是RabbitMQ内部的邮箱的名称。尽管消息流经RabbitMQ和应用程序,但它们只能存储在队列中
甲队列仅由主机的存储器&磁盘限制约束,它本质上是一个大的消息缓冲器。许多生产者可以发送进入一个队列的消息,许多消费者可以尝试从一个队列接收数据。这就是表示队列的方式
3.消费与接收具有相似的含义。一个消费者是一个程序,主要是等待接收信息
注意:生产者、消费者和经纪人不必位于同一主机上。实际上,在大多数应用程序中却没有。一个应用程序既可以是生产者,也可以是消费者
组件 | 描述 |
---|---|
Broker | 接收和分发消息的应用,RabbitMQ Server就是Message Broker |
Virtual host | 出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue等 |
Connection | publisher/consumer和broker之间的TCP连接。断开连接的操作只会在client端进行,Broker不会断开连接,除非出现网络故障或broker服务出现问题 |
Channel | 如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销 |
Exchange | message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。 |
Queue | 消息最终被送到这里等待consumer取走。一个message可以被同时拷贝到多个queue中 |
Binding | exchange和queue之间的虚拟连接,binding中可以包含routing key。Binding信息被保存到exchange中的查询表中,用于message的分发依据 |
类和接口 | 描述 |
---|---|
ConnectionFactory | 获取连接工厂 |
Connection | 一个连接 |
Channel | 数据通信通道。发送和接收消息 |
Queue | 具体的消息存储队列 |
Producer & Consumer | 生产和消费者 |
假定RabbitMQ已在标准端口(5672)的本地主机上安装并运行。如果使用其他主机,端口或凭据,则连接设置需要进行调整
RabbitMQ使用多种协议。本教程使用AMQP 0-9-1,这是一种开放的通用消息传递协议。RabbitMQ有许多不同语言的客户。我们将使用RabbitMQ提供的Java客户端。
下载客户端库及其依赖项(SLF4J API和SLF4J Simple)。将这些文件和教程Java文件一起复制到您的工作目录中。
请注意,对于教程而言,SLF4J Simple足够了,但是您应该在生产中使用成熟的日志记录库,例如Logback(RabbitMQ Java客户端也位于中央Maven存储库中,带有groupId com.rabbitmq和artifactId amqp-client。)
<!-- rabbitmq客户端 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
消息发布者(Send,发送者)和消息消费者(Recv,接收者)。发布者将连接到RabbitMQ,发送一条消息,然后退出
点对点的队列
功能:一个生产者P发送消息到队列Q,一个消费者C接收
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
//该连接抽象了套接字连接,并为处理协议版本协商和身份验证等
ConnectionFactory factory = new ConnectionFactory();//连接工厂
factory.setHost("localhost"); //设置服务地址。本机
// 协议的端口
factory.setPort(5672);
// 设置账号信息,用户名、密码、Virtual host
factory.setVirtualHost("/onetest");//虚拟主机
factory.setUsername("onetest");
factory.setPassword("onetest");
//try-with-resources语句,因为Connection和Channel都实现java.io.Closeable。这样,无需在代码中显式关闭它们
try (Connection connection = factory.newConnection();//获得连接
Channel channel = connection.createChannel()) {//获得通道
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//queueDeclare参数
//①queue String类型。队列的名称
//②durable Boolean类型。是否持久化 (该队列将在服务器重新启动后继续存在)
//③exclusive Boolean类型。当前声明的queue是否专注。true仅限于此连接
//④autoDelete Boolean类型。在最后连接使用完成后,是否删除队列
//⑤arguments 队列的其他属性(构造参数)
String message = "Hello World!";
//发送队列
//声明队列是幂等的,仅当队列不存在时才创建。消息内容是一个字节数组,因此可以在此处编码任何内容
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
}
}
发送不起作用?
如果这是第一次使用RabbitMQ,但没有看到“已发送”消息,那么可能会不知所措,想知道可能是什么问题。代理可能是在没有足够可用磁盘空间的情况下启动的(默认情况下,它至少需要200 MB的可用空间),因此拒绝接受消息。检查代理日志文件以确认并减少限制(如有必要)
消费者侦听RabbitMQ发出的消息,因此与发布单个消息的发布者不同,将使其继续运行以侦听消息并将其打印出来
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
// 设置账号信息,用户名、密码、Virtual host
factory.setVirtualHost("/onetest");//虚拟主机
factory.setUsername("onetest");
factory.setPassword("onetest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//将告诉服务器将队列中的消息传递回来。由于它将异步地发送消息,因此以对象的形式提供了一个回调,该回调将缓冲消息,直到准备使用它们为止。那就是DeliverCallback子类所做的
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
//自动确认
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}
}
注意:在这里声明队列。因为可能在发布者之前启动使用者,所以想确保队列存在,然后再尝试从中使用消息
为什么不使用try-with-resource语句自动关闭通道和连接?这样,只需使程序继续运行,关闭所有内容并退出!这将很尴尬,因为希望在消费者异步侦听消息到达时,该过程保持有效
管理工具中查看连接,消息等
在工人之间分配任务(竞争的消费者模式)
功能描述:一个生产者发送消息到队列中,有多个消费者共享一个队列,每个消费者获取的消息是唯一的
一个生产者,2个消费者,一个消息只能被一个消费者获取;只能有一个获取,线程的特性:抢
为了保证服务器同一时刻只发送一条消息给消费者,保证资源的合理利用。channal.basicQos(1);这样是为了保证多个消费者接收的消息数量不一样,能者多劳,如果不设置,那么消费者是平均分配消息(例如10条消息,每个消费者接收5条)
用于在多个工作人员之间分配耗时的任务
工作队列(任务队列):主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反,安排任务在以后完成。将任务封装 为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当运行许多工作人员时,任务将在他们之间共享
这个概念在Web应用程序中特别有用,因为在Web应用程序中,不可能在较短的HTTP请求窗口内处理复杂的任务
假装自己很忙-使用Thread.sleep()函数来伪造它。我们将字符串中的点数作为它的复杂度。每个点将占“工作”的一秒。例如,Hello …描述的虚假任务 将花费三秒钟
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
public class NewTask {
private static final String QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();//连接工厂
factory.setHost("localhost");//设置服务地址。本机
factory.setPort(5672);// 协议的端口
factory.setVirtualHost("/onetest");
factory.setUsername("onetest");
factory.setPassword("onetest");
// try-with-resources语句,无需在代码中显式关闭它们
try (Connection connection = factory.newConnection();//获得连接
Channel channel = connection.createChannel()) {//获得通道
// 声明队列。持久化
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 发送50条
int i = 1;
while (i <= 50) {
String message = "QUEUE_work !";
if (i % 2 == 0) {
message += "...";
}
i++;
// 发送队列。存
// 声明队列是幂等的,仅当队列不存在时才创建。消息内容是一个字节数
channel.basicPublish("", QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
}
为消息正文中的每个点伪造一秒钟的工作。它将处理传递的消息并执行任务,因此将其称为Worker.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Worker {
private static final String QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();//连接工厂
factory.setHost("localhost");//设置服务地址。本机
factory.setPort(5672);// 协议的端口
factory.setVirtualHost("/onetest");
factory.setUsername("onetest");
factory.setPassword("onetest");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println("开始!");
//平均分配(1)
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
//获得消息信息
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("消息:" + message);
try {
doWork(message);
} finally {
System.out.println("完成!");
//手动模式确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
//手动模式
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
}
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
if (ch == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Worker {
private static final String QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();//连接工厂
factory.setHost("localhost");//设置服务地址。本机
factory.setPort(5672);// 协议的端口
factory.setVirtualHost("/onetest");
factory.setUsername("onetest");
factory.setPassword("onetest");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println("开始!");
//平均分配(1)
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
//获得消息信息
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("消息:" + message);
try {
doWork(message);
} finally {
System.out.println("完成!");
}
};
//自动模式
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
if (ch == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
默认情况下,RabbitMQ将每个消息依次发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环。与三个或更多的工人一起尝试
如果RabbitMQ服务器停止,队列任务仍然会丢失
RabbitMQ退出或崩溃时,它将忘记队列和消息,除非告知不要这样做。要确保消息不会丢失,需要做两件事:需要将队列和消息都标记为持久
channel.queueDeclare("hello", true, false, false, null);
尽管此命令本身是正确的,但在当前的设置中将无法使用。这是因为我们已经定义了一个名为hello的队列 ,该队列并不持久。RabbitMQ不允许您使用不同的参数重新定义现有队列,并且将向尝试执行此操作的任何程序返回错误。但是有一个快速的解决方法-让我们声明一个名称不同的队列
channel.queueDeclare("work_queue", true, false, false, null);
确保即使RabbitMQ重新启动,task_queue队列也不会丢失
将消息标记为持久性
通过将MessageProperties(实现BasicProperties)设置为值PERSISTENT_TEXT_PLAIN
import com.rabbitmq.client.MessageProperties;
channel.basicPublish("" ,QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
消息持久性的说明
将消息标记为持久性并不能完全保证不会丢失消息。尽管它告诉RabbitMQ将消息保存到磁盘,但是RabbitMQ接受消息并且尚未保存消息时,还有很短的时间。而且,RabbitMQ不会对每条消息都执行fsync(2)。它可能只是保存到缓存中,而没有真正写入磁盘。持久性保证并不强,但是对于简单任务队列而言,这已经绰绰有余。如果需要更强有力的保证,则可以使用 发布者确认
调度仍然无法完全按照我要求进行
例如:在有两名工人的情况下,当所有奇怪的消息都很重,甚至消息很轻时,一位工人将一直忙碌而另一位工人将几乎不做任何工作。RabbitMQ对此一无所知,发生这种情况是因为RabbitMQ在消息进入队列时才调度消息。它不会查看使用者的未确认消息数。它只是盲目地将每第n条消息发送给第n个使用者
为了克服这一点,可以将basicQos方法与prefetchCount = 1设置一起使用。告诉RabbitMQ一次不要给工人一个以上的消息。换句话说,在处理并确认上一条消息之前,不要将新消息发送给工作人员。而是将其分派给不忙的下一个工作程序
int prefetchCount = 1;
channel.basicQos(prefetchCount);
队列大小的注意事项
如果所有工作人员都忙,队列就满了。需要留意这一点,也许会增加更多的工作人员,或者有其他一些策略
消费者从队列中获取消息,服务端如何知道消息已经被消费呢?
为了确保消息永不丢失,RabbitMQ支持 消息确认。消费者发送回一个确认(告知),告知RabbitMQ特定的消息已被接收,处理,并且RabbitMQ可以自由删除它。
默认情况下,手动消息确认处于打开状态。通过autoAck = true 标志显式关闭,将该标志设置为false,并在完成任务后从工作人员发送适当的确认。
// 消息确认自动模式
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
// 消息确认手动模式(默认)
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都认为是消息已经成功消费
// 将告诉服务器将队列中的消息传递回来
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("消息:" + message + "'");
try {
doWork(message);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("完成");
}
};
//监听队列。手动确认模式
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态
// 将告诉服务器将队列中的消息传递回来
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("消息信息" + message);
try {
doWork(message);
} finally {
System.out.println("手动确认:");
//channel.basicAck(long deliveryTag, boolean multiple);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
// 监听队列。消息确认手动模式
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
void basicAck(long deliveryTag, boolean multiple)
方法 | 描述 |
---|---|
deliveryTag | (唯一标识 ID)当一个消费者向RabbitMQ注册后,会建立起一个Channel ,RabbitMQ 会用basic.deliver方法向消费者推送消息,这个方法携带了一个delivery tag, 它代表了RabbitMQ向该Channel投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag的范围仅限于Channel |
multiple | 为了减少网络流量,手动确认可以被批处理,当该参数为true时,则可以一次性确认delivery_tag小于等于传入值的所有消息。false确认当前一条 |
void basicReject(long deliveryTag, boolean requeue)
方法 | 描述 |
---|---|
deliveryTag | (唯一标识 ID)当一个消费者向RabbitMQ注册后,会建立起一个Channel ,RabbitMQ 会用basic.deliver方法向消费者推送消息,这个方法携带了一个delivery tag, 它代表了RabbitMQ向该Channel投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag的范围仅限于Channel |
requeue | true,表示将消息重新放入到队列中,false:表示直接从队列中删除,此时和basicAck(long deliveryTag, false)的效果一样 |
一次向许多消费者发送消息
这个可能是消息队列中最重要的队列了,其他的都是在它的基础上进行了扩展。
功能实现:一个生产者发送消息,多个消费者获取消息(同样的消息),包括一个生产者,一个交换机,多个队列,多个消费者
(1)一个生产者,多个消费者
(2)每一个消费者都有自己的一个队列
(3)生产者没有直接发消息到队列中,而是发送到交换机
(4)每个消费者的队列都绑定到交换机上
(5)消息通过交换机到达每个消费者的队列
注意:交换机没有存储消息功能,如果消息发送到没有绑定消费队列的交换机,消息则丢失
RabbitMQ消息传递模型中的核心思想:生产者从不将任何消息直接发送到队列
实际上,生产者经常甚至根本不知道是否将消息传递到任何队列。生产者只能将消息发送到交换机。交流是一件非常简单的事情
根据具体的路由策略,接收来自生产者的消息,然后将它们推入队列
规则由交换类型定义。有几种交换类型可用:fanout、direct、topic、headers
定义:广播类型的交换机与一个队列绑定时,不需要指定binding key(路由键)
路由规则:当消息发送到广播类型的交换机时,不需要指定routing key,所有与之绑定的队列都能收到消息(将消息路由给绑定到此交换机上的所有队列)
fanout交换机是最基本的交换机类型,它做的事情很简单–广播信息。Fanout交换机会把接收到的消息全部转发到绑定的队列上。因为广播不需要“思考”,所以Fanout交换机是四种交换机中速度最快的
适用场景:需要随时增加减少业务处理的队列,例如注册、下单等功能需要增加送积分功能,只需要增加一个绑定到交换机的队列去处理新业务,无需修改旧的业务逻辑,从而达到业务解耦,非常容易扩展
定义:直连交换机与一个队列绑定时,需要指定一个明确的binding key(绑定键)
路由规则:发现消息到直连交换机时,只有routing key(路由键)和binding key(绑定键)完全匹配时,绑定的队列才能收到消息(根据消息携带的路由键(routing key)将消息推入给对应队列)
直连交换机是一种带路由功能的交换机,一个队列通过routing_key与一个交换机绑定,当消息被发送的时候,需要指定一个routing_key,这个消息被送达交换机的时候,就会被交换机送到指定的队列里面去。同样一个routing_key也是支持应用到多个队列中的,当一个交换机绑定多个队列时,消息就会被送到对应的队列去处理
适用场景:有优先级的任务,根据任务的优先级把消息发送到对应的队列,这样可以指派更多的资源去处理高优先级的队列
基于direct交换机,增加了*、#通配符。发送到主题交换机上的消息需要携带指定规则的routing_key,主题交换机会根据这个规则将数据发送到对应的(多个)队列上
定义:主题类型的交换机与一个队列绑定时,可以指定按模式匹配的routing key
路由规则:发送消息到主题交换机时,routing key(路由键)符合binding key(绑定键)的模式时,将消息路由给一个或多个绑定队列
通配符规则
主题交换机的routing_key需要有一定的规则,交换机和队列的binding_key需要采用*.#.*。…的格式,单词与单词之间用“.”隔开
通配符 | 描述 |
---|---|
* | 表示一个单词 |
# | 表示任意数量(零个或多个)单词 |
当一个队列的绑定键为#的时候,这个队列将会无视消息的路由键,接收所有的消息
适用场景:消息需要基于多重条件进行路由到达对应队列
例如:日志系统,不仅可以根据日志的级别而且能根据日志的来源进行订阅
类似主题交换机,但是头交换机使用多个消息属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。一般不使用,了解即可
头交换机是忽略routing_key的一种路由方式。路由器和交换机路由的规则是通过Headers信息来交换的,这个有点像HTTP的Headers。将一个交换机声明成首部交换机,绑定一个队列的时候,定义一个Hash的数据结构,消息发送的时候,会携带一组hash数据结构的信息,当Hash的内容匹配上的时候,消息就会被写入队列。
绑定交换机和队列的时候,Hash结构中要求携带一个键“x-match”,这个键的Value可以是any或者all,这代表消息携带的Hash是需要全部匹配(all),还是仅匹配一个键(any)就可以了。相比直连交换机,首部交换机的优势是匹配的规则不被限定为字符串(string)
HeadersExchange会忽略RoutingKey而根据消息中的Headers和创建绑定关系时指定的Arguments来匹配决定路由到哪些Queue HeadersExchange的性能比较差,而且DirectExchange完全可以代替它,所以不建议使用
交换机必须准确知道如何处理收到的消息
①是否应将其附加到特定队列?
②是否应该将其附加到许多队列中?
③是否丢弃它
列出服务器上的交换器,运行有用rabbitmqctl list_exchanges
在此列表中,将有一些amq.*交换和默认(未命名)交换。这些是默认创建的
前面部分中,对交换一无所知,但仍然能够将消息发送到队列。这是可能的,因为使用的是默认交换,通过空字符串(“”)进行标识
默认交换机(default exchange)实际上是一个由RabbitMQ预先声明好的名字为空字符串的直连交换机(direct exchange)。它有一个特殊的属性使得它对于简单应用特别有用处:那就是每个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同
DefaultExchange是一种特殊的DirectExchange,当手动创建一个队列时,后台会自动将这个队列绑定到一个名称为空的DirectExchange上,绑定RoutingKey与队列名称相同。有了这个默认的交换机和绑定,使我们只关心队列这一层即可,这个比较适合做一些简单的应用
如:当声明了一个名为”hello”的队列,RabbitMQ会自动将其绑定到默认交换机上,绑定(binding)的路由键名称也是为”hello”。因此,当携带着名为”hello”的路由键的消息被发送到默认交换机的时候,此消息会被默认交换机路由至名为”hello”的队列中
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
第一个参数是交换的名称。空字符串表示默认或无名称交换:消息将以routingKey指定的名称路由到队列(如果队列存在)
在默认情况,如果消息在投递到交换机时,交换机发现此消息没有匹配的队列,则这个消息将被悄悄丢弃。为了解决这个问题,RabbitMQ中有一种交换机叫死信交换机。当消费者不能处理接收到的消息时,将这个消息重新发布到另外一个队列中,等待重试或者人工干预。这个过程中的exchange和queue就是所谓的”Dead Letter Exchange 和 Queue”
除交换机类型外,在声明交换机时还可以附带许多其他的属性,其中最重要的几个分别是:
属性 | 描述 |
---|---|
name | 交换机名称 |
durable | 是否持久化。如果持久性,则RabbitMQ重启后,交换机还存在 |
auto_delete | 当所有与之绑定的消息队列都完成了对此交换机的使用后,删掉它 |
arguments | 扩展参数 |
创建这种类型的交换,并将其称为log。
//定义一个名为logs的fanout类型交换机
channel.exchangeDeclare("logs", "fanout");
fanout交换非常简单。只是将接收到的所有消息广播到它知道的所有队列中。而这正是记录器所需要的
现在改为消息发布到命名的交换机中
//消息提交到交换机中:P --> X
channel.basicPublish( "logs", "", null, message.getBytes());
之前使用的是具有特定名称的队列(还记得hello和task_queue吗?)。能够命名队列至关重要。需要将工人指向同一队列。当想在生产者和消费者之间共享队列时,给队列命名很重要
但这不是记录器的情况。希望听到所有日志消息,而不仅仅是它们的一部分。也只对当前正在发送的消息感兴趣,而对旧消息不感兴趣。为了解决这个问题,需要两件事
首先,无论何时连接到Rabbit,都需要一个全新的空队列。为此,可以创建一个具有随机名称的队列,或者甚至更好。让服务器选择一个随机队列名称
其次,一旦断开了使用者的连接,队列将被自动删除
在Java客户端中,当不向queueDeclare()提供任何参数时,将使用生成的名称创建一个非持久的、独有的、自动删除的队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String queueName = channel.queueDeclare().getQueue(); //临时队列
此时,queueName包含一个随机队列名称
例如:可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg
已经创建了一个fanout交换机和一个队列。现在需要告诉交换机将消息发送到队列。交换和队列之间的关系称为绑定
channel.queueBind(QUEUE_NAME, "logs", "");
logs交换机将消息添加到队列中
查看列表绑定
rabbitmqctl list_bindings
最重要的变化是,现在希望将消息发布到logs交换器,而不是无名的消息交换器。发送时需要提供一个routingKey,但是对于fanout交换,它的值将被忽略。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = argv.length < 1 ? "info: Hello World!" :
String.join(" ", argv);
//消息提交到交换机:P --> X
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
注意: 消息发送到没有队列绑定的交换机时,消息将丢失。因为,交换机没有存储消息的能力,消息只能存在在队列中
建立连接后,声明了交换机。由于禁止发布到不存在的交易所,因此此步骤是必需的
如果没有队列绑定到交换,消息将丢失,但这是可以的。如果没有消费者在监听,可以安全地丢弃该消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//创建临时队列
String queueName = channel.queueDeclare().getQueue();
//绑定至临时队列:X --> Q
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println("等待消息:");
//回调消息。lambda语法
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("消息:" + message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
有选择地接收消息
功能:生产者发送消息到交换机并指定一个路由key,消费者队列绑定到交换机时要制定路由key(key匹配就能接受消息,key不匹配就不能接受消息)
例如:把路由key设置为insert,那么消费者队列key指定包含insert才可以接收消息,消费者队列key定义为update或者delete就不能接收消息。很好的控制了更新,插入和删除的操作
绑定
在前面已经创建绑定
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
绑定:交换和队列之间的关系。绑定可以采用额外的routingKey参数。为了避免与basic_publish参数混淆,将其称为绑定键(binding key)
简单地理解为:队列对来自此交换机的消息感兴趣
以创建带有键的绑定的方法
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "black");
绑定密钥的含义取决于交换类型。之前使用的fanout交换机只是忽略了它的价值
需求:希望将日志消息写入磁盘的程序仅接收严重错误,而不会在警告或信息日志消息上浪费磁盘空间
使用fanout交换机,这种交换并没有给带来太大的灵活性,它只能进行无意识的广播
使用直接(direct)交换机。direct交换机背后的路由算法很简单:消息进入其绑定密钥(binding key)与消息的路由密钥(routing key)完全匹配的队列
在此设置中,可以看到绑定了两个队列的direct交换机X。第一个队列由绑定键orange绑定,第二个队列有两个绑定,一个绑定键为black,另一个绑定为green
①路由键orange发布到交换机的消息,将被路由到队列Q1
②路由键为black或green的消息将转到Q2。所有其他消息将被丢弃
多重绑定
用相同的绑定密钥绑定多个队列是完全合法的
示例中,可以使用绑定键black在X和Q1之间添加绑定。在这种情况下,direct交换机的行为将类似于fanout,并将消息广播到所有匹配的队列。带有black路由键的消息将同时传递给Q1和Q2
在记录系统中使用此模型。将发送消息到direct交换机,而不是fanout。将提供日志严重性作为路由键。这样,接收程序将能够选择它想要接收的严重性
与往常一样,需要首先创建一个交换机。
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
发送一条消息
channel.basicPublish(EXCHANGE_NAME, "black", null, message.getBytes());
为简化起见,将假定“严重性”可以是“信息”,“警告”,“错误”之一
接收消息的工作方式与上一教程一样,但有一个例外,将为感兴趣的每种严重性创建一个新的绑定
String queueName = channel.queueDeclare().getQueue();
for(String severity : argv){
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String severity = "black";//路由秘钥routing key
String message = "使用路由Routing";
//发送消息
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
System.out.println("路由秘钥:" + severity + "消息:" + message);
}
}
}
import com.rabbitmq.client.*;
public class ReceiveLogsDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
if (argv.length < 1) {
System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
System.exit(1);
}
for (String severity : argv) {
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
根据Topics模式(主题)接收消息。使用通配符,多条件接收消息。尽管使用direct交换机改进了系统,但它仍然存在局限性。不能基于多个条件进行路由
此模式实在路由key模式的基础上,使用了通配符来管理消费者接收消息。生产者(P)发送消息到交换机(X),交换机type=topic,交换机根据绑定队列的routing key的值进行通配符匹配。
绑定密钥也必须采用相同的形式。topic交换机背后的逻辑类似于driect交换机的逻辑,使用特定路由密钥发送的消息将被传递到所有使用匹配绑定密钥绑定的队列。但是,绑定键有两个重要的特殊情况(通配符)
通配符 | 描述 |
---|---|
*(星号) | 代替一个单词 |
#(哈希) | 替代零个或多个单词 |
当队列用“#”(哈希)绑定键绑定时,将接收所有消息,而与路由键无关,就像在fanout交换中一样
当在绑定中不使用特殊字符“*”(星号)和“#”(哈希)时,主题交换的行为direct一样
交换机和队列的binding_key需要采用“.#.”。…的格式,每个部分用“.”分开
发送到topic交换机的消息不能具有任意的routing_key。必须是单词列表,以点分隔。这些词可以是任何东西,但是通常它们指定与消息相关的某些功能。
路由关键示例:“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”。路由密钥中可以包含任意多个单词,最多255个字节。
发送所有描述动物的消息:将使用包含三个词(两个点)的路由密钥发送消息
路由键:
①第一个单词将描述速度
②第二个将描述颜色
③第三个将描述物种
创建了三个绑定:Q1与绑定键“* .orange.*”绑定,Q2与“*.*.rabbit”和“lazy.#”绑定
绑定总结
①Q1对所有橙色动物都感兴趣
②Q2对动物速度,以及有关懒惰动物的一切感兴趣
消息发送
路由键设置为“quick.orange.rabbit”的消息将传递到两个队列
消息“lazy.orange.elephant”也将发送给他们两个
路由键设置为“quick.orange.fox”只会进入第一个队列Q1
“lazy.brown.fox”只会进入第二个队列Q2
“lazy.pink.rabbit”将被传递到第二队只有一次,即使两个绑定匹配(也就两个匹配的两个路由键都指向一个队列,则只绑定一次)
“quick.brown.fox”与任何绑定都不匹配,因此将被丢弃
如果违反合同并发送一个或四个单词的消息,例如“orange”或“quick.orange.male.rabbit”,消息将不匹配任何绑定,并且将会丢失。另一方面,“lazy.orange.male.rabbit”即使有四个单词,也将匹配最后一个绑定,并将其传送到第二个队列。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String routingKey = "topic";
String message = "消息交换机topic";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.println("路由键:" + routingKey + "信息:" + message);
}
}
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class ReceiveLogsTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
if (argv.length < 1) {
System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
System.exit(1);
}
for (String bindingKey : argv) {
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
}
System.out.println("等待接收消息!");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
请求/回复模式示例
RabbitMQ构建RPC系统:客户端和可伸缩RPC服务器。由于我们没有值得分配的耗时任务,因此将创建一个虚拟RPC服务,该服务返回Fibonacci数
有关RPC说明
尽管RPC是计算中非常普遍的模式,但它经常受到批评。当程序员不知道函数调用是本地的还是缓慢的RPC时,就会出现问题。这样的混乱会导致系统变幻莫测,并给调试增加了不必要的复杂性。滥用RPC可能会导致无法维护的意大利面条代码,而不是简化软件。
与发布者进行可靠的发布确认
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。