赞
踩
原创不易,转载请注明出处
在《深度解析kafka生产者发送消息(原理篇)》一文中,我们主要介绍了kafka消息生产者端业务线程与sender线程主要工作流程,业务线程是将消息追加到一个内存缓冲区中,而sender线程就是不断从内存缓存去中获取消息,然后进行消息的请求封装,消息发送,接收响应,创建连接,断开连接等等。本文主要是从分析一下业务线程将消息追加到内存缓冲区这块的源码
我们就以KafkaProducer的send 方法看看消息是怎样一步一步被添加到内存缓冲区的。
上面那一段代码就是拦截器过滤消息,我们这里就不深入去看了,你可以配置"interceptor.classes
"这个参数来指定拦截器过滤消息。
我们接下来看看这个doSend方法
首先是等待获取这个topic的元数据,如果存在的话直接往下走了,如果不存在的话,就会等着sender线程发送请求去broker 上面获取,默认最大等待时间是1分钟,也就是这个maxBlockTimeMs 这个参数。
接着就是计算下剩余的等待时间,使用你配置的序列化器对key ,value 进行序列化
我们接着doSend方法往下看。
接着就是调用partition方法进行partition路由
可以看到,如果你指定了这个partition,就会是判断一下这个元数据里面有没有你这个partition,如果有的话就使用你指定的这个,如果没有就抛出异常,如果你没有指定这个消息的partition 就使用partitioner 进行路由,默认是DefaultPartitioner,我们来看下这个DefaultPartitioner 的partition方法实现
可以看到,会判断下你这个key是不是null,如果是null的话,就会使用轮询的方式从可用partition集合中选择partition,可用partition集合中美元素,就使用轮询方式从不可用的partition集合中选择一个,如果你这个key不是null,就使用murmur2算法对key进行hash 然后% partition数量。
接着上面doSend方法往下看,下面这段代码其实就是计算下消息的长度size_length + offset_length + crc + magic + attribute + timestamp + key_size + key + value_length + value
4 + 8 + 4 + 1 + 1 + 8 + 4 + key + 4 + value
判断下消息符不符合大小,默认是不能超过1m(max.request.size这个参数配置),也不能超过内存缓冲区的大小(这个参数配置buffer.memory )。
接着doSend方法往下看。
这块主要是使用拦截器包装一下你的回调函数,接着就是调用RecordAccumulator内存缓冲区的append方法将消息追加到内存缓冲区的batch中。
我们来看下RecordAccumulator的append方法。
首先会根据消息topic-partition 获取一个Deque队列,如果不存在就创建一个然后放到topic-partition 与Deque 对应关系的map中。
接着就是根据deque加锁,因为这里是支持多业务线程并发追加的。
调用tryAppend方法进行尝试追加
先从deque队列中获取队列尾部的一个元素batch,如果batch是等于null的也就是没有,说明之前没有创建过或者已经被sender线程发送光了,这个时候就会返回null,如果batch存在的话,就会调用batch的tryAppend方法进行追加下
先会判断下它这个batch还能不能放开这个消息,如果是不能放开的话,就返回null,如果是能放开就进行追加,其实再往下就是追加写到bytebuffer中了,这里我们就不再往下看了,接着封装一个future,将回调函数添加到thunks中。追加成功的话,一路返回就可以了,这个业务线程这次消息发送就算ok了。但是batch不存在,batch里面内存满了放不开这个消息的时候,就会接着执行RecordAccumulator # append方法的后半部分,那就申请内存,然后新创建一个batch进行追加,将batch 添加到队列的队尾。我们看看RecordAccumulator 的append方法第一次追加失败是怎样处理的。
先会计算出来batch的大小,如果你消息的大小大于batch默认大小(默认是16k,可以通过batch.size 配置),接着找bufferpool申请一块内存,再锁定这个deque,重新尝试追加,这里为什么又重新尝试追加呢?如果是多线程并发追加的话,多个线程同时往这个deque里面追加消息,如果一开始batch满了,多个线程没有追加成功,就会申请内存,重新获取锁,如果某个线程先获取了处理机的执行权限,就会先获取到锁,后面的线程在锁外面等着,这个线程先尝试追加一次,这次肯定失败,因为还是往之前那个batch里面追加,接着就会走后面的逻辑,创建batch,追加写消息,将batch添加到队列尾部,释放锁,接着锁外面等着的线程获得处理机执行权限后,获取锁,尝试追加下,如果能追加成功更好,说明它赶上了上个线程创建的那个batch没有满,如果追加上了,就会释放上面申请的那块内存,如果还是追加不上,就会创建新的batch,进行追加,将batch添加到队列的队尾,这次肯定是可以追加成功的。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。