当前位置:   article > 正文

消息队列 Kafka\RocketMq_消息队列 推送模型和拉取模型

消息队列 推送模型和拉取模型

目录

消息队列

1. 为什么使用消息队列? 

1.1. 消息队列优缺点

1.2.1.优点

1.2.2.缺点

2. Kafka、ActiveMq、RabbitMq、RocketMq优缺点对比

2.1.Kafka和RocketMQ的区别

2.1.1.数据可靠性

2.1.2.性能对比

2.1.3.单机支持的队列数

2.1.4.消息投递的实时性

2.1.5.消费失败重试

2.1.6.严格保证消息有序

3. 如何保证消息队列的高可用?

4. 如何保证消息不被重复消费?幂等性

4.1.哪些情况下会导致重复消费

5.2.幂等性

6. 如何保证消息可靠性传输?

6.1.消费者弄丢了数据

6.2.Kafka弄丢了数据

6.3.生产者弄丢了数据

7. 如何保证消息的顺序性?

7.1.全局有序

7.2.局部有序

8. 消息积压/消息堆积

8.1.问题描述

8.2.消息积压解决方案

9.消费模型 push/pull


Kafka

1.Kafka体系架构

1.1.架构图

1.2.主题topic

1.2.1.主题topic为什么不能过多

1.3.分区partition

1.4.分区日志

1.5.多副本机制

1.5.1.为什么Kafka不像MySQL和Redis那样允许follwer副本对外提供读服务呢?

1.5.2.Kafka分区和副本的架构

1.5.3.Kafka对节点的存活定义有2个条件

2.Kafka的生产者

2.1.为什么要有分区

2.2.分区策略

2.2.1.轮询策略

2.2.2.随机策略

2.2.3.按消息键保序策略

3.Kafka的消费者

3.1.消费模型

3.2.消费者组Consumer Group

3.3.重平衡Rebalance

3.3.1. 那么Consumer Group 何时进行Rebalance 呢?触发条件有3 个

3.3.2. Coordinator 会在什么情况下认为某个Consumer 实例已挂从而要被“踢出”组呢?

3.3.3. Rebalance 有些“缺点”?

3.3.4. 消费者组rebalance流程

3.3.5. 消费者端rebalance流程

4.Kafka的服务端

4.1.Kafka的控制器

4.2.Kafka的存储层

4.2.1.生产者写入数据过程

4.2.2.消费者读取数据过程

5.kafka高性能的原因

6.kafka三种语义实现

6.1.exactly once


1. 为什么使用消息队列? 

其实就是问问你消息队列都有哪些使用场景,然后你项目里具体是什么场景,说说你在这个场景里用消息队列是什么?

解耦、异步、削峰

“削峰填谷”就是指缓冲上下游瞬时突发的流量,使其更平滑。对于发送能力很强的上游系统,如果没有消息引擎的保护,下游系统可能会直接被压垮导致全链路服务雪崩,消息引擎可以在很大程度上避免流量的震荡。消息引擎系统的另外一大好处在于发送方和接收方的松耦合,减少系统间不必要的交互。

1.1. 消息队列优缺点

1.2.1.优点

优点上面已经说了,就是在特殊场景下有其对应的好处解耦异步削峰

1.2.2.缺点

① 系统可用性降低

系统引入的外部依赖越多,越容易挂掉。本来你就是 A 系统调用 BCD 三个系统的接口就好了,ABCD 四个系统还好好的,没啥问题,你偏加个 MQ 进来,万一 MQ 挂了咋整?MQ 一挂,整套系统崩溃,你不就完了?如何保证消息队列的高可用,可以点击这里查看

② 系统复杂度提高

硬生生加个 MQ 进来,你怎么保证消息没有重复消费?怎么处理消息丢失的情况?怎么保证消息传递的顺序性?头大头大,问题一大堆,痛苦不已。

③ 一致性问题

A 系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是 BCD 三个系统那里,BD 两个系统写库成功了,结果 C 系统写库失败了,咋整?你这数据就不一致了。

2. Kafka、ActiveMq、RabbitMq、RocketMq优缺点对比

特性ActiveMQRabbitMQRocketMQKafka
单机吞吐量万级,比 RocketMQ、Kafka 低一个数量级同 ActiveMQ10 万级,支撑高吞吐10 万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景
topic 数量对吞吐量的影响topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topictopic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源
时效性ms 级微秒级,这是 RabbitMQ 的一大特点,延迟最低ms 级延迟在 ms 级以内
可用性高,基于主从架构实现高可用同 ActiveMQ非常高,分布式架构非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消息可靠性有较低的概率丢失数据基本不丢经过参数优化配置,可以做到 0 丢失同 RocketMQ
功能支持MQ 领域的功能极其完备基于 erlang 开发,并发能力很强,性能极好,延时很低MQ 功能较为完善,还是分布式的,扩展性好功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用

综上,各种对比之后,有如下建议:

一般的业务系统要引入 MQ,最早大家都用 ActiveMQ,但是现在确实大家用的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃,所以大家还是算了吧,我个人不推荐用这个了。

后来大家开始用 RabbitMQ,但是确实 erlang 语言阻止了大量的 Java 工程师去深入研究和掌控它,对公司而言,几乎处于不可控的状态,但是确实人家是开源的,比较稳定的支持,活跃度也高。

不过现在确实越来越多的公司会去用 RocketMQ,确实很不错,毕竟是阿里出品,但社区可能有突然黄掉的风险(目前 RocketMQ 已捐给Apache,但 GitHub 上的活跃度其实不算高)对自己公司技术实力有绝对自信的,推荐用 RocketMQ,否则回去老老实实用 RabbitMQ 吧,人家有活跃的开源社区,绝对不会黄。

所以中小型公司,技术实力较为一般,技术挑战不是特别高,用 RabbitMQ 是不错的选择;大型公司,基础架构研发实力较强,用 RocketMQ 是很好的选择。

如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。

2.1.Kafka和RocketMQ的区别

kafka和RocketMQ的总体区别

  1. kafka设计初衷是用于日志传输
  2. RocketMQ的设计用于解决各类应用可靠的消息传输,阿里云官网承诺RocketMQ数据可靠性为10个9,服务可靠性为99.95%。

kafka相比RocketMQ的优势
1、单机吞吐量TPS可上百万,远高于RocketMQ的TPS7万每秒,适用于日志类消息
2、kafka支持多语言的客户端

RocketMQ相比kafka的优势
1、保证消息不丢( 数据可靠性达10个9)
2、可严格保证消息有序
3、支持分布式事务消息
4、支持按时间做消息回溯(可精确到毫秒级)
5、支持按标识和内容查询消息,用于排查丢消息
6、支持消费失败重试
7、可支持更多的partition, 即更多的消费线程数

2.1.1.数据可靠性

  1. RocketMQ支持异步实时刷盘、同步刷盘、同步复制、异步复制
  2. Kafka使用异步刷盘方式,异步复制/同步复制

总结:

  1. RocketMQ支持kafka所不具备的“同步刷盘”功能,在单机可靠性上比kafka更高,不会因为操作系统Crash而导致数据丢失。
  2. kafka的同步replication理论上性能低于RocketMQ的replication,这是因为kafka的数据以partition为单位,这样一个kafka实例上可能多上百个partition。而一个RocketMQ实例上只有一个partition,RocketMQ可以充分利用IO组的commit机制,批量传输数据。同步replication与异步replication相比,同步replication性能上损耗约20%-30%。

概括:

  1. RocketMQ新增了同步刷盘机制,保证了可靠性
  2. 一个RocketMQ实例只有一个partition, 在replication时性能更好

2.1.2.性能对比

  1. kafka单机写入TPS月在 百万条/秒,消息大小为10个字节
  2. RocketMQ单机写入TPS单实例约 7万条/秒,若单机部署3个broker,可以跑到最高12万条/秒,消息大小为10个字节。

总结:
kafka的单机TPS能跑到每秒上百万,是因为Producer端将多个小消息合并,批量发向broker。

那么RocketMQ为什么没有这样做呢?

发送消息的Producer通常是用Java语言,缓存过多消息,GC是个很严重的问题。(问题:难道kafka用scala不需要GC?)
Producer发送消息到broker, 若消息发送出去后,未达到broker,就通知业务消息发送成功,若此时Broker宕机,则会导致消息丢失,从而导致业务出错。
Producer通常为分布式系统,且每台机器都是多线程发送,通常来说线上单Producer产生的消息数量不会过万。
消息合并功能完全可由上层业务来做。
一句话概括:RocketMQ写入性能上不如kafka, 主要因为kafka主要应用于日志场景,而RocketMQ应用于业务场景,为了保证消息必达牺牲了性能,且基于线上真实场景没有在RocketMQ层做消息合并,推荐在业务层自己做。

2.1.3.单机支持的队列数

  1. kafka单机若超过了64个partition/队列,CPU load会发生明显飙高,partition越多,CPU load越高,发消息的响应时间变长
  2. RocketMQ单机支持最高5万个队列,CPU load不会发生明显变化

队列多有什么好处呢?

  1. 单机可以创建更多个topic, 因为每个topic都是有一组队列组成
  2. 消费者的集群规模和队列数成正比,队列越多,消费类集群可以越大

概括

  • RocketMQ支持的队列数远高于kafka支持的partition数,这样RocketMQ可以支持更多的consumer集群

2.1.4.消息投递的实时性

  1. kafka采用短轮询的方式,实时性取决于轮询时间间隔,0.8以后版本支持长轮询
  2. RocketMQ使用长轮询,同Push实时性一致,消息投递的延迟通常在几毫秒内

概括

  • kafka与RocketMQ都支持长轮询,消息投递的延迟在几毫秒内

2.1.5.消费失败重试

  1. kafka不支持消费失败重试
  2. RocketMQ消费失败支持定时重试,每次重试间隔时间顺延

概括

  • RocketMQ支持消费失败重试功能,主要用于第一次调用不成功,后面可调用成功的场景
  • kafka不支持消费失败重试

2.1.6.严格保证消息有序

  1. kafka可保证同一个partition上的消息有序,但一旦broker宕机,就会产生消息乱序
  2. Rocket支持严格的消息顺序,一台broker宕机,发送消息会失败,但不会乱序。举例:MySQL的二进制日志分发需要保证严格的顺序

概括

  • kafka不保证消息有序,RocketMQ可保证严格的消息顺序,即使单台Broker宕机,仅会造成消息发送失败,但不会消息乱序

3. 如何保证消息队列的高可用?

4. 如何保证消息不被重复消费?幂等性

4.1.哪些情况下会导致重复消费

首先,比如 RabbitMQ、RocketMQ、Kafka,都有可能会出现消息重复消费的问题,正常。因为这问题通常不是 MQ 自己保证的,是由我们开发来保证的。挑一个 Kafka 来举个例子,说说怎么重复消费吧。

Kafka 实际上有个 offset 的概念,就是每个消息写进去,都有一个 offset,代表消息的序号,然后 consumer 消费了数据之后,每隔一段时间(定时定期),会把自己消费过的消息的 offset 提交一下,表示“我已经消费过了,下次我要是重启啥的,你就让我继续从上次消费到的 offset 来继续消费吧”。

但是凡事总有意外,比如我们之前生产经常遇到的,就是你有时候重启系统,看你怎么重启了,如果碰到点着急的,直接 kill 进程了,再重启。这会导致 consumer 有些消息处理了,但是没来得及提交 offset,尴尬了。重启之后,没被提交offset的消息会再次消费一次

5.2.幂等性

其实重复消费不可怕,可怕的是你没考虑到重复消费之后,怎么保证幂等性

其实还是得结合业务来思考,我这里给几个思路:

  • 比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update 一下好吧。
  • 比如你是写 Redis,那没问题了,反正每次都是 set,天然幂等性。

6. 如何保证消息可靠性传输?

11 无消息丢失配置怎么实现?

这个是肯定的,用 MQ 有个基本原则,就是数据不能多一条,也不能少一条

  • 不能多,就是前面说的「重复消费和幂等性问题」
  • 不能少,就是说这数据别搞丢了

6.1.消费者弄丢了数据

手动提交消息

唯一可能导致消费者弄丢数据的情况,就是说,你消费到了这个消息,然后消费者那边自动提交了 offset,让 Kafka 以为你已经消费好了这个消息,但其实你才刚准备处理这个消息,你还没处理,你自己就挂了,此时这条消息就丢咯。

那么,只要关闭自动提交 offset,在处理完之后自己手动提交 offset,就可以保证数据不会丢。

但是此时确实还是可能会有重复消费,比如你刚处理完,还没提交 offset,结果自己挂了,此时肯定会重复消费一次,自己保证幂等性就好了。

6.2.Kafka弄丢了数据

kafka配置

这块比较常见的一个场景,就是 Kafka 某个 broker 宕机,然后重新选举 partition 的 leader。大家想想,要是此时其他的 follower 刚好还有些数据没有同步,结果此时 leader 挂了,然后选举某个 follower 成 leader 之后,不就少了一些数据?这就丢了一些数据啊。

生产环境也遇到过,我们也是,之前 Kafka 的 leader 机器宕机了,将 follower 切换为 leader 之后,就会发现说这个数据就丢了。

我们生产环境就是按照上述要求配置的,这样配置之后,至少在 Kafka broker 端就可以保证在 leader 所在 broker 发生故障,进行 leader 切换时,数据不会丢失。

所以此时一般是要求起码设置如下 4 个参数:

  • 给 topic 设置 replication.factor 参数:这个值必须大于 1,要求每个 partition 必须有至少 2 个副本。
  • 在 Kafka 服务端设置 min.insync.replicas 参数:这个值必须大于 1,这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系,没掉队,这样才能确保 leader 挂了还有一个 follower 吧。
  • 在 producer 端设置 acks=all :这个是要求每条数据,必须是写入所有 replica 之后,才能认为是写成功了
  • 在 producer 端设置 retries=MAX (很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了。
  • unclean,leader,election,enable配置为false(不允许选择OSR中的从节点作为主节点)

6.3.生产者弄丢了数据

解决此问题的方法非常简单:Producer 永远要使用带有回调通知的发送API,也就是说不要使用producer.send(msg),而要使用producer.send(msg, callback),它能准确地告诉你消息是否真的提交成功了。一旦出现消息提交失败的情况,你就可以有针对性地进行处理。

7. 如何保证消息的顺序性?

要想实现消息有序,需要从Producer和Consumer两方面来考虑。

如果对Kafka不了解的话,可以先看这篇博客《一文快速了解Kafka》。

针对消息有序的业务需求,还分为全局有序和局部有序。

  • 全局有序:一个Topic下的所有消息都需要按照生产顺序消费。

  • 局部有序:一个Topic下的消息,只需要满足同一业务字段的要按照生产顺序消费。例如:Topic消息是订单的流水表,包含订单orderId,业务要求同一个orderId的消息需要按照生产顺序进行消费。

7.1.全局有序

        由于Kafka的一个Topic可以分为了多个Partition,Producer发送消息的时候,是分散在不同 Partition的。当Producer按顺序发消息给Broker,但进入Kafka之后,这些消息就不一定进到哪个Partition,会导致顺序是乱的。

因此,要满足全局有序:

1. 需要1个Topic只能对应1个Partition。

        

2. 而且对应的consumer也要使用单线程或者保证消费顺序的线程模型,否则消费端造成的消费乱序。

7.2.局部有序

生产者和消费者,指定Partition Key,保证局部有序

  1. 在发消息的时候指定Partition Key(Kafka内部实现会对其进行Hash计算,这样Partition Key相同的消息会放在同一个Partition)
  2. 每个消费者线程都指定Partition Key,这样该消费者只会消费这个分区的消息

消费者进程优化:在不增加partition数量的情况下想提高消费速度,每个消费者进程可以进行再次优化(实现以下功能)

  1. 每个消费者进程都维护N个内存队列,内存队列的规则是:通过hash函数,将具有相同key的msg,分发到相同的内存队列中
  2. 每个消费者进程都维护N个线程
  3. 线程和内存队列是1:1的,即单生产者单消费者模型

8. 消息积压/消息堆积

8.1.问题描述

如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,说说怎么解决?

你看这问法,其实本质针对的场景是:

  1. 可能你的消费端出了问题,不消费了
  2. 消费的速度 << 生产者的速度

        接着就坑爹了,可能你的消息队列集群的磁盘都快写满了,都没人消费,这个时候怎么办?或者是这整个就积压了几个小时,你这个时候怎么办?或者是你积压的时间太长了,导致比如 RabbitMQ 设置了消息过期时间后就没了怎么办?

消息堆积可能原因如下:

  1. 生产速度大于消费速度,这样可以适当增加partition,增加consumer数量,提升消费TPS
  2. consumer消费性能低,查一下是否有很重的消费逻辑(比如拿到消息后写HDFS或HBASE这种逻辑就挺重的),看看是否可以优化consumer TPS
  3. 确保consumer端没有因为异常而导致消费hang住
  4. 如果你使用的是消费者组,确保没有频繁地发生rebalance

8.2.消息积压解决方案

关于这个事儿,我们一个一个来梳理吧,先假设一个场景,我们现在消费端出故障了,然后大量消息在 mq 里积压,现在出事故了,慌了。

几千万条数据在 MQ 里积压了七八个小时,从下午 4 点多,积压到了晚上 11 点多。这个是我们真实遇到过的一个场景,确实是线上故障了,这个时候要不然就是修复 consumer 的问题,让它恢复消费速度,然后傻傻的等待几个小时消费完毕。这个肯定不能在面试的时候说吧。

分析速度:一个消费者一秒是 1000 条,一秒 3 个消费者是 3000 条,一分钟就是 18 万条。所以如果你积压了几百万到上千万的数据,即使消费者恢复了,也需要大概 1 小时的时间才能恢复过来。

一般这个时候,只能临时紧急扩容了,具体操作步骤和思路如下:

  • 先修复 consumer 的问题,确保其恢复消费速度,然后将现有 consumer 都停掉
  • 增加 topic 的 partition 的个数,增加消费者组中 consumer线程的数量,使得 partition数 = consumer数
  • 消息worker线程,提高并发度

8.2.1.事前处理机制:避免

在应用上线之前,对大致的流量是有预估的,并且采用压测,探测「生产者产生消息的速率、消费者消费消息的速率」

如果发现消费者速度较慢,可以:①对消费者程序进行优化 ②消费者扩容

8.2.2.事中处理机制

场景描述:运营在调整活动时,导致了流量激增,导致生产者生产速率超过了我们预估的消费者速率,导致消息积压

解决方案:消费者扩容

8.2.3.事后处理机制

绝大多数的消费者程序都是IO密集型,一般是操作数据库、调用RPC

  1. 提高消费并行度:比如,并发调用下游服务
  2. 批量方式消费
  3. 跳过非重要消息
  4. 优化每条消息的消费过程

9.消费模型 push/pull

push

  1. 缺点:
    1. 没考虑消费者的消费能力
    2. 推送完消息后设置消费成功,但是消费者挂了,推送的消息会丢失
  2. 优点:及时性强

pull

  1. 缺点:及时性差
  2. 优点:
    1. 消费者可以根据自己的消费能力拉取消息
    2. 消费成功后,修改offset,消息不会丢失

1.Kafka体系架构

1.1.架构图

        一个典型的Kafka 体系架构包括若干Producer、若干Broker、若干Consumer,以及一个ZooKeeper 集群,如下图所示:

  1. Producer将消息发送到Broker ==> Broker负责将收到的消息存储到磁盘中 ==> Consumer负责从Broker订阅并消费消息
  2. ZooKeeper是Kafka用来负责集群元数据的管理、控制器的选举等操作的

1.2.主题topic

        kafka的消息,以主题为单位进行归类:生产者负责将消息发送到特定的主题(发送到Kafka 集群中的每一条消息都要指定一个主题),而消费者负责订阅主题并进行消费。

        主题topic在kafka中是⼀个逻辑的概念,kafka通过topic将消息进⾏分类。不同的topic会被订阅该topic的消费者消费。

        但是有⼀个问题,如果说这个topic中的消息⾮常⾮常多,多到需要⼏T来存,因为消息是会被保存到log⽇志⽂件中的。为了解决这个⽂件过⼤的问题,kafka提出了Partition分区的概念

1.2.1.主题topic为什么不能过多

  1. 即使每个topic只有1个partition,当topic数量到达成千上万时,会导致总分区数很多,虽然每个分区的日志是顺序写,但是随着分区增多,从全局来看,磁盘读写退化为随机写,影响性能
  2. Kafka中Topic的元数据是在zookeeper中的,大量topic确实会造成性能瓶颈,不仅在磁盘读写上
  3. topic太多造成partition过多。partition是kafka的最小并行单元,每个partition都会在对应的broker上有日志文件。当topic过多,partition增加,日志文件数也随之增加,就需要允许打开更多的文件数。partition过多,在controller选举和controller重新选举partition leader的耗时会大大增加,造成kafka不可用的时间延长

1.3.分区partition

通过 partition 将⼀个 topic 中的消息分区来存储。这样的好处有多个:
  1. 分区存储,可以解决统一存储文件(log)过大的问题
  2. 提供读写吞吐量:读写可以同时在多个分区中进行

        主题是一个逻辑上的概念,它还可以细分为多个分区,一个分区只属于单个主题,很多时候也会把分区称为主题分区(Topic-Partition)。

        分区是物理上的概念,分区在存储层面可以看作一个可追加的日志(Log)文件,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量(offset)。

        offset 是消息在分区中的唯一标识,是一个单调递增且不变的值。Kafka 通过它来保证消息在分区内的顺序性,不过offset 并不跨越分区,也就是说,Kafka 保证的是分区有序而不是主题有序

案例:如下图所示,主题中有4个分区,消息被顺序追加到每个分区日志文件的尾部。

1.4.分区日志

        每一条消息被发送到 broker 之前,会根据分区规则选择存储到哪个具体的分区。如果分区规则设定得合理,所有的消息都可以均匀地分配到不同的分区中。如果一个主题只对应一个文件,那么这个文件所在的机器 I/O 将会成为这个主题的性能瓶颈,而分区解决了这个问题。在创建主题的时候可以通过指定的参数来设置分区的个数,当然也可以在主题创建完成之后去修改分区的数量,通过增加分区的数量可以实现水平扩展。

        不考虑多副本的情况,一个分区对应一个日志(Log)。为了防止 Log 过大,Kafka 又引入了日志分段(LogSegment)的概念,将 Log 切分为多个 LogSegment,相当于一个巨型文件被平均分配为多个相对较小的文件,这样也便于消息的维护和清理。事实上,Log 和 LogSegment也不是纯粹物理意义上的概念,Log 在物理上只以文件夹的形式存储,而每个 LogSegment 对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件(比如以“.txnindex”为后缀的事务索引文件)。下图描绘了主题、分区、副本、Log 以及 LogSegment 之间的关系。

1.5.多副本机制

Kafka 为分区引入了多副本(Replica)机制,通过增加副本数量可以提升容灾能力。备份的思想,就是把相同的数据拷贝到多台机器上,而这些相同的数据拷贝在 Kafka 中被称为副本(Replica)。

同一分区的不同副本中保存的是相同的消息(在同一时刻,副本之间并非完全一样),副本之间是“一主多从”的关系,其中,leader 副本负责处理读写请求,follower 副本只负责与leader 副本的消息同步。副本处于不同的broker 中,当leader 副本出现故障时,从follower 副本中重新选举新的leader 副本对外提供服务。Kafka 通过多副本机制实现了故障的自动转移,当Kafka 集群中某个broker 失效时仍然能保证服务可用。

1.5.1.为什么Kafka不像MySQL和Redis那样允许follwer副本对外提供读服务呢?

使用场景

  1. Redis和MySQL都支持主从读写分离,这和它们的使用场景有关。对于那种读操作很多而写操作相对不频繁的负载类型而言,采用读写分离是非常不错的方案。我们可以添加很多follower横向扩展,提升读操作性能。
  2. 反观Kafka,它的主要场景还是在消息引擎而不是以数据存储的方式对外提供读服务,通常涉及频繁地生产消息和消费消息,这不属于典型的读多写少场景,因此读写分离方案在这个场景下并不太适合。

        Kafka副本机制使用的是异步消息拉取,因此存在leader和follower之间的不一致性。如果要采用读写分离,必然要处理副本lag引入的一致性问题,比如如何实现read-your-writes、如何保证单调读(monotonic reads)以及处理消息因果顺序颠倒的问题。相反地,如果不采用读写分离,所有客户端读写请求都只在Leader上处理也就没有这些问题了——当然最后全局消息顺序颠倒的问题在Kafka中依然存在,常见的解决办法是使用单分区,其他的方案还有version vector,但是目前Kafka没有提供。

        主写从读无非就是为了减轻leader节点的压力,将读请求的负载均衡到follower节点,如果Kafka的分区相对均匀地分散到各个broker上,同样可以达到负载均衡的效果,没必要刻意实现主写从读增加代码实现的复杂程度。

1.5.2.Kafka分区和副本的架构

如上图所示,Kafka 集群中有4个broker,某个主题中有3个分区,且副本因子(即副本个数)也为3。每个分区便有1个leader 副本和2个follower 副本。生产者和消费者只与leader 副本进行交互,而follower 副本只负责消息的同步,很多时候follower 副本中的消息相对leader 副本而言会有一定的滞后。

分区中的所有副本统称为AR(Assigned Replicas)。AR=ISR+OSR

1.5.3.Kafka对节点的存活定义有2个条件

  1. 节点必须和ZK保持会话
  2. 如果这个节点是某个分区的备份副本,它必须对分区主副本的写操作进行复制,并且复制的进度不能落后太多(follower的leo落后leader的leo超过阈值)

满足这两个条件,叫作“正在同步中”(in-sync)

ISR 与HW 和LEO 也有紧密的关系。

HW 是High Watermark 的缩写,俗称高水位,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个offset 之前的消息。

LEO 是Log End Offset 的缩写,它标识当前日志文件中下一条待写入消息的offset

如上图所示,它代表一个日志文件,这个日志文件中有9条消息,第一条消息的offset(LogStartOffset)为0,最后一条消息的offset 为8,offset 为9的消息用虚线框表示,代表下一条待写入的消息。日志文件的HW 为6,表示消费者只能拉取到offset 在0至5之间的消息,而offset 为6的消息对消费者而言是不可见的。

上图中offset 为9的位置即为当前日志文件的LEO,LEO 的大小相当于当前日志分区中最后一条消息的offset 值加1。分区ISR 集合中的每个副本都会维护自身的LEO,而ISR 集合中最小的LEO 即为分区的HW,对消费者而言只能消费HW 之前的消息。

2.Kafka的生产者

2.1.为什么要有分区

        分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性(Scalability)。

        不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且,我们还可以通过添加新的节点机器来增加整体系统的吞吐量

        分区是实现负载均衡以及高吞吐量的关键,故在生产者这一端就要仔细盘算合适的分区策略,避免造成消息数据的“倾斜”,使得某些分区成为性能瓶颈,这样极易引发下游数据消费的性能下降。

2.2.分区策略

Kafka生产者的分区策略是决定生产者将消息发送到哪个分区的算法。Kafka提供默认的分区策略,同时它也支持自定义分区策略。常见的分区策略如下:

2.2.1.轮询策略

        也称Round-robin 策略,即顺序分配。比如一个主题下有3 个分区,那么第一条消息被发送到分区0,第二条被发送到分区1,第三条被发送到分区2,以此类推。当生产第4 条消息时又会重新开始,即将其分配到分区0。轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是我们最常用的分区策略之一。

2.2.2.随机策略

        也称Randomness 策略,所谓随机就是我们随意地将消息放置到任意一个分区上。本质上看随机策略也是力求将数据均匀地打散到各个分区,但从实际表现来看,它要逊于轮询策略,所以如果追求数据的均匀分布,还是使用轮询策略比较好。

2.2.3.按消息键保序策略

        Kafka 允许为每条消息定义消息键,简称为Key。这个Key 的作用非常大,它可以是一个有着明确业务含义的字符串,比如客户代码、部门编号或是业务ID 等;也可以用来表征消息元数据。一旦消息被定义了Key,那么你就可以保证同一个Key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略。Kafka的主题会有多个分区,分区作为并行任务的最小单位,为消息选择分区要根据消息是否含有键来判断。

3.Kafka的消费者

1. 消费者采用拉取模型带来的优点有哪些?

2. 为什么要约定“同一个分区只可被一个消费者处理”?

3. 消费者如何拉取数据

4. 消费者如何消费消息

5. 消费者提交分区偏移量

6. 消费者组再平衡操作

7. 消费者组是什么

8. 消费者组的协调者

3.1.消费模型

        消息由生产者发布到Kafka集群后,会被消费者消费。消息的消费模型有两种:推送模型(push)和拉取模型(pull)。

        基于推送模型的消息系统,由broker记录消费者的消费状态。broker在将消息推送到消费者后,标记这条消息为已消费,这种方式无法很好地保证消息的处理语义。比如,broker把消息发送出去后,当消费进程挂掉或者由于网络原因没有收到这条消息时,就有可能造成消息丢失(因为消息代理已经把这条消息标记为己消费了,但实际上这条消息并没有被实际处理)。如果要保证消息的处理语义,broker发送完消息后,要设置状态为“已发送”,只有收到消费者的确认请求后才更新为“已消费”,这就需要在消息代理中记录所有消息的消费状态,这种方式需要在客户端和服务端做一些复杂的状态一致性保证,比较复杂。

        因此,kafka采用拉取模型由消费者自己记录消费状态,每个消费者互相独立地顺序读取每个分区的消息。这种由消费者控制偏移量的优点是消费者可以按照任意的顺序消费消息,比如,消费者可以重置到旧的偏移量,重新处理之前已经消费过的消息;或者直接跳到最近的位置,从当前时刻开始消费。broker是无状态的,它不需要标记哪些消息被消费者处理过,也不需要保证一条消息只会被一个消费者处理。而且,不同的消费者可以按照自己最大的处理能力来拉取数据,即使有时候某个消费者的处理速度稍微落后,它也不会影响其他的消费者,并且在这个消费者恢复处理速度后,仍然可以追赶之前落后的数据。

3.2.消费者组Consumer Group

Consumer Group 是Kafka 提供的可扩展且具有容错性的消费者机制。

  1. 既然是一个组,那么组内必然可以有多个Consumer实例,它们共享一个公共的ID,这个ID 被称为Group ID
  2. 组内的所有消费者协调在一起来消费订阅主题的所有分区
  3. 当然,每个分区只能由同一个消费者组内的一个Consumer实例来消费
一个分区只能属于一个消费者线程,将分区分配给消费者有以下几种场景
  • 线程数量多于分区的数量
  • 线程数量少于分区的数量
  • 线程数量等于分区的数量

        那么一个 Group 下该有多少个 Consumer 实例呢?理想情况下,「Consumer 实例的数量」应该等于「Group 订阅主题的分区总数」

3.3.重平衡Rebalance

        Rebalance 就是让一个Consumer Group 下所有的Consumer实例就如何消费订阅主题的所有分区达成共识的过程。

        在 Rebalance 过程中,所有Consumer 实例共同参与,在协调者组件(Coordinator)的帮助下,完成订阅主题分区的分配。

3.3.1. 那么Consumer Group 何时进行Rebalance 呢?触发条件有3 个

  1. 组成员数发生变更。比如有新的Consumer 实例加入组或者离开组,抑或是有Consumer 实例崩溃被“踢出”组。
  2. 订阅主题的分区数发生变更。Kafka 当前只能允许增加一个主题的分区数。当分区数增加时,就会触发订阅该主题的所有Group 开启Rebalance。
  3. 订阅主题数发生变更。Consumer Group 可以使用正则表达式的方式订阅主题,比如consumer.subscribe(Pattern.compile("t.*c")) 就表明该Group 订阅所有以字母t 开头、字母c 结尾的主题。在Consumer Group 的运行过程中,你新创建了一个满足这样条件的主题,那么该Group 就会发生Rebalance。

3.3.2. Coordinator 会在什么情况下认为某个Consumer 实例已挂从而要被“踢出”组呢?

        当Consumer Group 完成 Rebalance 之后,每个Consumer 实例都会定期地向Coordinator 发送心跳请求,表明它还存活着。

        如果某个Consumer 实例不能及时地发送这些心跳请求,Coordinator 就会认为该Consumer 已经“死”了,从而将其从Group 中移除,然后开启新一轮Rebalance。Rebalance 发生时,Group 下所有的Consumer 实例都会协调在一起共同参与。

3.3.3. Rebalance 有些“缺点”?

        首先,Rebalance 过程对 Consumer Group 消费过程有极大的影响。如果你了解 JVM 的垃圾回收机制,你一定听过万物静止的收集方式,即著名的 stop the world,简称 STW。在 STW期间,所有应用线程都会停止工作,表现为整个应用程序僵在那边一动不动。Rebalance 过程也和这个类似,在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成。这是 Rebalance 为人诟病的一个方面。

        其次,目前 Rebalance 的设计是所有 Consumer 实例共同参与,全部重新分配所有分区。其实更高效的做法是尽量减少分配方案的变动。例如实例 A 之前负责消费分区 1、2、3,那么Rebalance 之后,如果可能的话,最好还是让实例 A 继续消费分区 1、2、3,而不是被重新分配其他的分区。这样的话,实例 A 连接这些分区所在 Broker 的 TCP 连接就可以继续用,不用重新创建连接其他 Broker 的 Socket 资源。

        最后,Rebalance 实在是太慢了。

所以,我们尽量避免一些非必要的 Rebalance。

  1. 第一类非必要 Rebalance 是因为未能及时发送心跳,导致 Consumer 被“踢出”Group 而引发的。因此,需要仔细地设置 session.timeout.ms(决定了 Consumer 存活性的时间间隔)和 heartbeat.interval.ms(控制发送心跳请求频率的参数)的值。
  2. 第二类非必要 Rebalance 是 Consumer 消费时间过长导致的,Consumer 端还有一个参数,用于控制 Consumer 实际消费能力对 Rebalance 的影响,即max.poll.interval.ms 参数。它限定了 Consumer 端应用程序两次调用 poll 方法的最大时间间隔。它的默认值是 5 分钟,表示你的 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起“离开组”的请求,Coordinator 也会开启新一轮Rebalance。

3.3.4. 消费者rebalance流程

        消费者组的重平衡流程,它的作用是让组内所有的消费者实例就消费哪些主题分区达成一致。重平衡需要借助「 Kafka Broker 端的 Coordinator 组件」,在 Coordinator 的帮助下完成整个消费者组的分区重分配。

1. 触发与通知

  1. 重平衡过程通过消息者端的心跳线程通知到其他消费者实例
  2. Kafka Java 消费者需要定期地发送心跳请求到 Broker 端的协调者Coordinator,以表明它还存活着
  3. 消费者端的参数 heartbeat.interval.ms,从字面上看,它就是设置了心跳的间隔时间,但这个参数的真正作用是控制重平衡通知的频率

2. 消费者组状态机

重平衡一旦开启,Broker 端的协调者组件Coordinator就要开始忙了,主要涉及到控制消费者组的状态流转。

Kafka 设计了一套消费者组状态机(State Machine),帮助协调者完成整个重平衡流程。

a) Kafka 消费者组状态

  1. Empty:组内没有任何成员,但消费者组可能存在已提交的位移数据,而且这些位移尚未过期。
  2. Dead:组内没有任何成员,但组的元数据信息已经在协调者端被移除。协调者保存着当前向它注册过的所有组信息,所谓元数据就是类似于这些注册信息。
  3. PreparingRebalance:消费者组准备开启重平衡,此时所有成员都要重新请求加消费者组
  4. CompletingRebalance:消费者组下所有成员已经加入,各个成员正在等待分配方案。
  5. stable:消费者组的稳定状态。该状态表明重平衡已经完成,组内成员能够正常消费数据了。

b) 状态机的各个状态流转图如下:

        一个消费者组最开始是 Empty 状态,当重平衡过程开启后,它会被置于 PreparingRebalance状态等待成员加入,之后变更到 CompletingRebalance 状态等待分配方案,最后流转到 Stable状态完成重平衡。当有新成员加入或已有成员退出时,消费者组的状态从 Stable 直接跳到PreparingRebalance 状态,此时,所有现存成员就必须重新申请加入组。当所有成员都退出组后,消费者组状态变更为 Empty。Kafka 定期自动删除过期位移的条件就是,组要处于 Empty 状态。如果消费者组停了很长时间(超过 7 天),那么 Kafka 很可能就把该组的位移数据删除了。

3.3.5. 消费者rebalance流程

重平衡的完整流程需要消费者端和协调者组件共同参与才能完成。在消费者端,重平衡分为以下两个步骤:

1) 加入组:JoinGroup 请求

2) 等待领导者消费者分配方案:SyncGroup 请求

        当组内成员加入组时,他会向协调者发送 JoinGroup 请求。在该请求中,每个成员都要将自己订阅的主题上报,这样协调者就能收集到所有成员的订阅信息。一旦收集了全部成员的 JoinGroup请求后,协调者会从这些成员中选择一个担任这个消费者组的领导者。通常情况下,第一个发送JoinGroup 请求的成员自动成为领导者。注意区分这里的领导者和之前介绍的领导者副本,不是一个概念。这里的领导者是具体的消费者实例,它既不是副本,也不是协调者。领导者消费者的任务是收集所有成员的订阅信息,然后根据这些信息,制定具体的分区消费分配方案。

        选出领导者之后,协调者会把消费者组订阅信息封装进 JoinGroup 请求的响应中,然后发给领导者,由领导者统一做出分配方案后,进入下一步:发送 SyncGroup 请求。在这一步中,领导者向协调者发送 SyncGroup 请求,将刚刚做出的分配方案发给协调者。值得注意的是,其他成员也会向协调者发送 SyncGroup 请求,只是请求体中并没有实际内容。这一步的目的是让协调者接收分配方案,然后统一以 SyncGroup 响应的方式发给所有成员,这样组内成员就都知道自己该消费哪些分区了。

        以上是 JoinGroup 请求的处理过程。就像前面说的,JoinGroup 请求的主要作用是将组成员订阅信息发送给领导者消费者,待领导者制定好分配方案后,重平衡流程进入到 SyncGroup 请求阶段。下面这张图是 SyncGroup 请求的处理流程。

SyncGroup 请求的主要目的,就是让协调者把领导者制定的分配方案下发给各个组内成员。当所有成员都成功接收到分配方案后,消费者组进入到 Stable 状态,即开始正常的消费工作。 

4.Kafka的服务端

1. 副本机制如何工作,故障发生时,怎么确保数据不会丢失?
2. 消息成功提交的定义是什么?
3. Kafka 的消息提交机制如何保证消费者看到的数据是一致的?(ISR)

4.1.Kafka的控制器

        控制器组件(Controller),是 Apache Kafka 的核心组件。它的主要作用是在 Apache Zookeeper 的帮助下管理和协调整个 Kafka 集群。集群中任意一台 Broker 都能充当控制器的角色,但在运行过程中,只能有一个 Broker 成为控制器,行使其管理和协调的职责。

1. 自动检测 新增 Broker、Broker 主动关闭及被动宕机。

这种自动检测是依赖于前面提到的 Watch功能和 ZooKeeper 临时节点组合实现的。

控制器组件会利用 watch 机制检查 Zookeeper 的/brokers/ids 节点下的子节点数量变更。当有新Broker 启动后,它会在/brokers 下创建专属的 znode 节点。一旦创建完毕,Zookeeper 会通过Watch 机制将消息通知推送给控制器,这样,控制器就能自动地感知到这个变化。进而开启后续新增 Broker 作业。

侦测 Broker 存活性则是依赖于刚刚提到的另一个机制:临时节点。每个 Broker 启动后,会在/brokers/ids 下创建一个临时的 znode。当 Broker 宕机或主机关闭后,该 Broker 与 Zookeeper的会话结束,这个 znode 会被自动删除。同理,Zookeeper 的 Watch 机制将这一变更推送给控制器,这样控制器就能知道有 Broker 关闭或宕机了,从而进行善后。

2. 控制器保存的数据

控制器中保存的这些数据在 Zookeeper 中也保存了一份。每当控制器初始化时,它都会从Zookeeper 上读取对应的元数据并填充到自己的缓存中。这里面比较重要的数据有:

  • 所有主题信息。包括具体的分区信息,比如领导者副本是谁,ISR 集合中有哪些副本等
  • 所有 Broker 信息。包括当前都有哪些运行中的 Broker,哪些正在关闭中的 Broker 等
  • 所有涉及运维任务的分区。包括当前正在进行 Preferred 领导者选举以及分区重分配的分区列表

4.2.Kafka的存储层

先问自己几个小问题:

  1. Kafka的主题与分区内部是如何存储的,有什么特点?
  2. 如何利用操作系统的优化技术来高效地持久化日志文件和加快数据传输效率?page cache和zero copy的技术

        我们来说说Kafka Broker 是如何持久化数据的。总的来说,Kafka 使用消息日志(Log)来保存数据,一个日志就是磁盘上一个只能追加写(Append-only)消息的物理文件。因为只能追加写入,故避免了缓慢的随机I/O 操作,用性能较好的顺序I/O 写操作,这也是实现Kafka 高吞吐量特性的一个重要手段。

        向Kafka 发送数据并不是真要等数据被写入磁盘才会认为成功,而是只要数据被写入到操作系统的页缓存(Page Cache)上就可以了,随后操作系统根据LRU 算法会定期将页缓存上的“脏”数据落盘到物理磁盘上。这个定期就是由提交时间来确定的,默认是5 秒。一般情况下我们会认为这个时间太频繁了,可以适当地增加提交间隔来降低物理磁盘的写操作。当然你可能会有这样的疑问:如果在页缓存中的数据在写入到磁盘前机器宕机了,那岂不是数据就丢失了。的确,这种情况数据确实就丢失了,但鉴于Kafka 在软件层面已经提供了多副本的冗余机制,因此这里稍微拉大提交间隔去换取性能还是一个合理的做法。

4.2.1.生产者写入数据过程

        图中所有的数据都写入文件系统的持久化日志文件,但不进行刷新数据的任何调用。数据会首先被传输到磁盘缓存,操作系统随后会将这些数据定期自动刷新到物理磁盘。

        消息系统内的消息从生产者保存到服务端,消费者再从服务端读取出来,数据的传输效率决定了生产者和消费者的性能。生产者如果每发送一条消息都直接通过网络发送到服务端,势必会造成过多的网络请求。如果我们能够将多条消息按照分区进行分组,并采用批量的方式一次发送一个消息集,并且对消息集进行压缩,就可以减少网络传输的带宽,进一步提高数据的传输效率。

4.2.2.消费者读取数据过程

        消费者要读取服务端的数据,需要将服务端的磁盘文件通过网络发送到消费者进程,而网络发送通常涉及不同的网络节点。如下图(左)所示,传统读取磁盘文件的数据在每次发送到网络时,都需要将页面缓存先保存到用户缓存,然后在读取消息时再将其复制到内核空间,具体步骤如下:

  1. 操作系统将数据从磁盘中读取文件到内核空间里的页面缓存
  2. 应用程序将数据从内核空间读入用户空间的缓冲区
  3. 应用程序将读到的数据写回内核空间并放入socket缓冲区
  4. 操作系统将数据从socket缓冲区复制到网卡接口,此时数据才能通过网络发送出去

        结合Kafka 的消息有多个订阅者的使用场景,生产者发布的消息一般会被不同的消费者消费多次。如下图(右)所示,使用零拷贝技术(zero-copy )只需将磁盘文件的数据复制到页面缓存中一次,然后将数据从页面缓存直接发送到网络中(发送给不同的使用者时,都可以重复使用同一个页面缓存),避免了重复的复制操作。这样,消息使用的速度基本上等同于网络连接的速度了。

Kafka 作为一个消息队列,涉及到磁盘 I/O 主要有两个操作:

  • Provider 向 Kakfa 发送消息,Kakfa 负责将消息以日志的方式持久化落盘(应用层数据 ===> 磁盘)
  • Consumer 向 Kakfa 进行拉取消息,Kafka 负责从磁盘中读取一批日志消息,然后再通过网卡发送(磁盘数据 ===> 网络)

Kakfa 服务端接收 Provider 的消息并持久化的场景下使用 mmap 机制[6],能够基于顺序磁盘 I/O 提供高效的持久化能力,使用的 Java 类为 java.nio.MappedByteBuffer。

Kakfa 服务端向 Consumer 发送消息的场景下使用 sendfile 机制[7],这种机制主要两个好处:

  • sendfile 避免了内核空间到用户空间的 CPU 全程负责的数据移动;
  • sendfile 基于 Page Cache 实现,因此如果有多个 Consumer 在同时消费一个主题的消息,那么由于消息一直在 page cache 中进行了缓存,因此只需一次磁盘 I/O,就可以服务于多个 Consumer;

使用 mmap 来对接收到的数据进行持久化,使用 sendfile 从持久化介质中读取数据然后对外发送是一对常用的组合。但是注意,你无法利用 sendfile 来持久化数据,利用 mmap 来实现 CPU 全程不参与数据搬运的数据拷贝。

5.kafka高性能的原因

  1. 日志:顺序追加写:Kafka的每条消息都是append的,不会从中间写入和删除消息,保证了磁盘的顺序访问
  2. page-cache:
    1. 写:向 Kafka 发送数据并不是真要等数据被写入磁盘才会认为成功,而是只要数据被写入到操作系统的页缓存(Page Cache)上就可以了,随后操作系统根据 LRU 算法会定期将页缓存上的“脏”数据落盘到物理磁盘上
    2. 读:读取操作可以直接在Page Cache上进行
  3. 零拷贝
    1. 网络数据持久化到磁盘(Producer到Broker)
    2. 磁盘文件通过网络发送(Broker到Consumer)
  4. 全异步:Kafka基本上是没有阻塞操作的,调用发送方法会立即返回,等待buffer满了以后交给轮询线程,发送和接收消息,复制数据也是都是通过NetworkClient封装的poll方式
  5. 批量操作:结合磁盘顺序写入,批量无疑是非常有必要(如果用的时候每发送一条消息都调用future.get等待,性能至少下降2个数量级)。写入的时候放到RecordAccumulator进行聚合,批量压缩,还有批量刷盘等...

6.kafka三种语义实现

最多一次(at most once):消息可能会丢失,但绝不会被重复发送
至少一次(at least once):消息不会丢失,但有可能被重复发送
精确一次(exactly once):消息不会丢失,也不会被重复发送

6.1.exactly once

Kafka 是怎么做到精确一次的呢?简单来说,这是通过两种机制:幂等性、事务

事务型 Producer 的功能

  1. 能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。
  2. 事务型 Producer 也不惧进程的重启。Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理。

        和普通 Producer 代码相比,事务型 Producer 的显著特点是调用了一些事务 API,如 initTransaction、beginTransaction、commitTransaction 和 abortTransaction,它们分别对应事务的初始化、事务开始、事务提交以及事务终止。

事务消息是由 producer、事务协调器、broker、组协调器、consumer 共同参与实现的

[1] producer
  • 为 producer 指定固定的 TransactionalId,可以穿越 producer 的多次会话(producer 重启/断线重连)中,持续标识 producer 的身份
  • 使用 epoch 标识 producer 的每一次"重生",防止同一 producer 存在多个会话
  • producer 遵从幂等消息的行为,并在发送的 BatchRecord 中增加事务 id 和 epoch
[2] 事务协调器(Transaction Coordinator)
  • 引入事务协调器,以两阶段提交2PC的方式,实现消息的事务提交。
  • 事务协调器使用一个特殊的 topic:transaction,来做事务提交日志。
  • 事务控制器通过 RPC 调用,协调 broker 和 consumer coordinator 实现事务的两阶段提交。
  • 每一个 broker 都会启动一个事务协调器,使用 hash(TransactionalId)确定 producer 对应的事务协调器,使得整个集群的负载均衡。
[3] broker
  • broker 处理事务协调器的 commit/abort 控制消息,把控制消息向正常消息一样写入 topic(和正常消息交织在一起,用来确认事务提交的日志偏移),并向前推进消息提交偏移 hw。
[4] 组协调器
  • 如果在事务过程中,提交了消费偏移,组协调器在 offset log 中写入事务消费偏移。当事务提交时,在 offset log 中写入事务 offset 确认消息。
[5] consumer
  • consumer 过滤未提交消息和事务控制消息,使这些消息对用户不可见。 有两种实现方式,
    • consumer 缓存方式:设置 isolation.level=read_uncommitted,此时 topic 的所有消息对 consumer 都可见。 consumer 缓存这些消息,直到收到事务控制消息。若事务 commit,则对外发布这些消息;若事务 abort,则丢弃这些消息
    • broker 过滤方式:设置 isolation.level=read_committed,此时 topic 中未提交的消息对 consumer 不可见,只有在事务结束后,消息才对 consumer 可见。broker 给 consumer 的 BatchRecord 消息中,会包含以列表,指明哪些是"abort"事务,consumer 丢弃 abort 事务的消息即可

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/517852
推荐阅读
相关标签
  

闽ICP备14008679号