当前位置:   article > 正文

RocketMQ-名词和架构_rocketmq 消费者 设置instancename clientid。

rocketmq 消费者 设置instancename clientid。

RocketMQ

rocketMQ是做什么的我就不用解释了吧,以及他的背景。本文主要是为了让大家明白RocketMQ的工作原理。

架构图

RocketMQ架构图
上图,双箭头代表是双向通信,ProducerGroup和ConsumerGroup以及Broker集群,NameServer集群在互相通信的时候,是每个实例之间的通信。举个例子:上图中ProducerGroup和NameSevrer通信来说,是三台Producer实例分别与三台NameServer实例都会进行通信(当然前提是Producer默认注册了三台Producer实例配置了三台NameServer的地址),但是三台NameServer之间不会进行通信,他们是多活的模式,不是主备的模式。

主要名词解释
  • ProducerGroup
    由一组Producer组成,如果只是单纯的发送普通消息,本身没有什么特别含义,发送分布式事务消息时,
    如果 Producer 中途意外宕机,Broker会主动回调Producer Group内的任意一台机器来确认事务状态。
    
    • 1
    • 2
  • ConsumerGroup
    标识一类Consumer的集合名称,这类Consumer通常消费一类消息,且消费逻辑一致。同
    Consumer Group下的各个实例将共同消费topic的消息,起到负载均衡的作用。
    消费进度以Consumer Group为粒度管理,不同Consumer Group之间消费进度彼此不受影响,
    即消息A被Consumer Group1消费过,也会再给Consumer Group2消费。
    注: RocketMQ要求同一个Consumer Group的消费者必须要拥有相同的注册信息,
    即必须要听一样的topic(并且tag也一样)
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
  • Client
    RocketMQ里面有Client这个概念,Consumer和Producer都是Client,可以这么理解:
    生产者和消费者都是客户端,且都具备一个Client应该有的属性,因为RocketMQ对Client有一些限制和规定,
    所以在使用Consumer和Producer的时候也要注意这些规定和限制。对应的有个ClientConfig类。
    
    • 1
    • 2
    • 3
参数默认值参数说明
NameServer通过配置这个Client可以与NameServer通信获得需要的Topic和Broker的对应关系,默认值:-D系统参数rocketmq.namesrv.addr或环境变量NAMESRV_ADDR,Springboot配置:rocketmq.nameServer:IP:端口,多个NameServer用分号分割
clientIP本机IP客户端所在的服务器的ip地址,某些机器会发生无法识别客户端IP地址情况,可以在代码中强制指定
instanceName“DEFAULT”如果是DEFAULT得话,该字段又会被转换成该client所在的进程id,在RocketMQ中区分客户端是根据ClientID,ClientID=ClientIP@instanceName,也就是说如果同一个IP下的不同生产者如果instanceName相同的话,那就会被识别为同一个MQClientInstance(负责与MQ进行通信,如保持心跳,拉取Topick信息等),如果两个生产者配置的集群不同(不同NameServer),那么就会导致不同生产者的消息发往同一个集群(NameServer)中去,如果是消费者的话,就会导致多个消费者消费相同Queue里面的信息,导致信息混乱
clientCallbackExecutorThreads4通信层回调线程数量
pollNameServerInteval30000轮训NameServer的时间周期,单位毫秒
heartbeatBrokerInterval30000向Broker发送心跳的周期,单位毫秒
persistConsumerOffsetInterval5000持久化消费者消费进度的周期,单位毫秒,RocketMQ采取的是定期批量ack的机制以持久化消费进度。也就是说每次消费消息结束后,并不会立刻ack,而是定期的集中的更新进度。 由于持久化不是立刻持久化的,所以如果消费实例突然退出(如断电)或者触发了负载均衡分consue queue重排,有可能会有已经消费过的消费进度没有及时更新而导致重新投递。故本配置值越小,重复的概率越低,但同时也会增加网络通信的负担。
vipChannelEnabled-D com.rocketmq.sendMessageWithVIPChannel参数的值,若无则是true是否启用VIP通道发送信息,broker的netty server会起两个通信服务。两个服务除了服务的端口号不一样,其他都一样。其中一个的端口(配置端口-2)作为vip通道,客户端可以启用本设置项把发送消息此vip通道。
  • Producer
    Producer是一个Client,用来生产消息,并发送到指定Topic,甚至Broker和Queue。
    
    • 1
参数默认值参数说明
producerGroupDEFAULT_PRODUCER对于非事务型的Producer,producer group仅起到标识作用并没有实际作用
createTopicKeyTBW102Producer第一次发送消息的时候,如果topic不存在,若想自动创建该topic,需要一个topickey,这个值即是topickey的值。自动创建该topic支持的前提是broker 的配置打开autoCreateTopicEnable=true,然后broker会创建一个TBW102的topic,这个就是我们讲的默认的topickey.自动构建topic的过程:Producer发送的时候如果发现该Topic不存在,就会向配置有Producer配置的topickey的那个broker发送消息broker校验客户端的topic key是否在broker存在,且校验其权限最后一位是否是1(topic权限总共有3位,按位存储,分别是读、写、支持自动创建)若权限校验通过,先在该broker把T创建,并且权限就是topickey除去最后一位的权限
defaultTopicQueueNums4自动创建新的Topic时,创建的对应的Queue的数量
sendMsgTimeout10000发送消息的超时时间 单位毫秒
compressMsgBodyOverHowmuch4096发送消息过大时,进行消息压缩的标准 单位Byte
retryAnotherBrokerWhenNotStoreOKfalse如果发送消息返回sendResult,发送的结果如果不是SEND_OK状态,是否当作失败处理而尝试重发,此配置项只对同步发送有效,异步、oneway无效
maxMessageSize4M客户端验证,允许发送的最大消息体大小,超过会报错
还有一些事务相关的属性这里就不罗列了需要的可以自己去了解一下
  • PushConsumer
    推送模式的消费者,即消息由MQ主动推送过来
    
    • 1
参数默认值参数说明
consumerGroup消费者组名称,用来标识一组消费者
messageModelMessageModel.CLUSTERING消费的模式:有两种BROADCASTING和CLUSTERING。
consumeFromWhereConsumeFromWhere.CONSUME_FROM_LAST_OFFSET启动后的首次消费的起始点,可选值有三个:CONSUME_FROM_LAST_OFFSET //队列尾消费 CONSUME_FROM_FIRST_OFFSET //队列头消费CONSUME_FROM_TIMESTAMP //按照日期选择某个位置消费,这个配置只生效于新在线测consumer group,如果是老的已存在的consumer group,都降按照已经持久化的consume offset进行消费
consumeTimestamp半个小时前配合上面的配置使用,CONSUME_FROM_LAST_OFFSET的时候使用,从哪个时间点开始消费,格式为yyyyMMddhhmmss 如 20191123171201
allocateMessageQueueStrategyAllocateMessageQueueAveragely(取模平均分配)负载均衡策略算法这个算法可以自行扩展以使用自定义的算法,目前内置的有以下算法可以使用AllocateMessageQueueAveragely //取模平均AllocateMessageQueueAveragelyByCircle //环形平均AllocateMessageQueueByConfig // 按照配置,传入听死的messageQueueListAllocateMessageQueueByMachineRoom //按机房,从源码上看,必须和阿里的某些broker命名一致才行,也可以自己实现相应的接口,实现自己的策略
consumeThreadMin20PushConsumer内部拥有一个线程池进行消费消息,这里是核心线程数
consumeThreadMax64PushConsumer内部拥有一个线程池进行消费消息,这里是最大线程数
consumeConcurrentlyMaxSpan2000并发消费下,单条consume queue队列允许的最大offset跨度,达到则触发流控,只对并发消费(ConsumeMessageConcurrentlyService)生效
pullThresholdForQueue1000consume queue流控的阈值,每条consume queue的消息拉取下来后会缓存到本地,消费结束会删除。当累积达到一个阈值后,会触发该consume queue的流控
pullInterval0拉取消息的时间间隔,单位毫秒。由于RocketMQ采取的pull的方式进行消息投递,每此会发起一个异步pull请求,得到请求后会再发起下次请求,这个间隔默认是0,表示立刻再发起。在间隔为0的场景下,消息投递的及时性几乎等同用Push实现的机制
pullBatchSize32一次拉取的最大消息条数
consumeMessageBatchMaxSize1单次消费最大消息条数,由于拉取到的一批消息会立刻拆分成N(取决于consumeMessageBatchMaxSize)批消费任务,所以集合中msgs的最大大小是consumeMessageBatchMaxSize和pullBatchSize的较小值
maxReconsumeTimes-1一个消息如果消费失败的话,最多重新消费多少次才投递到死信队列,注:这个值默认值虽然是-1,但是实际使用的时候默认并不是-1。按照消费是并行还是串行消费有所不同的默认值。并行:默认16次串行:默认无限大(Interge.MAX_VALUE)。由于顺序消费的特性必须等待前面的消息成功消费才能消费后面的,默认无限大即一直不断消费直到消费完成。
suspendCurrentQueueTimeMillis1000串行消费使用,如果返回ROLLBACK或者SUSPEND_CURRENT_QUEUE_A_MOMENT,再次消费的时间间隔,单位毫秒
consumeTimeout15消费的最长超时时间 单位分钟,如果消费超时,则按照消费失败
  • PullConsumer
    拉取模式的消费者,自己控制消息的消费,包括消费量和进度等
    
    • 1
参数默认值参数说明
consumerGroup消费者组名称,用来标识一组消费者
messageModelMessageModel.CLUSTERING消费的模式:有两种BROADCASTING和CLUSTERING。
registerTopics空集合消费者监听的Topic
allocateMessageQueueStrategyAllocateMessageQueueAveragely(取模平均分配)负载均衡策略算法这个算法可以自行扩展以使用自定义的算法,目前内置的有以下算法可以使用AllocateMessageQueueAveragely //取模平均AllocateMessageQueueAveragelyByCircle //环形平均AllocateMessageQueueByConfig // 按照配置,传入听死的messageQueueListAllocateMessageQueueByMachineRoom //按机房,从源码上看,必须和阿里的某些broker命名一致才行,也可以自己实现相应的接口,实现自己的策略
offsetStorenull消息消费进度存储器,offsetStore 有两个策略:LocalFileOffsetStore 和 RemoteBrokerOffsetStore。若没有显示设置的情况下,广播模式将使用LocalFileOffsetStore,集群模式将使用RemoteBrokerOffsetStore
maxReconsumeTimes16调用sendMessageBack的时候,如果发现重新消费超过这个配置的值,则投递到死信队列。由于PullConsumer没有管理消费的线程池和管理器,需要用户自己处理各种消费结果和拉取结果,故需要投递到重试队列或死信队列的时候需要显示调用sendMessageBack。回传消息的时候会带上maxReconsumeTimes的值,broker发现此消息已经消费超过此值,则投递到死信队列,否则投递到重试队列。此逻辑和DefaultPushConsumer是一致的,只是PushConsumer无需用户显示调用。
messageQueueListener由于pull操作需要用户自己去触发,故如果负载均衡发生变化,要有方法告知用户现在分到的新consume queue是什么。使用方可以实现接口MessageQueueListener 以达到此目的
  • Topic
    主题,是一个虚拟的概念,是一类消息的抽象,消息的具体存放是放在Broker上面的,
    一个Topic可以存放在多个Broker的多个Queue上面。
    
    • 1
    • 2
  • Broker
    节点,就是服务器,发送的消息真正存放的地方,也是真正集群部署的基本单位,
    
    • 1
参数默认值参数说明
consumerGroupConusmer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组
listenPort10911Broker对外监听的端口
namesrvAddrNameServer的地址
brokerIP1本机ip地址有些网卡会识别失败或者识别错误,这里就需要手动填写
brokerName本机主机名可以设置主机名,和主从有关,主从的brokerName必须一致
brokerClusterNameDefaultClusterBroker所属哪个集群
brokerId0BrokerId,必须是大等于0的整数,0表示Master,>0表示Slave,一个Master可以挂多个Slave,Master和Slave通过BrokerName来配对
storePathCommitLog$HOME/store/commitlogcommitLog的存储路径,该broker接收到的所有的消息(任何topic)都会被实例化到该文件里面,为了保证写入效率,这里写的方式是顺序写的
storePathConsumeQueue$HOME/store/consumequeue消费队列的存储路径
storePathIndex$HOME/store/index消息索引存储路径
deleteWhen4删除时间点,24小时制
fileReservedTime48文件保留时间,单位小时
maxTransferBytesOnMessageInMemory262144单次拉取消息(内存)传输的最大字节数,单位Byte
maxTransferCountOnMessageInMemory32单次拉取消息(内存)的最大条数
maxTransferBytesOnMessageInDisk65535单次拉取消息(硬盘)传输的最大字节数,单位Byte
maxTransferCountOnMessageInDisk8单次拉取消息(硬盘)的最大条数
messageIndexEnableTrue是否开启索引功能
brokerRoleASYNC_MASTERBroker的角色 ASYNC_MASTER:异步复制Master。SYNC_MASTER:同步双写Master。 SLAVE:从
flushDiskTypeASYNC_FLUSH刷盘方式。ASYNC_FLUSH:异步刷盘;SYNC_FLUSH:同步刷盘
cleanFileForciblyEnableTRUE磁盘满,且无过期文件情况下TRUE表示强制删除文件,优先保证服务可用,FALSE标记服务不可用,文件不删除
  • Tag
    tag是消息的标志,发送消息的时候可以指定,接收消息的时候也可以按照这个Tag来过滤消息。

  • Queue
    是消息存在也是生产者投放消息和消费者消费信息的目的地。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/125723
推荐阅读
相关标签
  

闽ICP备14008679号