赞
踩
笔记内容取自尚硅谷Kafka3.0教程,以及《深入理解Kafka核心设计与实践原理》
内容还会不断充实~
传统定义:
Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域
最新定义:
Kafka是一个开源的分布式事件流平台(Event Streaming Platform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。
解耦
允许你独立的扩展或修改两边的处理过程,只要确保他们遵守同样的接口约束
可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理
缓冲
有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的的处理速度不一致的情况
灵活性&峰值处理能力
使系统能够应对突发的高峰流量
异步通信
允许用户把一个消息放入队列,但不立即处理它,在需要的时候再去处理
点对点模式
一对一,消费者主动拉取数据,收到消息后,队列中的消息清除。队列可以有多个消费者,但对于一个消息而言,只能被一个消费者消费。
发布/订阅模式
一对多,消费者消费消息后,队列不会清除消息。消息生产者将消息发布到topic中,同时有多个消费者消费该消息。和点对点模式不同,发布到topic的消息会被所有订阅者消费
发布/订阅模式中,又分为两种:
消费者主动拉取消息
Kafka就是属于这种类型
优势:速度取决于消费者,可以根据消费能力以适当的速率消费消息
弊端:需要轮询,查看队列中是否有消息,浪费资源
队列推送消息
类似于公众号推送
弊端:
推送消息的速度取决于队列,各个消费者处理消息的速度可能不一致,造成消费者崩掉(推送速度 >消费者处理速度)或者资源浪费(推送速度 < 消费者处理速度)
zk在这里的作用:
查看该机器上所有topic:
kafka-topics.sh --list --zookeeper ip:zk端口
创建topic:
kafka-topics.sh --create --topic topic名称 --zookeeper ip:zk端口 --partitions 分区数 --replication-factor 副本数
#注:副本数不能大于当前可用的Broker数,分区数可以大于当前可用的Broker数
#副本数 包括 leader 和 follower
删除topic:
kafka-topics.sh --delete --topic first --zookeeper ip:zk端口
#注:执行效果:
#Topic first is marked for deletion. 标记为删除
#Note: This will have no impact if delete.topic.enable is not set to true. 只有当delete.topic.enable设为true时才会真正删除
查看topic详情:
kafka-topics.sh --describe --zookeeper ip:zk端口 --topic topic名称
生产者发送消息
kafka-console-producer.sh --topic first --broker-list kafkaIP:kafka端口
消费者消费消息
kafka-console-consumer.sh --topic first --bootstrap-server kafkaIP:kafka端口 #从当前开始消费
#或者:
kafka-console-consumer.sh --topic first --bootstrap-server kafkaIP:kafka端口 --from-begining #从头开始消费
说明:
每个partition对应一个文件夹(文件夹名 = topic - partition),文件夹内有.log文件,该log文件存储的就是producer生产的数据。producer生产的数据会不断追加到该log文件末端,且每条数据都有自己的offset。
消费者都会实时记录自己消费到了哪个offset,以便出错恢复时,从上次的位置继续消费。
个人更倾向于《深入理解Kafka核心设计与实践原理》上的叙述:
分区同主题一样是一个逻辑的概念而没有物理上的存在
主题、分区、副本和Log(日志)的关系如下图,主题和分区都是提供给上层用户的抽象,而在副本层面或更加确切的说是Log层面才有实际物理上的存在
整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和Sender线程(发送线程)。在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。
RecordAccumulator主要用来缓存消息以便Sender线程可以批量发送,进而减少网络传输的资源消耗以提升性能。 RecordAccumulator的大小可通过生产者客户端参数buffer.memory配置,默认32MB。如果生产者发送消息的速度超过发送到服务器端的速度,则会导致生产者空间不足,这个时候KafkaProducer的send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms的配置,此参数默认60秒。
主线程中发送过来的消息都会被追加到RecordAccumulator的某个双端队列(Deque)中,在RecordAccumulator的内部为每个分区都维护了一个双端队列,队列中的内容就是ProducerBatch,即Deque<ProducerBatch>。消息写入缓存时,追加到双端队列的尾部;Sender读取消息时,从双端队列的头部读取。
注意ProducerBatch不是ProducerRecord,ProducerBatch中可以包含一致多个ProducerRecord
如果生产者要向很多分区发送消息,则可将buffer.memory参数适当调大以增加整体吞吐量(buffer.memory大,RecordAccumulator则大,因RecordAccumulator中为每个分区都维护了一个双端队列,所以,RecordAccumulator大,每个分区分到的空间就大,可缓存的消息就多)。
在Kafka生产者客户端中,使用java.io.ByteBuffer实现消息内存的创建和释放,不过频繁的创建和释放比较耗费资源,故RecordAccumulator内部有一个BufferPool,主要用来实现ByteBuffer的复用,以实现缓存的高效利用。不过BufferPool只针对特定大小的ByteBuffer进行管理,而其他大小的ByteBuffer不会缓存进BufferPool中,这个特定的大小由batch.size参数指定,默认16KB。
ProducerBatch的大小和batch.size的关系:
当一条消息(ProducerRecord)进入RecordAccumulator时,会先寻找与消息分区所对应的双端队列(如果没有则新建),再从这个双端队列尾部获取一个ProducerBatch(如果没有则新建),查看ProducerBatch中是否还可以写入这个ProducerRecord,如果可以则写入,如果不可以则需要创建一个新的ProducerBatch。在新建ProducerBatch时评估这条消息的大小是否超过batch.size参数设定的大小,如果不超过,则以batch.size参数的大小来创建ProducerBatch,这样在使用完这段内存区域之后,可以通过BufferPool的管理来进行复用;如果超过,那么就以评估的大小来创建ProducerBatch,这段内存区域不会被复用。
向RecordAccumulator中追加消息源码:
public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long maxTimeToBlock, boolean abortOnNewBatch, long nowMs) throws InterruptedException { // We keep track of the number of appending thread to make sure we do not miss batches in // abortIncompleteBatches(). appendsInProgress.incrementAndGet(); ByteBuffer buffer = null; if (headers == null) headers = Record.EMPTY_HEADERS; try { // check if we have an in-progress batch // 获取 or 创建队列 Deque<ProducerBatch> dq = getOrCreateDeque(tp); synchronized (dq) { if (closed) throw new KafkaException("Producer closed while send in progress"); // 尝试向ProducerBatch中添加数据 RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs); if (appendResult != null) // 向已有的 ProducerBatch 中追加成功,直接返回即可 return appendResult; } // we don't have an in-progress record batch try to allocate a new batch if (abortOnNewBatch) { // Return a result that will cause another call to append. return new RecordAppendResult(null, false, false, true); } byte maxUsableMagic = apiVersions.maxUsableProduceMagic(); // 计算批次大小设置值,和真实消息(序列化和压缩后)大小 取较大值 int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, tp.topic(), tp.partition(), maxTimeToBlock); // BufferPool分配内存(按照上一步计算得到的size来分配,如果size超过了16k,则这块区域不会被BufferPool复用)(BufferPool只对特定大小的ByteBuffer进行管理,这个特定大小由batch.size指定,默认16k) buffer = free.allocate(size, maxTimeToBlock); // Update the current time in case the buffer allocation blocked above. nowMs = time.milliseconds(); synchronized (dq) { // Need to check if producer is closed again after grabbing the dequeue lock. if (closed) throw new KafkaException("Producer closed while send in progress"); //再向 ProducerBatch 中追加试试 RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs); if (appendResult != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... return appendResult; } //不再舔了,我自己新建一个ProducerBatch // 封装ByteBuffer MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic); // 再封装 ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs); FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers, callback, nowMs)); // 将ProducerBatch添加到队列末尾 dq.addLast(batch); incomplete.add(batch); // Don't deallocate this buffer in the finally block as it's being used in the record batch buffer = null; return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false); } } finally { if (buffer != null) free.deallocate(buffer); appendsInProgress.decrementAndGet(); } }
Sender从RecordAccumulator中获取缓存的消息之后,会进一步将原本<分区,Deque<ProducerBatch>>的保存形式转变成<Node, List<ProducerBatch>>的形式,其中Node表示Kafka集群的broker节点。对于网络连接来说,生产者客户端是与具体的broker节点建立的连接,也就是向具体的broker节点发送消息,而并不关心消息是属于哪一个分区;而对于KafkaProducer的应用逻辑而言,我们只关注向哪个topic中发送哪些消息,所以这里需要做一个应用逻辑层到网络I/O层面的转换。
在转换成<Node, List<ProducerBatch>>的形式之后,Sender还会进一步封装成<Node, Request>的形式,这样就可以将Request请求发往各个Node了。这里的Request是指Kafka的各种协议请求,对于消息发送而言就是具体的ProduceRequest。
请求在从Sender线程发往Kafka之前还会保存到InFlightRequests中,InFlightRequests保存对象的具体形式为Map<NodeId, Deque<Request>>,它的主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId是一个String类型,表示节点的id编号)。通过max.in.flight.requests.per.connection参数可限制每个连接(也就是客户端与每个Node之间的连接)最多缓存的请求数,默认值为5,即每个连接最多只能缓存5个未响应的请求,超过该数值之后就不能再向这个连接发送更多请求了,除非有缓存的请求已经收到了响应(Response)。
如果响应成功,则会清理InFlightRequests中的请求,以及RecordAAccumulater中对应分区中的数据;
如果响应失败,则会进行重试,重试次数可通过retries参数进行设置,默认为int类型的最大值。
我们发送消息通常只指定了topic,那么生产者客户端如何知道要发往哪个broker节点呢?这就需要元数据
元数据是指kafka集群的元数据,这些元数据具体记录了集群中有哪些主题,这些主题有哪些分区,每个分区的leader副本分配在哪个节点上,follower副本分配在哪些节点上,哪些副本在AR,ISR等集合中,集群中有哪些节点,控制器节点又是哪一个等信息。
元数据的更新(二者满足其一即可触发更新):
当需要更新元数据时,会先挑选出latestLoadedNode(即InFlightRequests中还未确认的请求个数最小的Node),然后向这个Node发送MeteDataRequest请求来获取具体的元数据信息。这个更新操作由Sender线程发起,在创建完MeteDataRequest之后同样会存入InFlightRequests,之后的步骤就和发送消息时类似。元数据由Sender线程负责更新,但是主线程也需要读取这些信息,这里的数据同步由synchronized和final关键字来保障。
参数名称 | 说明 |
---|---|
bootstrap.servers | 生 产 者 连 接 集 群 所 需 的 broker 地 址 清 单 。 例如ip:port,ip1:port,可以设置 1 个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者可以从给定的 broker里查找到其他 broker 信息。 |
key.serializer 和 value.serializer | 指定发送消息的 key 和 value 的序列化类型。一定要写全类名 |
buffer.memory | RecordAccumulator 缓冲区总大小, 默认 32m。 |
batch.size | 缓冲区一批数据最大值, 默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加 |
linger.ms | 如果数据迟迟未达到 batch.size, sender 等待 linger.time之后就会发送数据。单位 ms, 默认值是 0ms,表示没有延迟。 生产环境建议该值大小为 5-100ms 之间。 |
acks | 0:生产者发送过来的数据,不需要等数据落盘应答。 1:生产者发送过来的数据, Leader 收到数据后应答。 -1(all):生产者发送过来的数据, Leader+和 isr 队列里面的所有节点收齐数据后应答。-1 和all 是等价的。 Kafka3.0中默认值是-1,之前版本默认是1。 |
max.in.flight.requests.per.connection | 允许最多没有返回 ack 的次数, 默认为 5,开启幂等性要保证该值是 1-5 的数字 |
retries | 当消息发送出现错误的时候,系统会重发消息。 retries表示重试次数。 默认是 int 最大值, 2147483647。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。 |
retry.backoff.ms | 两次重试之间的时间间隔,默认是 100ms。 |
enable.idempotence | 是否开启幂等性, 默认 true,开启幂等性。 |
compression.type | 生产者发送的所有数据的压缩方式。 默认是 none,也就是不压缩。支持压缩类型: none、 gzip、 snappy、 lz4 和 zstd。 |
分区原则
发送的数据要封装成一个ProduceRecord对象,该对象中有partition、key、value等属性
源码
//这个方法是默认的分区策略类里的,能进到这个方法,说明肯定没有指定partition public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //获取当前topic的partition数目 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); //没有指定key if (keyBytes == null) { int nextValue = this.nextValue(topic); //当前topic存活的partition数 List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { //从存活的partition里选取一个partition返回 int part = Utils.toPositive(nextValue) % availablePartitions.size(); return ((PartitionInfo)availablePartitions.get(part)).partition();
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。