赞
踩
消息可能在哪些阶段丢失呢?可能会在这三个阶段发生丢失:生产阶段
、存储阶段
、消费阶段
所以要从这三个阶段考虑:
在生产阶段,主要通过请求确认机制,来保证消息的可靠传递。
OK
,表示消息成功发送到了Broker
,如果响应失败,或者发生其它异常,都应该重试。Broker
存储成功。存储阶段,可以通过配置可靠性优先的 Broker
参数来避免因为宕机丢消息,简单说就是可靠性优先的场景都应该使用同步
消息只要持久化到CommitLog
(日志文件)中,即使Broker
宕机,未消费的消息也能重新恢复再消费。
Broker
的刷盘机制:同步刷盘和异步刷盘,不管哪种刷盘都可以保证消息一定存储在pagecache
中(内存中),但是同步刷盘更可靠,它是Producer
发送消息后等数据持久化到磁盘之后再返回响应给Producer
Broker
通过主从模式来保证高可用,Broker
支持Master
和Slave
同步复制、Master
和Slave
异步复制模式,生产者的消息都是发送给Master
,但是消费既可以从Master
消费,也可以从Slave
消费。同步复制模式可以保证即使Master
宕机,消息肯定在Slave
中有备份,保证了消息不会丢失。
从Consumer
角度分析,如何保证消息被成功消费?
Consumer
保证消息成功消费的关键在于确认的时机,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。因为消息队列维护了消费的位置,逻辑执行失败了,没有确认,再去队列拉取消息,就还是之前的一条。
对分布式消息队列来说,同时做到确保一定投递和不重复投递是很难的,就是所谓的有且仅有一次
。RocketMQ
择了确保一定投递,保证消息不丢失,但有可能造成消息重复。
处理消息重复问题,主要有业务端自己保证,主要的方式有两种:业务幂等
和消息去重
业务幂等
:第一种是保证消费逻辑的幂等性,也就是多次调用和一次调用的效果是一样的。这样一来,不管消息消费多少次,对业务都没有影响。消息去重
:第二种是业务端,对重复的消息就不再消费了。这种方法,需要保证每条消息都有一个惟一的编号,通常是业务相关的,比如订单号,消费的记录需要落库,而且需要保证和消息确认这一步的原子性消费记录表
,拿到这个消息做数据库的insert操作。给这个消息做一个唯一主键(primary key)或者唯一约束,那么就算出现重复消费的情况,就会导致主键冲突,那么就不再处理这条消息。发生了消息积压,这时候就得想办法赶紧把积压的消息消费完,就得考虑提高消费能力,一般有两种办法:
消费者扩容
:如果当前Topic
的Message Queue
的数量大于消费者数量,就可以对消费者进行扩容,增加消费者,来提高消费能力,尽快把积压的消息消费玩。消息迁移Queue扩容
:如果当前Topic的Message Queue
的数量小于或者等于消费者数量,这种情况,再扩容消费者就没什么用,就得考虑扩容Message Queue
。可以新建一个临时的Topic
,临时的Topic
多设置一些Message Queue
,然后先用一些消费者把消费的数据丢到临时的Topic
,因为不用业务处理,只是转发一下消息,还是很快的。接下来用扩容的消费者去消费新的Topic
里的数据,消费完了之后,恢复原状。顺序消息是指消息的消费顺序
和产生顺序
相同,在有些业务逻辑下,必须保证顺序,比如订单的生成、付款、发货,这个消息必须按顺序处理才行。
顺序消息分为全局顺序消息
和部分顺序消息
:
Topic
下的所有消息都要保证顺序;一个队列
上发送和消费,因此只适用于少量队列(通常是1个队列,否则就无法做到全局顺序)分片键
)进行分区,相同的消息会被发送到同一队列中,从而在每个分区内部实现顺序。部分顺序消息相对比较好实现,生产端需要做到把同 ID
的消息发送到同一个 Message Queue
;在消费过程中,要做到从同一个Message Queue
读取的消息顺序处理——消费端不能并发处理顺序消息,这样才能达到部分有序。
发送端使用 MessageQueueSelector
类来控制 把消息发往哪个 Message Queue
import lombok.Data; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.exception.RemotingException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; public class Producer { public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); String[] tags= new String[]{"TagA","TagC","TagD"}; //订单列表 List<OrderStep> orderList = new Producer().buildOrders(); Date date= new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String dateStr = sdf.format(date); for(int i=0;i<10;i++){ //加时间前缀 String body= dateStr+"Hello RocketMQ"+orderList.get(i); // 或者使用冒号的方式发送tag // Message msg = new Message(topic + ":" + tag, body.getBytes()); Message msg = new Message("TopicTest",tags[i % tags.length],"KEY"+i,body.getBytes()); SendResult sendResult = producer.send(msg,new MessageQueueSelector(){ @Override public MessageQueue select(List<MessageQueue> mqs,Message msg,Object arg){ Long id = (Long)arg; long index = id%mqs.size(); return mqs.get((int)index); } },orderList.get(i).getOrderId()); System.out.println(String.format("SendResult status:%s,queueId:%d,body:%s",sendResult.getSendStatus(), sendResult.getMessageQueue().getQueueId(), body)); } producer.shutdown(); } @Data private static class OrderStep{ private long orderId; private String desc; } private List<OrderStep> buildOrders(){ List<OrderStep> orderList = new ArrayList<>(); OrderStep order = new OrderStep(); order.setOrderId(1); order.setDesc("创建"); orderList.add(order); order = new OrderStep(); order.setOrderId(2); order.setDesc("创建"); orderList.add(order); order = new OrderStep(); order.setOrderId(3); order.setDesc("创建"); orderList.add(order); order = new OrderStep(); order.setOrderId(4); order.setDesc("创建"); orderList.add(order); return orderList; } }
消费端通过使用 MessageListenerOrderly
来解决单 Message Queue
的消息被并发处理的问题
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListener; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; import java.util.Random; import java.util.concurrent.TimeUnit; public class ConsumerInOrder { public static void main(String[] args) throws Exception{ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3"); consumer.setNamesrvAddr("127.0.0.1:9876"); /* * 设置consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 * 如果不是第一次启动,那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest","TagA || TagC || TagD"); consumer.registerMessageListener(new MessageListenerOrderly() { Random random = new Random(); @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext context) { context.setAutoCommit(true); for(MessageExt msg : list){ //可以看到每一个queue有一个consume线程来消费,订单对每个queue(分区)有序 System.out.println("consumeThread = "+ Thread.currentThread().getName()+"queueId="+ msg.getQueueId()+",content:"+new String(msg.getBody())); } try{ TimeUnit.SECONDS.sleep(random.nextInt(10)); }catch (Exception e){ e.printStackTrace(); } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
点击此处了解 SpringBoot 整合RocketMQ指定部分顺序讲解
RocketMQ
默认情况下不保证顺序,比如创建一个 Topic
,默认八个写队列,八个读队列,这时候一条消息可能被写入任意一个队列里;在数据的读取过程中,可能有多个 Consumer
,每个 Consumer
也可能启动多个线程并行处理,所以消息被哪个 Consumer
消费,被消费的顺序和写人的顺序是否一致是不确定的。
要保证全局顺序消息, 需要先把 Topic
的读写队列数设置为 一,然后Producer Consumer
的并发设置,也要是一。简单来说,为了保证整个 Topic
全局消息有序,只能消除所有的并发处理,各部分都设置成单线程处理 ,这时候就完全牺牲RocketMQ
的高并发、高吞吐的特性了。
有两种方案:
Broker
端按照 Consumer
的去重逻辑进行过滤,这样做的好处是避免了无用的消息传输到 Consumer
端,缺点是加重了 Broker
的负担,实现起来相对复杂。Consumer
端过滤,比如按照消息设置的 tag
去重,这样的好处是实现起来简单,缺点是有大量无用的消息到达了 Consumer
端只能丢弃不处理。Cosumer
端过滤,如果希望提高吞吐量,可以采用Broker
过滤。对消息的过滤有三种方式:
Tag
过滤DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
SQL
表达式过滤SQL
表达式过滤更加灵活DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// 只有订阅的消息有这个属性a, a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
Filter Server
方式:最灵活,也是最复杂的一种方式,允许用户自定义函数进行过滤消费端队列存储的是 tag
的 hashcode
,众所周知,不同的字符串得到的hashcode
值可能一样,故在服务端是无法精确对消息进行过滤的,所以在RocketMQ
中会进行两次消息过滤。
当客户端向服务端拉取消息时,服务端在返回消息之前,会先根据hashcode
进行过滤,然后客户端收到服务端的消息后,再根据消息的tag
字符串进行精确过滤。
那么为什么会消息丢失呢,其实就是消息对接的负载均衡机制。
在
RocketMQ
中使用集群模式消费时,同一个消费组中的多个消费者共同完成主题中的队列的消费,即一个消费者只会分配到其中某几个队列,并且同一时间,一个队列只会分配给一个消费者
问题的核心关键是,同一个tag
会分布在不同的队列中,但消费者C1分配到的队列为q0,q1,q0,q1中有taga和tagb的消息,但tagb的消息会被消费者C1过滤,但这部分消息却不会被C2消费,造成了消息丢失。
所以,rocketmq
中所有消费者的订阅关系必须保持一致。
解决上述问题新建consumer组
独立消费即可
电商的订单超时自动取消,就是一个典型的利用延时消息的例子,用户提交了一个订单,就可以发送一个延时消息,1h
后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
RocketMQ
是支持延时消息的,只需要在生产消息的时候设置消息的延时级别:
// 实例化一个生产者来产生延时消息
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// 启动生产者
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
message.setDelayTimeLevel(3);
// 发送消息
producer.send(message);
}
但是目前RocketMQ
支持的延时级别是有限的:
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
那么RocketMQ
怎么实现延时消息的
简单,八个字:临时存储
+定时任务
Broker
收到延时消息了,会先发送到主题(SCHEDULE_TOPIC_XXXX
)的相应时间段的Message Queue
中,然后通过一个定时任务轮询这些队列,到期后,把消息投递到目标Topic
的队列中,然后消费者就可以正常消费这些消息。
半消息
:是指暂时还不能被Consumer
消费的消息,Producer
成功发送到 Broker
端的消息,但是此消息被标记为 暂不可投递
状态,只有等 Producer
端执行完本地事务后经过二次确认了之后,Consumer
才能消费此条消息。
依赖半消息,可以实现分布式消息事务,其中的关键在于二次确认以及消息回查:
RocketMQ
实现消息事务 :
Producer
向 broker
发送半消息Producer
端收到响应,消息发送成功,此时消息是半消息,标记为 不可投递
状态,Consumer
消费不了。Producer
端执行本地事务。Producer
向Broker
发送Commit/Rollback
,如果是Commit
,Broker
端将半消息标记为正常消息,Consumer
可以消费,如果是Rollback
,Broker
丢弃此消息。Broker
端迟迟等不到二次确认。在一定时间后,会查询所有的半消息,然后到Producer
端查询半消息的执行情况。Producer
端查询本地事务的状态commit/rollback
到 broker
端。(5,6,7 是消息回查)死信队列
用于处理无法被正常消费的消息,即死信消息
当一条消息初次消费失败,消息队列 RocketMQ
会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ
不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中,该特殊队列称为死信队列
死信消息的特点:
死信队列的特点:
Group ID
, 而不是对应单个消费者实例。Group ID
未产生死信消息,消息队列 RocketMQ
不会为其创建相应的死信队列。Group ID
产生的所有死信消息,不论该消息属于哪个 Topic
RocketMQ
控制台提供对死信消息的查询、导出和重发的功能。
NameServer
因为是无状态,且不相互通信的,所以只要集群部署就可以保证高可用。
RocketMQ
的高可用主要是在体现在Broker
的读和写的高可用,Broker
的高可用是通过集群和主从实现的。
Broker
可以配置两种角色:Master
和Slave
,Master
角色的Broker
支持读和写
,Slave
角色的Broker
只支持读
,Master
会向Slave
同步消息。
也就是说Producer
只能向Master
角色的Broker
写入消息,Cosumer
可以从Master
和Slave
角色的Broker
读取消息。
Consumer
的配置文件中,并不需要设置是从 Master
读还是从 Slave
读,当 Master
不可用或者繁忙的时候, Consumer
的读请求会被自动切换到从 Slave
。有了自动切换 Consumer
这种机制,当一个 Master
角色的机器出现故障后,Consumer
仍然可以从 Slave
读取消息,不影响 Consumer
读取消息,这就实现了读的高可用。
如何达到发送端写的高可用性
Topic
的时候,把Topic
的多个Message Queue
创建在多个Broker
组上(相同 Broker
名称,不同 brokerId
机器组成 Broker
组),这样当 Broker
组的 Master
不可用后,其他组Master
仍然可用, Producer
仍然可以发送消息注意
:RocketMQ
目前还不支持Broker
把Slave
自动转成Master
,如果机器资源不足,需要把 Slave
转成 Master
,则要手动停止 Slave
色的 Broker
,更改配置文件,用新的配置文件启动 Broker
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。