赞
踩
Apache Kafka® 是 一个分布式流处理平台. 这到底意味着什么呢?
我们知道流处理平台有以下三种特性:
注意:kafak是用scala语言编写的。
它可以用于两大类别的应用:
场景实例:
应用架构:
Kafka有四个核心的API:
1.主题(topic):一个 topic 里保存的是同一类消息,相当于对消息的分类,每个 producer 将消息发送到 kafka 中,都需要指明要存的 topic 是哪个,也就是指明这个消息属于哪一类。
2.分区(partition):每个 topic 都可以分成多个 partition,每个 partition 在存储层面是 append log 文件。任何发布到此 partition 的消息都会被直接追加到 log 文件的尾部。
日志中的 partition(分区)有以下几个用途。
第一,当日志大小超过了单台服务器的限制,允许日志进行扩展。每个单独的分区都必须受限于主机的文件限制,不过一个主题可能有多个分区,因此可以处理无限量的数据。
第二,可以作为并行的单元集,日志的分区partition (分布)在Kafka集群的服务器上。每个服务器在处理数据和请求时,共享这些分区。
每一个分区都会在已配置的服务器上进行备份,确保容错性.每个分区都有一台 server 作为 “leader”,零台或者多台server作为 follwers 。
leader server 处理一切对 partition (分区)的读写请求,而follwers只需被动的同步leader上的数据。当leader宕机了,followers 中的一台服务器会自动成为新的 leader。
每台 server 都会成为某些分区的 leader 和某些分区的 follower,因此集群的负载是平衡的。注意服务器的leader是以分区为维度,即可能topic1-1是leader,topic1-2却是follower。
3.偏移量(Offset):一个分区对应一个磁盘上的文件,而消息在文件中的位置就称为 offset(偏移量),offset 为一个 long 型数字,它可以唯一标记一条消息。
例子:比如一个topic中有100条数据,我消费了50条并且提交了,那么此时的kafka服务端记录提交的offset就是49(offset从0开始),那么下次消费的时候offset就从50开始消费。
由于kafka 并没有提供其他额外的索引机制来存储 offset,文件只能顺序的读写,所以在kafka中几乎不允许对消息进行“随机读写”。
Topic 就是数据主题,是数据记录发布的地方,可以用来区分业务系统。Kafka中的Topics总是多订阅者模式,一个topic可以拥有一个或者多个消费者来订阅它的数据。
对于每一个topic, Kafka集群都会维持一个分区日志,如下所示:
每个分区都是有序且顺序不可变的记录集,并且不断地追加到结构化的commit log文件。
分区中的每一个记录都会分配一个id号来表示顺序,我们称之为offset,offset用来唯一的标识分区中每一条记录。
在每一个消费者中唯一提交给broker的元数据是offset(偏移量)即消费在log中的位置.
偏移量由消费者所控制:通常在读取记录后,消费者会以线性的方式增加偏移量,但是实际上,由于这个位置由消费者控制,所以消费者可以采用任何顺序来消费记录。
例如,一个消费者可以重置到一个旧的偏移量,从而重新处理过去的数据;也可以跳过最近的记录,从"现在"最新发送过来的消息开始消费。
这些细节说明Kafka 消费者是非常廉价的—消费者的增加和减少,对集群或者其他消费者没有多大的影响。比如,你可以使用命令行工具,对一些topic内容执行 tail操作,并不会影响已存在的消费者消费数据。
4.生产者
生产者可以将数据发布到所选择的topic(主题)中并且分配到topic的哪一个 partition(分区)中。
可以使用循环的方式来简单地实现负载均衡,也可以根据某些语义分区函数(例如:记录中的key)来完成。
5.消费者
消费者使用一个 消费组 名称来进行标识,发布到topic中的每条记录被分配给订阅消费组中的一个消费者实例.消费者实例可以分布在多个进程中或者多个机器上。
注意一个分区在同一个消费组下只能被一个消费者消费,但一个消费者可以消费多个分区
上图说明:由2个broker组成的kafka集群,总共有4个partition(P0-P3)。这个集群由2个Consumer Group, A有2个 consumer instances ,B有四个。
在Kafka中实现消费的方式是将日志中的分区划分到每一个消费者实例上,以便在任何时间,每个实例都是分区唯一的消费者。
维护消费组中的消费关系由Kafka协议动态处理。如果新的实例加入组,他们将从组中其他成员处接管一些 partition 分区;如果一个实例消失,拥有的分区将被分发到剩余的实例。
6.Broker
Kafka集群包含一个或多个服务器,每个服务器节点称为一个Broker,一个topic中设置partition的数量是broker的整数倍
7.Zookeeper
负责维护和协调Broker,负责Broker Controller的选举;这里要说明一下,Broker Controller是由Zookeeper选举出来的,而Partition Leader是由Broker Controller选举出来的。
6.其他名词解释
Segment:段,将partition进一步细分为若干个段,每个segment文件的大小相等
Consumer Group:消费组,一个partition中的消息只能被同一个消费组中的一个消费者进行消费;而一个消费组内的消费者只会消费一个或者几个特定的partition
Replication of partition:分区副本,副本是一个分区的备份,是为了防止消息丢失而创建的分区备份。分区副本不负责数据的读写,只做备份。
Partition Leader:每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责读写的partition,即所有读写操作只能发生于Leader分区上
Partition Follower:所有Follower都要从Leader上同步消息,Follower与Leader始终保持消息同步;partition leader与partition follower之间是主备关系而非主从关系
ISR:ISR:In-Sync Replicas,是指副本同步列表;AR:Assiged Replicas,指所有副本;OSR:Outof-Sync Replicas;AR=ISR+OSR
offset commit:当consumer从partition中消费了消息后,consumer会将其消费消息的offset提交给broker,表示当前partition已经消费到了该offset所标识的消息。
Rebalance:当消费者组中消费者数量发生变化或者topic中partition数量发生变化,partition的所有权会在消费者间转移,即partition会重新分配。
Broker Controller:Kafka集群的多个broker中,会有一个被选举为controller,负责管理集群中partition和副本replicas的状态。
__commit_offsets :消费者提交的offset被封装为了一种特殊的消息被写入到一个由系统创建的、名称为__commit_offstes的特殊topic的partition中,该topic默认包含50个partition,这些offset的默认有效期为一天
Group Coordinator:group coordinator是运行在broker上的线程,主要用于consumer group中各个成员的offset位移管理和Rebalance;Group Coordinator同时管理着当前broker的所有消费者组。当Consumer需要消费数据时,并不是直接中__comsumer_offset的partition中获取的,而是从当前broker的Coordinator的缓存中获取的。而缓存中的数据是在consumer消费完提交offset时,同时提交到coordinator的缓存以及__consumer_offset的partition中的。
见消息发布原理,消息消费原理
以下是 Pub-Sub 消息的逐步工作流程 -
在我看来,如果有多个consumer(消费者)可以消费一个分区的数据,那么在操作这个消费进度的时候就需要加锁,可能会对性能有一定的影响。
如果不加锁如果读到分区的同一个消息,可能会造成重复消费的问题。
关于分区和消费者关系。先摘录如下:
If, say, 6 TopicPartition s are provided and the concurrency is 3; each container will get 2 partitions. For 5 TopicPartition s, 2 containers will get 2 partitions and the third will get 1. If the concurrency is greater than the number of TopicPartitions, the concurrency will be adjusted down such that each container will get one partition.
一个分区只能被一个消费者消费,但一个消费者可以消费多个分区
最根本的原因就是:可以增加消息堆积量。kafka基于文件进行存储,当文件内容大到一定程度时,很容易达到单个磁盘的上限,因此,采用分区的办法,一个分区对应一个文件,这样就可以将数据分别存储到不同的server上去,另外这样做也可以负载均衡,容纳更多的消费者。
offset存在Kafka的broker中topic为__consumer_offsets的分区里面的
在Kafka集群中会有一个或者多个broker,其中有一个broker会被选举为控制器(Kafka Controller),它负责管理整个集群中所有分区和副本的状态。
controller感知到分区leader所在的broker挂了(controller监听了很多zk节点可以感知到分区leader所在的broker存活),controller会从每 个parititon的 replicas 副本列表中取出第一个broker作为leader,当然这个broker需要也同时在ISR列表里。
Partititon Leader选举范围:
当leader挂了后,broker controller会从ISR中选一个follower成为新的leader,但是如果所有的follower都挂了怎么办?可以通过unclean.leader.election.enable的取值来设置leader的选举范围。
false:必须等待ISR列表中由副本活过来才进行新的选举,该策略可靠性有保证,但是可用性低
true:在ISR中没有副本存活的情况下,可以选择任何一个该topic的partition作为新的leader,该策略可用性高,但是可靠性没有保证,可能会引发大量的消息丢失。
消费者rebalance就是说如果consumer group中某个消费者挂了,此时会自动把分配给他的分区交给其他的消费者,如 果他又重启了,那么又会把一些分区重新交还给他。
如下情况可能会触发消费者rebalance
1. consumer所在服务重启或宕机了
2. 动态给topic增加了分区
3. 消费组订阅了更多的topic
注意:Rebalance的过程中,不能进行读写直到结束。
当有消费者加入消费组时,消费者、消费组及组协调器之间会经历以下几个阶段。
第一阶段:选择组协调器GroupCoordinator
组协调器GroupCoordinator:每个consumer group都会选择一个broker作为自己的组协调器coordinator(一般是partition的leader节点所在的broker),负责监控 这个消费组里的所有消费者的心跳,以及判断是否宕机,然后开启消费者rebalance。
consumer group中的每个consumer启动时会向kafka集群中的某个节点发送 FindCoordinatorRequest 请求来查找对 应的组协调器GroupCoordinator,并跟其建立网络连接。
组协调器选择方式:
通过如下公式可以选出consumer消费的offset要提交到__consumer_offsets的哪个分区,这个分区leader对应的broker 就是这个consumer group的coordinator
其公式为:
hash(consumer group id) % __consumer_offsets主题的分区数
第二阶段:加入消费组JOIN GROUP
在成功找到消费组所对应的 GroupCoordinator 之后就进入加入消费组的阶段,在此阶段的消费者会向 GroupCoordinator 发送 JoinGroupRequest 请求,并处理响应。
然后GroupCoordinator 从一个consumer group中 选择第一个加入group的consumer作为leader(消费组协调器)。
leader consumer监控topic的变化,通知coordinator触发rebalance
leader consumer从GroupCoordinator 获取所有的consumer,发送syncGroup(分配信息)给到
coordinator
第三阶段( SYNC GROUP)
consumer leader通过给GroupCoordinator发送SyncGroupRequest,接着GroupCoordinator就把分区方案下发给各个consumer,他们会根据指定分区的leader broker进行网络连接以及消息消费。
解决:coordinator每次rebalance,会标记一个Generation给到consumer,每次rebalance该
Generation会+1,consumer提交offset时,coordinator会比对Generation,不一致则拒绝提交
主要有三种rebalance的策略:range、round-robin、sticky。
Kafka 提供了消费者客户端参数partition.assignment.strategy 来设置消费者与订阅主题之间的分区分配策略。默认情 况为range分配策略。
假设一个主题有10个分区(0-9),现在有三个consumer消费:
range策略就是按照分区序号排序,假设 n=分区数/消费者数量 = 3, m=分区数%消费者数量 = 1,那么前 m 个消 费者每个分配 n+1 个分区,后面的(消费者数量-m )个消费者每个分配 n 个分区。
比如分区0~3给一个consumer,分区4~6给一个consumer,分区7~9给一个consumer。
round-robin策略就是轮询分配,比如分区0、3、6、9给一个consumer,分区1、4、7给一个consumer,分区2、5、 8给一个consumer
sticky策略就是在rebalance的时候,需要保证如下两个原则。
1)分区的分配要尽可能均匀 。
2)分区的分配尽可能与上次分配的保持相同。
当两者发生冲突时,第一个目标优先于第二个目标 。
这样可以最大程度维持原来的分区分配的策略。
比如对于第一种range情况的分配,如果第三个consumer挂了,那么重新用sticky策略分配的结果如下:
consumer1除了原有的0~3,会再分配一个7
consumer2除了原有的4~6,会再分配8和9
1、写入方式
producer采用 push 模式将消息发布到 broker,每条消息都被 append 到 patition 中,属于顺序写磁盘(顺序写磁盘 效率比随机写内存要高,保障 kafka 吞吐率)。
2、消息路由算法
消息路由的目的是找到对应的patition
producer 发送消息到 broker 时,会根据分区算法选择将其存储到哪一个 partition。其路由机制为:
3、写入broker流程
工作流程
1、producer向broker集群提交连接请求,其所连接上的任意一个broker都会向其发送broker controller的通信URL,即broker controller主机配置文件中的listeners地址
2、当producer指定了要生产消息的topic后,其会向broker controller发送请求,请求当前topic的所有partition leader
3、broker controller在接收到请求后,会从zk服务器中查找指定topic的所有partition leader,并将partition leader所在的broker地址返回给producer
4、producer在接收到partition leader列表后,会根据路由策略找到对应的partition leader,将消息发送该partition leader
5、leader将消息写入log,并通知ISR中的followers
6、ISR中的follower从leader中同步消息后向leader发送ACK消息
7、leader收到了所有ISR中的follower的ACK后,增加HW,表示消费者可以消费到该位置;如果leader在等待的follower的ACK超时了,发现还有follower没有发送ACK,则会将这些没有发送ACK的follower从ISR中剔除,然后再增加HW
时序图
HW
HW俗称高水位,HighWatermark的缩写,表示consumer可以消费到的最高partition偏移量。
HW保证了Kafka broker集群中消息的一致性,确切地说,是在broker集群正常运转的情况下,保证了partition的follower和leader之间数据的一致性。
LEO
Log End Offset,日志最后消息的偏移量,消息是被写入到Kafka的日志文件中的,这是当前最后一个写入的消息在partition中的偏移量。
取一个partition对应的ISR中最小的LEO(log-end-offset)作为HW
另外每个replica都有HW,leader和follower各自负责更新自己的HW的状 态。
对于leader新写入的消息,consumer不能立刻消费,leader会等待该消息被所有ISR中的replicas同步后更新HW, 此时消息才能被consumer消费。这样就保证了如果leader所在的broker失效,该消息仍然可以从新选举的leader中获取。
对于来自内部broker的读取请求,没有HW的限制。
工作流程
由此可见,Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。
事实上,同步复制要求所有能工作的follower都复制完,这 条消息才会被commit,这种复制方式极大的影响了吞吐率。
而异步复制方式下,follower异步的从leader复制数据,数据只要被leader写
入log就被认为已经commit,这种情况下如果follower都还没有复制完,落后于leader时,突然leader宕机,则会丢失数据。
而Kafka的这 种使用ISR的方式则很好的均衡了确保数据不丢失以及吞吐率。
如果partition leader接收到了新的消息,ISR中其他follower正在同步过程中,还未同步完毕leader就挂了,此时就需要选举新的leader,若没有HW截断机制,将会导致partition中Leader与Follower数据不一致。
当原leader宕机恢复后,将其LEO回退到宕机时的HW,然后再与新的Leader进行数据同步,这种机制称为HW截断机制。
HW截断机制会引起数据丢失,但保证了分区broker和follower的数据一致性
acks=1的工作流程
kafka不基于内存,而是硬盘存储,因此消息堆积能力更强。
Kafka具有存储功能,默认保存数据时间为7天或者大小1G,也就是说kafka broker上的数据超7天或者1G,就会被清理掉。
故消息消费完后,Kafka并不会删除对应的消息,而是由内部过期删除机制进行消息的删除。当然开发者也可以自己去配置符合自己业务的消息保存时间。这些数据存放在broker服务器上,以log文件的形式存在。
kafka的安装目录下面的/conf/server.propertites文件中可设置存储时间限制。
Kafka 一个分区的消息数据对应存储在一个文件夹下,以topic名称+分区号命名,消息在分区内是分段(segment)存储,每个段的消息都存储在不一样的log文件里.
这种特性方便old segment file快速被删除,kafka规定了一个段位的 log 文件最大为 1G,做这个限制目的是为了方便把 log 文件加载到内存去操作:
1.日志写入机制
顺序写:利用磁盘的顺序访问速度可以接近内存,kafka的消息都是append操作,partition是有序的,节省了磁盘的寻道时间,同时通过批量操作、节省写入次数。partition物理上分为多个segment存储,方便删除
- # 部分消息的offset索引文件,kafka每次往分区发4K(可配置)消息就会记录一条当前消息的offset到index文件,
- # 如果要定位消息的offset会先在这个文件里快速定位,再去log文件里找具体消息
- 00000000000000000000.index
- # 消息存储文件,主要存offset和消息体
- 00000000000000000000.log
- # 消息的发送时间索引文件,kafka每次往分区发4K(可配置)消息就会记录一条当前消息的发送时间戳与对应的offset到timeindex文件,
- # 如果需要按照时间来定位消息的offset,会先在这个文件里查找
- 00000000000000000000.timeindex
-
- 00000000000005367851.index
- 00000000000005367851.log
- 00000000000005367851.timeindex
-
- 00000000000009936472.index
- 00000000000009936472.log
- 00000000000009936472.timeindex
这个 9936472 之类的数字,就是代表了这个日志段文件里包含的起始 Offset,也就说明这个分区里至少都写入了接近 1000 万条数据了。
Kafka Broker 有一个参数,log.segment.bytes,限定了每个日志段文件的大小,最大就是 1GB。
一个日志段文件满了,就自动开一个新的日志段文件来写入,避免单个文件过大,影响文件的读写性能,这个过程叫做 log rolling,正在被写入的那个日志段文件,叫做 active log segment。
2.日志读机制
根据Offset索引位置顺序读
consumer有两种消息方式,一种是存放在broker的日志目录中,另一种方式是存放在zookeeper中。
两种存放方式和你使用kafka-console-consumer命令使用的选项有关。
默认是存储在broker中topic为__consumer_offsets的分区里面的
同时需要注意的是存储还会通过消费组进行分组,不同的group存储的offset位置是相互独立,不同的。
传统:
零拷贝:
kafka不太依赖jvm,主要理由操作系统的pageCache,如果生产消费速率相当,则直接用pageCache交换数据,不需要经过磁盘IO。
在Kafka中,消息的删除是通过保留时间和存储空间限制来实现的。
首先,我们来看保留时间的设置。在创建主题时,可以为其设置一个保留时间(retention time)。保留时间决定了Kafka将保留消息的时间周期,超过此时间的消息将被删除。可以使用下面的代码设置主题的保留时间:
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --config retention.ms=3600000
上述命令将主题“my-topic”的保留时间设置为1小时。
另外,还可以通过设置存储空间限制来控制消息的删除。Kafka提供了两种存储空间限制方式:基于总大小(total size)和基于日志段大小(log segment size)。
对于基于总大小的存储空间限制,可以使用如下代码设置主题最大可占用的总大小:
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name my-topic --add-config max.bytes=1073741824
上述命令将主题“my-topic”最大可占用1GB的空间。
对于基于日志段大小的存储空间限制,可以使用如下代码设置主题中单个日志段的最大大小:
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name my-topic --add-config segment.bytes=1048576
上述命令将主题“my-topic”中单个日志段的最大大小设置为1MB。
当一个消息被删除时,Kafka会将其标记为删除状态,即“删除记录”(tombstone record)。应用程序可以通过订阅主题中的“删除记录”来获知消息被删除的事实。当然,应用程序也可以选择主动删除“删除记录”,以免占用过多空间。
完整示例代码如下:
- Properties props = new Properties();
- props.put("bootstrap.servers", "localhost:9092");
- props.put("acks", "all");
- props.put("retries", 0);
- props.put("batch.size", 16384);
- props.put("linger.ms", 1);
- props.put("buffer.memory", 33554432);
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- Producer producer = new KafkaProducer<>(props);
- // 发送消息
- producer.send(new ProducerRecord("my-topic", "key", "value"));
- // 删除消息
- producer.send(new ProducerRecord("my-topic", "key", null));
- producer.close();
在该示例中,我们通过向主题“my-topic”发送一条消息来演示如何使用Kafka。然后,我们通过发送一条键为“key”的空消息来删除该消息。注意,我们将消息的值设置为null,这样Kafka就会将其标记为“删除记录”。最后,我们关闭了生产者。
工作流程
1、consumer向broker集群提交连接请求,其所连接上的任意broker都会向其发送broker controller的通信URL,即broker controller主机配置文件中的listeners。
2、当consumer指定了要消费的topic后,其会向broker controller发送poll请求
3、broker controller会为consumer分配一个或这几个partition leader,并将该partition的当前offset发送给consumer
4、consumer会按照broker controller分配的partition对其中的消息进行消费
5、当consumer消费完该条数据后,消费者会向broker发送一个消息已被消费的反馈,即该消息的offset。
6、当broker接收到consumer的offset后,会将其更新到__consumer_offset中
7、以上过程一直重复,直到消费者停止请求消息;消费者可以重置offset,从而可以灵活的消费存储在broker上的消息
发出消息持久化机制参数
(1)acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息。
(2)acks=1: 只需等待leader成功将数据写入本地log,但是不需要等待follower是否成功写入。就可以继续发送下一 条消息。kafka的默认配置就是这种
这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。
(3)acks=-1或all: 需要等待 min.insync.replicas(默认为1,推荐配置大于等于2) 这个参数配置的副本个数都成功写入日志,才可以继续发送下一条消息。这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置。
props.put(ProducerConfig.ACKS_CONFIG, "1");
官网截图如下:
生产端收到broker的ack返回可通过回调的方式,代码如下:
- /**
- * send logs
- *
- * @param logs
- */
- public void send(List<KafkaQuestionRecordDetail> logs) {
- logs.stream().peek(log -> log.setUpdateDate(new Date())).forEach(log -> {
- ListenableFuture<SendResult<Long, String>> commonFuture = kafkaTemplate
- .send(KafkaConstants.TOPIC_QUESTION_RECORD_DETAIL, log.getCustomerId(), JSON.toJSONString(log));
-
- commonFuture.addCallback(successCallback(), failureCallback(log));
- });
- }
-
- /**
- * 投递消息成功回调
- *
- * @return
- */
- private SuccessCallback<SendResult<Long, String>> successCallback() {
- return result -> LOGGER.info("投递消息成功");
- }
-
- /**
- * 投递消息失败回调
- *
- * @param log
- * @return
- */
- private FailureCallback failureCallback(KafkaQuestionRecordDetail log) {
- return ex -> {
- Marker marker = MarkerFactory.getMarker(MarkerConstants.KAFKA_SEND_ERROR);
- LOGGER.error(marker, JSON.toJSONString(log, SerializerFeature.WriteDateUseDateFormat));
- };
- }
可以配置需要多少个副本,多少个副本同步完成再返回ACK确认。
可见实际上依赖于发送端机制
kafka消费信息lag
lag = logSize - offset 即计算还未消费的消息数,可以用来判断是否有消息堆积
每个consumer会定期将自己消费分区的offset提交给kafka内部topic:__consumer_offsets,提交过去的时候,key是 consumerGroupId+topic+分区号,value就是当前offset的值,
kafka会定期清理topic里的消息,最后就保留最新的那条数据。
因为__consumer_offsets可能会接收高并发的请求,kafka默认给其分配50个分区(可以通过 offsets.topic.num.partitions设置),这样可以通过加机器的方式抗大并发。
消费端自动提交和手动提交
offset提交的方式有两种,自动提交和手动提交。
conf/server.properties配置文件的参数设置:
enable.auto.commit = true (那么这个是自动提交,false为手动)
默认为自动提交,可参考源码:org.springframework.kafka.core.DefaultKafkaConsumerFactory#isAutoCommit
自动提交
自动提交偏移量:
Kafka中偏移量的自动提交是由参数enable_auto_commit和auto_commit_interval_ms控制的。
当enable_auto_commit=True时,Kafka在消费的过程中会以频率为auto_commit_interval_ms
向Kafka自带的topic(__consumer_offsets)进行偏移量提交,具体提交到哪个Partation是以算法:
partation=hash(group_id)%50来计算的。
如:group_id=test_group_1,则partation=hash(“test_group_1”)%50=28
自动提交使用场景
1.自动提交能较大提升消费速度,所以对不是很重要的消息,如只是纪录一次当天是否有没有学习的消息,如果已经消费了这条消息后面不再消费,使用自动提交可以提供吞吐量。
2.自动提交可能造成消息丢失(消费端不会进行消息重试),所以尤其是在需要事物控制的时候,最好不使用自动提交,因为一旦消费端消费失败,这条消息就不会再进行消费了。如拉取数据之后进行写入mysql这种,所以这时我们就需要进行手动提交kafka的offset下标。
手动提交
对于手动提交offset主要有3种方式:
1.同步提交
同步模式下提交失败的时候一直尝试提交,直到遇到无法重试的情况下才会结束。
同时同步方式下消费者线程在拉取消息会被阻塞,在broker对提交的请求做出响应之前,
会一直阻塞直到偏移量提交操作成功或者在提交过程中发生异常,限制了消息的吞吐量。
只有当前批次的消息提交完成时才会触发poll来获取下一轮的消息。
注意这种情况可能导致消息堆积
2.异步提交
异步手动提交offset时,消费者线程不会阻塞,提交失败的时候也不会进行重试。
并且可以配合回调函数在broker做出响应的时候记录错误信息,以做后面的补偿重试。
对于异步提交,由于不会进行失败重试,当消费者异常关闭或者触发了再均衡前,如果偏移量还未提交就会造成偏移量丢失。
注意这种情况可能导致重复消费
3.异步+同步 组合的方式提交
针对异步提交偏移量丢失的问题,通过对消费者进行异步批次提交并且失败后再同步提交的方式。
这样即使上一次的异步提交失败,通过同步提交还能够进行补救,同步会一直重试,直到提交成功。通过finally在最后不管是否异常都会触发consumer.commit()来同步补救一次,确保偏移量不会丢失。但这样如果还是失败就会阻塞造成消息堆积。
所以我个人认为还是选择异步提交,如果回调结果提交失败,则进行告警并人工介入。
参考:
kafka原理系列之(一)消息存储和offset提交机制 https://blog.csdn.net/sheep8521/article/details/89491372
提交ack时,消费者宕机了。刚拉取了一批数据处理了一部分,但还没来得及提交,服务挂了,下次重启又会拉取相同的一批数据重复处理
这种场景下自动提交和手动提交都会造成重复消费
当然使用Kafka的管理控制页面将更加清晰可见。
1./brokers/ids:临时节点,保存所有broker节点信息,存储broker的物理地址、版本信息、启动时间等,节点名称为brokerID,broker定时发送心跳到zk,如果断开则该brokerID会被删除
2./brokers/topics:临时节点,节点保存broker节点下所有的topic信息,每一个topic节点下包含一个固定的partitions节点,partitions的子节点就是topic的分区,每个分区下保存一个state节点、state节点保存着当前leader分区和ISR的brokerID,state节点由leader创建,若leader宕机该节点会被删除,直到有新的leader选举产生、重新生成state节点
3./consumers/[group_id]/owners/[topic]/[broker_id-partition_id]:维护消费者和分区的注册关系
4./consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]:分区消息的消费进度Offset
zk的主要作用是对broker服务的注册与发现,并保持心态连接,以及维护消费进度Offset
Kafka默认使用的是pull模式。
由消费端自己主动的向broker拉起消息消费。应用中一般需要配置拉起的数据量和间隔时间
优点
缺点
如果kafka没有数据,会导致consumer空循环,消耗资源
解决方案
通过参数设置,consumer拉取数据为空或者没有达到一定数量时进行阻塞,当broker有数据时再进行唤醒阻塞的线程
由broker主动的向消费端推送消息消费。
优点
不会导致consumer循环等待
缺点
速率固定、忽略了consumer的消费能力,可能导致拒绝服务或者网络拥塞等情况,对消费端造成较大的压力
high-level Kafka给予以下保证:
log的partitions分布在kafka集群中不同的broker上,每个broker可以请求备份其他broker上partition上的数据。
kafka集群支持配置一个partition备份的数量。针对每个partition,都有一个broker起到“leader”的作用,0个或多个其他的broker作为“follwers”的作用。
leader处理所有的针对这个partition的读写请求,而followers被动复制leader的结果,followers是不承接发送端和消费端的读写的,只是负责备份和leader失效后的选举。如果这个leader失效了,其中的一个follower将会自动的变成新的leader。
我个人的理解有如下原因:
1.文件大小的限制,如果一个broker存储了所有的消息,那么文件的大小将非常大,且读取的性能很差,分摊到不同的broker,可以减少文件存储的压力和提升读取的性能。
2.一个broker挂了,影响将减少。因为那个broker的分区的leader挂了,只会影响该broker的分区leader,其他broker的leader不会有影响。
如果不分区,并且使用zk那用的主从架构,那么一个broker挂了,只有等leader选举完成之后才能正常使用。
可以用kafka压测工具自己测试分区数不同,各种情况下的吞吐量。
网络上很多资料都说分区数越多吞吐量越高 , 但从压测结果来看,分区数到达某个值吞吐量反而开始下降,实际上很多事情都会有一个 临界值,当超过 这个临界值之后,很多原本符合既定逻辑的走向又会变得不同。一般情况分区数跟集群机器数量相当就差不多了。 当然吞吐量的数值和走势还会和磁盘、文件系统、 I/O调度策略等因素相关。
需要保证消息顺序的需求。订单号路由到不同的partition
,同一个订单号的消息,每次到发到同一个partition
同时一个partition同一个时刻在一个consumer group中只有对应的一个consumer instance在消费,从而保证顺序。但一个消费者实例可以订阅多个分区。
Kafka只在partition的范围内保证消息消费的局部顺序性,不能在同一个topic中的多个partition中保证总的消费顺序性。
如果有在总体上保证消费顺序的需求,那么我们可以通过将topic的partition数量设置为1,将consumer group中的 consumer instance数量也设置为1。当然这是不推荐的,最好就是跟进key值设置分区,如在线教育公司,做题的分发到一个partition中,看课的分发到另一个partition中,这样就能保证某一个业务场景,消费的业务如做题是有序的。
Kafka和RabbitMQ一样,本身是没有延迟队列功能的。
RocketMQ内部有实现延迟队列的功能,所以如果真的要用延迟队列的功能,建议用RocketMQ或者RabbitMQ。
实现思路
主要两个步骤
delay-minutes-1
delay-minutes-1
中的消息,将满足条件的消息发送到真正的目标主题里。发送延时消息时先把消息按照不同的延迟时间段发送到指定的队列中(topic_1s,topic_5s,topic_10s,...topic_2h,这个一 般不能支持任意时间段的延时)。
发送到内部主题(delaytopic*)中的消息会被一个独立的 DelayService 进程消费,这个 DelayService 进程和 Kafka broker 进程以一对一的配比进行同机部署(参考下图),以保证服务的可用性。
针对不同延时级别的主题,在 DelayService 的内部都会有单独的线程来进行消息的拉取,以及单独的 DelayQueue(这里用的是 JUC 中 DelayQueue)进行消息的暂存。与此同时,在 DelayService 内部还会有专门的消息发送线程来获取 DelayQueue 的消息并转发到真实的主题中。从消费、暂存再到转发,线程之间都是一一对应的关系。如下图所示,DelayService 的设计应当尽量保持简单,避免锁机制产生的隐患。
为了保障内部 DelayQueue 不会因为未处理的消息过多而导致内存的占用过大,DelayService 会对主题中的每个分区进行计数,当达到一定的阈值之后,就会暂停拉取该分区中的消息。
因为一个主题中一般不止一个分区,分区之间的消息并不会按照投递时间进行排序,DelayQueue的作用是将消息按照再次投递时间进行有序排序,这样下游的消息发送线程就能够按照先后顺序获取最先满足投递条件的消息。
1.Kafka 实现延迟队列、死信队列、重试队列https://blog.csdn.net/zhengzhaoyang122/article/details/116075669
死信队列 (DLQ)是消息系统或数据流平台内的一种服务实现,用于存储未成功处理的消息。系统不是被动地转储消息,而是将其移动到死信队列。
Kafka 架构不支持 broker中的DLQ,所以要实现死信队列需要自己去实现
以下为自定义实现思路:
死信队列实现的源代码包含一个 try-catch 块来处理预期或意外异常。如果没有发生错误,则处理该消息。如果发生任何异常,请将消息发送到专用的 DLQ Kafka 主题。
失败原因应添加到 Kafka 消息的标头中。不应更改键和值,以便将来对历史事件进行重新处理和故障分析。
kafka是scala语言开发,运行在JVM上,需要对JVM参数合理设置, 修改bin/kafka-start-server.sh中的jvm设置
export KAFKA_HEAP_OPTS="‐Xmx16G ‐Xms16G ‐Xmn12G ‐XX:MetaspaceSize=256M ‐XX:+UseG1GC ‐XX:MaxGCPauseMillis=50"
这种大内存的情况一般都要用G1垃圾收集器,因为年轻代内存比较大,用G1可以设置GC最大停顿时间,不至于一次minor gc就花费太长 时间
max.poll.interval.ms
默认为5分钟
如果两次poll操作间隔超过了这个时间,broker就会认为这个consumer处理能力太弱,会将其踢出消费组,将分区分配给别的consumer消费 ,触发rebalance 。
如果你的消费者节点总是在重启完不久就不消费了,可以考虑检查改配置项或者优化你的消费者的消费速度等等。
max.poll.records
参考源码解释
常见配置参考:
- /**
- * kafka配置 default
- *
- * @return
- */
- private Map<String, Object> consumerProperties() {
- Map<String, Object> props = new HashMap<>(16);
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.TRUE);
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
- props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMS);
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "50");
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- return props;
- }
-
- /**
- * 消费者工厂
- *
- * @return
- */
- @Bean("autocommitConsumerFactory")
- @Primary
- public DefaultKafkaConsumerFactory autocommitConsumerFactory() {
- Map<String, Object> props = consumerProperties();
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.TRUE);
- return new DefaultKafkaConsumerFactory(props);
- }
-
- /**
- * kafka监听容器
- * kafkaListenerContainerFactory
- *
- * @return
- */
- @Bean(name = "containerFactory")
- public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Long, String>> containerFactory() {
- ConcurrentKafkaListenerContainerFactory<Long, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
- factory.setConsumerFactory(consumerFactory());
- factory.setConcurrency(3);
- factory.getContainerProperties().setPollTimeout(3000);
- factory.setBatchListener(true);
- return factory;
- }
-
-
-
-
-
- /**
- * producerFactory
- *
- * @return
- */
- @Bean
- public ProducerFactory<Long, String> producerFactory() {
- Map<String, Object> props = new HashMap<>();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- return new DefaultKafkaProducerFactory<>(props);
- }
-
- /**
- * kafkaTemplate
- *
- * @return
- */
- @Bean
- public KafkaTemplate<Long, String> kafkaTemplate(ProducerFactory<Long, String> producerFactory) {
- return new KafkaTemplate<>(producerFactory);
- }
如果某段时间对已消费消息计算的结果觉得有问题,可能是由于程序bug导致的计算错误,当程序bug修复后,这时可能需要对之前已消 费的消息重新消费,可以指定从多久之前的消息回溯消费。
这种可以用consumer的offsetsForTimes、seek等方法指定从某个offset偏移 的消息开始消费。
消费端解决Missing required argument "[zookeeper]"问题
使用命令:
kafka-console-consumer.bat --new-consumer --bootstrap-server localhost:9092 --topic haha --from-beginning
效果如下:
Kafka在window上安装部署 - 将军上座 - 博客园
安装及基本使用可参考:https://www.cnblogs.com/dadonggg/p/8205302.html
1.Kafka中文官方文档:https://kafka.apachecn.org/
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。