赞
踩
RabbitMQ 的消费模式分为两种:推模式和拉模式。
两种模式优缺点对比:
在推模式中,可以通过持续订阅的方式来消费消息,使用到的类有:
接收消息一般通过实现 Consumer 接口或继承 DefaultConsumer 类来实现。当调用与 Consumer 相关的 API 方法时,不同的订阅采用不同的消费者标签(consumerTag)来区分彼此,在同一个 Channel 中的消费者也需要通过唯一的消费者标签以作区分。以下是关键的消费代码:
boolean autoAck = false;
channel.basicQos(64);
channel.basicConsume(queueName, autoAck, "myConsumerTag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException{
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
// todo…… 在这里处理消息
// (process the message components here ……)
/** 此处进行显示的 ack 操作(channel.basicAck),是防止消息不必要的丢失。*/
channel.basicAck(deliveryTag, false);
}
});
Channel 类中 basicConsume 方法有如下几种形式:
以上函数对应参数说明如下:
生产者和消费者客户端都需要考虑线程安全的问题。消费者客户端的 callback 会被分配到与 Channel 不同的线程池上,这就意味着消费者客户端可以安全的调用这些堵塞方法,比如 channel.queueDeclare、channel.basicCancel 等。
每个 Channel 都拥有自己独立的线程。最常用的做法是一个 Channel 对应一个消费者,也就是说消费者之间彼此没有任何联系。当然也可以在一个 Channel 中维持多个消费者,但是需要注意一个问题,如果 Channel 中的一个消费者一直在运行,那么其他消费者的 callback 会被 “耽搁”。
补充说明
示例代码中重写了 handleDelivery 方法,对于更复杂的消费者客户端,会重写更多的方法,例如:
- viod handleConsumeOk (String consumerTag);
- void handleCancelOk (String consumerTag);
- void handleCancel (String consumerTag) throws IOException;
- void handleShutdownSignal (String consumerTag, ShutdownSignalException sig);
- void handleRecoverOk (String consumerTag);
以下是拉模式时序图:
拉模式的消费方式,是通过 channel.basicGet 方法单条的获取消息,其返回值是 GetResponse . Channel 类的 basicGet 方法没有其他重载方法,仅有:
GetResponse basicGet (String queue, boolean autoAck) throws IOException;
以上的 queue 代表队列名称,如果将 autoAck 设置为 false ,则需要调用channel.basicAck 来确认消息已被成功接收。
拉模式的关键实用代码如下:
GetResponse response = channel.basicGet(QUEUE_NAME, false);
System.out.println(new String(response.getBody()));
channel.basicAck(response.getEnvelope().getDeliveryTag(),false);
特别需要注意的部分:
Basic.Consume 将信道(Channel)置为接收模式,直到取消队列的订阅为止。在接收模式期间,Rabbit MQ 会不断的推送消息给消费者,当然推送消息的个数还是会受到 Basic.Qos的限制。如果只是想从队列获取单条消息而不是持续订阅,建议还是使用 Basic.Get 进行消费。但是不能将 Basic.Get 放在一个循环里来代替 Basic.Consume ,这样做会严重影响 Rabbit MQ 的性能。如果要实现高吞吐量,消费者应该使用 Basic.Consume 方法。
以下是拉模式时序图:
引用链接:https://blog.csdn.net/weixin_41922349/article/details/102373950
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。