赞
踩
对比RocketMQ和RabbitMQ消息可靠性投递
消息丢失可能出现的环节1234;
RabbitMQ消息零丢失方案:
1》生产者保证消息正确发送到RibbitMQ
对于单个数据,可以使用生产者确认机制。通过多次确认的方式,保证生产者的消息能够正确的发送到RabbitMQ中。
RabbitMQ的生产者确认机制分为同步确认和异步确认。同步确认主要是通过在生产者端使用Channel.waitForConfirmsOrDie()指定一个等待确认的完成时间。异步确认机制则是通过channel.addConfirmListener(ConfirmCallback var1,ConfirmCallback var2)在生产者端注入两个回调确认函数。第一个函数是在生产者发送消息时调用,第二个函数则是生产者收到Broker的消息确认请求时调用。两个函数需要通过sequenceNumber自行完成消息的前后对应。sequenceNumber的生成方式需要通过channel的序列获取。int sequenceNumber =
channel.getNextPublishSeqNo();
在RabbitMQ中,另外还有一种手动事务的方式,可以保证消息正确发送。手动事务机制主要有几个关键的方法: channel.txSelect() 开启事务;channel.txCommit() 提交事务; channel.txRollback() 回滚事务; 用这几个方法来进行事务管理。但是这种方式需要手动控制事务逻辑,并且手动事务会对channel产生阻塞,造成吞吐量下降
2》RabbitMQ消息存盘不丢消息
这个在RabbitMQ中比较好处理,对于Classic经典队列,直接将队列声明成为持久化队列即可。而新增的Quorum队列和Stream队列,都是明显的持久化队列,能更好的保证服务端消息不会丢失。
3》 RabbitMQ 主从消息同步时不丢消息
这涉及到RabbitMQ的集群架构。首先他的普通集群模式,消息是分散存储的,不会主动进行消息同步了,是有可能丢失消息的。而镜像模式集群,数据会主动在集群各个节点当中同步,这时丢失消息的概率不会太高。另外,启用Federation联邦机制,给包含重要消息的队列建立一个远端备份,也是一个不错的选择。
4》 RabbitMQ消费者不丢失消息
RabbitMQ在消费消息时可以指定是自动应答,还是手动应答。如果是自动应答模式,消费者会在完成业务处理后自动进行应答,而如果消费者的业务逻辑抛出异常,RabbitMQ会将消息进行重试,这样是不会丢失消息的,但是有可能会造成消息一直重复消费。将RabbitMQ的应答模式设定为手动应答可以提高消息消费的可靠性。
1.对比见下表
– | -RocketMQ- | -RabbitMQ- |
---|---|---|
生产者确认机制 | 1.同步发送时只要send()方法没有抛出异常,就可以认为消息发送成功,即消息队列Broker成功接受到了消息。2.使用异步发送方式时记得重写SendCallback类的两个方法,在onSuccess()方法中更新消息的发送状态为发送成功,只要不发生异常且回调了onSuccess()方法也可以认为成功发送到了Broker,自动重试机制 | 1.事务方式 生产者在发送数据之前开启事物,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会受到异常报错,这时就可以回滚事物,然后尝试重新发送;如果收到了消息,那么就可以提交事物 2.confirm模式 ,生产者设置开启了 confirm 模式之后,每次写的消息都会分配一个唯一的 ID,然后如何写入了 RabbitMQ 之中,RabbitMQ 会给你回传一个 ack 消息,告诉你这个消息发送 OK 了;如果RabbitMQ 没能处理这个消息,会回调你一个 nack 接口,告诉你这个消息失败了,你可以进行重试– |
-broker确认机制- | 1.同步刷盘在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写 成功的状态;2.异步刷盘在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入 | -1.持久化 将 queue 的持久化标识 durable 设置为 true,代表是一个持久的队列;发送消息的时候将 deliveryMode = 2,这样消息就会被设为持久化方式,此时 RabbitMQ 就会将消息持久化到磁盘上;这个持久化配置可以和 confirm 机制配合使用,在消息持久化磁盘后,再给生产者发送一个 ack 信号。这样,如果消息持久化磁盘之前,RabbitMQ 阵亡了,那么生产者收不到 ack 信号,生产者会自动重发- |
– | – | – |
消费者确认机制 | 1.消费者拉取消息进行本地业务处理,业务处理完成才能提交ACK ConsumeConcurrentlyStatus.CONSUME_SUCCESS,切不可先提交ACK再进行业务处理。如果业务处理出现异常情况,可以先返回ConsumeConcurrentlyStatus.RECONSUME_LATER等待消息队列的下次重试,消息队列 RocketMQ 默认允许每条消息最多重试 16 次,如果消息重试 16 次后仍然失败,消息将不再投递。如果严格按照上述重试时间间隔计算,某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递。此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。在消息队列 RocketMQ 中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。死信队列里的消息有效期与正常消息相同,均为3天。3天后会被自动删除。针对这种情况,为了不丢失消息我们需要处理死信队列里的消息,有消息进入死信队列,意味着某些问题导致消费者无法正常消费消息,因此,通常需要人工介入对其进行特殊处理。排查可疑因素并解决问题后,可以在消息队列 RocketMQ 控制台重新发送该消息让消费者重新消费一次,或者直接让专门的消费者订阅死信队列进行消费 | 关闭 autoAck 功能,启用手动确认模式;每次在确保处理完这个消息之后,在代码里手动调用 ack,以确保消息一定被消费;如果打开的的是 autoAck 模式,当消费者处理了数据以后,消费者会自动通知 RabbitMQ 数据已经被消费。如果此时不巧,消费者还没有处理完服务器就宕机了,而 RabbitMQ 和 消费者都以为消息已经被消费,不会触发重发机制,这就会造成消息丢失。介绍下,消费者端移动有3种模式可供选择:自动确认模式:消费者挂掉,待 ack 的消息回归到队列中。消费者抛出异常,消息会不断的被重发,直到处理成功。不会丢失消息,即便服务挂掉,没有处理完成的消息会重回队列,但是异常会让消息不断重试。手动确认模式:如果消费者来不及处理就死掉时,没有响应 ack 时会重复发送一条信息给其他消费者;如果监听程序处理异常了,且未对异常进行捕获,会一直重复接收消息,然后一直抛异常;如果对异常进行了捕获,但是没有在 finally 里ack,也会一直重复发送消息(重试机制)。不确认模式:acknowledge=“none” 不使用确认机制,只要消息发送完成会立即在队列移除,无论客户端异常还是断开,只要发送完就移除,不会重发 |
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。