赞
踩
是在消息的传输过程中保存消息的容器。它是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。
MQ是消息通信的模型,并发具体实现。现在实现MQ的有两种主流方式:AMQP、JMS。
两者间的区别和联系:
JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
JMS规定了两种消息模型;而AMQP的消息模型更加丰富
高并发环境下,由于来不及同步处理,请求往往会发生堵塞,比如说,大量的insert,update之类的请求同时到达MySQL,直接导致无数的行锁表锁,甚至最后请求会堆积过多,从而触发too many connections错误。通过使用消息队列,我们可以异步处理请求,从而缓解系统的压力。将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。减少了应用程序的响应时间。
MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。常见MQ产品
RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。RabbitMQ官方地址:http://www.rabbitmq.com
RabbitMQ由Erlang语言开发,需要安装与RabbitMQ版本对应的Erlang语言环境,具体的就不解释了,自行搜索教程。RabbitMQ官网下载地址:http://www.rabbitmq.com/download.html
下图是RabbitMQ的基本结构:
Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue
Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的
Producer:消息生产者,即生产方客户端,生产方客户端将消息发送
Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。
Connection:生产者和Borker之间需要通过Tcp连接。AMQP应用层协议使用的是能够提供可靠传输的Tcp连接,AMQP的连接是长连接。AMQP使用认证机制并且提供TLS(SSL)保护。当我们的生产者 或 消费者 不再需要连接到消息中间件的的时候,需要优雅的释放掉它们与消息中间件TCP连接,而不是直接将TCP连接关闭
Channel:通常情况下生产者 或 消费者 需要与 消息中间件之间建立多个连接。无论怎样,同时开启多个TCP连接都是不合适的,因为这样做会消耗掉过多的系统资源。AMQP协议提供了信道(channel)这个概念来处理多连接,可以把通道理解成共享一个TCP连接的多个轻量化连接。一个特定通道上的通讯与其他通道上的通讯是完全隔离的,因此每个AMQP方法都需要携带一个通道号,这样客户端就可以指定此方法是为哪个信道准备的。
rabbitmq channel参数详解
1、Channel
1.1 channel.exchangeDeclare():
type:有direct、fanout、topic三种
durable:true、false true:服务器重启会保留下来Exchange。警告:仅设置此选项,不代表消息持久化。即不保证重启后消息还在。原文:true if we are declaring a durable exchange (the exchange will survive a server restart)
autoDelete:true、false.true:当已经没有消费者时,服务器是否可以删除该Exchange。原文1:true if the server should delete the exchange when it is no longer in use。Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments) throws IOException;
- 1
- 2
1.2 chanel.basicQos()
prefetchSize:0
prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack
global:true\false 是否将上面设置应用于channel,简单点说,就是上面限制是channel级别的还是consumer级别备注:据说prefetchSize 和global这两项,rabbitmq没有实现,暂且不研究
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
- 1
1.3 channel.basicPublish()
routingKey:路由键,#匹配0个或多个单词,*匹配一个单词,在topic exchange做消息转发用
mandatory:true:如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返还给生产者。false:出现上述情形broker会直接将消息扔掉
immediate:true:如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。
BasicProperties :需要注意的是BasicProperties.deliveryMode,0:不持久化 1:持久化 这里指的是消息的持久化,配合channel(durable=true),queue(durable)可以实现,即使服务器宕机,消息仍然保留
简单来说:mandatory标志告诉服务器至少将该消息route到一个队列中,否则将消息返还给生产者;immediate标志告诉服务器如果该消息关联的queue上有消费者,则马上将消息投递给它,如果所有queue都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
- 1
- 2
1.6 channel.basicConsume(QUEUE_NAME, true, consumer);
autoAck:是否自动ack,如果不自动ack,需要使用channel.ack、channel.nack、channel.basicReject 进行消息应答
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
- 1
1.7 chanel.exchangeBind()
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
用于通过绑定bindingKey将queue到Exchange,之后便可以进行消息接收Exchange.BindOk exchangeBind(String destination, String source, String routingKey) throws IOException;
- 1
1.8 channel.queueDeclare()
durable:true、false true:在服务器重启时,能够存活
exclusive :是否为当前连接的专用队列,在连接断开后,会自动删除该队列,生产环境中应该很少用到吧。
autodelete:当没有任何消费者使用时,自动删除该队列。this means that the queue will be deleted when there are no more processes consuming messages from it.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
- 1
- 2
1、生产者和Broker建立TCP连接。
2、生产者和Broker建立通道。
3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
4、Exchange将消息转发到指定的Queue(队列)
1、消费者和Broker建立TCP连接
2、消费者和Broker建立通道
3、消费者监听指定的Queue(队列)
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、消费者接收到消息。
6、ack回复
虚拟主机(vHosts):虚拟主机概念,一个Virtual Host里面可以有若干个Exchange和Queue,我们可以控制用户在Virtual Host的权限。后面使用篇章再详细说明。
用户(User):最直接了当的认证方式,谁可以使用当前的消息中间件。
Durability (持久化):消息代理重启后,交换机是否还存在。交换机可以有两个状态:持久(durable)、暂存(transient)。持久化的交换机会在消息中间件(broker)重启后依旧存在,而暂存的交换机则不会(它们需要在消息中间件再次上线后重新被声明)。然而并不是所有的应用场景都需要持久化的交换机。
Auto-delete (自动删除):当所有与之绑定的消息队列都完成了对此交换机的使用后,是否自动删掉它。
Durable(持久化):消息中间件重启后,队列是否依旧存在。持久化队列(Durable queues)会被存储在磁盘上,当消息中间件(broker)重启之后,它依旧存在。没有被持久化的队列称作暂存队列(Transient queues)。这里需要注意队列的持久化和它存储的未被消费消息的持久化是2个概念,队列的持久化并不会使存储的消息持久化。假如消息中间件(broker)重启之后,持久化队列会被重新声明,但它里面存储的消息只有设置过持久化的消息才能被重新恢复。
Exclusive(专用队列):可以这样理解当创建这个队列的Connection关闭后队列即被删除,不存在其它的使用可能性
在上图的模型中,有以下概念:
这种模式如果不指定交换机就会使用rabbitmq的默认的交换机、指定路由键,就会根据路由键把不同的消息放入不同的队列。如果使用的是默认的交换机,那么路由键就使用队列的名称即可
// 向指定的队列中发送消息
//参数:String exchange, String routingKey, BasicProperties props, byte[] body
/**
* 参数明细:
* 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"")
* 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称
* 3、props,消息的属性
* 4、body,消息内容
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
//开启手动确认的方式 1、注解上添加 @RabbitListener(queues = "queue_simple",ackMode = "MANUAL") 2、如果想开启全局的ACK手动确认,那么在配置类里面有这段代码即可 @Bean @ConditionalOnClass public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(CachingConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return factory; }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
工作队列或者竞争消费者模式
work queues与入门程序相比,多了一个消费端,两个消费端共同消费同一个队列中的消息,但是一个消息只能被一个消费者获取。
这个消息模型在Web应用程序中特别有用,可以处理短的HTTP请求窗口中无法处理复杂的任务。
接下来我们来模拟这个流程:
P:生产者:任务的发布者
C1:消费者1:领取任务并且完成任务,假设完成速度较慢(模拟耗时)
C2:消费者2:领取任务并且完成任务,假设完成速度较快
生产者
生产者循环发送50条消息
package com.robin.rabbitmqp.work;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class RabbitmqServiceImpl {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendWork() {
for (int i = 0; i < 50; i++) {
rabbitTemplate.convertAndSend("queue_work", "测试work模型: " + i);
}
}
}
消费者,两个消费者各消费25条
package com.robin.rabbitmqp.work;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
// 2个消费者
@Component
public class WorkReceiveListener {
@RabbitListener(queues = "queue_work")
public void receiveMessage(String msg, Channel channel, Message message) {
// 只包含发送的消息
System.out.println("1接收到消息:" + msg);
// channel 通道信息
// message 附加的参数信息
}
@RabbitListener(queues = "queue_work")
public void receiveMessage2(Object obj, Channel channel, Message message) {
// 包含所有的信息
System.out.println("2接收到消息:" + obj);
}
}
有时候会出现,一个消费者处理一个消息,花费的时间比较长,那么就会导致最后消费这50个消息的总时间变长,为了更快的处理50条消息,要设置prefetchCount = 1
消费者1比消费者2的效率要低,一次任务的耗时较长
然而两人最终消费的消息数量是一样的
消费者2大量时间处于空闲状态,消费者1一直忙碌
现在的状态属于是把任务平均分配,正确的做法应该是消费越快的人,消费的越多。
怎么实现呢?
通过 BasicQos 方法设置prefetchCount = 1。这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理1个Message。换句话说,在接收到该Consumer的ack前,他它不会将新的Message分发给它。相反,它会将其分派给不是仍然忙碌的下一个Consumer。
值得注意的是:prefetchCount在手动ack的情况下才生效,自动ack不生效。
说明下:
1、一个生产者多个消费者
2、每个消费者都有一个自己的队列
3、生产者没有将消息直接发送给队列,而是发送给exchange(交换机、转发器)
4、每个队列都需要绑定到交换机上
5、生产者发送的消息,经过交换机到达队列,实现一个消息被多个消费者消费
例子:注册->发邮件、发短信
X(Exchanges):交换机一方面:接收生产者发送的消息。另一方面:知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
Exchange类型有以下几种:
Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key 的队列
Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
Header:header模式与routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(键值对)匹配队列。
Header模式不展开了,感兴趣可以参考这篇文章https://blog.csdn.net/zhu_tianwei/article/details/40923131
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
也就是会把所有消息发送给交换机、交换机再把消息发送到绑定了这个交换机的所有队列上
Fanout和work模型
不同点:
- fanout需要定义交换机、work不需要定义交换机
- fanout是面向交换机发送消息的、work则是面向队列发送消息的(底层使用默认的交换机)
相同点:
- 所以两者实现的发布/订阅的效果是一样的,多个消费端监听同一个队列不会重复消费消息。
实际工作用 publish/subscribe还是work queues。
建议使用 publish/subscribe,发布订阅模式比工作队列模式更强大(也可以做到同一队列竞争),并且发布订阅模式可以指定自己专用的交换机。
P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
C1:消费者,其所在队列指定了需要routing key 为 error 的消息
C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
消费者会提前绑定队列、交换机、路由键的关系。发送消息的时候,指定路由键,会去找能匹配上的队列。
每个消费者监听自己的队列,并且设置带统配符的routingkey,生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列。
Routingkey一般都是有一个或者多个单词组成,多个单词之间以“.”分割,例如:inform.sms
通配符规则:
#:匹配一个或多个词
*:匹配不多不少恰好1个词
举例:
audit.#:能够匹配audit.irs.corporate 或者 audit.irs
audit.*:只能匹配audit.irs
rabbitmq默认是自动信息消息确认的。
消息确认机制(ACK)
通过刚才的案例可以看出,消息一旦被消费者接收,队列中的消息就会被删除。
那么问题来了:RabbitMQ怎么知道消息被接收了呢?
如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?消息消费失败,但是RabbitMQ无从得知,这样消息就丢失了!
因此,RabbitMQ有一个ACK机制。当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收。不过这种回执ACK分两种情况:
自动ACK:消息一旦被接收,消费者自动发送ACK
手动ACK:消息接收后,不会发送ACK,需要手动调用
这需要看消息的重要性:
如果消息不太重要,丢失也没有影响,那么自动ACK会比较方便
如果消息非常重要,不容丢失。那么最好在消费完成后手动ACK,否则接收消息后就自动ACK,RabbitMQ就会把消息从队列中删除。如果此时消费者宕机,那么消息就丢失了。
MQ只要确认消息发送成功,无需等待应答就会丢弃消息。在这几种情况下消费者还未处理完时、出现异常、断电,就会导致消息丢失。
只要队列不是空的就会,一直把队列中的消息推送给消费者,而不管消费者是否消费完
如果不自动ack,需要使用channel.ack、channel.nack、channel.basicReject 进行消息应答
//第一种
void basicAck(long deliveryTag, boolean multiple) throws IOException;
deliveryTag:该消息的index
multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
//第二种
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
deliveryTag:该消息的index
multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。
requeue:被拒绝的是否重新入队列
//第三种
void basicReject(long deliveryTag, boolean requeue) throws IOException;
deliveryTag:该消息的index
requeue:被拒绝的是否重新入队列
channel.basicNack 与 channel.basicReject 的区别在于basicNack可以拒绝多条消息,而basicReject一次只能拒绝一条消息
7.2.1、消息被消费者消费的时候,出现异常、不能得到ack的响应,消息的状态变为unacked。然后队列的Unacked消息数暴涨,导致MQ响应越来越慢,甚至崩溃的问题。
原因是如果MQ没得到ack响应,这些消息会堆积在Unacked消息里,不会抛弃,直至客户端断开重连时,才变回ready;
如果Consumer客户端不断开连接,这些Unacked消息,永远不会变回ready状态,Unacked消息多了,占用内存越来越大,就会异常了。
看下面的代码,在消费这消费的时候出现异常
package com.robin.rabbitmqp.simple;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
@Component
public class WorkReceiveSimpleListener {
@RabbitListener(queues = "queue_simple")
public void receiveMessage(String msg, Channel channel, Message message) throws IOException, TimeoutException {
// int i=1/0;
System.out.println("1、消息已拿到:" + msg);
System.out.println("2、消息已经使用完" );
System.out.println("3、消息已确认被消费,可以进行销毁" );
int i=1/0;
//接受完消息之后,手动信息ACK。成功ack之后会从队列中移除、失败了消息就会在队列中的额状态变为unacked,没有unacked的多了,就会出现MQ占用的内存变大,导致程序越来越慢
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("4、消息已销毁" );
channel.close();
}
//设置全局都要进行手动ACK
@Bean
@ConditionalOnClass
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(CachingConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
}
解决思路:
在catch{}代码中,也进行Ack,在之后根据日志,再恢复没有处理的消息
package com.robin.rabbitmqp.simple;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
@Component
public class WorkReceiveSimpleListener {
@RabbitListener(queues = "queue_simple")
public void receiveMessage(String msg, Channel channel, Message message) throws IOException, TimeoutException {
// int i=1/0;
System.out.println("1、消息已拿到:" + msg);
try {
System.out.println("2、消息已经使用完" );
System.out.println("3、消息已确认被消费,可以进行销毁" );
int i=1/0;
//接受完消息之后,手动信息ACK。成功ack之后会从队列中移除、失败了消息就会在队列中的额状态变为unacked,没有unacked的多了,就会出现MQ占用的内存变大,导致程序越来越慢
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("4、消息已销毁" );
}catch (Exception exception){
System.out.println("===================》消费出现异常"+message.getMessageProperties().getDeliveryTag());
//接受完消息之后,手动信息ACK。成功ack之后会从队列中移除、失败了消息就会在队列中的额状态变为unacked,没有unacked的多了,就会出现MQ占用的内存变大,导致程序越来越慢
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("这条消息没有消费" );
}
channel.close();
}
@Bean
@ConditionalOnClass
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(CachingConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
}
如果在消费者消费之前,MQ就宕机了,消息就没了?
2)可以将消息进行持久化。要将消息持久化,前提是:队列、Exchange都持久化
MessageProperties messageProperties = new MessageProperties();
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
Message message = new Message("持久化消息".getBytes(), messageProperties);
编写一个监听器组件,通过注解配置消费者队列,以及队列与交换机之间绑定关系。(也可以像生产者那样通过配置类配置)
在SpringAmqp中,对消息的消费者进行了封装和抽象。一个JavaBean的方法,只要添加@RabbitListener注解,就可以成为了一个消费者。
@Component
public class ReceiveHandler {
//监听邮件队列
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "queue_email", durable = "true"),
exchange = @Exchange(
value = "topic.exchange",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC
),
key = {"topic.#.email.#","email.*"}))
public void rece_email(String msg){
System.out.println(" [邮件服务] received : " + msg + "!");
}
//监听短信队列
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "queue_sms", durable = "true"),
exchange = @Exchange(
value = "topic.exchange",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC
),
key = {"topic.#.sms.#"}))
public void rece_sms(String msg){
System.out.println(" [短信服务] received : " + msg + "!");
}
}
属性说明:
@Componet:类上的注解,注册到Spring容器
@RabbitListener:方法上的注解,声明这个方法是一个消费者方法,需要指定下面的属性:
bindings:指定绑定关系,可以有多个。值是@QueueBinding的数组。@QueueBinding包含下面属性:
value:这个消费者关联的队列。值是@Queue,代表一个队列
exchange:队列所绑定的交换机,值是@Exchange类型
key:队列和交换机绑定的RoutingKey,可指定多个
我们在使用rabbitmq来保障消息一致性的时候需要从以下几方面保障消息的可靠性:
1.客户端异常捕获,包括生产者与消费者
2.RabbitMQ/AMQP的事务机制
3.发送端的消息确认机制
4.消息持久化机制
5.Broker的高可用集群
6.消费端的消息确认机制
7.消费端限流
8.消息幂等性
消息发送过程通过 try catch 方式捕获异常, 在异常处理的代码块中执行回滚业务操作或者执行重发操作等。这是一种最大努力确保的方式, 并无法保证100%绝对可靠,因为这里没有异常并不代表消息就一定投递成功。
可以通过原生的接口通过事务的方式来保障消息投递成功,但是这种方式在性能方面的开销比较大,一般不推荐使用
详情可参考:RabbitMQ中消息确认机制_李嘉图呀李嘉图的博客-CSDN博客
持久化是提高RabbitMQ 可靠性的基础,否则当 RabbitMQ 遇到异常时(如:重启、断电、停机等)数据将会丢失。主要从以下几个方面来保障消息的持久性:
1. Exchange 的持久化。通过定义时设置 durable 参数为 ture 来保证 Exchange 相关的元数据不丢失。
2. 2.Queue 的持久化。也是通过定义时设置 durable 参数为 ture 来保证 Queue 相关的元数据不丢失。
3. 消息 的持久化。通过将消息的投递模式 (BasicProperties 中的 deliveryMode 属性 ) 设置为 2 即可实现消息的持久化,保证消息自身不丢失。
4. 注:Exchange 和 Queue 的持久化只能保证 Exchange 跟 Queue 在RabbitMQ重启之后仍然存在,如果消息没有设置持久化的话,仅设置 Exchange 和 Queue 的持久化,消息仍然会丢失,想要保证消息不丢失, 交换机,队列,消息 三者的持久化缺一不可
详情可参考:https://blog.csdn.net/qq_42029989/article/details/121969133
https://blog.csdn.net/qq_42029989/article/details/122639491
在电商秒杀活动中,活动会有大量并发请求发到服务端,服务器肯定无法同时处理这么多请求,因此需要对消息进行削峰处理,但是如何削峰呢?
当消息投递速度远大于消费速度时,随着时间积累就会出现“消息积压”,消息中间件本身具备一定得缓冲能力,但是这个能力受限于服务器的设置的容量,长时间消息积压则会导致Broker崩溃,而分布式系统的故障往往会发生上下游传递,进而导致更大的崩溃…
因此我们需要从多个角度进行限流,防止以上问题的发生。
1.在RabbitMQ中对内存和磁盘使用量设置阈值,当到达一定得阈值之后,生产者被阻塞(block), 直到对应项指标恢复正常。在全局上防止超大流量,消息积压冲垮Broker。当内存受限或磁盘可用空间受限的时候,服务器都会暂时阻止连接,服务器将暂停发布消息的已连接客户端的套接字读取数据,连接心跳监视也将会被禁用。所有网络连接将会在rabbitmqctl和管理插件中显示为“已阻止”。
# 设置磁盘可用空间大小,单位字节。当磁盘可用空间低于这个值的时候。
# 发出磁盘警告,引发限流。
# 如果设置了相对大小,则忽略此绝对大小。
disk_free_limit.absolute = 50000
# 使用计量单位, 从rabbitmq 3.6.0 开始有效。对 vm_memory_high_watermark 同样有效
# disk_free_limit.absolute = 500KB
# disk_free_limit.absolute = 50mb
# disk_free_limit.absolute = 5GB
# 还可以使用相对于总可用内存的相对值来设置,注意:此相对值不要低于1.0!
# 当磁盘可用空间低于总可用内存的2.0倍的时候,出发限流
# disk_free_limit.relative = 2.0
# 内存限流阈值设置
# 0.4标识阈值总可用内存的比值,总可用内存标识操作系统给每隔进程分配的大小,或实际内存大小
# vm_memory_high_watemark.relative = 0.4
#
# 还可以直接通过绝对值限制可用内存大小,单位字节
# vm_memory_high_watermark.absolute = 107374184
#
# 从RabbitMQ 3.6.0 开始,绝对值支持计量单位,如果设置了相对值,则忽略此相对值。
# vm_memory_high_watermark.absolute = 2GB
2.RabbitMQ默认提供了一种基于 credit flow 的流控机制,针对每一个连接进行流控,当单个队列达到最大流速时,或者多个队列达到总流速时,都会出发流控,出发单个连接的流控可能是应为 connection、channel、queue 的某一个过程处于 flow 状态,这些状态都可以从监控平台看到。
3.RabbitMQ中有一种Qos保证机制,可以限制 Channel 上接收的未被 ACK 的消息数量,如果超过这个数量限制 RabbitMQ将不会再往消费端推送消息,这样可以防止大量消息瞬时从Broker送达消费端造成消费端巨大压力(甚至压垮消费者)。需要注意的是,Qos机制仅对于消费端推模式有效,对拉模式无效。且不支持NONE Ack模式。在执行channel.basicConsume 方法之前通过 channel.basicQos方法可以设置该数量。 通过对Qos 的 prefetchCount 进行设置,保证消费者待处理消息永远小于 prefetchCount 个
消息可靠传输一般是业务系统接入消息中间件时首要考虑的问题,一般消息中间件的消息传输保障分为三个层级:
RabbitMQ 支持其中的“最多一次”和“最少一次“
ConfirmCallback是一个回调接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中。
我们需要在生产者的配置中添加下面配置,表示开启发布者确认
spring.rabbitmq.publisher-confirms=true
package com.robin.rabbitmqp.confirm;
import com.robin.rabbitmqp.work.User;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class RabbitmqConfirmServiceImpl {
@Autowired
private RabbitTemplate rabbitTemplate;
// 配置 confirm 机制
private final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
/**
* @param correlationData 消息相关的数据,一般用于获取 唯一标识 id
* @param b true 消息确认成功,false 失败
* @param s 确认失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
if (b) {
System.out.println("confirm 消息确认成功..." + correlationData.getId());
} else {
System.out.println("confirm 消息确认失败..." + correlationData.getId() + " cause: " + s);
}
}
};
// 测试 confirm机制
void sendConfirm() {
rabbitTemplate.convertAndSend("queue_confirm", new User(1, "km", "km123"), new CorrelationData("" + System.currentTimeMillis()));
rabbitTemplate.setConfirmCallback(confirmCallback);
}
}
通过实现 ReturnCallback 接口,启动消息失败返回,此接口是在交换器路由不到队列时触发回调,该方法可以不使用,因为交换器和队列是在代码里绑定的,如果消息成功投递到Broker后几乎不存在绑定队列失败,除非你代码写错了。
使用此接口需要在生产者配置中加入一下配置,表示发布者返回
spring.rabbitmq.publisher-returns=true
package com.robin.rabbitmqp.returnjz;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class RabbitmqReturnjzServiceImpl {
@Autowired
private RabbitTemplate rabbitTemplate;
// 配置 return 消息机制
private final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
/**
* return 的回调方法(找不到路由才会触发)
* @param message 消息的相关信息
* @param i 错误状态码
* @param s 错误状态码对应的文本信息
* @param s1 交换机的名字
* @param s2 路由的key
*/
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
System.out.println(message);
System.out.println(new String(message.getBody()));
System.out.println(i);
System.out.println(s);
System.out.println(s1);
System.out.println(s2);
}
};
// 测试return机制
public void sendReturn() {
rabbitTemplate.setReturnCallback(returnCallback);
rabbitTemplate.convertAndSend("exchange_return", "return.km", "测试 return 机制");
}
}
消费者确认发生在监听队列的消费者处理业务失败,如,发生了异常,不符合要求的数据……,这些场景我们就需要手动处理,比如重新发送或者丢弃。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。