赞
踩
过期时间TTL表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取。过期之后消息将自动被删除,RabbitMQ可以对消息队列设置TTL.目前有两种方法可以设置.
如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息队列的生存时间一旦超过了设置的TTL值,就称为dead message被投递到死信队列,消费者将无法再接收该消息。
@Configuration public class TTLRabbitMqConfiguration { // 1: 声明注册direct模式的交换机 @Bean public DirectExchange ttlExchange(){ return new DirectExchange("direct_ttl_exchange",true,false); } //队列的过期时间 @Bean public Queue smsTTLQueue(){ //设置过期时间 Map<String,Object> args = new HashMap<>(); args.put("x-message-ttl",5000); //这里的时间是Int类型 return new Queue("ttl.direct.queue",true,false,false,args); } @Bean public Binding smsTTLBinding(){ return BindingBuilder.bind(smsTTLQueue()).to(ttlExchange()).with("ttl"); } }
参数x-message-ttl的值 必须是非负数 32位整数(0 <= n <= 2^32-1) , 以毫秒单位表示TTL的值。这样,值6000表示存在于队列中的当前消息将最多只存活6秒钟。
设置了过期时间的队列,在Rabbit Mq的图形化界面中会出现TTL的标识,过期时间可以在队列详细信息中,查看。如下图所示:
给一个单独的消息设置TTL可以使用普通的队列(不设置过期时间),将消息存入到该队列中,一段时间后,这条消息将会被移除。相较于直接给队列设置过期时间而言,给消息设置过期时间变得更加的灵活。
配置交换机和队列:
@Bean
public DirectExchange ttlExchange(){
return new DirectExchange("direct_ttl_exchange",true,false);
}
@Bean
public Queue directTTLMessageQueue(){
return new Queue("ttl.message.queue",true,false,false);
}
@Bean
public Binding smsTTLMessageBinding(){
return BindingBuilder.bind(directTTLMessageQueue()).to(ttlExchange()).with("ttlmessage");
}
设置发送消息:
public void makeTTLMessageOrder(String userid , String productId , int num ,String routingKey){ // 1: 根据商品Id查询库存是否充足 // 2: 保存订单 String orderId = UUID.randomUUID().toString(); System.out.println("订单生产成功: " + orderId); // 3: 通过消息队列完成消息的分发 // 参数1: 交换机 参数2: 路由key/queue队列名称 参数3: 消息内容 String exchangeName = "direct_ttl_exchange"; //给消息设置过期时间 MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //设置过期时间 message.getMessageProperties().setExpiration("5000"); //设置编码 message.getMessageProperties().setContentEncoding("UTF-8"); return message; } }; rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId,messagePostProcessor); }
DLX,全称为Dead-Letter-Exchange,可以称之为死信交换机,也有人称之为死信邮箱,当消息在队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是DLX,绑定DLX的队列就称之为死信队列。消息变成死信,可能是由于以下的原因:
DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性。当这个队列中存在死信时,RabbitMq就会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。
要想使用死信队列,只需要在定义队列的时候被设置队列参数x-dead-letter-exchange
指定交换机即可。
@Configuration public class DeadRabbitMqConfiguration { @Bean public DirectExchange deadDirectExchange(){ return new DirectExchange("direct_dead_exchange",true,false); } @Bean public Queue deadQueue(){ return new Queue("dead.direct.queue",true); } @Bean public Binding deadBinding(){ return BindingBuilder.bind(deadQueue()).to(deadDirectExchange()).with("dead"); } }
//队列的过期时间
@Bean
public Queue smsTTLQueue(){
//设置过期时间
Map<String,Object> args = new HashMap<>();
args.put("x-message-ttl",5000); //这里的时间是Int类型
//绑定死信队列
args.put("x-dead-letter-exchange","direct_dead_exchange");
args.put("x-dead-letter-routing-key","dead"); //fanoutm模式不需要配置
return new Queue("ttl.direct.queue",true,false,false,args);
}
过期前:
过期后:
当内存超过配置的阈值或者磁盘空间剩余空间对于配置的阈值时,RabbitMQ会暂时阻塞客户端的连接,并且停止接收从客户端发来的消息,以此避免服务器的崩溃,客户端与服务端的心态检测机制也会失效。
如下图:
参考帮助文档: https://www.rabbitmq.com/configure.html
当出现警告的时候,可以通过配置去修改和调整
rabbitmqctl set_vm_memory_high_watermark <fraction>
rabbitmqctl set_vm_memory_high_watermark absolute 50MB
fraction/value 为内存阈值,默认情况下是: 0.4/2GB ,代表的含义是:当RabbitMQ的内存超过40%时,就会产生警告并且阻塞所有生产者的连接,通过此命令修改阈值在Broker重启以后将会失效,通过修改配置文件方式设置的阈值则不会随着重启而消失,但修改了配置文件一样要重启broker才会失效。
分析:
rabbitmqctl set_vm_memory_high_watermark absolute 50MB
当前配置文件:/etc/rabbitmq/rabbitmq.conf
# 默认
# vm_memory_high_watermark.relative = 0.4
# 使用relative相对值进行设置fraction,建议取值在0.4~0.7之间,不建议超过0.7
vm_memory_high_watermark.relative = 0.6
# 使用absolute的绝对值的方式,但是是KB,MB,GB对应的命令如下
vm_memory_high_watermark.absolute = 2GB
在某个Broker节点及内存阻塞生产者之前,它会尝试将队列中的消息换页到磁盘以释放内存空间,持久化和非持久化的消息都会写入磁盘中,其中持久化的消息本身就在磁盘中有一个副本,所以在转移的过程中持久化的消息会先从内存中清除掉。
默认情况下,内存到达的阈值是50%时就会换页处理。
也就是说,在默认情况下该内存的阈值是0.4的情况下当内存超过0.4 * 0.5 = 0.2 时,会进行换页操作。
比如有1000MB内存,当内存的使用率达到了400MB,已经达到了极限,但是因为配置的换页内存为0.5,这个时候会在达到400mb之前,将内存中的200MB进行转移到磁盘中,从而达到稳健的运行。
可以通过设置vm_memory_high_watermark_paging_ratio
来进行调整。
vm_memory_high_watermark.relative = 0.4
vm_memory_high_watermark_paging_ratio = 0.7 (设置小于1的值)
为什么设置小于1,因为你如果设置为1的阈值,内存都已经达到了极限了。你再去换页的意义已经不是很大了。
当磁盘的剩余空间低于确定的阈值时,Rabbit MQ同样会阻塞生产者,这样可以避免因非持久化的消息持续换页而耗尽磁盘空间导致服务器崩溃。
默认情况下:磁盘预警为50MB的时候会发出预警。表示当前磁盘空间到50MB的时候会阻塞生产者并且停止内存消息换页到磁盘的过程。
这个阈值可以减小,但并不能完全的消除因磁盘耗尽而导致崩溃的可能性。比如在两刺磁盘空间的检查空隙内,第一次检查是:60MB,第二次检查可能就是1MB,就会出现警告。
通过命令方式修改如下:
rabbitmqctl set_disk_free_limit <disk_limit>
rabbitmqctl set_disk_free_limit memory_limit <fraction>
disk_limit: 固定单位 KB MB GB
fraction: 是相对阈值,建议范围在:1.0~ 2.0 之间(相对于内存)
通过配置文件配置如下:
disk_free_limit.relative = 3.0
disk_free_limit.absolute = 50mb
RabbitMQ这款消息队列中间件产品本身是基于Erlang编写,Erlang的语言天生具备分布式特性(通过同步Erlang集群各节点的magic cookie来实现)。因此,RabbitMQ天然支持Clustering,这使得Rabbit MQ本身不需要像ActiveMQ,Kafka那样通过Zookeeper分别来实现HA方案和保存集群的元数据。集群是保证可靠性的一种方式,同时可以通过水平扩展以达到增加消息吞吐量能力的目的。
在实际使用过程中多采取多机多实例部署方式,为了便于练习搭建,有时候不得不在一台机器上搭建一个rabbitmq集群。4
主要参考官方文档:https://www.rabbitmq.com/clustering.html
配置的前提是你的rabbtmq可以运行起来,比如“ps aux|grep rabbitmq” 你能看到相关进程,又比如运行 “rabbitmqctl status” 你可以看到类似如下信息,而不报错
执行下面命令进行查看
ps aux|grep rabbitmq
场景: 假设两个rabbitmq结点,分别为rabbit-1,rabbit-2,rabbit-1作为主节点,rabbit-2作为从节点。
启动命令:RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit-1 rabbitmq-server -detached
结束命令:rabbitmqctl -n rabbit-1 stop
> sudo RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit-1 rabbitmq-server start &
````````省略```````````
Starting broker....
completed with 7 plugins
至此节点rabbit-1启动完成。
注意:web管理插件端口占用,所以还要指定其web插件占用的端口号
RABBITMQ_SERVER_START_ARGS=“-rabbitmq_management listener [{port,15673}]”
sudo RABBITMQ_NODE_PORT=5673 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15673}]" RABBITMQ_NODENAME=rabbit-2 rabbitmq-server start &
```````省略`````````
Starting broker....
completed with 7 plugins.
至此节点rabbit-2启动完成。
# 停止应用
sudo rabbitmqctl -n rabbit-1 stop_app
# 目的是清楚节点上的历史数据(如果不清除,无法将节点加入到集群)
sudo rabbitmqctl -n rabbit-1 reset
# 启动应用
sudo rabbitmqctl -n rabbit-1 start_app
# 停止应用
sudo rabbitmqctl -n rabbit-2 stop_app
# 目的是清除节点上的历史数据(如果不清楚,无法将节点加入到集群)
sudo rabbitmqctl -n rabbit-2 reset
# 将rabbit-2节点加入到rabbit-1(主节点)集群当中[Server-node服务器的主机名]
sudo rabbitmqctl -n rabbit-2 join_cluster rabbit-1@'Server-node'
# 启动应用
sudo rabbitmqctl -n rabbit-2 start_app
sudo rabbitmqctl cluster_status -n rabbit-1
//集群有两个节点:rabbit-1@Server-node、rabbit-2@Server-node
Disk Nodes
rabbit-1@VM-4-4-centos
rabbit-2@VM-4-4-centos
注意在访问的时候:web界面的管理需要给15762 node-1 和 15673的node-2 设置用户名和密码。如下:
# 设置主节点的登录用户名和密码
rabbitmqctl -n rabbit-1 add_user root root
# 设置主节点root用户的角色
rabbitmqctl -n rabbit-1 set_user_tags root administrator
# 设置主节点root用户的权限
rabbitmqctl -n rabbit-1 set_permissions -p / root ".*" ".*" ".*"
# 设置从节点的登录用户名和密码
rabbitmqctl -n rabbit-1 add_user root root
# 设置从节点root用户的角色
rabbitmqctl -n rabbit-1 set_user_tags root administrator
# 设置从节点root用户的权限
rabbitmqctl -n rabbit-1 set_permissions -p / root ".*" ".*" ".*"
Tips:
如果采用多机部署方式,需读取其中一个节点的cookie,并复制到其他节点(节点之间通过cookie确定相互是否可通信),cookie存放在/var/lib/rabbitmq/.erlang.cookie
例如:主机名分别为rabbit-1、rabbit-2
逐个启动各个节点
配置各节点的host文件(vim /etc/hosts)
ip1: rabbit-1
ip2: rabbit-2
其他步骤雷同单机部署方式
简述:
分布式事务是指事务的操作位于不同的节点上,需要保证事务的AICD特性。
例如在下单场景下,库存和订单如果不在同一节点上,就涉及分布式事务。
在分布式系统中,要实现分布式事务,无外乎那几种解决方案。
两阶段提交(Two-phase-Commit,2PC),通过引入协调者(Coordinator)来协调参与者的行为,并最终决定这些参与者是否要真正执行事务。
准备阶段
协调者询问参与者事务是否执行成功,参与者发回事务执行结果。
提交阶段
存在的问题
TCC其实就是采用的补偿机制,其核心思想是:针对每个操作,都要注册一个其对应的确认和补偿(撤销)操作。它分为三个阶段:
举个例子,假如Bob要Smith转账,思路大概是:我们有一个本地方法,里面依次调用
- 首先在Try阶段,要先调用远程接口把Smith和Bob的钱给冻结起来。
- 在Confirm阶段,执行远程调用的转账的操作,转账成功进行解冻
- 如果第2步执行成功,那么转账成功,如果第二步执行失败,则调用远程冻结接口对应的解冻方法(Cancel)
优点:跟2PC比起来,实现以及流程相对简单了一些,但数据的一致性比2PC也要差一些
缺点:缺点还是比较明显的,在2,3步中都有可能失效。TCC属于应用层的一种补偿方式,所以需要程序员在实现的时候多写很多补偿的代码,在一些场景中,一些业务流程可能用TCC不太好定义及处理。
本地消息表与业务数据表处于同一个数据库中,这样就能利用本地事务来保证在对这两个表的操作满足事务特性,并且使用了消息队列来保证最终一致性。
优点:一种非常经典的实现,避免了分布式事务,是实现了最终一致性。
缺点:消息表会耦合到业务系统中,如果没有封装好的解决方案,会有很多杂活需要处理。
有一些第三方的MQ是支持事务消息的,比如RocketMQ,他们支持事务消息的方式也是类似于采用的二阶段提交,但是市面上一些主流的MQ都是不支持事务消息的,比如Kafka不支持。
以阿里的Rabbit MQ中间件为例,其思路大致为:
我们总结并对比了几种分布式分解方案的优缺点,分布式事务本身是一个技术难题,是没有一种完美的方案应对所有场景的,具体还是要根据业务场景去抉择把。阿里Rocket MQ去实现分布式事务,现在也有除了很多分布式事务的协调器,比如LCN等。
美团外卖架构
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。