当前位置:   article > 正文

RocketMQ笔记_rocketmq延迟队列

rocketmq延迟队列

马士兵教育:李瑾 https://www.bilibili.com/video/BV15L4y1F7kh?p=5&spm_id_from=pageDriver

一、基础概念

1. RocketMQ的发展背景

image.png目前,企业中应用广泛的是rocketmq-4.x版本

image.png

Kafka(由Scala语言编写,属于java生态圈,可运行在jvm上) scala -> 字节码
rocketMQ的前身metaQ,就是Kafka的java版本

那么为什么Kafka性能那么好,还要引出rocketMQ呢?
  1. 在一些特殊场景下,Kafka的多队列多文件存储设计方式会存在瓶颈。如果topic量过多,会导致出现大量的partition,影响吞吐量。而rocketMQ,采用单文件多队列的存储设计。能够保证不管有多少个topic,每个topic有多少个队列,消息的写入性能是相当稳定的。
  2. Kafka在broker宕机后,会出现消息乱序。而rocketmq支持严格的消息顺序,在顺序消息的场景下,就算broker宕机,发送的消息会失败,但不会乱序。
  3. rocketmq扩展了很多新的功能,包括顺序消息,延时消息,批量消息的生产消费。
  4. rocketmq消费端消费失败支持重试。
  5. 总而言之,rocketmq和Kafka适用的领域不同,rocketmq更偏向复杂的业务场景

2. RocketMQ基本架构

基础架构

image.png

3. RocketMQ的基本概念

  • 分组(Group):生产者/消费者的分组。生产者分组一般没有特殊之处,消费组一般指定为订阅一类消息的消费者的集合。消费者group与生产者group之间没有任何关系。
  • 主题(Topic):标识一类消息(如淘宝中:电器、数码、衣服)
  • tag:producer发送的消息中有tag信息,用于对topic下的消息进行二级过滤(类似于 topic=>衣服 tag=> T恤)
  • 消息队列(Message Queue):一个topic一般会有一个或n个messageQueue。一般producer生产投递消息的速度特别快,但是consumer由于要进行业务处理,所以消费速率会慢一些。因此topic会将消息采用一些均衡的策略投递到不同的messageQueue中,consumer与messageQueue一一对应,consumer只消费监听的messageQueue上的消息,这样可以提高消息消费的并发度。

image.png

  • 偏移量(Offset):一般默认是指消费偏移量(consumerOffset),即当前topic中的消息,已经消费到哪一条了
    image.png

二、RocketMQ的下载,安装,使用

window下安装

  1. 下载rocketmq,去官网:https://rocketmq.apache.org/dowloading/releases/
    image.png

  2. 配置环境变量

image.png

  1. 启动nameServer:start mqnamesrv.cmd

image.png

  1. 如果启动broker报错:

image.png

修改 runbroker.cmd文件, %CLASSPATH% 添加双引号

image.png

  1. 启动broker:start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

image.png

启动成功

image.png

Linux下安装

  1. 下载rocketmq,官网:https://rocketmq.apache.org/dowloading/releases/

image.png

  1. 启动nameServer(后台启动的方式):nohup sh mqnamesrv &

image.png

  1. 修改 broker.config文件,添加 : brokerIP1=192.168.1.49 (外网可访问的IP)

image.png

  1. 启动borker:nohup sh mqbroker -c …/conf/broker.conf -n 192.168.1.49:9876 autoCreateTopicEnable=true &

image.png
image.png

源码安装

  1. 官网,下载rocketmq源码:

image.png

  1. 通过Idea打开,下载maven引用:mvn install -Dmaven.test.skip=true
  2. 启动nameserver,配置完环境变量后,启动

image.png
image.png

  1. 启动broker,配置环境变量,启动参数,再启动

image.png

控制台安装

GitHub上下载 rocketmq-dashboard-master:https://github.com/apache/rocketmq-dashboard
参照readme.md文件中的描述,打包,并运行
image.png

三、RocketMQ普通消息的发送

普通消息发送的三种方式

消息发送的步骤

  1. 创建消息生产者producer,并指定生产者组名
  2. 指定NameServer地址
  3. 启动producer
  4. 创建消息对象,指定topic,tag和消息体
  5. 发送消息
  6. 关闭生产者producer

同步发送

同步发送:producer.send()后,需要等待发送返回结果,才能进行下一条消息的发送。会阻塞发送消息的线程
一般适用于需要确保消息发送成功的场景(重要的消息通知、短信通知、物流信息通知等)
可靠

/**
 * 同步发送
 */
public class SyncProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        // 创建一个Producer实例
        DefaultMQProducer producer = new DefaultMQProducer("group_test");
        // 设置NameServer地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 启动Producer实例
        producer.start();

        for (int i = 0; i < 10; i++) {
            try {
                // 创建消息:指定topic,tag,消息体
                Message msg = new Message("TopicTest", // Topic (衣服)
                    "TagA", // Tag 相当于二级目录 (男装/女装)
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) // 消息体 发送的消息都是字节数组
                );
                // 发送消息(同步方式)
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        // 如果不再发送消息,关闭Producer实例
        producer.shutdown();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

追溯 producer.send(msg)方法

image.png

异步发送

异步发送:producer.send(msg, new SendCallback(){ … }) ,SendCallback接收异步返回结果的回调。不会阻塞发送消息的线程
一般适用于消息量大,对响应时间比较敏感的场景。不能容忍长时间阻塞等待broker的响应
可靠

for (int i = 0; i < 10; i++) {
    // 创建消息:指定topic,tag,消息体
    final int index = i;
    Message msg = new Message("TopicTest", "TagA",  ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
    // 发送消息(异步方式 SendCallback接收异步返回结果的回调)
    producer.send(msg, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.printf("%s%n", sendResult);
        }

        @Override
        public void onException(Throwable e) {
            System.out.printf("%-10d Exception %s %n", index, e);
        }
    });
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

单向发送

只发送消息,不需要得到mq的确认,不关心是否发送成功,不需要获取发送后的响应。这种发送方式是不可靠的,但是速度是最快的。
适合一些耗时短,对可靠性要求不是很高的场景(日志消息的记录)
不可靠

// 发送消息(单向发送)
producer.sendOneway(msg);
  • 1
  • 2

四、RocketMQ普通消息的消费

集群消费

消费组中的consumer均摊消费消息,每条消息只会被消费组中一个实例消费
集群消费也是一般场景下默认的消费模式,消息只会被消费一次
消息的消费进度,是在mq服务端维护的,可靠性比较高
image.png

public class BalanceConsumer {
    public static void main(String[] args) throws MQClientException {
        // 实例化消息消费者,指定组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_consumer");
        // 指定NameServer地址
        consumer.setNamesrvAddr("127.0.0.1:9876");
        // 订阅topic
        consumer.subscribe("TopicTest", "*");
        // 设置消费模式 => 集群消费 负载均衡模式
        consumer.setMessageModel(MessageModel.CLUSTERING);
        // 注册回调函数,处理消息
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        // 启动消费者实例
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

广播消费

即消费组中每个实例都会拿到每一条消息,进行消费。消息会重复消费
消息消费进度的维护不在mq服务端,在consumer消费组端。不能处理消息的顺序消费

image.png

// 设置消费模式 => 广播消费
consumer.setMessageModel(MessageModel.BROADCASTING);
  • 1
  • 2

五、特殊消息的发送和消费

顺序消息的生产和消费

  1. 全局顺序消息

一个生产者,一个消费组,rocketmq的topic中只定义一个messageQueue
image.png

  1. 部分顺序消息

topic中有多个messageQueue,将顺序发送的消息进行标记,将标记同种颜色的消息顺序放入到对应的队列中,然后指定的消费者去订阅对应的队列,那么获取到的消息也是顺序的
image.png

生产消息时:

  • 根据不同的消息id对消息队列数目进行取余运算。实现根据消息id选择投送消息的queue
  • producer.send(msg, new MessageQueueSelector{ … }),用到了消息队列选择器
public class ProducerInOrder {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        DefaultMQProducer producer = new DefaultMQProducer("OrderProducer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        // 订单列表
        List<Order> orderList = new ProducerInOrder().buildOrders();
        for (int i = 0; i < orderList.size(); i++) {
            String body = orderList.get(i).toString();
            Message msg = new Message("PartOrder", null, "KEY" + i, body.getBytes());
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    int id = (int) arg; // 根据订单id选择发送的queue
                    long index = id % mqs.size();
                    return mqs.get((int) index);
                }
            }, orderList.get(i).getOrderId()); // 订单id
            System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
                    sendResult.getSendStatus(),
                    sendResult.getMessageQueue().getQueueId(),
                    body));
        }
        producer.shutdown();
    }

    class Order{
        private int orderId;
        private String desc;

        public int getOrderId() {
            return orderId;
        }

        public void setOrderId(int orderId) {
            this.orderId = orderId;
        }

        public String getDesc() {
            return desc;
        }

        public void setDesc(String desc) {
            this.desc = desc;
        }

        @Override
        public String toString() {
            return "Order{" +
                    "orderId=" + orderId +
                    ", desc='" + desc + '\'' +
                    '}';
        }
    }

    // 模拟生成订单数据  3个订单,每个订单4个状态
    // 每个订单 创建->付款->推送->完成
    private List<Order> buildOrders(){
        List<Order> orderList = new ArrayList<>();
        Order orderDemo = new Order();
        orderDemo.setOrderId(001);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(002);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(001);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(003);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(002);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(003);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(002);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(003);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(002);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(001);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(001);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(003);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        return orderList;
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123

消费消息时:

  • consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // 设置从最后的偏移位置消费
  • consumer.registerMessageListener(new MessageListenerOrderly() { … } ) ,使用到了顺序消息监听器,实现顺序接收消息,一个queue对应一个线程进行操作
  • A MessageListenerOrderly object is used to receive messages orderly. One queue by one thread
public class ConsumerInOrder {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderConsumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.subscribe("PartOrder", "*");
        consumer.registerMessageListener(new MessageListenerOrderly() {
            Random random = new Random();
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                for(MessageExt msg: msgs){
                    // 可以看到每个queue有唯一的consume线程来消费,订单对每个queue(分区)有序
                    System.out.println("consumeThread="+Thread.currentThread().getName()+" , queueId="+msg.getQueueId()
                    + " , content="+new String(msg.getBody()));
                }
                try{
                    // 模拟业务逻辑处理中...
                    TimeUnit.MILLISECONDS.sleep(random.nextInt(300));
                }catch (Exception e){
                    e.printStackTrace();
                    // 这里要注意:意思是先等一会儿,一会儿再处理这批消息,而不是放到重试队列中。
                    // 直接放入重试队列,会导致消息的顺序性被破坏
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

生产端打印:

image.png

消费端打印:

image.png

延时消息的生产与消费

延时消息:指生产者将消息投递给rocketmq,并不期望消息立马投递给消费者。而是希望延时一段时间,再投递给消费者
image.png
延时消息有很多应用场景(比如:购买电影票,选好座位后,需要发送一个延时通知。避免过长时间,用户选了座位,但是未支付。就需要通知用户进行支付处理。如果用户已经支付了,就可以清除消息;电商交易系统的订单超时未支付,自动取消订单)
生产端:

  • 包装好消息后,通过message.setDelayTimeLevel(4); 给消息设置延时等级
  • delayTimeLevel: (1-18个等级) “ 1s 5s 10s 30s 1min 2min 3min 4min 5min 6min 7min 8min 9min 10min 20min 30min 1h 2h ”
  • level有以下三种情况:
    • level == 0,消息为非延迟消息
    • 1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s
    • level > maxLevel,则level== maxLevel,例如level==20,延迟2h

生产端

public class ScheduledMessageProducer {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        // 初始化producer实例
        DefaultMQProducer producer = new DefaultMQProducer("ScheduledProducer");
        // 设置namserver地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 启动producer实例
        producer.start();
        int totalMessageToSend = 10;
        for (int i = 0; i < totalMessageToSend ; i++) {
            // 包装消息
            Message message = new Message("ScheduledTopic", ("Hello scheduled message " + i).getBytes());
            // 设置延时等级为4,这个消息将在30s之后投递给消费者
            // delayTimeLevel: (1-18个等级) “ 1s 5s 10s 30s 1min 2min 3min 4min 5min 6min 7min 8min 9min 10min 20min 30min 1h 2h ”
            message.setDelayTimeLevel(4);
            // 发送消息
            producer.send(message);
        }
        // 关闭producer实例
        producer.shutdown();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

消费端

public class ScheduledMessageConsumer {
    public static void main(String[] args) throws MQClientException {
        // 初始化消费者实例,指定消费组名称
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ScheduledConsumer");
        // 设置NameServer地址
        consumer.setNamesrvAddr("127.0.0.1:9876");
        // 订阅topic
        consumer.subscribe("ScheduledTopic", "*");
        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for(MessageExt msg: msgs){
                    // 打印每条消息 接收时间-发送时间 = 延时的时间
                    System.out.println("Receive message[msgId=" + msg.getMsgId() + "]" + (msg.getStoreTimestamp()-msg.getBornTimestamp()) + "ms later");
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者实例
        consumer.start();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

image.png

批量消息的生产与消费

批量消息的出现是因为,原来发送消息都是一条一条的发送,那么在大量消息发送的场景下,就容易出现性能瓶颈。所以,可以将一批消息打成一个包,做批量发送,可以显著提升发送消息的性能
批量消息的生产:

  • 单批次的消息,不能超过4MB。如果超过了4MB,rocketmq就会出现性能瓶颈
  • 那么当出现超过4MB的大消息时,需要进行切分。切割成不超过4MB的块,再进行批量发送
public class BatchProducer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQProducer producer = new DefaultMQProducer("BatchProducer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        String topic = "BatchTest";
        // 一般,单批次的消息数据不要超过4MB,如果超过了4MB,rocketmq会出现性能瓶颈
        List<Message> messages = new ArrayList<>();
        messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 1".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 2".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 3".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 4".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 5".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 6".getBytes()));
        try{
            // 发送批量消息
            producer.send(messages);
        }catch (Exception e){
            producer.shutdown();
            e.printStackTrace();
        }
        producer.shutdown();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
public class BatchConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BatchConsumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("BatchTest", "*");
        consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Message: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

过滤消息的生产与消费

1. Tag过滤

producer创建消息的时候,里面有一个tag的参数

image.png

consumer订阅topic消息的时候,第二个参数传入正则,可以根据tag的名称过滤消息

image.png

2. SQL过滤

producer通过 msg.putUserProperty(“a”, String.valueOf(i)); 给消息设置用于sql过滤的属性

public class SqlFilterProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("SqlFilterProducer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        String[] tags = new String[] {"TagA", "TagB", "TagC"};
        for (int i = 0; i < 10; i++) {
            Message msg = new Message("SqlFilterTest", tags[i % tags.length], ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 设置SQL过滤的属性
            msg.putUserProperty("a", String.valueOf(i));
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        producer.shutdown();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

consumer通过 MessageSelector.bySql(“(TAGS is not null and TAGS in (‘TagA’, ‘TagB’)) and (a is not null and a between 0 and 3)”) ,以sql的方式进行消息的过滤筛选

public class SqlFilterConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SqlFilterConsumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        // 设置消息的sql过滤条件
        consumer.subscribe("SqlFilterTest", MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB')) and (a is not null and a between 0 and 3)"));
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for(MessageExt msg: msgs){
                    String topic = msg.getTopic();
                    String msgBody = new String(msg.getBody(), StandardCharsets.UTF_8);
                    String msgPro = msg.getProperty("a");
                    String tags = msg.getTags();
                    System.out.println("收到消息:" + " topic: " + topic + " , tags: " + tags + " ,a: " + msgPro + " ,msgBody: " + msgBody);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

六、消息发送时的重要方法属性

image.png

image.png


image.png


image.png


image.png

七、消息消费时的重要方法和属性

image.png


image.png

消息消费失败,就放入到重试队列中。可能会导致消费顺序性乱掉

image.png

为了确保消息的顺序消费,可以使用MessageListenerOrderly(顺序事件监听器),消费失败的时候,设置mq等一会儿,再处理。并不直接放入重试队列。

image.png

八、RocketMQ的高可用机制

mq服务器端为保证流入的消息不丢失,会将消息进行持久化。为防止broker节点宕机,保证高可用,会采用集群的方式部署
一般场景会采用 多master多slave模式 同步复制 异步刷盘 的模式。【综合可靠性和性能】

集群部署模式

  • 单master模式
  • 多master模式
  • 多master多slave模式(同步)
  • 多master多slave模式(异步)

image.png

刷盘与主从同步

  • 同步刷盘与异步刷盘
  • 同步复制与异步复制

image.png

消息存储设计

RocketMQ因为有高可用的要求(宕机不丢失数据),所以要进行持久化存储,RocketMQ采用文件的方式进行消息数据的存储

image.png

  • commitlog:消息存储目录(单文件大小默认1GB,如果存放的消息超过了1GB,就会再创建一个1GB的新文件)
  • consumequeue:消息消费队列存储目录。consumequeue的一级目录为Topic,二级目录为Topic的消息队列。主要是针对每一个Topic建立的索引,方便consumer消费某个topic下的消息
  • index:消息索引文件存储目录。存消息的hash值
  • config:运行期间一些配置信息,通过json文件存储
  • abort:如果存在该文件,则broker非正常关闭
  • checkpoint:文件检查点,存储commitlog文件最后一次刷盘时间戳、consumequeue最后一次刷盘时间戳,index索引文件最后一次刷盘时间戳

image.png
设计思路:
当有一条消息从producer端发送到commitLog中,会有一个异步线程监听到,然后生成一个消息对应的索引,存入comsumequeue目录指定文件中。由于消息中包含(topic,tag,消息体字节数组),由此构造出每条消息对应的索引数据(每条消息的索引为20个字节,包括:8字节的commitLog offset偏移量,可以看作顺序放入到commitLog中的消息的位置下标;4字节的消息长度;8字节的tag的hashcode,这个值用于对消息进行二级过滤)

commitLog

image.png

consumeQueue

image.png

indexFile

image.png

config

image.png

rocketmq的存储文件设计,保证消息查找的时间复杂度为O(1),消息的消费速度很快。

当consumer消费一条消息的时候,例如:
消费:TopicA Q1的消息(消费第2条消息)

查找消息的逻辑:

  1. 首先找到comsumequeue文件目录,找到对应的topic子文件目录,找到Q1子文件目录,取出里面的索引文件。
  2. 直接从第20个字节开始找,找到第20-40字节的数据。因为每条消息的索引都是20个字节,第2条消息就是从20开始。 查找时间复杂度O(1)
  3. 取出第20-40字节的数据,然后取出 commitLog offset 和 size。
  4. 按照commitLog offset 和 size ,去commitLog文件中查找对应位置的消息本体。查找时间复杂度O(1)

因此:整体去查询一条需要消费的消息,时间复杂度为O(1),查找效率非常快,所以消费速度也很快。
IndexFile文件中存每一条消息的hash值(消息key的hash值),方便进行消息的查找。

过期文件删除机制

image.png
image.png
image.png

消息生产的高可用机制

image.png
如果超过重试次数(默认为2,即总共三次机会)还是发送失败,就进行默认的规避策略(即认为之前选择的brokerA节点不可用,下次选择队列会去选择brokerB上的)

消息生产的故障延迟机制策略(非默认的规避策略)

image.png
故障延迟机制策略更适合网络状况不是很好,网络波动比较大的场景

RocketMQ中的负载均衡策略

image.png

九、分布式事务

image.png

rocketMQ中的解决方案

image.png
这两种情况都会出现问题。所以rocketMQ进行了优化
image.png

两阶段提交(2pc)

  • 在生产者侧,需要处理 半事务,本地事务,以及事务回查(执行本地事务比较耗时,返回unknow,然后事务回查会间隔一定时间定时回查消息发送是否成功)
  • 在消费者侧,需要确保消息幂等性,处理重试消费,消息重复的问题

十、源码亮点分析

1. 提升文件读写性能的MMAP零拷贝技术

Broker启动流程分析

image.png

result = result && this.commitLog.load();

// ==> 追入load()方法。会调用一个this.mappedFileQueue.load()方法

public boolean load() {
    boolean result = this.mappedFileQueue.load();
    log.info("load commit log " + (result ? "OK" : "Failed"));
    return result;
}

// ==> 追入load()方法。MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
public boolean load() {
    File dir = new File(this.storePath);
    File[] files = dir.listFiles();
    if (files != null) {
        // ascending order
        Arrays.sort(files);
        for (File file : files) {
            
            if (file.length() != this.mappedFileSize) {
                log.warn(file + "\t" + file.length()
                         + " length not matched message store config value, please check it manually");
                return false;
            }
            
            try {
                MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
                
                mappedFile.setWrotePosition(this.mappedFileSize);
                mappedFile.setFlushedPosition(this.mappedFileSize);
                mappedFile.setCommittedPosition(this.mappedFileSize);
                this.mappedFiles.add(mappedFile);
                log.info("load " + file.getPath() + " OK");
            } catch (IOException e) {
                log.error("load file " + file + " error", e);
                return false;
            }
        }
    }
    
    return true;
}

// 追入new MappedFile(file.getPath(), mappedFileSize)。里面会调用一个init(fileName, fileSize)方法
public MappedFile(final String fileName, final int fileSize) throws IOException {
    init(fileName, fileSize);
}

// 追入init(fileName, fileSize)中
private void init(final String fileName, final int fileSize) throws IOException {
    this.fileName = fileName;
    this.fileSize = fileSize;
    this.file = new File(fileName);
    this.fileFromOffset = Long.parseLong(this.file.getName());
    boolean ok = false;
    
    ensureDirOK(this.file.getParent());
    
    try {
        // 文件通道 fileChannel
        this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
        // FileChannel配合着ByteBuffer,将读写的数据缓存到内存中(操作大文件时可以显著提升效率)
        // MappedByteBuffer(零拷贝之内存映射:mmap)
        // FileChannel定义了一个map()方法,它可以把一个文件从position位置开始,size大小的区域映射为内存
        this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
        // 原子操作类 -- CAS的原子操作类 -- 多线程效率(加锁)
        TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
        TOTAL_MAPPED_FILES.incrementAndGet();
        ok = true;
    } catch (FileNotFoundException e) {
        log.error("Failed to create file " + this.fileName, e);
        throw e;
    } catch (IOException e) {
        log.error("Failed to map file " + this.fileName, e);
        throw e;
    } finally {
        if (!ok && this.fileChannel != null) {
            this.fileChannel.close();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81

MMAP零拷贝核心代码

image.pngimage.png

map()方法 – 内存映射
mappedByteBuffer其实是磁盘上的一块区域,rocketmq把它当作内存使用。类似虚拟内存的概念。

mmap零拷贝技术

数据的拷贝过程(接收网络数据,并落盘的过程):

  • 消息数据通过网络socket传输到mq服务器;
  • 首先到达网卡(网络设备缓冲区)
  • 然后经过两次拷贝(DMA拷贝,CPU拷贝)才能到达rocketmq。
    • 因为能够与网络设备打交道的只能是操作系统或内核,所以数据首先通过一次DMA拷贝,到达内核缓冲区。【操作系统与应用层是隔离的,不能共享数据。所以内存是单独隔离的】
    • 然后在应用层经过一次CPU拷贝,到达rocketmq进行业务逻辑处理。处理后的数据需要存到文件系统或磁盘中
  • 如果不使用零拷贝技术,那么rocketmq中的数据需要经过两次拷贝(一次CPU拷贝,一次DMA拷贝)才能到达磁盘。先通过CPU拷贝将数据拷贝到内存缓冲区中,然后通过DMA拷贝将内存缓冲区中的数据拷贝到磁盘中
  • 使用mmap方式的零拷贝技术,可以减少第三次CPU拷贝。从而提升数据读写的效率

一般CPU拷贝比较慢,DMA拷贝比较快

image.png

sendFile零拷贝技术(mmap与sendFile的区别?)

mmap只能减少第三次CPU拷贝,提升写入效率

sendFile

  • 优势:可以减少前后两次的CPU拷贝,在拷贝性能上是优于mmap的
  • 劣势:减少了两次CPU拷贝。数据的传输在操作系统层完成。应用层只能读取到文件描述符,也就是说拿不到完整的数据。如果在应用层需要获取完整数据,并对数据进行业务处理,这种场景下,sendFile的方式就不太适用。

2. 提升同步双写性能的CompletableFuture

消息写入流程

public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
    // Set the storage time
    ...
}
  • 1
  • 2
  • 3
  • 4

这个CompletableFuture是在rocketmq-4.7.0之后才有的。

同步的方式:
producer -->生产消息,发送到 --> Broker --> Broker会启动一个线程来处理消息(可能有多个业务处理步骤 1,2,3,4,5)。那么中间就需要阻塞,等待处理完的结果 --> 回复给 producer

rocketmq-4.7.0之后,采用CompletableFuture异步的方式:
启动一个线程来处理消息(主线程),不会阻塞。会启动一个子线程,拿到处理返回的结果后,就会响应(通过CompletableFuture.completedFuture()方法)–> 回复给 producer

使用CompletableFuture的好处?

在rocketmq集群架构下,有一种保证消息数据不丢失的机制 – 同步双写(2主2从)
image.png

从节点进行数据备份,同步复制主节点的消息数据。
返回之前还需要将Memory中的数据同步刷入磁盘中。
这个过程中,用到了很多CompletableFuture,来提升同步双写的性能。

3. Commitlog写入时使用可重入锁还是自旋锁?

异步刷盘建议使用自旋锁,同步刷盘建议使用重入锁

由于rocketmq消息写入CommitLog中,是单文件多队列的存储设计。那么同时有多个生产者往多个topic的队列中写入消息,对应的都是写入到同一个commitLog文件中。就会存在线程安全的问题。
因此commitLog采取锁的机制,来保证多线程并发写入的线程安全。

// CommitLog.java
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
    
    protected final PutMessageLock putMessageLock;
    
    // 构造函数
    public CommitLog(final DefaultMessageStore defaultMessageStore) {
        ...
        // 默认使用new PutMessageSpinLock(),自旋锁 (乐观锁)
        // 也可设置成 可重入锁(悲观锁)
        // UseReentrantLockWhenPutMessage参数默认值是false,使用自旋锁。异步刷盘建议使用自旋锁,同步刷盘建议使用重入锁
        this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
    }
    
    ...
    // 会有多个线程并行处理,需要上锁
    putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
    try{
        ...
    }finally{
        putMessageLock.unlock(); // 解锁。标准的lock锁的方式
    }
}

// PutMessageSpinLock.java
public class PutMessageSpinLock implements PutMessageLock {
    //true: Can lock, false : in lock.
    private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);

    @Override
    public void lock() {
        boolean flag;
        do {
            flag = this.putMessageSpinLock.compareAndSet(true, false);
        }
        while (!flag);
    }

    @Override
    public void unlock() {
        this.putMessageSpinLock.compareAndSet(false, true);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

核心代码片段:
this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();

rocketmq中:CommitLog初始化的时候,默认使用的是PutMessageSpinLock(自旋锁)。当然,也可配置成使用PutMessageReentrantLock(可重入锁)

那么 自旋锁 和 可重入锁 的差别在哪里?

这个需要结合场景。
自旋锁:不会有上下文的切换,获取不到锁资源时,会采用消耗cpu空转的方式等待。
可重入锁:有可能阻塞线程。发生上下文的切换。

同步刷盘建议使用重入锁。
因为同步刷盘下,多线程对锁资源的竞争很激烈,如果使用自旋,那么CAS失败的机率很高。CAS失败会自旋,导致对CPU的消耗过大。

异步刷盘建议使用自旋锁。
因为异步刷盘下,锁资源竞争小,使用自旋锁,可以减少上下文的切换,提高刷盘的效率。

4. 数据读写分离之堆外内存机制

image.png

// CommitLog.java
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
    ...
    result = mappedFile.appendMessage(msg, this.appendMessageCallback);
}

// 追入,MappedFile.java
public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) {
    return appendMessagesInner(msg, cb);
}

// 追入
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
    assert messageExt != null;
    assert cb != null;
    // 当前这个MappedFile的写入位置
    int currentPos = this.wrotePosition.get();

    if (currentPos < this.fileSize) {
        // 异步刷盘时,还有两种刷盘模式可以选择
        // 如果writeBuffer != null 即开启了堆外内存缓冲,使用writeBuffer,否则使用mappedByteBuffer(也是继承的ByteBuffer)
        // slice() 方法,创建一个新的字节缓冲区
        ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
        byteBuffer.position(currentPos);
        AppendMessageResult result;
        if (messageExt instanceof MessageExtBrokerInner) {
            // 写入具体的数据 commitLog中的数据格式
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
        } else if (messageExt instanceof MessageExtBatch) {
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
        } else {
            return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
        }
        this.wrotePosition.addAndGet(result.getWroteBytes());
        this.storeTimestamp = result.getStoreTimestamp();
        return result;
    }
    log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
    return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

为什么要用堆外内存?

因为读写数据本身就是一个频次很高的操作。
堆内存:读写过程中,会产生大量数据 -> new 对象 -> GC -> 垃圾回收 -> 停顿 -> 效率低
堆外内存:使用的是本地内存 -> 手动的GC -> 没有停顿 -> 效率高

堆外内存也有一些缺点:

  • 内存需求比较大
  • 数据写入需要两步,写入不够及时

如果业务场景对消息写入的时效性要求很高,那么最好选择默认的写入模式

面试题

1. 消费并发度问题?

在rocketmq集群环境下,可以将一个topic对应的多个messageQueue分别放在不同的机器上,从而实现提高消费并发度的效果。

2. 消息的顺序消费问题?

如果想确保消息的顺序消费,那么生产者、队列、消费者最好都是一对一的关系。这样设计,能够保证消息的顺序消费,但同时带来了性能瓶颈。(并发度不够)

rocketmq不解决消息顺序消费的问题,理由:

  1. 乱序的应用实际大量存在
  2. 队列无序并不意味着消息无序

3. 消息的重复消费问题?

造成消息重复消费的根本原因是:网络波动。

rocketmq不保证消息的不重复,如果想在业务中严格确保消息不重复,需要在业务端进行去重:

  1. 消费端处理消息的业务逻辑保持幂等性
  2. 确保每一条消息都有唯一的编号,且保证消息处理成功与去重的日志同时出现

4. 为什么要使用消息队列?

三个核心特点:解耦、异步、削峰
结合公司的具体业务场景,描述业务场景,这个业务场景有什么挑战,如果不用MQ可能会很麻烦,用了MQ之后带来了哪些好处。

解耦:

  • 如果用RPC远程过程调用,业务之间调用的话,耦合度还是比较高的
  • RPC调用是同步的方式,性能有所限制

5. 消息队列有什么优缺点?

优点就是,在特殊的场景下面有其对应的好处,如解耦、异步、削峰。

缺点:

  • 系统的可用性降低
    • 因为系统引入的外部依赖越多,越容易挂掉。本来你就是A系统调用BCD三个系统的接口就好了,ABCD四个系统好好的,没啥问题。
    • 加进来一个MQ,万一MQ挂掉了怎么办?MQ挂了,整套系统崩溃了,业务也就停顿了。
  • 系统复杂度提高
    • 硬生生加进来一个MQ,怎么保证消息没有重复消费?怎么处理消息丢失的情况?怎么保证消息传递的顺序性?
  • 一致性问题
    • A系统处理完了直接返回成功了,人家就以为你这个请求成功了。但是问题是,要是BCD三个系统那里,BD两个系统写库成功了,结果C系统写库失败了,你这数据就不一致了。

所以消息队列实际是一种非常复杂的架构,引入它有很多好处,但是也得针对它带来的各种坏处,需要做各种额外的技术方案和架构来规避掉。

6. 为什么选择RocketMQ?

  • 性能:阿里支撑,经受住淘宝,天猫双11重重考验;性能高,可靠性好;可用性高;易扩展
  • 功能:功能完善,我们需要的功能,基本能够满足,如:事务消息、消息重试、死信队列、定时消息等
  • 易用,跨平台:跨语言,多协议接入(支持http,mqtt,tcp协议,支持Restful风格http收发消息)
  • 钱能解决的问题,一般都不是问题。所以免费服务不能满足的,适当的花钱购买所需的服务是值得的

为什么要学习读源码?

  • 编写优雅,高效的代码经验
  • 提升微观的架构设计能力,重点在于思维和理念
  • 解决工作中,学习中的各种疑难杂症
  • 在面试中展现优秀的自己
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/715363
推荐阅读
相关标签
  

闽ICP备14008679号