当前位置:   article > 正文

深度解析kafka生产者发送消息(业务线程将消息追加到缓冲区)_kafka生产者发送缓冲区

kafka生产者发送缓冲区

原创不易,转载请注明出处


前言

在《深度解析kafka生产者发送消息(原理篇)》一文中,我们主要介绍了kafka消息生产者端业务线程与sender线程主要工作流程,业务线程是将消息追加到一个内存缓冲区中,而sender线程就是不断从内存缓存去中获取消息,然后进行消息的请求封装,消息发送,接收响应,创建连接,断开连接等等。本文主要是从分析一下业务线程将消息追加到内存缓冲区这块的源码

1.回顾业务线程都经历哪些核心步骤

  1. 当我们业务线程调用KafkaProducer的send方法的时候,会先经过一堆拦截器,会过滤你发送的消息,这个拦截器默认是没有的,这块可以自己实现配置一下就可以了。
  2. 接着就是看看元数据有没有,如果没有的话就等着获取下元数据,这个获取元数据其实sender线程在运行时候会向broker获取下元数据。
  3. 接着就是对消息里面key ,value 进行序列化,使用我们配置的序列化器进行。
  4. 接着就是使用partitioner进行 partition路由,其实就是为你这个消息选择一个partition,如果你在消息中指定了partition ,就会使用你指定的那个,如果没有指定的话,就会使用你设置的那个partitioner进行路由,如果没有配置这个partitioner 就使用默认的那个,默认的那个partitioner 在路由的时候会判断你这个key存不存在,如果key不存在,直接轮询的方式选择partition,如果key存在的话,就hash key然后模partition的数量。
  5. 接着就是往RecordAccumulator中追加消息,首先会找到你这个消息topic-partition 对应的一个队列,如果有的话,并且没有满,能够放得下本条消息,就会将消息追加到batch中, 如果没有的话就创建,接着会从这个队列队尾取出一个batch,如果batch 满了或者是不存在就新向bufferpool中申请块内存,创建一个batch,然后将消息追加到batch中,将batch添加到队列的队尾。追加成功之后,业务线程就可以返回去继续处理其他消息的发送了。
    在这里插入图片描述

2.源码解析

我们就以KafkaProducer的send 方法看看消息是怎样一步一步被添加到内存缓冲区的。
在这里插入图片描述
上面那一段代码就是拦截器过滤消息,我们这里就不深入去看了,你可以配置"interceptor.classes"这个参数来指定拦截器过滤消息。
我们接下来看看这个doSend方法
在这里插入图片描述
首先是等待获取这个topic的元数据,如果存在的话直接往下走了,如果不存在的话,就会等着sender线程发送请求去broker 上面获取,默认最大等待时间是1分钟,也就是这个maxBlockTimeMs 这个参数。
接着就是计算下剩余的等待时间,使用你配置的序列化器对key ,value 进行序列化
我们接着doSend方法往下看。
在这里插入图片描述
接着就是调用partition方法进行partition路由
在这里插入图片描述
可以看到,如果你指定了这个partition,就会是判断一下这个元数据里面有没有你这个partition,如果有的话就使用你指定的这个,如果没有就抛出异常,如果你没有指定这个消息的partition 就使用partitioner 进行路由,默认是DefaultPartitioner,我们来看下这个DefaultPartitioner 的partition方法实现
在这里插入图片描述
可以看到,会判断下你这个key是不是null,如果是null的话,就会使用轮询的方式从可用partition集合中选择partition,可用partition集合中美元素,就使用轮询方式从不可用的partition集合中选择一个,如果你这个key不是null,就使用murmur2算法对key进行hash 然后% partition数量。
接着上面doSend方法往下看,下面这段代码其实就是计算下消息的长度size_length + offset_length + crc + magic + attribute + timestamp + key_size + key + value_length + value 4 + 8 + 4 + 1 + 1 + 8 + 4 + key + 4 + value
判断下消息符不符合大小,默认是不能超过1m(max.request.size这个参数配置),也不能超过内存缓冲区的大小(这个参数配置buffer.memory )。
接着doSend方法往下看。
在这里插入图片描述
这块主要是使用拦截器包装一下你的回调函数,接着就是调用RecordAccumulator内存缓冲区的append方法将消息追加到内存缓冲区的batch中。
我们来看下RecordAccumulator的append方法。
在这里插入图片描述
首先会根据消息topic-partition 获取一个Deque队列,如果不存在就创建一个然后放到topic-partition 与Deque 对应关系的map中。
接着就是根据deque加锁,因为这里是支持多业务线程并发追加的。
调用tryAppend方法进行尝试追加
在这里插入图片描述
先从deque队列中获取队列尾部的一个元素batch,如果batch是等于null的也就是没有,说明之前没有创建过或者已经被sender线程发送光了,这个时候就会返回null,如果batch存在的话,就会调用batch的tryAppend方法进行追加下
在这里插入图片描述
先会判断下它这个batch还能不能放开这个消息,如果是不能放开的话,就返回null,如果是能放开就进行追加,其实再往下就是追加写到bytebuffer中了,这里我们就不再往下看了,接着封装一个future,将回调函数添加到thunks中。追加成功的话,一路返回就可以了,这个业务线程这次消息发送就算ok了。但是batch不存在,batch里面内存满了放不开这个消息的时候,就会接着执行RecordAccumulator # append方法的后半部分,那就申请内存,然后新创建一个batch进行追加,将batch 添加到队列的队尾。我们看看RecordAccumulator 的append方法第一次追加失败是怎样处理的。
在这里插入图片描述
先会计算出来batch的大小,如果你消息的大小大于batch默认大小(默认是16k,可以通过batch.size 配置),接着找bufferpool申请一块内存,再锁定这个deque,重新尝试追加,这里为什么又重新尝试追加呢?如果是多线程并发追加的话,多个线程同时往这个deque里面追加消息,如果一开始batch满了,多个线程没有追加成功,就会申请内存,重新获取锁,如果某个线程先获取了处理机的执行权限,就会先获取到锁,后面的线程在锁外面等着,这个线程先尝试追加一次,这次肯定失败,因为还是往之前那个batch里面追加,接着就会走后面的逻辑,创建batch,追加写消息,将batch添加到队列尾部,释放锁,接着锁外面等着的线程获得处理机执行权限后,获取锁,尝试追加下,如果能追加成功更好,说明它赶上了上个线程创建的那个batch没有满,如果追加上了,就会释放上面申请的那块内存,如果还是追加不上,就会创建新的batch,进行追加,将batch添加到队列的队尾,这次肯定是可以追加成功的。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/707514
推荐阅读
相关标签
  

闽ICP备14008679号