赞
踩
rocketMQ是做什么的我就不用解释了吧,以及他的背景。本文主要是为了让大家明白RocketMQ的工作原理。
上图,双箭头代表是双向通信,ProducerGroup和ConsumerGroup以及Broker集群,NameServer集群在互相通信的时候,是每个实例之间的通信。举个例子:上图中ProducerGroup和NameSevrer通信来说,是三台Producer实例分别与三台NameServer实例都会进行通信(当然前提是Producer默认注册了三台Producer实例配置了三台NameServer的地址),但是三台NameServer之间不会进行通信,他们是多活的模式,不是主备的模式。
由一组Producer组成,如果只是单纯的发送普通消息,本身没有什么特别含义,发送分布式事务消息时,
如果 Producer 中途意外宕机,Broker会主动回调Producer Group内的任意一台机器来确认事务状态。
标识一类Consumer的集合名称,这类Consumer通常消费一类消息,且消费逻辑一致。同
Consumer Group下的各个实例将共同消费topic的消息,起到负载均衡的作用。
消费进度以Consumer Group为粒度管理,不同Consumer Group之间消费进度彼此不受影响,
即消息A被Consumer Group1消费过,也会再给Consumer Group2消费。
注: RocketMQ要求同一个Consumer Group的消费者必须要拥有相同的注册信息,
即必须要听一样的topic(并且tag也一样)。
RocketMQ里面有Client这个概念,Consumer和Producer都是Client,可以这么理解:
生产者和消费者都是客户端,且都具备一个Client应该有的属性,因为RocketMQ对Client有一些限制和规定,
所以在使用Consumer和Producer的时候也要注意这些规定和限制。对应的有个ClientConfig类。
参数 | 默认值 | 参数说明 |
---|---|---|
NameServer | 空 | 通过配置这个Client可以与NameServer通信获得需要的Topic和Broker的对应关系,默认值:-D系统参数rocketmq.namesrv.addr或环境变量NAMESRV_ADDR,Springboot配置:rocketmq.nameServer:IP:端口,多个NameServer用分号分割 |
clientIP | 本机IP | 客户端所在的服务器的ip地址,某些机器会发生无法识别客户端IP地址情况,可以在代码中强制指定 |
instanceName | “DEFAULT” | 如果是DEFAULT得话,该字段又会被转换成该client所在的进程id,在RocketMQ中区分客户端是根据ClientID,ClientID=ClientIP@instanceName,也就是说如果同一个IP下的不同生产者如果instanceName相同的话,那就会被识别为同一个MQClientInstance(负责与MQ进行通信,如保持心跳,拉取Topick信息等),如果两个生产者配置的集群不同(不同NameServer),那么就会导致不同生产者的消息发往同一个集群(NameServer)中去,如果是消费者的话,就会导致多个消费者消费相同Queue里面的信息,导致信息混乱 |
clientCallbackExecutorThreads | 4 | 通信层回调线程数量 |
pollNameServerInteval | 30000 | 轮训NameServer的时间周期,单位毫秒 |
heartbeatBrokerInterval | 30000 | 向Broker发送心跳的周期,单位毫秒 |
persistConsumerOffsetInterval | 5000 | 持久化消费者消费进度的周期,单位毫秒,RocketMQ采取的是定期批量ack的机制以持久化消费进度。也就是说每次消费消息结束后,并不会立刻ack,而是定期的集中的更新进度。 由于持久化不是立刻持久化的,所以如果消费实例突然退出(如断电)或者触发了负载均衡分consue queue重排,有可能会有已经消费过的消费进度没有及时更新而导致重新投递。故本配置值越小,重复的概率越低,但同时也会增加网络通信的负担。 |
vipChannelEnabled | -D com.rocketmq.sendMessageWithVIPChannel参数的值,若无则是true | 是否启用VIP通道发送信息,broker的netty server会起两个通信服务。两个服务除了服务的端口号不一样,其他都一样。其中一个的端口(配置端口-2)作为vip通道,客户端可以启用本设置项把发送消息此vip通道。 |
Producer是一个Client,用来生产消息,并发送到指定Topic,甚至Broker和Queue。
参数 | 默认值 | 参数说明 |
---|---|---|
producerGroup | DEFAULT_PRODUCER | 对于非事务型的Producer,producer group仅起到标识作用并没有实际作用 |
createTopicKey | TBW102 | Producer第一次发送消息的时候,如果topic不存在,若想自动创建该topic,需要一个topickey,这个值即是topickey的值。自动创建该topic支持的前提是broker 的配置打开autoCreateTopicEnable=true,然后broker会创建一个TBW102的topic,这个就是我们讲的默认的topickey.自动构建topic的过程:Producer发送的时候如果发现该Topic不存在,就会向配置有Producer配置的topickey的那个broker发送消息broker校验客户端的topic key是否在broker存在,且校验其权限最后一位是否是1(topic权限总共有3位,按位存储,分别是读、写、支持自动创建)若权限校验通过,先在该broker把T创建,并且权限就是topickey除去最后一位的权限 |
defaultTopicQueueNums | 4 | 自动创建新的Topic时,创建的对应的Queue的数量 |
sendMsgTimeout | 10000 | 发送消息的超时时间 单位毫秒 |
compressMsgBodyOverHowmuch | 4096 | 发送消息过大时,进行消息压缩的标准 单位Byte |
retryAnotherBrokerWhenNotStoreOK | false | 如果发送消息返回sendResult,发送的结果如果不是SEND_OK状态,是否当作失败处理而尝试重发,此配置项只对同步发送有效,异步、oneway无效 |
maxMessageSize | 4M | 客户端验证,允许发送的最大消息体大小,超过会报错 |
还有一些事务相关的属性这里就不罗列了 | 需要的可以自己去了解一下 |
推送模式的消费者,即消息由MQ主动推送过来
参数 | 默认值 | 参数说明 |
---|---|---|
consumerGroup | 无 | 消费者组名称,用来标识一组消费者 |
messageModel | MessageModel.CLUSTERING | 消费的模式:有两种BROADCASTING和CLUSTERING。 |
consumeFromWhere | ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET | 启动后的首次消费的起始点,可选值有三个:CONSUME_FROM_LAST_OFFSET //队列尾消费 CONSUME_FROM_FIRST_OFFSET //队列头消费CONSUME_FROM_TIMESTAMP //按照日期选择某个位置消费,这个配置只生效于新在线测consumer group,如果是老的已存在的consumer group,都降按照已经持久化的consume offset进行消费 |
consumeTimestamp | 半个小时前 | 配合上面的配置使用,CONSUME_FROM_LAST_OFFSET的时候使用,从哪个时间点开始消费,格式为yyyyMMddhhmmss 如 20191123171201 |
allocateMessageQueueStrategy | AllocateMessageQueueAveragely(取模平均分配) | 负载均衡策略算法这个算法可以自行扩展以使用自定义的算法,目前内置的有以下算法可以使用AllocateMessageQueueAveragely //取模平均AllocateMessageQueueAveragelyByCircle //环形平均AllocateMessageQueueByConfig // 按照配置,传入听死的messageQueueListAllocateMessageQueueByMachineRoom //按机房,从源码上看,必须和阿里的某些broker命名一致才行,也可以自己实现相应的接口,实现自己的策略 |
consumeThreadMin | 20 | PushConsumer内部拥有一个线程池进行消费消息,这里是核心线程数 |
consumeThreadMax | 64 | PushConsumer内部拥有一个线程池进行消费消息,这里是最大线程数 |
consumeConcurrentlyMaxSpan | 2000 | 并发消费下,单条consume queue队列允许的最大offset跨度,达到则触发流控,只对并发消费(ConsumeMessageConcurrentlyService)生效 |
pullThresholdForQueue | 1000 | consume queue流控的阈值,每条consume queue的消息拉取下来后会缓存到本地,消费结束会删除。当累积达到一个阈值后,会触发该consume queue的流控 |
pullInterval | 0 | 拉取消息的时间间隔,单位毫秒。由于RocketMQ采取的pull的方式进行消息投递,每此会发起一个异步pull请求,得到请求后会再发起下次请求,这个间隔默认是0,表示立刻再发起。在间隔为0的场景下,消息投递的及时性几乎等同用Push实现的机制 |
pullBatchSize | 32 | 一次拉取的最大消息条数 |
consumeMessageBatchMaxSize | 1 | 单次消费最大消息条数,由于拉取到的一批消息会立刻拆分成N(取决于consumeMessageBatchMaxSize)批消费任务,所以集合中msgs的最大大小是consumeMessageBatchMaxSize和pullBatchSize的较小值 |
maxReconsumeTimes | -1 | 一个消息如果消费失败的话,最多重新消费多少次才投递到死信队列,注:这个值默认值虽然是-1,但是实际使用的时候默认并不是-1。按照消费是并行还是串行消费有所不同的默认值。并行:默认16次串行:默认无限大(Interge.MAX_VALUE)。由于顺序消费的特性必须等待前面的消息成功消费才能消费后面的,默认无限大即一直不断消费直到消费完成。 |
suspendCurrentQueueTimeMillis | 1000 | 串行消费使用,如果返回ROLLBACK或者SUSPEND_CURRENT_QUEUE_A_MOMENT,再次消费的时间间隔,单位毫秒 |
consumeTimeout | 15 | 消费的最长超时时间 单位分钟,如果消费超时,则按照消费失败 |
拉取模式的消费者,自己控制消息的消费,包括消费量和进度等
参数 | 默认值 | 参数说明 |
---|---|---|
consumerGroup | 无 | 消费者组名称,用来标识一组消费者 |
messageModel | MessageModel.CLUSTERING | 消费的模式:有两种BROADCASTING和CLUSTERING。 |
registerTopics | 空集合 | 消费者监听的Topic |
allocateMessageQueueStrategy | AllocateMessageQueueAveragely(取模平均分配) | 负载均衡策略算法这个算法可以自行扩展以使用自定义的算法,目前内置的有以下算法可以使用AllocateMessageQueueAveragely //取模平均AllocateMessageQueueAveragelyByCircle //环形平均AllocateMessageQueueByConfig // 按照配置,传入听死的messageQueueListAllocateMessageQueueByMachineRoom //按机房,从源码上看,必须和阿里的某些broker命名一致才行,也可以自己实现相应的接口,实现自己的策略 |
offsetStore | null | 消息消费进度存储器,offsetStore 有两个策略:LocalFileOffsetStore 和 RemoteBrokerOffsetStore。若没有显示设置的情况下,广播模式将使用LocalFileOffsetStore,集群模式将使用RemoteBrokerOffsetStore |
maxReconsumeTimes | 16 | 调用sendMessageBack的时候,如果发现重新消费超过这个配置的值,则投递到死信队列。由于PullConsumer没有管理消费的线程池和管理器,需要用户自己处理各种消费结果和拉取结果,故需要投递到重试队列或死信队列的时候需要显示调用sendMessageBack。回传消息的时候会带上maxReconsumeTimes的值,broker发现此消息已经消费超过此值,则投递到死信队列,否则投递到重试队列。此逻辑和DefaultPushConsumer是一致的,只是PushConsumer无需用户显示调用。 |
messageQueueListener | 无 | 由于pull操作需要用户自己去触发,故如果负载均衡发生变化,要有方法告知用户现在分到的新consume queue是什么。使用方可以实现接口MessageQueueListener 以达到此目的 |
主题,是一个虚拟的概念,是一类消息的抽象,消息的具体存放是放在Broker上面的,
一个Topic可以存放在多个Broker的多个Queue上面。
节点,就是服务器,发送的消息真正存放的地方,也是真正集群部署的基本单位,
参数 | 默认值 | 参数说明 |
---|---|---|
consumerGroup | Conusmer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组 | |
listenPort | 10911 | Broker对外监听的端口 |
namesrvAddr | 无 | NameServer的地址 |
brokerIP1 | 本机ip地址 | 有些网卡会识别失败或者识别错误,这里就需要手动填写 |
brokerName | 本机主机名 | 可以设置主机名,和主从有关,主从的brokerName必须一致 |
brokerClusterName | DefaultCluster | Broker所属哪个集群 |
brokerId | 0 | BrokerId,必须是大等于0的整数,0表示Master,>0表示Slave,一个Master可以挂多个Slave,Master和Slave通过BrokerName来配对 |
storePathCommitLog | $HOME/store/commitlog | commitLog的存储路径,该broker接收到的所有的消息(任何topic)都会被实例化到该文件里面,为了保证写入效率,这里写的方式是顺序写的 |
storePathConsumeQueue | $HOME/store/consumequeue | 消费队列的存储路径 |
storePathIndex | $HOME/store/index | 消息索引存储路径 |
deleteWhen | 4 | 删除时间点,24小时制 |
fileReservedTime | 48 | 文件保留时间,单位小时 |
maxTransferBytesOnMessageInMemory | 262144 | 单次拉取消息(内存)传输的最大字节数,单位Byte |
maxTransferCountOnMessageInMemory | 32 | 单次拉取消息(内存)的最大条数 |
maxTransferBytesOnMessageInDisk | 65535 | 单次拉取消息(硬盘)传输的最大字节数,单位Byte |
maxTransferCountOnMessageInDisk | 8 | 单次拉取消息(硬盘)的最大条数 |
messageIndexEnable | True | 是否开启索引功能 |
brokerRole | ASYNC_MASTER | Broker的角色 ASYNC_MASTER:异步复制Master。SYNC_MASTER:同步双写Master。 SLAVE:从 |
flushDiskType | ASYNC_FLUSH | 刷盘方式。ASYNC_FLUSH:异步刷盘;SYNC_FLUSH:同步刷盘 |
cleanFileForciblyEnable | TRUE | 磁盘满,且无过期文件情况下TRUE表示强制删除文件,优先保证服务可用,FALSE标记服务不可用,文件不删除 |
Tag
tag是消息的标志,发送消息的时候可以指定,接收消息的时候也可以按照这个Tag来过滤消息。
Queue
是消息存在也是生产者投放消息和消费者消费信息的目的地。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。