赞
踩
马士兵教育:李瑾 https://www.bilibili.com/video/BV15L4y1F7kh?p=5&spm_id_from=pageDriver
目前,企业中应用广泛的是rocketmq-4.x版本
Kafka(由Scala语言编写,属于java生态圈,可运行在jvm上) scala -> 字节码
rocketMQ的前身metaQ,就是Kafka的java版本
下载rocketmq,去官网:https://rocketmq.apache.org/dowloading/releases/
配置环境变量
修改 runbroker.cmd文件, %CLASSPATH% 添加双引号
启动成功
GitHub上下载 rocketmq-dashboard-master:https://github.com/apache/rocketmq-dashboard
参照readme.md文件中的描述,打包,并运行
消息发送的步骤
同步发送:producer.send()后,需要等待发送返回结果,才能进行下一条消息的发送。会阻塞发送消息的线程
一般适用于需要确保消息发送成功的场景(重要的消息通知、短信通知、物流信息通知等)
可靠
/** * 同步发送 */ public class SyncProducer { public static void main(String[] args) throws MQClientException, InterruptedException { // 创建一个Producer实例 DefaultMQProducer producer = new DefaultMQProducer("group_test"); // 设置NameServer地址 producer.setNamesrvAddr("127.0.0.1:9876"); // 启动Producer实例 producer.start(); for (int i = 0; i < 10; i++) { try { // 创建消息:指定topic,tag,消息体 Message msg = new Message("TopicTest", // Topic (衣服) "TagA", // Tag 相当于二级目录 (男装/女装) ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) // 消息体 发送的消息都是字节数组 ); // 发送消息(同步方式) SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } // 如果不再发送消息,关闭Producer实例 producer.shutdown(); } }
追溯 producer.send(msg)方法
异步发送:producer.send(msg, new SendCallback(){ … }) ,SendCallback接收异步返回结果的回调。不会阻塞发送消息的线程
一般适用于消息量大,对响应时间比较敏感的场景。不能容忍长时间阻塞等待broker的响应
可靠
for (int i = 0; i < 10; i++) { // 创建消息:指定topic,tag,消息体 final int index = i; Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); // 发送消息(异步方式 SendCallback接收异步返回结果的回调) producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.printf("%s%n", sendResult); } @Override public void onException(Throwable e) { System.out.printf("%-10d Exception %s %n", index, e); } }); }
只发送消息,不需要得到mq的确认,不关心是否发送成功,不需要获取发送后的响应。这种发送方式是不可靠的,但是速度是最快的。
适合一些耗时短,对可靠性要求不是很高的场景(日志消息的记录)
不可靠
// 发送消息(单向发送)
producer.sendOneway(msg);
消费组中的consumer均摊消费消息,每条消息只会被消费组中一个实例消费
集群消费也是一般场景下默认的消费模式,消息只会被消费一次
消息的消费进度,是在mq服务端维护的,可靠性比较高
public class BalanceConsumer { public static void main(String[] args) throws MQClientException { // 实例化消息消费者,指定组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_consumer"); // 指定NameServer地址 consumer.setNamesrvAddr("127.0.0.1:9876"); // 订阅topic consumer.subscribe("TopicTest", "*"); // 设置消费模式 => 集群消费 负载均衡模式 consumer.setMessageModel(MessageModel.CLUSTERING); // 注册回调函数,处理消息 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); // 启动消费者实例 consumer.start(); System.out.printf("Consumer Started.%n"); } }
即消费组中每个实例都会拿到每一条消息,进行消费。消息会重复消费
消息消费进度的维护不在mq服务端,在consumer消费组端。不能处理消息的顺序消费
// 设置消费模式 => 广播消费
consumer.setMessageModel(MessageModel.BROADCASTING);
一个生产者,一个消费组,rocketmq的topic中只定义一个messageQueue
topic中有多个messageQueue,将顺序发送的消息进行标记,将标记同种颜色的消息顺序放入到对应的队列中,然后指定的消费者去订阅对应的队列,那么获取到的消息也是顺序的
生产消息时:
public class ProducerInOrder { public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { DefaultMQProducer producer = new DefaultMQProducer("OrderProducer"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); // 订单列表 List<Order> orderList = new ProducerInOrder().buildOrders(); for (int i = 0; i < orderList.size(); i++) { String body = orderList.get(i).toString(); Message msg = new Message("PartOrder", null, "KEY" + i, body.getBytes()); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { int id = (int) arg; // 根据订单id选择发送的queue long index = id % mqs.size(); return mqs.get((int) index); } }, orderList.get(i).getOrderId()); // 订单id System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s", sendResult.getSendStatus(), sendResult.getMessageQueue().getQueueId(), body)); } producer.shutdown(); } class Order{ private int orderId; private String desc; public int getOrderId() { return orderId; } public void setOrderId(int orderId) { this.orderId = orderId; } public String getDesc() { return desc; } public void setDesc(String desc) { this.desc = desc; } @Override public String toString() { return "Order{" + "orderId=" + orderId + ", desc='" + desc + '\'' + '}'; } } // 模拟生成订单数据 3个订单,每个订单4个状态 // 每个订单 创建->付款->推送->完成 private List<Order> buildOrders(){ List<Order> orderList = new ArrayList<>(); Order orderDemo = new Order(); orderDemo.setOrderId(001); orderDemo.setDesc("创建"); orderList.add(orderDemo); orderDemo = new Order(); orderDemo.setOrderId(002); orderDemo.setDesc("创建"); orderList.add(orderDemo); orderDemo = new Order(); orderDemo.setOrderId(001); orderDemo.setDesc("付款"); orderList.add(orderDemo); orderDemo = new Order(); orderDemo.setOrderId(003); orderDemo.setDesc("创建"); orderList.add(orderDemo); orderDemo = new Order(); orderDemo.setOrderId(002); orderDemo.setDesc("付款"); orderList.add(orderDemo); orderDemo = new Order(); orderDemo.setOrderId(003); orderDemo.setDesc("付款"); orderList.add(orderDemo); orderDemo = new Order(); orderDemo.setOrderId(002); orderDemo.setDesc("推送"); orderList.add(orderDemo); orderDemo = new Order(); orderDemo.setOrderId(003); orderDemo.setDesc("推送"); orderList.add(orderDemo); orderDemo = new Order(); orderDemo.setOrderId(002); orderDemo.setDesc("完成"); orderList.add(orderDemo); orderDemo = new Order(); orderDemo.setOrderId(001); orderDemo.setDesc("推送"); orderList.add(orderDemo); orderDemo = new Order(); orderDemo.setOrderId(001); orderDemo.setDesc("完成"); orderList.add(orderDemo); orderDemo = new Order(); orderDemo.setOrderId(003); orderDemo.setDesc("完成"); orderList.add(orderDemo); return orderList; } }
消费消息时:
public class ConsumerInOrder { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderConsumer"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.subscribe("PartOrder", "*"); consumer.registerMessageListener(new MessageListenerOrderly() { Random random = new Random(); @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { context.setAutoCommit(true); for(MessageExt msg: msgs){ // 可以看到每个queue有唯一的consume线程来消费,订单对每个queue(分区)有序 System.out.println("consumeThread="+Thread.currentThread().getName()+" , queueId="+msg.getQueueId() + " , content="+new String(msg.getBody())); } try{ // 模拟业务逻辑处理中... TimeUnit.MILLISECONDS.sleep(random.nextInt(300)); }catch (Exception e){ e.printStackTrace(); // 这里要注意:意思是先等一会儿,一会儿再处理这批消息,而不是放到重试队列中。 // 直接放入重试队列,会导致消息的顺序性被破坏 return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
生产端打印:
消费端打印:
延时消息:指生产者将消息投递给rocketmq,并不期望消息立马投递给消费者。而是希望延时一段时间,再投递给消费者
延时消息有很多应用场景(比如:购买电影票,选好座位后,需要发送一个延时通知。避免过长时间,用户选了座位,但是未支付。就需要通知用户进行支付处理。如果用户已经支付了,就可以清除消息;电商交易系统的订单超时未支付,自动取消订单)
生产端:
生产端
public class ScheduledMessageProducer { public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { // 初始化producer实例 DefaultMQProducer producer = new DefaultMQProducer("ScheduledProducer"); // 设置namserver地址 producer.setNamesrvAddr("127.0.0.1:9876"); // 启动producer实例 producer.start(); int totalMessageToSend = 10; for (int i = 0; i < totalMessageToSend ; i++) { // 包装消息 Message message = new Message("ScheduledTopic", ("Hello scheduled message " + i).getBytes()); // 设置延时等级为4,这个消息将在30s之后投递给消费者 // delayTimeLevel: (1-18个等级) “ 1s 5s 10s 30s 1min 2min 3min 4min 5min 6min 7min 8min 9min 10min 20min 30min 1h 2h ” message.setDelayTimeLevel(4); // 发送消息 producer.send(message); } // 关闭producer实例 producer.shutdown(); } }
消费端
public class ScheduledMessageConsumer { public static void main(String[] args) throws MQClientException { // 初始化消费者实例,指定消费组名称 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ScheduledConsumer"); // 设置NameServer地址 consumer.setNamesrvAddr("127.0.0.1:9876"); // 订阅topic consumer.subscribe("ScheduledTopic", "*"); // 注册消息监听器 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for(MessageExt msg: msgs){ // 打印每条消息 接收时间-发送时间 = 延时的时间 System.out.println("Receive message[msgId=" + msg.getMsgId() + "]" + (msg.getStoreTimestamp()-msg.getBornTimestamp()) + "ms later"); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动消费者实例 consumer.start(); } }
批量消息的出现是因为,原来发送消息都是一条一条的发送,那么在大量消息发送的场景下,就容易出现性能瓶颈。所以,可以将一批消息打成一个包,做批量发送,可以显著提升发送消息的性能
批量消息的生产:
public class BatchProducer { public static void main(String[] args) throws MQClientException { DefaultMQProducer producer = new DefaultMQProducer("BatchProducer"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); String topic = "BatchTest"; // 一般,单批次的消息数据不要超过4MB,如果超过了4MB,rocketmq会出现性能瓶颈 List<Message> messages = new ArrayList<>(); messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 1".getBytes())); messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 2".getBytes())); messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 3".getBytes())); messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 4".getBytes())); messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 5".getBytes())); messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 6".getBytes())); try{ // 发送批量消息 producer.send(messages); }catch (Exception e){ producer.shutdown(); e.printStackTrace(); } producer.shutdown(); } }
public class BatchConsumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BatchConsumer"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.subscribe("BatchTest", "*"); consumer.setMessageModel(MessageModel.CLUSTERING); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Message: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
producer创建消息的时候,里面有一个tag的参数
consumer订阅topic消息的时候,第二个参数传入正则,可以根据tag的名称过滤消息
producer通过 msg.putUserProperty(“a”, String.valueOf(i)); 给消息设置用于sql过滤的属性
public class SqlFilterProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("SqlFilterProducer"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); String[] tags = new String[] {"TagA", "TagB", "TagC"}; for (int i = 0; i < 10; i++) { Message msg = new Message("SqlFilterTest", tags[i % tags.length], ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); // 设置SQL过滤的属性 msg.putUserProperty("a", String.valueOf(i)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } producer.shutdown(); } }
consumer通过 MessageSelector.bySql(“(TAGS is not null and TAGS in (‘TagA’, ‘TagB’)) and (a is not null and a between 0 and 3)”) ,以sql的方式进行消息的过滤筛选
public class SqlFilterConsumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SqlFilterConsumer"); consumer.setNamesrvAddr("127.0.0.1:9876"); // 设置消息的sql过滤条件 consumer.subscribe("SqlFilterTest", MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB')) and (a is not null and a between 0 and 3)")); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for(MessageExt msg: msgs){ String topic = msg.getTopic(); String msgBody = new String(msg.getBody(), StandardCharsets.UTF_8); String msgPro = msg.getProperty("a"); String tags = msg.getTags(); System.out.println("收到消息:" + " topic: " + topic + " , tags: " + tags + " ,a: " + msgPro + " ,msgBody: " + msgBody); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }
消息消费失败,就放入到重试队列中。可能会导致消费顺序性乱掉
为了确保消息的顺序消费,可以使用MessageListenerOrderly(顺序事件监听器),消费失败的时候,设置mq等一会儿,再处理。并不直接放入重试队列。
mq服务器端为保证流入的消息不丢失,会将消息进行持久化。为防止broker节点宕机,保证高可用,会采用集群的方式部署
一般场景会采用 多master多slave模式 同步复制 异步刷盘 的模式。【综合可靠性和性能】
RocketMQ因为有高可用的要求(宕机不丢失数据),所以要进行持久化存储,RocketMQ采用文件的方式进行消息数据的存储
设计思路:
当有一条消息从producer端发送到commitLog中,会有一个异步线程监听到,然后生成一个消息对应的索引,存入comsumequeue目录指定文件中。由于消息中包含(topic,tag,消息体字节数组),由此构造出每条消息对应的索引数据(每条消息的索引为20个字节,包括:8字节的commitLog offset偏移量,可以看作顺序放入到commitLog中的消息的位置下标;4字节的消息长度;8字节的tag的hashcode,这个值用于对消息进行二级过滤)
当consumer消费一条消息的时候,例如:
消费:TopicA Q1的消息(消费第2条消息)
查找消息的逻辑:
因此:整体去查询一条需要消费的消息,时间复杂度为O(1),查找效率非常快,所以消费速度也很快。
IndexFile文件中存每一条消息的hash值(消息key的hash值),方便进行消息的查找。
如果超过重试次数(默认为2,即总共三次机会)还是发送失败,就进行默认的规避策略(即认为之前选择的brokerA节点不可用,下次选择队列会去选择brokerB上的)
故障延迟机制策略更适合网络状况不是很好,网络波动比较大的场景
这两种情况都会出现问题。所以rocketMQ进行了优化
两阶段提交(2pc)
result = result && this.commitLog.load(); // ==> 追入load()方法。会调用一个this.mappedFileQueue.load()方法 public boolean load() { boolean result = this.mappedFileQueue.load(); log.info("load commit log " + (result ? "OK" : "Failed")); return result; } // ==> 追入load()方法。MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize); public boolean load() { File dir = new File(this.storePath); File[] files = dir.listFiles(); if (files != null) { // ascending order Arrays.sort(files); for (File file : files) { if (file.length() != this.mappedFileSize) { log.warn(file + "\t" + file.length() + " length not matched message store config value, please check it manually"); return false; } try { MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize); mappedFile.setWrotePosition(this.mappedFileSize); mappedFile.setFlushedPosition(this.mappedFileSize); mappedFile.setCommittedPosition(this.mappedFileSize); this.mappedFiles.add(mappedFile); log.info("load " + file.getPath() + " OK"); } catch (IOException e) { log.error("load file " + file + " error", e); return false; } } } return true; } // 追入new MappedFile(file.getPath(), mappedFileSize)。里面会调用一个init(fileName, fileSize)方法 public MappedFile(final String fileName, final int fileSize) throws IOException { init(fileName, fileSize); } // 追入init(fileName, fileSize)中 private void init(final String fileName, final int fileSize) throws IOException { this.fileName = fileName; this.fileSize = fileSize; this.file = new File(fileName); this.fileFromOffset = Long.parseLong(this.file.getName()); boolean ok = false; ensureDirOK(this.file.getParent()); try { // 文件通道 fileChannel this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel(); // FileChannel配合着ByteBuffer,将读写的数据缓存到内存中(操作大文件时可以显著提升效率) // MappedByteBuffer(零拷贝之内存映射:mmap) // FileChannel定义了一个map()方法,它可以把一个文件从position位置开始,size大小的区域映射为内存 this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize); // 原子操作类 -- CAS的原子操作类 -- 多线程效率(加锁) TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize); TOTAL_MAPPED_FILES.incrementAndGet(); ok = true; } catch (FileNotFoundException e) { log.error("Failed to create file " + this.fileName, e); throw e; } catch (IOException e) { log.error("Failed to map file " + this.fileName, e); throw e; } finally { if (!ok && this.fileChannel != null) { this.fileChannel.close(); } } }
MMAP零拷贝核心代码
map()方法 – 内存映射
mappedByteBuffer其实是磁盘上的一块区域,rocketmq把它当作内存使用。类似虚拟内存的概念。
数据的拷贝过程(接收网络数据,并落盘的过程):
一般CPU拷贝比较慢,DMA拷贝比较快
mmap只能减少第三次CPU拷贝,提升写入效率
sendFile
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
// Set the storage time
...
}
这个CompletableFuture是在rocketmq-4.7.0之后才有的。
同步的方式:
producer -->生产消息,发送到 --> Broker --> Broker会启动一个线程来处理消息(可能有多个业务处理步骤 1,2,3,4,5)。那么中间就需要阻塞,等待处理完的结果 --> 回复给 producer
rocketmq-4.7.0之后,采用CompletableFuture异步的方式:
启动一个线程来处理消息(主线程),不会阻塞。会启动一个子线程,拿到处理返回的结果后,就会响应(通过CompletableFuture.completedFuture()方法)–> 回复给 producer
在rocketmq集群架构下,有一种保证消息数据不丢失的机制 – 同步双写(2主2从)
从节点进行数据备份,同步复制主节点的消息数据。
返回之前还需要将Memory中的数据同步刷入磁盘中。
这个过程中,用到了很多CompletableFuture,来提升同步双写的性能。
异步刷盘建议使用自旋锁,同步刷盘建议使用重入锁
由于rocketmq消息写入CommitLog中,是单文件多队列的存储设计。那么同时有多个生产者往多个topic的队列中写入消息,对应的都是写入到同一个commitLog文件中。就会存在线程安全的问题。
因此commitLog采取锁的机制,来保证多线程并发写入的线程安全。
// CommitLog.java public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) { protected final PutMessageLock putMessageLock; // 构造函数 public CommitLog(final DefaultMessageStore defaultMessageStore) { ... // 默认使用new PutMessageSpinLock(),自旋锁 (乐观锁) // 也可设置成 可重入锁(悲观锁) // UseReentrantLockWhenPutMessage参数默认值是false,使用自旋锁。异步刷盘建议使用自旋锁,同步刷盘建议使用重入锁 this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock(); } ... // 会有多个线程并行处理,需要上锁 putMessageLock.lock(); //spin or ReentrantLock ,depending on store config try{ ... }finally{ putMessageLock.unlock(); // 解锁。标准的lock锁的方式 } } // PutMessageSpinLock.java public class PutMessageSpinLock implements PutMessageLock { //true: Can lock, false : in lock. private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true); @Override public void lock() { boolean flag; do { flag = this.putMessageSpinLock.compareAndSet(true, false); } while (!flag); } @Override public void unlock() { this.putMessageSpinLock.compareAndSet(false, true); } }
核心代码片段:
this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
rocketmq中:CommitLog初始化的时候,默认使用的是PutMessageSpinLock(自旋锁)。当然,也可配置成使用PutMessageReentrantLock(可重入锁)
这个需要结合场景。
自旋锁:不会有上下文的切换,获取不到锁资源时,会采用消耗cpu空转的方式等待。
可重入锁:有可能阻塞线程。发生上下文的切换。
同步刷盘建议使用重入锁。
因为同步刷盘下,多线程对锁资源的竞争很激烈,如果使用自旋,那么CAS失败的机率很高。CAS失败会自旋,导致对CPU的消耗过大。
异步刷盘建议使用自旋锁。
因为异步刷盘下,锁资源竞争小,使用自旋锁,可以减少上下文的切换,提高刷盘的效率。
// CommitLog.java public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) { ... result = mappedFile.appendMessage(msg, this.appendMessageCallback); } // 追入,MappedFile.java public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) { return appendMessagesInner(msg, cb); } // 追入 public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) { assert messageExt != null; assert cb != null; // 当前这个MappedFile的写入位置 int currentPos = this.wrotePosition.get(); if (currentPos < this.fileSize) { // 异步刷盘时,还有两种刷盘模式可以选择 // 如果writeBuffer != null 即开启了堆外内存缓冲,使用writeBuffer,否则使用mappedByteBuffer(也是继承的ByteBuffer) // slice() 方法,创建一个新的字节缓冲区 ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice(); byteBuffer.position(currentPos); AppendMessageResult result; if (messageExt instanceof MessageExtBrokerInner) { // 写入具体的数据 commitLog中的数据格式 result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt); } else if (messageExt instanceof MessageExtBatch) { result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt); } else { return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); } this.wrotePosition.addAndGet(result.getWroteBytes()); this.storeTimestamp = result.getStoreTimestamp(); return result; } log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize); return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); }
因为读写数据本身就是一个频次很高的操作。
堆内存:读写过程中,会产生大量数据 -> new 对象 -> GC -> 垃圾回收 -> 停顿 -> 效率低
堆外内存:使用的是本地内存 -> 手动的GC -> 没有停顿 -> 效率高
堆外内存也有一些缺点:
如果业务场景对消息写入的时效性要求很高,那么最好选择默认的写入模式
在rocketmq集群环境下,可以将一个topic对应的多个messageQueue分别放在不同的机器上,从而实现提高消费并发度的效果。
如果想确保消息的顺序消费,那么生产者、队列、消费者最好都是一对一的关系。这样设计,能够保证消息的顺序消费,但同时带来了性能瓶颈。(并发度不够)
rocketmq不解决消息顺序消费的问题,理由:
造成消息重复消费的根本原因是:网络波动。
rocketmq不保证消息的不重复,如果想在业务中严格确保消息不重复,需要在业务端进行去重:
三个核心特点:解耦、异步、削峰
结合公司的具体业务场景,描述业务场景,这个业务场景有什么挑战,如果不用MQ可能会很麻烦,用了MQ之后带来了哪些好处。
解耦:
优点就是,在特殊的场景下面有其对应的好处,如解耦、异步、削峰。
缺点:
所以消息队列实际是一种非常复杂的架构,引入它有很多好处,但是也得针对它带来的各种坏处,需要做各种额外的技术方案和架构来规避掉。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。