赞
踩
OffserStore和信息存储位置
实际运行系统,难免会遇到重新消费某条消息,跳过一段时间内的消息等情况。这些异常情况的处理都和offset有关。本节主要分析存储位置以及如何根据需要调整offset的值。
首先先来明确一下offset的含义,rocketmq中一种类型的消息会放到一个Topic里,为了能够并行,一般一个Topic会有多个message queue,offset是某个Topic下的一条消息在某个message queue里的位置,通过offset 的值可以定位到这条消息或者指示Consume从这条消息开始向后继续处理。
如下是offset的类结构,主要分为本地文件类型和broker代存类型两种。对于DefaultMQPushConsume 来说,默认是集群模式,也就是同一个消费者组里的多个消费者每个人消费一部分,各自收到的消息内容就不一样,这种情况下,由broker端存储和控制offset的值,使用RomoteBrokerOffsetStore 结构。
备注:(Diagrams可以展示父类,实现类可以用实现implemention或者光标移到该类上右击Browse Type Hierarchy)
在广播模式下,每个消费者都收到这个topic的全部消息,各个消费者之间互不干扰,rocketmq使用LocalFileOffsetStore,把offset存到本地,offsetStroe 使用json格式存储,简单明了。
在使用DefaultMQPushConsume 的时候,我们不用关心OffsetStore的事,但是如果是PullConsume,我们就要自己处理OffsetStore了。在上一篇博文中,pullConsume的示例中,代码把offset存到了内存,没有持久化存储,这样就有可能因为程序的异常或者重启而丢失offset,实际应用中并不推荐这么做。为了能让我们看清楚OfsetStore究竟是何物?
1.首先通过广播模式去消费某条topic 中的消息(LocalFileOffsetStore文件类型是以本地存储的,必须要是广播模式下,广播模式下接受该topic所有的消息,包括历史消息)
- public class ProducerQuickStart {
-
- public static void main(String[] args) throws MQClientException,InterruptedException {
- DefaultMQProducer producer = new DefaultMQProducer("unique_producer_group__name");
- producer.setInstanceName("instance1");
- producer.setRetryTimesWhenSendFailed(3);//重试次数 192.168.138.47 192.168.142.133 192.168.0.102
- producer.setNamesrvAddr("192.168.139.151:9876");//多个用;分割 192.168.138.47
- producer.start();
- for (int i = 0; i < 1; i++) {
- Date date = new Date();
- SimpleDateFormat sdf = new SimpleDateFormat();
- String format = sdf.format(date);
- System.out.println("准备发送:" + format);
- Message message = new Message("topicName", String.valueOf(i),format.getBytes());
- SendResult sendResult= new SendResult();
- try {
- sendResult = producer.send(message);
- } catch (RemotingException |MQBrokerException | InterruptedException e) {
- System.out.println("消息发送失败:" + sendResult.getMsgId());
- e.printStackTrace();
- }
- System.out.println("key:"+i + "消息的发送结果为:" + sendResult.toString() + "消息ID为:" + sendResult.getMsgId());
- }
- producer.shutdown();
-
- }
-
-
- }
- public class ConsumeQuickStart {
-
- public static void main(String[] args) throws MQClientException {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("unique_consume_group_name");
- consumer.setNamesrvAddr("192.168.139.151:9876");//
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
- consumer.setMessageModel(MessageModel.BROADCASTING);//默认是集群模式
- consumer.subscribe("topicName",null);
- //如果要检查配置信息
- consumer.registerMessageListener((MessageListenerConcurrently) (listMsg, consumeConcurrentlyContext) -> {
- byte[] body = listMsg.get(0).getBody();
- try {
- String ms = new String(body,"utf-8");
- System.out.println(Thread.currentThread().getName()+"收到消息:" + ms);
- } catch (UnsupportedEncodingException e) {
- e.printStackTrace();
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- });
- consumer.start();
-
- }
-
- }
2. 查看LocalFileOffsetStore 本次文件存储的位置(LocalFileOffsetStore类的源码)
- public final static String LocalOffsetStoreDir = System.getProperty(
- "rocketmq.client.localOffsetStoreDir",
- System.getProperty("user.home") + File.separator + ".rocketmq_offsets");///Users/humingming/.rocketmq_offsets
-
- this.storePath = LocalOffsetStoreDir + File.separator + //
- this.mQClientFactory.getClientId() + File.separator + //
- this.groupName + File.separator + //
- "offsets.json";//+ 192.168.0.102@DEFAULT/unique_consume_group_name/offsets.json
我这里地址就是:/Users/humingming/.rocketmq_offsets/192.168.0.102@DEFAULT/unique_consume_group_name/offset.json
所以可以看出其大概组成就是:用户根目录 + .rocketmq_offsets + mq服务端IP地址@? + 消费者组名 + offset.json,因为我消费端起了两个服务(之前向两个服务端发送过两次消息,所以会有两个json文件)
- {
- "offsetTable":{{
- "brokerName":"humingmingdeMacBook-Pro.local",
- "queueId":3,
- "topic":"topicName"
- }:8,{
- "brokerName":"humingmingdeMacBook-Pro.local",
- "queueId":1,
- "topic":"topicName"
- }:12,{
- "brokerName":"humingmingdeMacBook-Pro.local",
- "queueId":2,
- "topic":"topicName"
- }:8,{
- "brokerName":"humingmingdeMacBook-Pro.local",
- "queueId":0,
- "topic":"topicName"
- }:31
- }
- }
-
- // 可以看出当前消费端已经消费 各个队列最大offset {key:messagqqueue value:bigOffset}
值得注意的一点就是:因为广播模式下是offset存储在本地,当有消息发送过来时,此时服务端只要没收到消息或者消息接受失败,自然是不会去更改本地存储的OffsetStore 的json文件的,所以你下一次消息消费时候,会先去看该消息队列的最大offset,然后在看本地json文件的该消息队列的value值,如果相同则认为没有新的消息,如果队列的offset > json文件该队列的value 则认为有新的消息过来,则会去更新或者新增此文件,所以我们可以主动去更改json文件的value值来让消费端收不到消息或者重复消费之前消费过的消息。目前据我所知道这仅仅限定于广播模式。
那么问题来了,因为我们之前有过利用pull模式去拉取消息,但是当时仅仅是将每个队列的offset存于内存中,比如(这里是存于map)
- package rocketmq.day04;
-
- import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
- import com.alibaba.rocketmq.client.consumer.PullResult;
- import com.alibaba.rocketmq.client.exception.MQClientException;
- import com.alibaba.rocketmq.common.message.MessageExt;
- import com.alibaba.rocketmq.common.message.MessageQueue;
-
- import java.io.UnsupportedEncodingException;
- import java.util.List;
- import java.util.Set;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.atomic.AtomicLong;
-
- /**
- * @author heian
- * @create 2020-01-10-11:14 上午
- * @description
- */
- public class OffsetPersistence {
-
- private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable = new ConcurrentHashMap<>();
-
-
- private long getMessageQueueOffset(MessageQueue mq) {
- AtomicLong atomicLong = offsetTable.get(mq);
- if (atomicLong != null)
- return atomicLong.get();
- return 0;
- }
-
- private void putMessageQueueOffset(MessageQueue mq, long offset) {
- offsetTable.put(mq, new AtomicLong(offset));
- }
-
- public void pullMsg() throws MQClientException {
- DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("pull_consume_group");
- consumer.setNamesrvAddr("192.168.139.151:9876");// 192.168.0.102
- consumer.start();
- Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("topicName");
- for (int j=0;j<=0;j++){//不断轮询 演示的话就调一次算了
- for (MessageQueue mq : mqs) {
- SINGLE_MQ:
- while (true) {
- try {
- //第一次拉去将获得队列的所有消息写入到数据库,保存当时的 nextBeginOffset,
- //下一次轮询的时候,在与该队列比较其nextBeginOffset 如果变化了,则说明有消息进来了,则返回FOUND状态
- //我这里是100 一拉取 假设某条队列的nextBeginOffset=12 则会去拉去两次 一次拉取8 状态为FOUND,第二次拉取4 状态为FOUND 第三次拉去0 状态为NO_NEW_MSG
- PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 100);
- System.out.println("拉取结果:"+pullResult + "队列为:" +mq );
- putMessageQueueOffset(mq, pullResult.getNextBeginOffset());//可以存在数据库或者redis中 持久化
- switch (pullResult.getPullStatus()) {
- case FOUND:
- List<MessageExt> listMsg = pullResult.getMsgFoundList();
- for (int i = 0; i < listMsg.size(); i++) {
- byte[] body = listMsg.get(i).getBody();
- try {
- String ms = new String(body,"utf-8");
- //System.out.println("收到消息:" + ms);
- } catch (UnsupportedEncodingException e) {
- e.printStackTrace();
- }
- }
- break;
- case NO_MATCHED_MSG:
- break;
- case NO_NEW_MSG:
- break SINGLE_MQ;
- case OFFSET_ILLEGAL:
- break;
- default:
- break;
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
- }
-
- public static void main(String[] args) throws MQClientException {
- OffsetPersistence offsetPersistence = new OffsetPersistence();
- offsetPersistence.pullMsg();
- }
-
- }
那么我们怎么将内存中的map持久化到硬盘中呢,其实参考源码LocalFileOffsetStore类也不难,无非就是将map转为字符串,将字符串写入到file.json中,所以改造下如下:
- package rocketmq.day04;
-
- import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
- import com.alibaba.rocketmq.client.consumer.PullResult;
- import com.alibaba.rocketmq.client.consumer.store.OffsetSerializeWrapper;
- import com.alibaba.rocketmq.client.exception.MQClientException;
- import com.alibaba.rocketmq.common.MixAll;
- import com.alibaba.rocketmq.common.message.MessageExt;
- import com.alibaba.rocketmq.common.message.MessageQueue;
-
- import java.io.IOException;
- import java.io.UnsupportedEncodingException;
- import java.util.List;
- import java.util.Map;
- import java.util.Set;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.atomic.AtomicLong;
-
- /**
- * @author heian
- * @create 2020-01-10-11:14 上午
- * @description
- */
- public class OffsetPersistence2 {
-
- private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable = new ConcurrentHashMap<>();
- //offsetStore 持久化地址 这里我就写死了
- private static final String groupName="pull_consume_group";
- private static final String storePath = "/Users/humingming/.rocketmq_offsets/"+groupName+"/offset.json";
-
-
- private long getMessageQueueOffset(MessageQueue mq) {
- AtomicLong atomicLong = offsetTable.get(mq);
- if (atomicLong != null)
- return atomicLong.get();
- return 0;
- }
-
- /**
- * 通过message和offset 得到map (存在此条消息则put增加,不存在则去set值更新)
- */
- public void putMessageQueueOffset(final MessageQueue mq,long offset){
- if (mq != null){
- AtomicLong offsetOld = this.offsetTable.get(mq);
- if (offsetOld == null){
- this.offsetTable.put(mq,new AtomicLong(offset));
- }else {
- offsetOld.set(offset);
- }
- }
- }
-
- //将set集合中的消息队列与公共变量的消息队列求并集,并存到对应的磁盘中
- public void persistAll(Set<MessageQueue> mqs){
- if (null == mqs || mqs.isEmpty()){
- return;
- }
- OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();
- //map中的示例逐个与set集合中比较,有则将map中的消息存入 offsetSerializeWrapper(实际上就是一个可以序列化的map)
- for (Map.Entry<MessageQueue,AtomicLong> entry:this.offsetTable.entrySet()){
- if (mqs.contains(entry.getKey())){
- AtomicLong offset = entry.getValue();
- offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);
- }
- }
- String jsonStr = offsetSerializeWrapper.toJson();
- if (jsonStr != null){
- try {
- MixAll.string2File(jsonStr,this.storePath);
- }catch (IOException e){
- e.printStackTrace();
- }
- }
- }
-
-
- public void pullMsg() throws MQClientException {
- DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("pull_consume_group");
- consumer.setNamesrvAddr("192.168.139.151:9876");// 192.168.0.102
- consumer.start();
- Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("topicName");
- for (int j=0;j<=0;j++){
- for (MessageQueue mq : mqs) {
- SINGLE_MQ:
- while (true) {
- try {
- //第一次拉去将获得队列的所有消息写入到数据库,保存当时的 nextBeginOffset,
- //下一次轮询的时候,在与该队列比较其nextBeginOffset 如果变化了,则说明有消息进来了,则返回FOUND状态
- //我这里是100 一拉取 假设某条队列的nextBeginOffset=12 则会去拉去两次 一次拉取8 状态为FOUND,第二次拉取4 状态为FOUND 第三次拉去0 状态为NO_NEW_MSG
- PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 100);
- System.out.println("拉取结果:"+pullResult + "队列为:" +mq );
- putMessageQueueOffset(mq, pullResult.getNextBeginOffset());//将队列信息存于map
- switch (pullResult.getPullStatus()) {
- case FOUND:
- List<MessageExt> listMsg = pullResult.getMsgFoundList();
- for (int i = 0; i < listMsg.size(); i++) {
- byte[] body = listMsg.get(i).getBody();
- try {
- String ms = new String(body,"utf-8");
- //System.out.println("收到消息:" + ms);
- } catch (UnsupportedEncodingException e) {
- e.printStackTrace();
- }
- }
- break;
- case NO_MATCHED_MSG:
- break;
- case NO_NEW_MSG:
- break SINGLE_MQ;
- case OFFSET_ILLEGAL:
- break;
- default:
- break;
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- //持久化
- persistAll(mqs);
- }
- }
-
-
-
- public static void main(String[] args) throws MQClientException {
- OffsetPersistence2 offsetPersistence = new OffsetPersistence2();
- offsetPersistence.pullMsg();
- }
-
- }
所以会在/Users/humingming/.rocketmq_offsets/pull_consume_group 生成一个json文件 如下:
控制台打印信息如下:
- 拉取结果:PullResult [pullStatus=FOUND, nextBeginOffset=8, minOffset=0, maxOffset=8, msgFoundList=8]队列为:MessageQueue [topic=topicName, brokerName=bogon, queueId=2]
- 拉取结果:PullResult [pullStatus=NO_NEW_MSG, nextBeginOffset=8, minOffset=0, maxOffset=8, msgFoundList=0]队列为:MessageQueue [topic=topicName, brokerName=bogon, queueId=2]
- 拉取结果:PullResult [pullStatus=FOUND, nextBeginOffset=12, minOffset=0, maxOffset=12, msgFoundList=12]队列为:MessageQueue [topic=topicName, brokerName=bogon, queueId=1]
- 拉取结果:PullResult [pullStatus=NO_NEW_MSG, nextBeginOffset=12, minOffset=0, maxOffset=12, msgFoundList=0]队列为:MessageQueue [topic=topicName, brokerName=bogon, queueId=1]
- 拉取结果:PullResult [pullStatus=FOUND, nextBeginOffset=32, minOffset=0, maxOffset=38, msgFoundList=32]队列为:MessageQueue [topic=topicName, brokerName=bogon, queueId=0]
- 拉取结果:PullResult [pullStatus=FOUND, nextBeginOffset=38, minOffset=0, maxOffset=38, msgFoundList=6]队列为:MessageQueue [topic=topicName, brokerName=bogon, queueId=0]
- 拉取结果:PullResult [pullStatus=NO_NEW_MSG, nextBeginOffset=38, minOffset=0, maxOffset=38, msgFoundList=0]队列为:MessageQueue [topic=topicName, brokerName=bogon, queueId=0]
- 拉取结果:PullResult [pullStatus=FOUND, nextBeginOffset=8, minOffset=0, maxOffset=8, msgFoundList=8]队列为:MessageQueue [topic=topicName, brokerName=bogon, queueId=3]
- 拉取结果:PullResult [pullStatus=NO_NEW_MSG, nextBeginOffset=8, minOffset=0, maxOffset=8, msgFoundList=0]队列为:MessageQueue [topic=topicName, brokerName=bogon, queueId=3]
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。