赞
踩
Kafka处理leaderAndIsr请求中提到follower会不停地从leader那里复制数据,这次介绍的是如何复制。
先看ReplicaFetcherThread的运行流程:
每个topicPartition都先从leader那里,获取对应自己当前leaderEpoch的offset。如果获取不到就使用自身的highWatermark,如果获取到就取leaderEpochOffset和自己logEndOffset的最小值作为truncatePoint(截断点),然后截断大于这个truncatePoint的记录,再往后的记录复制就从这个truncatePoint开始获取。
maybeTruncate的流程
在复制partition数据的时候,如果出现了OFFSET_OUT_OF_RANGE的错误,就需要重新计算有效的offset。但是这个过程有可能连续出现多次,因为如果获取的是leader的earliest offset,在下次复制数据时,leader的startOffset可能已经因为再一次的定时清理而不存在,因此有需要重新计算有效的offset。
handleOffsetOutOfRange的流程
Log的截断过程指的是对于指定的targetOffset,截断其后的记录,一共分为3种情况:
1. logEndOffset < targetOffset,没记录可截断
2. targetOffset < log的startOffset(log的第一个segment的baseOffset),意味着整个log都要删掉,走的是truncateFullyAndStartAt流程
3. targetOffset落在log中(处于log第一个segment的baseOffset 和 log最后一个segment的endOffset的中间),则需要把baseOffset大于targetOffset的segment都删除,然后再检查targetOffset是否还落在最后一个segment中,如果是则还需要对最后一个segment进行相应截断。
Log的truncateTo流程
Log的truncateFullyAndStartAt流程
Partition的appendRecordsToFollower流程
追加记录时,需要验证记录,可能需要分派offset(对于非复制的请求),如果记录的版本和当前节点支持的最新版本不一致,还需要进行格式转换。
另外,需要检查当前激活的segment(log的最后一个segment)是否能容纳这批记录,取决于以下几个条件:
1. 距离上次追加记录的时间超过了配置的时间
2. 激活的segment的size+记录的size超过了segment配置的最大size
3. segment的index,timeindex容量满了(默认为10M,追加过程中需要插入index和timeindex)
4. 相对offset溢出(记录的最大offset - segment的baseOffset),超过了Integer.MAX_VALUE(index中的offset是相对offset,其类型为int)
Log的append流程
LogValidator的validateMessageAndAssignOffsets流程
MemoryRecordsBuilder用于给记录重新分派offset,并兼容遗留格式的记录,内含了一块ByteBuffer,其大小为记录的size的大概值(对于没有压缩的记录是准确值,对于压缩了的是大概值)
MemoryRecordsBuilder的appendWithOffset流程
计算所有记录中的maxTimestamp和offsetOfMaxTimestamp
MemoryRecordsBuilder的get info流程
roll会生成新的segment,并对log的其他segment进行flush。
Log的roll segment流程
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。