赞
踩
解耦:在项目启动之初来预测将来会碰到什么需求是极其困难的。消息中间件在处理过程
中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口,这允许你独
立地扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束即可
冗余〈存储) 有些情况下,处理数据的过程会失败。消息中间件可以把数据进行持久化直
到它们已经被完全处理,通过这一方式规避了数据丢失风险。在把 个消息从消息中间件中删
除之前,需要你的处理系统明确地指出该消息己经被处理完成,从而确保你的数据被安全地保
存直到你使用完毕。
扩展性: 因为消息中间件解耦了应用的处理过程,所以提高消息入队和处理的效率是很容
易的,只要另外增加处理过程即可,不需要改变代码,也不需要调节参数。
削峰: 在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常
见。如果以能处理这类峰值为标准而投入资源,无疑是巨大的浪费,使用消息中间件能够使关
键组件支撑突发访问压力,不会因为突发的超负荷请求而完全崩惯
可恢复性: 当系统一部分组件失效时,不会影响到整个系统 消息中间件降低了进程间的
稿合度,所以即使 个处理消息的进程挂掉,加入消息中间件中的消息仍然可以在系统恢复后
进行处理
顺序保证: 在大多数使用场景下,数据处理的顺序很重要,大部分消息中间件支持一定程
度上的顺序性。
缓冲: 在任何重要的系统中,都会存在需要不同处理时间的元素。消息中间件通过 个缓
冲层来帮助任务最高效率地执行,写入消息中间件的处理会尽可能快速 该缓冲层有助于控制
和优化数据流经过系统的速度。
异步通信: 在很多时候应用不想也不需要立即处理消息 消息中间件提供了异步处理机制,
允许应用把 些消息放入消息中间件中,但并不立即处理它,在之后需要的时候再慢慢处理
消息一般可以包含两个部分:消息体(payload)和标签(Label)
消费者连接到 RabbitMQ 服务器,并订阅到队列上。当消费者消费一条消息时,只是消费消息的消息体。在消息路由的过程中,消息的标签会丢弃,存入到队列中的消息只有消息体,消费者也只会消费到消息体,也就不知道消息的生产者是谁,当然消费者也不需要知道
RabbitMQ中的消息都只能存储在队列中,这一点和Katka相反,Katka 将消息存储在topic (主题)这个逻辑层面,而相对应的队列逻辑只是 topic 实际存储文件中的位移标识。 RabbitMQ 的生产者生产消息并最终投递到队列中,消费者可以从队列中获取消息并消费。
多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊 CRound-Robin ,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。
RabbitMQ 不支持队列层面的广播消费,如果需要广播消费,需要在其上进行二次开发,处理逻辑会变得异常复杂,同时也不建议这么做。
交换器将消息路由到一个或者多个队列中,如果路由不到,或许会返回给生产者,或许直接丢弃。
生产者将消息发送给交换器时, 需要一个RoutingKey。交换器和队列绑定时需要一个BindingKey。当BindingKey和RoutingKey相匹时, 消息会被路由到对应的队列中。在绑定多个队列到同一个交换器的时候,这些绑定允许使用相同的BindingKey。
BindingKey并不是在所有的情况下都能生效,它依赖于交换器类型。如fanout 类型的交换器就会无视BindingKey,而是将消息路由到所有绑定到该交换器的队列中。
RabbitMQ就是AMQP协议的 Erlang 的实现。 AMQP模型架构和RabbitMQ 的模型架构是一样的,生产者将消息发送给交换器,交换器和队列绑定。。当生产者发送消息时所携带的 RoutingKey 与绑定时 BindingKey 匹配时,消息即被存入相应的队列之中,消费者可以订阅相应的队列来获取消息。
AMQP 协议本身包括三层。
exchangeDeclare,交换器声明有多个重载方法,这些重载方法都是由下面这个方法中缺省的某些参数构成的。
public Exchange.DeclareOk exchangeDeclare(String exchange , String type , boolean durable , boolean autoDelete , boolean internal, Map<String, Object> arguments) throws IOException ;
返回值Exchange.DeclareOk是用来标识成功声明了一个交换器
exchange:交换器的名称
type:交换器的类型,常见的如 fanout、direct、topic、headers
durable: 设置是否持久化,设置为 true 表示持久化, 反之是非持久 。持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息
autoDelete:设置是否自动删除, true表示自动删除。自动删除的前提是至少有一个队列或者交换器与这个交换器绑定过,绑定之后所有与这个交换器绑定的队列或者交换器都与此解绑。全部解绑后该交换器自动删除。
解绑后,即使交换器是持久化的,也会自动删除
当有未解绑的队列或交换器时,重启rabbitmq服务、或者与此交换器连接的客户端都断开时,RabbitMQ都不会删除该交换器
internal:设置是否是内置的。如果设置为 true,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。
argument 其他一些结构化参数。
public Exchange.DeleteOk exchangeDelete(String exchange, boo1ean ifUnused) throws IOException;
public Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;
Queue.DeclareOk queueDeclare() throws IOException;
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive,
boolean autoDelete, Map<Str ng Object> arguments) throws IOException;
queue:队列的名称
durable: 设置是否持久化。为true则设置队列为持久化。持久化的队列会存盘,在服务器重启的时候可以保证不丢失相关信息
exclusive:设置是否排他。为true则设置队列为排他的。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除
autoDelete: 设置是否自动删除。为true则设置队列为自动删除(必须有至少一个消费者连接再断开后才能自动删除)
argurnents: 设置队列的其他一些参数
public Queue.PurgeOk queuePurge(String queue) throws IOException;
public Queue.De1eteOk queueDe1ete(String queue ,boo1ean ifUnused, boolean ifEmpty)
throws IOExcept on;
public Queue.Dec1areOk queueDec1arePassive(String queue) throws IOException;
public Queue.BindOk queueBind(String queue , String exchange , String routingKey,
Map<String, Object> arguments) throws IOExcept on;
queue: 队列名称
exchange: 交换器的名称
routingKey: 用来绑定队列和交换器的路由键
argument: 定义绑定的一些参数
public Queue.UnbindOk queueUnbind (String queue , String exchange , String routingKey,
Map<String , Object> arguments) throws IOException;
public Exchange.BindOk exchangeBind(String destination, String source , String
routingKey, Map<String , Object> arguments) throws IOException ;
destination:目标路由器名称(接收转发)
source:资源路由器(转发消息)
routingKey:用来绑定交换器和交换器的路由键
argument: 定义绑定的一些参数
// 声明资源交换器(类型可不同)
channel.exchangeDeclare( "source" , "direct" , false , true , null) ;
// 声明目标路由器(类型可不同)
channel.exchangeDeclare( "destination" , "fanout" , false , true , null);
// 交换器绑定交换器
channel.exchangeBind( "destination" , "source" , "exKey");
// 声明队列
channel.queueDeclare( "queue" , false , false , true , null);
// 队列绑定交换器
channel.queueBind("queue" , "destination");
// 发送消息
channel.basicPublish("source" , "exKey" , null , "exToExDemo".getBytes ());
RabbitMQ的消息存储在队列中,交换器并不真正耗费服务器的性能,而队列会
如果发送消息的交换器没有绑定任何队列,那么消息将会丢失
交换器绑定了某个队列,但是发送消息时的路由键无法与现存的队列匹配,那么消息也会丢失
使用预先分配创建队列资源的静态方式还是代码中动态声明式的创建队列,需要从业务逻辑本身、公
司运维体系和公司硬件资源等方面考虑
public void basicPublish(String exchange , String routingKey, boolean mandatory,
boolean immediate , BasicProperties props, byte[] body) throws IOException;
AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
.contentType("text/plain")
// 持久化的消息
.deliveryMode(2)
// 消息的优先级
.priority(1)
.userId("hidden")
// header信息
.headers(new HashMap<>())
// 消息过期时间
.expiration("60000")
.build();
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, props, message.getBytes());
设置为 true 时,当交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么 RabbitMq就会调用 Basic.Return 命令将消息返回给生产者
设置为 false 时,出现上述情形,则消息直接被丢弃(发送消息时不设置该值,默认值为false)
// 发送消息时交换器根据路由键ROUTING_KEY_NO找不到对应的队列,mandatory = true
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY_NO, true, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
// 监听被返回的消息
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("Basic.Return 返回的结果是: " + message);
}
});
发送消息时不设置该值,默认值为false
Map<String, Object> arguments = new HashMap<>(); // 设置备份交换器 arguments.put("alternate-exchange", "_1_backup_exchange"); // 声明主交换器(路由模式)持久化的(消息服务重启后,该交换机不会消失)durable = true、非自动删除的、有备份交换器的 channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, arguments); // 持久化的(消息服务重启后,该队列不会消失)durable = true、非排他、非自动删除的 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 通过路由绑定交换机和队列(此处的key就是BindingKey) channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); // 声明备份交换器(广播模式) channel.exchangeDeclare("_1_backup_exchange", "fanout", true, false, null); channel.queueDeclare("_1_backup_queue", true, false, false, null); channel.queueDeclare("_2_backup_queue", true, false, false, null); channel.queueBind("_1_backup_queue", "_1_backup_exchange", ""); channel.queueBind("_2_backup_queue", "_1_backup_exchange", "");
当生产者将消息发送出去之后,默认情况下生产者是不知道消息有没有正确地到达服务器。如果在消息到达服务器之前己经丢失,持久化操作也解决不了这个问题,因为消息根本没有到达服务器
RabbitMQ提供了两种生产者消息确认方式:
通过事务机制实现:
通过发送方确认 publisher confirm 机制实现
事务机制在一条消息发送之后会使发送端阻塞,以等待 RabbitMQ 的回应,之后才能继续发送下一条消息。所以采用事务机制实现会严重降低 RabbitMQ 的消息吞吐量
// 特别注意:该事务是每次提交一条消息,回滚也只是回滚一条
channel.txSelect();
for (int i = 0; i < 1000; i++) {
try {
String message = "生成的message - " + i;
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
if (i == 100) {
int m = 1 / 0;
}
channel.txCommit();
} catch (Exception e) {
channel.txRollback();
}
}
生产者将信道设置成 confirm(确认)模式,一旦信道进入 confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的 ID(从1开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ 会发送一个确认(Basic.Ack) 给生产者(包含消息的唯一 ID) ,这就使得生产者知晓消息已经正确到达了目的地了
如果消息和队列是可持久化的,那么确认消息会在消息写入磁盘之后发出。 RabbitMQ 回传给生产者的确认消息中的 deliveryTag 包含了确认消息的序号,此外 RabbitMQ 可以设置 channel.basicAck 方法中的 multiple 参数,表示到这个序号之前的所有消息都己经得到了处理
发送方确认机制最大的好处在于它是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用程序便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack (Basic.Nack) 命令,生产者应用程序同样可以在回调方法中处理该 nack (否定确认)命令
生产者通过调用 channel.confirmSelect 方法(即 Confirm.Select 命令)将信道设置为 confirm模式,之后 RabbitMQ 会返回 Confirm Select-Ok 命令表示同意生产者将当前信道设置为 confirm 模式。所有被发送的后续消息都被 ack 或者 nack 一次,不会出现一条消息既被 ack 又被 nack 情况,并且 RabbitMQ 并没有对消息被confirm 的快慢做任何保证
// 设置信道为 confirm 模式
channel.confirmSelect();
for (int i = 0; i < 1000; i++) {
String message = "生成的message - " + i;
// 发送持久化的消息 - deliveryMode = 2(persistent)
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
// 带有超时时间的等待broke确认,发一条消息后等服务器确认,这是一种串行同步等待的方式
if (!channel.waitForConfirms(1000)) {
System.out.println( " send message failed" ) ;
// 做消息重发逻辑
}
}
如果信道没有开启 publisher confirm 模式,调用任何 waitForConfirms 方法都会报出java.lang.IllegalStateException
对于没有参数的 waitForConfirms 方法来说,其返回的条件是客户端收到了相应的Basic.Ack/.Nack 或者被中断。参数 timeout 表示超时时间,一旦等待 RabbitMQ 回应超时就会抛出 java.util.concurrent.TimeoutException 的异常
waitForConfirmsOrDie 方法在接收到 RabbitMQ 返回Basic.Nack 之后会抛出 java.io.IOException 。业务代码可以根据自身的特性灵活地运用这四种方法来保障消息的可靠发送
对于持久化的消息来说,该方式和事务机制方式都需要等待消息确认落盘之后才会返回(调Linux内核的 fsync 方法) 。在同步等待的方式下, publisher confirm 机制发送一条消息需要通信交互的命令是 :Basic.Publish和Basic .Ack;,而事务机制是 :Basic.Publish、Tx.Commmit及Tx.Commit-Ok (或者Tx .Rollback及Tx .Rollback-Ok) 。事务机制比confirm机制多了一个命令帧报文的交互,所以 QPS 会略微下降,相差很小。
List<String> list = new ArrayList<>(); final int BATCH_COUNT = 1000; channel.confirmSelect(); int MsgCount = 0; while (true) { String message = "生成的message - " + MsgCount; // 发送持久化的消息 - deliveryMode = 2(persistent) channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); //将发送出去的消息存入缓存中 list.add(message); if (++MsgCount >= BATCH_COUNT) { MsgCount = 0; try { if (channel.waitForConfirms()) { //确认成功,将缓存中的消息清空 list.clear(); } // 确认失败,将缓存中的消息重新发送 } catch (InterruptedException e) { e.printStackTrace(); // 确认失败,将缓存中的消息重新发送 } } }
强烈建议使用异步 confirm 的方式
multiple参数如何指定,或是由broke服务器自主选择
// 消息序列号列表 SortedSet confirmSet = Collections.synchronizedSortedSet(new TreeSet()); // 开启confirm模式 channel.confirmSelect(); channel.addConfirmListener(new ConfirmListener() { // 消息确认处理 @Override public void handleAck(long deliveryTag, boolean multiple) { System.out.println("消息序列号 : " + deliveryTag + ", 是否批量确认 : " + multiple); if (multiple) { // 批量确认时,将未确认消息列表中的当前序列号之前的消息批量确认删除 confirmSet.headSet(deliveryTag - 1).clear(); } else { // 单条确认时,只删除当前序列号的消息 confirmSet.remove(deliveryTag); } } // 消息未确认处理 @Override public void handleNack(long deliveryTag, boolean multiple) { if (multiple) { confirmSet.headSet(deliveryTag - 1).clear(); } else { confirmSet.remove(deliveryTag); } //添加处理消息重发的场景 } }); // 模拟循环发送消息 for (int i = 0; i < 1000; i++) { // 获取当前消息的序列号 long nextPublishSeqNo = channel.getNextPublishSeqNo(); String message = "生成的message - " + i; channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); // 将当前发送的消息序列号添加到列表中 confirmSet.add(nextPublishSeqNo); }
channel.basicQos(int prefetchCount)方法允许限制信道上的消费者所能保持的最大未确认消息的数量
在订阅消费队列之前,消费端程序调用了 channel.basicQos(5) ,之后订阅了某个队列进行消费RabbitMq会保存一个消费者的列表,每发送一条消息都会为对应的消费者计数,如果达到了所设定的上限,那么 RabbitMQ 就不会向这个消费者再发送任何消息。直到消费者确认了某条消息之后 RabbitMQ 将相应的计数减1,之后消费者可以继续接收消息,直到再次到达计数上限
Basic.Qos的使用对于拉模式的消费方式无效
// prefetchSize表示消费者所能接收未确认消息的总体大小的上限,单位为B(字节),设置为0,则表示没有上限
// prefetchCount表示限制信道上的消费者所能保持的最大未确认消息的数量,设置为0,则表示没有上限
// global为false信道上新的消费者需要遵从prefetchCount的限定值(默认值)
// global为true信道上所有的消费者都需要遵从prefetchCount的限定值
void basicQos(int prefetchSize , int prefetchCount , boo1ean global);
消息的顺序性是指消费者消费到的消息和发送者发布的消息的顺序是一致的。在不使用任何 RabbitMq高级特性 ,也没有消息丢失、网络故障之类异常的情况发生,并且只有一个消费者的情况下,最好也只有一个生产者的情况下可以保证消息的顺序性。如果有多个生产者同时发送消息,无法确定消息到达 Broker 的前后顺序,也就无法验证消息的顺序性。
下列情形会打破RabbitMQ 的消息顺序性(不限于)
- 发送消息之后遇到异常进行了事务回滚,如果补偿发送这条消息得是在另一个线程实现的
- 启用 publisher confirm 时,在发生超时、中断,或是收到 RabbitMQBasic.Nack命令时,补偿发送结果与事务机制一样会错序
- 消费设置了不同的过期时间的死信队列中的消息
- 消息设置了优先级
- 被拒绝的重入队列中的消息
如果要保证消息的顺序性,需要业务方使用 RabbitMQ 之后做进一步的处理,比如在消息体内添加全局有序标识(类似 Sequence ID) 来实现
消息可靠传输一般是业务系统接入消息中间件时首要考虑的问题,一般消息中间件的消息传输保障分为三个层级。RabbitMQ 支持其中的"最多一次 “和"最少一次”。
最多一次:消息可能会丢失,但绝不会重复传输
最少一次:消息绝不会丢失,但可能会重复传输。
恰好一次:每条消息肯定会被传输一次且仅传输一次
"最多一次"的方式生产者随意发送,消费者随意消费,不过这样很难确保消息不会丢失
"最少一次"投递实现需要考虑以下这个几个方面的内容:
- 消息生产者需要开启事务机制或者 publisher confirm 机制,以确保消息可以可靠地传输到 RabbitMQ 中。
- 消息生产者需要配合使用 mandatory 参数或者备份交换器来确保消息能够从交换器路由到队列中,进而能够保存下来而不会被丢弃。
- 消息和队列都需要进行持久化处理,以确保 RabbitMQ 服务器在遇到异常情况时不会造成消息丢失
- 消费者在消费消息的同时需要将 autoAck 设置为 false ,然后通过手动确认的方式去确认己经正确消费的消息,以避免在消费端引起不必要的消息丢失
“恰好一次” RabbitMQ目前无法保障的。
- 消费者在消费完一条消息之后向RabbitMQ 发送确认 Basic.Ack 命令,此时由于网络断开或者其他原因造成 RabbitMQ并没有收到这个确认命令,那么 RabbitMQ 不会将此条消息标记删除。在重新建立连接之后,消费者还是会消费到这一条消息,这就造成了重复消费
- 生产者在使用publisher confirm机制,发送完一条消息等待 RabbitMQ返回确认通知,此时网络断开,生产者捕获到异常情况,为了确保消息可靠性选择重新发送,这样 RabbitMQ 中就有两条同样,在消费的时候,消费者就会重复消费
去重处理一般是在业务客户端实现,比如引入GUID (Globally Unique Identifier) 的概念。针对GUID ,如果从客户端的角度去重,那么要引入集中式缓存,必然会增加依赖复杂度,另外缓存的大小也难以界定。建议在实际生产环境中,业务方根据自身的业务特性进行去重,比如业务消息本身具备幂等性,或者借助 Redis 等其他产品进行去重处理
RabbitMQ的消费模式分两种:推(Push) 模式和拉(Pull) 模式
推模式采用 BasicConsume进行消费,而拉模式则是调用 BasicGet 进行消费
接收消息一般通过实现Consumer接口或者继承DefaultConsumer类来实现。当调用与Consumer 相关的 API 方法时,不同的订阅采用不同消费者标签 (consumerTag) 来区分彼
在同一个 Channe中的不同消费者也需要通过唯一的消费者标签以作区分
BasicConsume将信道 (Channel)置为接收模式,直到取消队列的订阅为止。在接收模式期间, RabbitMQ 会不断地推送消息给消费者,使用 BasicConsume方法可实现高吞吐量
推送消息的个数还会受到basicQos的限制
// 1次能接收的未确认的最大消息数(接收的消息未确认数到达64时,不再被分发新消息)
channel.basicQos(64);
// 非自动确认
boolean autoAck = false;
// 消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("message = " + new String(body));
// 消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息(成功消费,消息从队列中删除 )
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 消费者订阅消息,手动确认
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
public String basicConsume(String queue ,boolean autoAck ,String consumerTag,
boolean noLocal ,boolean exclusive ,Map<String,Object> arguments ,Consumer callback)
throws IOException
queue:队列的名称
autoAck:设置是否自动确认。建议设成false ,即不自动确认:
consumerTag:消费者标签,用来区分多个消费者
noLocal:设置为true则表示不能将同一个Connection中生产者发送的消息传送给这个Connection中的消费者
exclusive:设置是否排他
arguments:设置消费者的其他参数
callback:设置消费者的回调函数。用来处理RabbitMq推送过来的消息,比如DefaultConsumer使用时需要客户端重写 (override) 其中的方法。
public GetResponse basicGet(String queue , boolean autoAck) throws IOException;
当 autoAck = false时, RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移去消息(实质上是先打上删除标记,之后再删除)
当 autoAck 等于 true 时, RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息
队列中的消息分成了两个部分:
RabbitMQ不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的依据是消费该消息的消费者连接是否己经断开,这么设计的原因是RabbitMQ允许消费者消费一条消息的时间可以很久
public void basicReject(long deliveryTag, boolean multiple , boolean requeue) throws IOException;
public void basicNack(long deliveryTag, boolean multiple , boolean requeue) throws IOException;
deliveryTag、requeue的用法同上
multiple:设置为false,则表示拒绝编号为deliveryTag的这一条消息,此时basicNack和basicReject方法作用一样,都是拒绝一条消息
multiple设置为true,则表示拒绝deliveryTag编号之前所有未被当前消费者确认的消息
public Basic.RecoverOk basicRecover(boolean requeue) throws IOException;
requeue:控制消息是否分配给同一个消费者
设置为 true ,则未被确认的消息会被重新加入到队列中,对于同一条消息,可能会被分配给与之前不同的消费者
设置为false,同一条消息会被分配给与之前相同的消费者
有两种方法可以设置消息的TTL(time to live),若两种方法一起使用,则消息的TTL以两者之间较小的那个数值为准
通过队列属性设置消息TTL的方法是在声明队列时加入x-message-ttl参数来实现,单位是毫秒
Map<String, Object> arguments = Maps.newHashMap();
// 队列中的消息5s后过期
arguments.put("x-message-ttl", 5000);
// 持久化的(消息服务重启后,该队列不会消失)durable = true、非排他、非自动删除的、消息可过期
channel.queueDeclare(QUEUE_NAME, true, false, false, map);
// 通过路由绑定交换机和队列(此处的key就是BindingKey)
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
通过消息本身设置设置消息TTL,单位是毫秒
String message = "生成的message - " + i;
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2)
// 消息的过期时间为5s
.expiration(String.valueOf(5000)).build();
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, message.getBytes());
为什么这两种方法处理的方式不一样?
因为第一种方法里,队列中己过期的消息肯定在队列头部, RabbitMQ只要定期从队头开始扫描是否有过期的消息即可。
而第二种方法里,每条消息的过期时间不同,如果要删除所有过期消息势必要扫描整个队列,所以不如等到此消息即将被消费时再判定是否过期,如果过期再进行删除即可。
通过声明队列时,添加x-expires参数可以控制队列被自动删除前处于未使用状态的时间。未使用的意思是队列上没有任何的消费者,队列也没有被重新声明,并且在过期时间段内也未调用过 Basic.Get 命令。
设置队列里的TTL可以应用于类似RPC方式的回复队列,在RPC中,许多队列会被创建出来,但是却是未被使用的。
RabbitMQ会确保在过期时间到达后将队列删除,但是不保障删除的动作有多及时 。RabbitMQ重启后,持久化的队列的过期时间会被重新计算。
用于表示过期时间的 x-expires 参数以毫秒为单位,并且服从和x-message-ttl的约束条件,不过不能设置为0比如该参数设置为1000 ,则表示该队列如果在1秒钟之内未使用则会被删除
Map<String, Object> arguments = new HashMap<>();
// 队列未使用5s后删除
arguments.put("x-expires", 5000);
// 队列中的消息10s后过期(消息过期时间大于队列删除时间,不会影响队列的删除时间,删除后消息丢失)
arguments.put("x-message-ttl", 10000);
// 持久化的(消息服务重启后,该队列不会消失)durable = true、非排他、非自动删除的
channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
持久化可以提高 RabbitMQ 的可靠性,以防在异常情况(重启、关闭、宕机等)下的数据丢失。RabbitMQ的持久化分为三个部分:交换器的持久化、队列的持久化和消息的持久化。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3wHLhfBE-1661678466107)(C:\Users\wl\Desktop\笔记\image\RabbitMQ.assets\image-20220428092233878.png)]
DLX,全称为 Dead-Letter-Exchange,可以称之为死信交换器。当消息在一个队列中变成死信 (dea message) 之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX,绑定 DLX 的队列就称之为死信队列。
消息变成死信一般是由于以下几种情况:
DLX是一个正常的交换器,和一般的交换器没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中存在死信时,RabbitMQ 就会自动地将这个消息重新发布到设置的DLX 上去,进而被路由到另一个队列,即死信队列。
死信队列的特性与将消息的 TTL设置为0配合使用可以弥补imrnediate参数的功能
死信队列绑定死信交换器的路由键,可以和原队列和原交换器的路由键设置为不同
// 声明死信交换器(不设置参数,默认非持久化、非自动删除的),路由模式 channel.exchangeDeclare("dlx_exchange", "direct"); // 声明死信队列 channel.queueDeclare("dlx_queue", false, false, false, null); // 绑定死信队列到死信交换器(具有路由key) channel.queueBind("dlx_queue", "dlx_exchange", "dlx-routing-key"); Map<String, Object> map = new HashMap<>(); // 设置死信交换器 map.put("x-dead-letter-exchange", "dlx_exchange"); // 设置允许路由到死信交换器消息的路由key map.put("x-dead-letter-routing-key", "dlx-routing-key"); // 持久化的(消息服务重启后,该交换器不会消失)durable = true、非自动删除的 channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null); // 持久化的(消息服务重启后,该队列不会消失)durable = true、非排他、非自动删除的,设置死信交换器 channel.queueDeclare(QUEUE_NAME, true, false, false, map); // 绑定普通队列到普通交换器 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
"延迟消息"是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费
优先级队列,具有高优先级的队列具有高的优先权,优先级高的消息具备优先被消费的特权。
Map<String, Object> map = new HashMap<>(); // 设置队列的最大优先级 = 10 map.put("x-max-priority", 10); // 持久化的(消息服务重启后,该交换机不会消失)durable = true、非自动删除的 channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null); // 持久化的(消息服务重启后,该队列不会消失)durable = true、非排他、非自动删除的,有队列优先级 channel.queueDeclare(QUEUE_NAME, true, false, false, map); // 通过路由绑定交换机和队列(此处的key就是BindingKey) channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); String message = "生成的message"; AMQP.BasicProperties properties = new AMQP.BasicProperties().builder() .deliveryMode(2) // 设置消息的优先级为5 .priority(5) .build(); // 发送持久化的消息 - deliveryMode = 2(persistent) channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, message.getBytes());
RPC,是Remote Procedure Call 的简称,即远程过程调用
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-UQqhkdob-1661678466108)(C:\Users\wl\Desktop\笔记\image\RabbitMQ.assets\image-20220428113631428.png)]
public class RabbitmqFactory { private static final String IP_ADDRESS = "172.16.57.5"; private static final int PORT = 5672;//RabbitMQ 服务端默认端口号为 5672 protected Connection connection; protected Channel channel; /** * @return com.rabbitmq.client.Connection * @Description 获取连接 * @Param */ public void initRabbitmqConnection() { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(IP_ADDRESS); connectionFactory.setPort(PORT); connectionFactory.setUsername("root"); connectionFactory.setPassword("root"); connectionFactory.setVirtualHost("rootDB"); if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception e) { System.out.println("获取连接前,关闭信道失败"); e.printStackTrace(); } } if (connection != null && connection.isOpen()) { try { connection.close(); } catch (Exception e) { System.out.println("获取连接前,关闭连接失败"); e.printStackTrace(); } } try { connection = connectionFactory.newConnection(); channel = connection.createChannel(); } catch (Exception e) { System.out.println("获取连接失败"); e.printStackTrace(); } } /** * 重连Rabbitmq */ public void reConnect() { System.out.println("5s后重连MQ"); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } initRabbitmqConnection(); } // 关闭连接 public void shutdownConnect() { if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception e) { System.out.println("关闭信道失败"); e.printStackTrace(); } } if (connection != null && connection.isOpen()) { try { connection.close(); } catch (Exception e) { System.out.println("关闭连接失败"); e.printStackTrace(); } } } public Connection getConnection() { return connection; } public Channel getChannel() { return channel; } }
public class RPCClient { // 封装的获取连接、信道及关闭资源的工厂 private RabbitmqFactory rabbitmqFactory; private String requestQueueName = "rpc_queue"; private String replyQueueName; private QueueingConsumer consumer; public RPCClient() throws IOException, TimeoutException { rabbitmqFactory = new RabbitmqFactory(); rabbitmqFactory.initRabbitmqConnection(); Channel channel = rabbitmqFactory.getChannel(); // 声明回调队列,并订阅回调队列 replyQueueName = channel.queueDeclare().getQueue(); consumer = new QueueingConsumer(channel); channel.basicConsume(replyQueueName, true, consumer); } public String call(String message) throws IOException, InterruptedException { String response; // 设置请求标识id String corrId = UUID.randomUUID().toString(); // 设置消息参数:回调队列、请求标识 AMQP.BasicProperties props = new AMQP.BasicProperties.Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); rabbitmqFactory.getChannel().basicPublish("", requestQueueName, props, message.getBytes()); // 监听 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); if (delivery.getProperties().getCorrelationId().equals(corrId)) { response = new String(delivery.getBody()); break; } } return response; } public static void main(String args[]) throws Exception { RPCClient fibRpc = new RPCClient(); System.out.println(" [x) Requesting fib(30)"); String response = fibRpc.call("30"); System.out.println(" [.) Got '" + response + "'"); fibRpc.rabbitmqFactory.shutdownConnect(); } }
public class RPCServer { private static final String RPC_QUEUE_NAME = "rpc_queue"; public static void main(String args[]) throws Exception { RabbitmqFactory rabbitmqFactory = new RabbitmqFactory(); rabbitmqFactory.initRabbitmqConnection(); Channel channel = rabbitmqFactory.getChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.basicQos(1); System.out.println(" [x] Awaiting RPC requests "); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String response = ""; try { String message = new String(body, "UTF-8"); int n = Integer.parseInt(message); System.out.println(" [ . ] fib( " + message + " ) "); response += fib(n); } catch (RuntimeException e) { System.out.println(" [.] " + e.toString()); } finally { AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder() .correlationId(properties.getCorrelationId()) .build(); // 使用回调队列、请求标识 channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8")); channel.basicAck(envelope.getDeliveryTag(), false); } } }; channel.basicConsume(RPC_QUEUE_NAME, false, consumer); } private static int fib(int n) { if (n == 0) return 0; if (n == 1) return 1; return fib(n - 1) + fib(n - 2); } }
AMQP协议中并没有指定权限在Vhost级别还是在服务器级别实现,由具体的应用自定义。在RabbitMQ中,权限控制则是以Vhost为单位的 。当创建一个用户时,用户通常会被指派给至少一个vhost ,并且只能访问被指派的vhost内的队列、交换器和绑定关系等。因此,RabbitMQ中的授予权限是指在vhost级别对用户而言的权限授予
可配置权限:指的是队列和交换器的创建及删除之类的操作
可写权限:指的是发布消息
可读权限:指与消息有关的操作,包括读取消息及清空整个队列等
RabbitMQ中,用户是访问控制(Access Control)的基本单元,且单个用户可以跨越vhost进行授权。针对一至多个vhost ,用户可以被赋予不同级别的访问权限,并使用标准的用户名和密码来认证用户。
用户的角色分为5种类型。
none:无任何角色。新创建的用户的角色默认为none
management:可以访问Web管理页面
policymaker:包含 management 的所有权限,并且可以管理策略(Policy)和参数( Parameter )
monitoring:包含 management 的所有权限,并且可以看到所有连接、信道及节点相关的信息
administartor:包含 monitoring的所有权限,井且可以管理用户、虚拟主机、权限、策略、参数等。 administartor代表了最高的权限
response.getBytes(“UTF-8”));
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
}
private static int fib(int n) {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n - 1) + fib(n - 2);
}
}
## 5、用户管理 ### 5.1、多租户及权限 AMQP协议中并没有指定权限在Vhost级别还是在服务器级别实现,由具体的应用自定义。在RabbitMQ中,权限控制则是以Vhost为单位的 。当创建一个用户时,用户通常会被指派给至少一个vhost ,并且只能访问被指派的vhost内的队列、交换器和绑定关系等。因此,RabbitMQ中的授予权限是指在vhost级别对用户而言的权限授予 可配置权限:指的是队列和交换器的创建及删除之类的操作 可写权限:指的是发布消息 可读权限:指与消息有关的操作,包括读取消息及清空整个队列等 ### 5.2、用户管理 RabbitMQ中,用户是访问控制(Access Control)的基本单元,且单个用户可以跨越vhost进行授权。针对一至多个vhost ,用户可以被赋予不同级别的访问权限,并使用标准的用户名和密码来认证用户。 用户的角色分为5种类型。 * none:无任何角色。新创建的用户的角色默认为none * management:可以访问Web管理页面 * policymaker:包含 management 的所有权限,并且可以管理策略(Policy)和参数( Parameter ) * monitoring:包含 management 的所有权限,并且可以看到所有连接、信道及节点相关的信息 * administartor:包含 monitoring的所有权限,井且可以管理用户、虚拟主机、权限、策略、参数等。 administartor代表了最高的权限
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。