当前位置:   article > 正文

RocketMQ全量_对接rocketmq 的全量数据

对接rocketmq 的全量数据

RocketMQ

一.概述

1.MQ简介

MQ(Message Queue)提供消息队列服务的中间件,称为消息中间件,提供消息生产,储存,消费全称api的软件系统,消息即数据,消息的体量一般不会很大

2.MQ用途

削流限峰

MQ将系统的超量请求暂存其中,以便系统后期自行处理,避免请求的丢失或者系统被压跨

异步解耦

上游系统对下游系统的调用若为同步调用,则会大大降低系统的吞吐量与并发量,系统的耦合度太高,异步调用解决此问题,常见的作法是添加一个MQ层

数据收集

分布式系统会产生海量级数据流,如:业务日志,监控数据,用户行为,针对此数据实时或批量采集汇总,对数据进行大数据分析

3.RocketMQ概念

消息(Message)

A message is the smallest unit of data transimission in RocketMQ

主题(Topic)

消息集合,每个消息只属于一个主题,消息订阅基本单位

生产者同时发送多种Topic的消息,消费者对某种特定的Topic可订阅和消费

队列(Queue)

存储Message的物理实体,一个Topic中可以包含多个Queue,每个Queue中存放的就是该Topic的Message,一个Topic的Queue也被称为一个Topic消息的分区Partition

一个消费者可以消费同一Topic下的多个Queue的Message,一条Queue消息不能被多个消费者同时消费

分片Shard ing是指同一个Topic在不同的Broker的队列

消息标识(MessageId/Key)

每个消息拥有唯一i的MessageId,可以携带具有业务标识的Key,便于对业务的查询,Producer send() Message 时,自动生成一个MessageId(msgId),消息到达Broker后,自动生成MessageId(offsetId),msgId,offsetMsgId和Key都称为消息标识

  • msgId:由producer生成,规则为: produce IP + 进程id+MessageClientIDSetter类的class Loader的hashcode+ 当前时间 +AutomicInteger 自增计数器

  • offsetMsgId :broker生成 ,规则为:broker IP + 物理分区的offset

  • key:拥护指定的业务相关唯一标识

4.RocketMQ系统架构

Producer

消息生产者,Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递过程支持快速失败并且低延迟

消息生产者都是以生产者组(producer group) 的形式出现,是发送相同topic类型的生产者的集合

Consumer

消费者以消费者组(Consumer Group)形式出现,消费同一个Topic类型的Message

  • 一个Consumer group 中的consumer 必需订阅完全相同的topic

  • 一个Consumer Group只能消费一个Topic的消息,不能同时消费多个Topic

Name Server
功能介绍

Name Server 是一个Broker 与 Topic 路由的注册中心,支持Broker的动态注册中心与发现

包括两个功能:

  • Broker management:接受Broker集群的注册信息并且保存作为Routing信息的基本数据,提供心跳检测机制,检查broker是否存活

  • 路由信息管理:每个NameServer都保存者Broker集群的整个Routing 信息和 用于客户端查询的队列信息,Producer 和 Consumer 通过NameServer 获取整个Broker集群的路由信息

路由注册

Name Server通常也是以集群的方式部署,Name server无状态,各节点之间无差异,相互之间不进行信息通讯,Broker节点启动时,轮询NameServer列表,与每个NameServer节点建立长连接,发起注册请求,在内部维护着Broker列表,用于动态存储Broker的信息

Broker为证明自身存活,为维护与NameServer的长连接,会将最新的信息以心跳包的方式上报NameServer,每30s一次 发送心跳.心跳包含BrokerID,Broker Address(IP+Port),Broker name,Broker Cluster name .etc ,接受到心跳包后,更新心跳时间戳,记录此Broker 的最新存活时间,NameServer不能随意扩容,Broker不重新配置的话,新增的Name Server是不可见的

路由剔除

由于Broker 关机宕机或者网络抖动.etc原因,NameServer未收到心跳,NameServer可能会将此Broker从列表中剔除

NameServer 中存在一个定时任务,每隔10s就会扫描一次Broker表,查看每一个Broker的最新心跳时间戳距离当前时间是否超过120s,超过就会判定Broker失效,从列表剔除

路由发现

采用pull模型,Topic路由信息出现变换时,NameServer不会主动推送给客户端,而是客户端定时拉取主题最新的路由,默认30s一次

  • Pull模型:实时性差

  • Push模型:维护长链接

  • long Polling 模型:对Push和Pull模型整合

客户端NameServer选择策略

客户端值 Producer和Consumer ,配置时填入NameServer集群地址,首先随机策略,失败后轮询

4.Broker
功能介绍

Broker 充当消息中转角色,负责存储消息,转发消息.Broker在RocketMQ系统中负责接受并存储从Produce发来的消息,同时为Consumer的拉取请求做准备.Broker 同时也存储着消息相关的元数据,包括Consumer Group 消费进度偏移offset ,主题,队列.etc

模块构成
Broker Server
Remoting module
Client Management
Store Service
HA Service
Index Service
.etc
  • Remoting module : 整个Broker的实体,负责接受来自Clients端的请求

  • Client management:客户端管理器,负责接收,解析客户端(Producer/Consumer)请求,管理客户端

  • Store Service: 存储服务,提供方便简单的API接口,处理消息储存到物理硬盘和消息查询功能

  • HA Service:高可用服务,提供Master Broker 和 Slave Broker 之间的数据同步功能

  • Index Servcie: 索引服务.根据特定的Message Key,对投递到Broker的消息进行索引服务,也提供根据Message Key 的快速查询功能

集群部署

Broker以集群方式出现,各集群节点可能存放相同Topic的不同Queue

为主备集群,具有Master和Slave,Master负责读写操作请求,Slave负责对Master中的数据进行备份,一个Slave只能隶属于一个Master,二者对应关系是通过相通的BrokerName,不同的BrokerId来确定,BrokerId为0表示Master,反之表示Slave,每个Broker于NameServer集群中的所有结点建立长连接,定时注册Topic信息到所有的NameServer

二.部署

1.单机部署

主要组件NameServer,Broker,console使用docker-compose部署(最好采用服务器)

docker-compose
  • sudo curl -L "https://get.daocloud.io/docker/compose/releases/download/v2.15.1/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
    
    • 1
  • chmod +x /usr/local/bin/docker-compose
    
    • 1

如果docker-compose 未正确安装,自身下载文件linux .x86-64,load至服务器

文件结构
rocketmq
broker
conf
bin
logs
data
runbroker.sh
nameserver
conf
bin
runserver.sh

其中.sh启动脚本内中有一个自动计算最大堆内存和新生代内存的函数会导致在不同硬件环境下设置最大堆内存和新生代内存环境变量不被应用,注释掉即可

docker-compose

注:阿里云服务器需要开放端口

version: '3.8'
services:
  rmqnamesrv:
    image: apache/rocketmq:5.1.0
    container_name: rmqnamesrv
    ports:
      - 9876:9876
    restart: always
    privileged: true
    volumes:
      - /storage/rocketmq/nameserver/logs:/home/rocketmq/logs
      - /storage/rocketmq/nameserver/bin/runserver.sh:/home/rocketmq/rocketmq-5.1.0/bin/runserver.sh
    environment:
      - MAX_HEAP_SIZE=256M
      - HEAP_NEWSIZE=128M
    command: ["sh","mqnamesrv"]
  broker:
    image: apache/rocketmq:5.1.0
    container_name: rmqbroker
    ports:
      - 10909:10909
      - 10911:10911
    restart: always
    privileged: true
    volumes:
      - /storage/rocketmq/broker/logs:/home/rocketmq/logs
      - /storage/rocketmq/broker/store:/home/rocketmq/logs
      - /storage/rocketmq/broker/conf/broker.conf:/home/rocketmq/broker.conf
      - /storage/rocketmq/broker/bin/runbroker.sh:/home/rocketmq/rocketmq-5.1.0/bin/runbroker.sh
    depends_on:
      - 'rmqnamesrv'
    environment:
      - NAMESRV_ADDR=rmqnamesrv:9876
      - MAX_HEAP_SIZE=512M
      - HEAP_NEWSIZE=256M
    command: ["sh","mqbroker","-c","/home/rocketmq/broker.conf"]
  rmqdashboard:
    image: apacherocketmq/rocketmq-dashboard:latest
    container_name: rocketmq-dashboard
    ports:
      - 8080:8080
    restart: always
    privileged: true
    depends_on:
      - 'rmqnamesrv'
    environment:
      - JAVA_OPTS= -Xmx256M -Xms256M -Xmn128M -Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

三.RocketMQ工作原理

1.消息产生

Producer可以将消息写入至某Broker的Queue中,过程如下:

  • Producer发送消息之前,会先向NameServer发出获取消息Topic的Routing info的request

  • NameServer返回该Topic路由表Broker列表

    • 路由表: 实际为一个Map, k 为 Topic name,v 为 QueueData实例列表.一个Broker中该Topic的所有Queue对应一个Queue Data,一个Broker对应一个QueueData,包含brokerName

    • Broker列表:实际为一个Map, k 为 BrokerName ,v 为BrokerData,一套broker Name相通的Master-slave集群对应一个Brokerdata,其中包含brokerName和一个map,该map的 k 为brokerId,v 为该broker 对应地址,brokerID = 0 表示master,非0表示Slave

  • Producer根据代码中指定的Queue选择策略,从Queue列表中选择一个,用于后续存储信息

  • Producer对消息中做一些特殊处理,ex: 消息本身超过4m,对其压缩

  • Producer向选择的Queue所在的Broker发出PRC request,将消息发送到选择出的queue

2.Queue选择算法

对于无序消息,消息投递算法,存在如下:

轮询算法

default algorithm,保证每个Queue中均匀获取msg

算法的问题是:如果某些Broker的RT较高,会导致Producer的缓存队列出现较大的消息挤压,影响消息投递的性能

最小投递延迟算法

统计每次消息投递的时间延迟,根据统计出的结构将消息投递到时间延迟最小的Queue,延迟相同,采用轮询算法

msg在Queue的分配不均匀,投递延迟小的Queue可能存在大量的msg,该queue的消费压力骤增,导致mq消息堆积

3.消息储存

用户主目录store下文件

filedescription
abortbroker启动后自动创建,正常关闭此文件自动消失,若未启动Broker的情况下,文件存在,表明Broker关闭不正常 // todo 什么情况不正常关闭
checkpoint存储commitlog,consumeque,index文件的最后刷盘时间戳
commitlog存放mappedFile文件,消息写在mappedlFile文件
config存放Broker运行时的配置数据
consumequeue存放consumequeue文件,队列存放在这个目录
index存放消息索引文件indexFile
lock运行期间使用到的全局资源锁
commitlog

此目录下文件简称commitlog文件,源码中称为mappedFile

当前Broker中的所有消息落盘到mappedFile中,文件最大size为1G,文件名由20位十进制数构成,表示当前文件的第一条消息的起始偏移量

第一个mappedFile的文件名是00000000000000000000,第一个文件的commitlog offset = 0

当第一个mappedFile写满时,自动生成第2个mappedFile存放消息,假设第一个文件szie为1073741824byte(1G=1073741824byte),则第二个文件名为00000000001073741824

一个Broker下的mappedFile的commitlog offset 是连续的

一个Broker中仅含一个commitlog目录,消息存放时并未按照Topic进行分类存放,仅按照顺寻写入mappedFile

消息单元

RocketMQ之消息存储和查询原理_rocketmq的borntime-CSDN博客

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

mappedFile由消息单元构成,其包括消息总长MsgLen,消息物理位置physicalOffset,消息体内容Body,消息体Body,消息体长度BodyLength,消息源(Producer)BornHost,消息发送时间戳BornTimestamp,消息主题Topic,消息所在队列QueueId,队列存储的偏移量QueueOffset.etc

一个mappedFile中第M+1个消息单元的commitLog offset 为

L(M+1) = L(M) +MsgLen(M) (M>=0)

consumequeue

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传
每个Topic会在consumequeue中创建{$Topic}目录,并且为该Topic每个Queue创建一个目录,目录名为queueId,每个目录存放若干的consumequeue文件,其为commitLog的索引文件,可根据consumequeue中的索引条目定位到具体消息

consumequeue文件名由20位数字构成,表示当前文件的第一个索引条目的起始位移偏移量,其后续文件名固定,consumequeue文件大小不变

索引条目

设计(Design) · Apache RocketMQ开发者指南

每个consumequeue文件包含30w个索引条目,包含:消息在mappedFile文件的偏移量commitLog offset ,消息长度,消息Tag的hashCode,共占20Byte,每个文件的大小是固定的5.7220458984375mb

一个consumequeue中的Msg,Topic一定相同,但是tag可能不同

文件读写
消息写入

持久化之前过程:

  • Broker根据queueId(Producer),获取到该Msg的QueueOffset(Broker计算得出),即此消息对应的索引条目要在consumequeue目录中的写入偏移量

  • Msg与queueId,queueOffset.etc数据,一起封装为消息单元

  • 将消息单元写入commitLog

  • 形成消息索引条目

  • 将消息索引条目分发到相应的consumequeue

消息拉取

Consumer拉取步骤;

  • Consumer获取到其要消费Msg所在的Queue的Message offset,计算Consumer offset

    1. 消息偏移量(Message Offset):

      • 消息偏移量是每条消息在队列中的唯一标识,它代表了消息在队列中的位置。
      • 消息偏移量通常用于在分布式系统中确保消息的顺序性。例如,如果一个系统有多个消费者,那么每条消息只会被一个消费者消费,而消息偏移量可以确保每个消费者按照消息在队列中的顺序进行消费。
      • 消息偏移量通常由消息队列系统管理,不需要消费者自行维护。
    2. 消费偏移量(Consumer Offset):

      • 消费偏移量是消费者当前已消费的消息的位置。它用于跟踪消费者在队列中的进度。
      • 当消费者从队列中消费一条消息后,该消费者的消费偏移量就会更新为该消息的偏移量。
      • 消费者可以定期将消费偏移量提交(commit)到消息队列系统,以便在系统崩溃或重启后能够从上次的消费位置继续消费。
      • 消费偏移量通常由消费者自行维护,并在需要时提交给消息队列系统。
    3. Consumer offset = Message offset +1

  • Consumer从Broker发送拉取消息,其中包含拉取消息的Queue,Msg OffsetMsg Tag

  • Broker 计算该consumequeue中的queueOffset(queueOffset = Message offset * 20 Byte)

  • 从该queueOffset处向后查找第一条指定Tag的索引条目

  • 解释该索引条目的前8个Byte(commitLog offset),即定位

  • 读取后发送给Consumer

Performance
mmap (memory map)
pageCache

4.indexFile

rocketMq提供根据key进行进行消息查询的功能.此查询通过store目录中的index子目录indexFile进行索引快查.indexFile中的索引数据是 包含key的Msg发送到Broker时写入的,若Msg不含key,不写入

索引条目结构
indexFile

每个Broker会包含一组indexFile,每个indexFile以时间戳命名,其由三部分构成 : indexHeader,slots槽位,indexes索引数据.每个indexFile文件包含500w个slot槽,每个slot槽可能挂载很多index索引单元

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

indexHeader

固定40字节

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

constructuredescription
beginTimeStamp此indexFile第1条消息的存储时间
endTimeStamp此indexFile最后1条消息的存储时间
beginPhyoffset此index File中第1条Msg在commitLog中的偏移量commitLog offset
endPhyoffset此index File中最后1条Msg在commitLog中的偏移量commitLog offset
hashSlotCount已经填充有index的slot数量
indexCount此indexFile包含的索引个数
slot&&index unit

此结构图展示,slot与index索引单元的关系

此处非实际存储结构,意在展示slot与index索引的关系,非实际数据排布方式,实际slot数目500W,index索引单元存储在slot之后

1634476565600

key的hashCode%500w对应一个slot槽位(存储index索引的indexNo),根据此indexNo可以计算该index单元在indexFile(index索引单元存储时按顺序,有唯一且递增的indexNo对应)

此种处理会有hash碰撞,解决方案是对index索引单元加入preIndexNo属性,存储碰撞的前一个index索引单元

indexNo为indexFile的递增且对于每个index索引单元唯一的number,从0开始

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

attributedescription
keyHash消息中指定的业务key的hash值
phyoffset此Msg对应在commitLog中的偏移量commitLog offset
timeDiff此key对应消息的存储时间与当前indexFile创建时间的时间差
preIndexNo此slot下当前index索引单元的前一个index索引单元的indexNo
indexFile的创建

创建时机:

  • 当第一条携带key的Msg进入后,系统发现无indexFile,此时创建第一个indexFile

  • 当一个indexFile中挂载的index索引单元数量超过2000W时,会创建新的indexFile(原理是:接受Msg后,读取IndexHeader最后四个Byte,即indexCount,若其>=2000w,创建新的indexFile)

根据业务key查询时,查询条件除key之外,还需指定查询时间戳,表示要查询不大于该时间戳的最新消息,时间戳文件名可简化查询

文件size为(40+500w * 4+2000w * 20)Byte ,约为400mb

5.消息消费

Consumer从Broker中获取Msg的方式:pull 拉取 / push 推动 ,Consumer group 对于Msg消费的模式分为两种:集群消费Clustering ,广播消费Broadcasting

推拉消费类型
拉取式消费

Consumer主动从Broker中拉取消息,主动权在Consumer,一旦获取批量消息,即启动消费过程

推送式消费

Broker收到数据后主动推送给Consumer,典型发布-订阅模型,即Consumer向其关联的Queue注册监听器,一旦发现有新的Msg就触发回调的执行,回调方法是Consumer去Queue中拉取消息,基于Consumer与Broker的长连接

对比
  • pull:需要应用实现关联的Queue的遍历(do it yourself)

    拉取时间间隔自行指定,注意适当选取

  • push:封装对关联的Queue的遍历,实时性强,但会占用较多系统资源

消费模式
广播消费

集群消费和广播消费

相同Consumer Group 中的每个Consumer 实例都接受同一个Topic的全量Msg,即每个Msg都会发送到Consumer Group的每个Consumer

集群消费

集群消费和广播消费

相同Consumer Group的每个Consumer实例平均分配同一个Topic的消息,即每条消息只会被发送到Consumer Group的某个Consumer

消息进度保存
  • 广播模式:Consumer Group中的每一个Consumer都会消费所有Msg,但是不同的Consumer消费进度不同,因此消费进度保存在Consumer

  • 集群模式:Consumer Group中的consumer共同消费同一个Topic中的Msg,同一Msg只会消费一次,消费进度参与到消费的负载均衡,因此消费进度保存于Broker

Rebalance

集群

概念

再均衡:将一个Topic下的多个Queue在同一个Consumer Group 中的多个Consumer间再分配的过程,提升Msg的并行消费能力

限制

一个Queue可分配给一个Consumer,当消费者实例数量大于Queue数量,多余的Consumer不会分配任何队列

危害

消费暂停:指在只有一个Consumer时,它负责消费所有队列,在新增了一个Consumer后,会触发Rebalance的发生。此时原Consumer就需要暂停部分队列的消费,等到这些队列分配给新的Consumer后,这些暂停消费的队列才能继续被消费。

消费暂停:Consumer在消费新分配给自己的队列时,必须接着之前Consumer提交的消费进度Offset继续消费,默认情况,offset异步提交,导致提交到Broker中的offset与consumer实际消费的msg不一致,不一致的差值为重复消费的msg

同步提交:consumer提交其消费完毕的一批消息的offset给brokerg后,需等待broker的成功ack,收到ack后,consumer才会继续获取下一批msg,等待ack期间,consumer阻塞

异步提交:consumer提交其消费完毕的一批消息的offset给brokerg后,无需等待broker的成功ack,consumer可直接获取并消费下一批msg

消费突刺:由于reblance可能导致重复消费,如果需要重复消费的msg过多,或者由于reblance暂停时间过长积压msg

原因
  • 消费者订阅的queue数量变化

    • Broker扩容或缩容

    • Broker升级运维

    • Broker与NameServer网络异常(伪变化)

  • 消费者组中的消费者数量发生变化

    • Consumer Group 扩容或缩容

    • Consumer升级运维

    • Consuemr与NameServer网络通信异常(伪变化)

过程

在broker中维护多个map集合,动态存放着当前Topic中的queue的信息和Consumer Group中Consumer实例的信息,一旦发现消费者所订阅的Queue数量发生变化,或者消费者组中的数量变化,立即向Consumer Group中的每个实例发送reblance通知

TopicManager

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/712372
推荐阅读
相关标签