赞
踩
本文主要用来整理kafka生产者相关的一些知识点,主要参考自《深入理解kafka核心设计与实践原理》--朱忠华
kafka先后有两个大版本的生产者客户端:第一个是kafka开源之初使用Scala语言编写的客户端,我们称Scala生产客户端;第二个就是0.9.x版本开始推出的使用java语言编写的客户端,我们称java版生产客户端,它弥补了很多旧版Scala生产客户端的缺陷;(现在的旧版基本已经淘汰)。构建一个完整的生产者客户端应该包含下面几部分信息:
1、配置生产者客户端参数及创建相应的生产者实例
KafkaProducer<String,String> producer=new KafkaProducer<>(props,new StringSerializer(),new StringSerializer());//线程安全
创建生产者实例需要配置相应的参数,其中有3个必填参数
kafka还有众多其他的参数,我们在构造的KafkaProducer的时候可以根据实际情况去灵活调配;
2、构建待发的消息
一个待发的消息对象不只是包含业务相关的消息体,它还包含了很多其他的属性
- public class ProducerRecord<K,V>{
- private final String topic; //主题
- private final Integer partition; //分区号
- private final Headers headers; //消息头部 kafka 0.11.x版本才引入,设置应用相关信息,非必须
- private final K key; //键--不只是可以用来指定消息的键,还可以用来计算消息分区
- private final V value; //值--消息体
- private final Long timestamp; //消息的时间戳
- //省略其他成员方法和构造方法
- }
3、发送消息
KafkaProducer的send方法有两个重载方法:
public Future<RecordMetadata> send(ProducerRecord<K,V> record);
public Future<RecordMetadata> send(ProducerRecord<K,V> record,CallBack callback);
KafkaProduer调用send()方法时,一般会发生两种类型的异常:可重试异常和不可重试异常
Kafka消息发送主要有三种模式,发后即忘(fire-and-forget)、同步(sync)、异步(async),重点对比优劣
- producer.send(record,new CallBack(){
- @Override
- public void onCompletion(RecordMetaData metadata,Exception exception){
- //该方法中的两个参数不并存,要么异常,要么返回消息元数据
- if (exception !=null) {
- exception.printStackTrace(); //实际中可能需要记录日志分析
- } else {
- System.out.println(metadata.topic()+"-"+metadata.partation()+":"+metadata.offset());
- }
- }
- });
当存在多个异步消息发送时,各消息的回调也是保证分区有序的
4、关闭生产者实例
Kafka发送消息一般也不是单条发送,也是多条消息,当消息发送完成后,也需要调用close()方法回收资源。close()方法会阻塞等待之前的所有消息都发送完后再关闭KafkaProducer;
KafkaProducer还提供了一个带超时时间的close方法:
public void close(long timeout,TimeUnit timeUnit)
5、拦截器|序列化器|分区器
消息在通过send()方法发往broker的过程中,可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器系列作用后才能真正发往boker;其中拦截器一般不是必须的;
5.1、拦截器
拦截器是死kafka0.10.0.0中引入的一个新功能,kafka一般有两种拦截器,生产者拦截器、消费者拦截器;
生产者拦截器主要主要用在消息发送前做一些准备工作,比如按照某个规则过滤掉不符合规则的消息,修改消息内容,也可在发送回调之前做一些定制化的需求(统计成功率等);kafka为了方便使用提供了org.apache.kafka.clients.producer.ProducerIntercepter接口,来统一生产端的拦截器;ProducerIntercepter包含三个方法,对应消息发送过程中的三个拦截点:
public ProducerRecord<K,V> onSend(ProducerRecord<K,V> record); //消息分区之前对消息进行定制化操作
public ProducerRecord<K,V> onAcknowledgement(ProducerRecord<K,V> record);//消息应答之前或消息发送异常时调用;优于用户设定的callback之前执行
public void close(long timeout,TimeUnit timeUnit); //关闭拦截器之前执行一些资源清理工作
kafka也支持定义使用多个拦截器,形成拦截器链,拦截器链会按照,interceptor.classes参数配置拦截器的顺序来一一执行;在拦截器链中,如果某个拦截器执行失败了,那么下一个拦截器会接着从上一个执行成功的拦截器继续执行;
5.2、序列化器
生产者需要将消息对象通过序列化器(Serializer)转换为字节数组才能通过网络发送给kafka;而在对侧,消费者需要用反序列器(Deserializer)把从kafka消费的字节数组转换为对象。为了方便,消息的key、value一般定义为string,对应序列化器是kafka自带的StringSerializer,StringDeserializer;kakfa也提供了ByteArray、ByteBuffer、Byte、Integer、Double、Long这几种类型的序列化器;它们都实现了org.apache.kafka.common.serialization.Serializer接口,此接口提供3个方法:
public void configure(Map<String,?> configs,boolean isKey); //用来配置当前类(确定编码类型等)
public byte[] serialize(String topic ,T data); //对消息进行序列化操作
public void close(); //关闭序列化器之前执行一些资源清理工作
当kafka自身提供的几种序列化器不能满足时,则可以使用JSON,Thrift,ProtoBuf和Protostuff等通用序列化工具来实现,或者自定序列化器实现;定义了序列化器就一定要定义反序列化器;
5.3、分区器
如果消息ProducerRecord中指定了partition字段,那么就不需要分区器;因为partition 代表的就是消息发送到哪个分区;如果没有指定分区partition,那么就依赖分区器,根据key这个字段来计算partition的值;分区器的作用就是为消息指定分区。kafka提供了默认分区器org.apache.kafka.clients.producer.internals.DefaultPartitioner,它实现自org.apache.kafka.clients.producer.Partitioner接口,接口定义了2个方法:
public int partition(String topic,Object key,byte[] keyBytes,Object value,byte[] valueBytes,Cluster cluster);//计算分区号,其中参数keyBytes是序列化后的键
public void close(); //分区器关闭时释放一些资源
默认分区器DefaultPartitioner的实现中close()方法是空实现,partition()方法定义了分区分配的主要逻辑:如果key不为null,那么默认的分区器会对key进行hash(采用MurmurHash2算法,具备高性能低碰撞率),根据得到的哈希值来计算分区号,同一个key的消息会被写入到相同的分区;如果key为null,则消息采用轮询的方式发往主题内各个可用分区;
当主题分区数不改变的情况下,消息key和分区的映射关系是不变的;修改主题分区数之后,消息key和分区的映射关系就会发生变化; 我们可以自定义分区器,实现org.apache.kafka.clients.producer.Partitioner接口,在partition方法中定制key和分区的映射逻辑;自定义分区器后需要用 partitioner.class 参数指定分区器;
6、整体架构
上图是生产者客户端的整体架构,生产者客户端由两个线程协调运行,主线程和Sender线程(发送线程);主线程负责KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器作用后,将消息缓存到Accumulator(消息累加器)中;Sender线程负责从Accumulator中获取消息发送到kafka中。
6.1、消息累加器
Accumulator的主要作用是将消息缓存,以便Sender线程批量发送,进而减少网络传输资源消耗;可以通过客户端的buffer.memory参数设置Accumulator缓存的大小。当主线程中生产者发送消息的速度大于Sender线程发送消息到服务器的速度时,会导致消息累加器缓存空间不足,这时KafkaProducer的send()方法要么抛出异常,要么阻塞;这取决于max.block.ms参数的配置,此参数默认是60000ms=60s;
消息累加器Accumulator内部为每个分区维护了一个双端队列,队列的内容就是ProducerBatch,即Deque<ProducerBatch>,消息写入缓存时消息追加到队列的尾部,Sender线程读取消息时,从队列的头部读取;(这里ProudcerBatch不等同ProducerRecord,一个ProducerBatch中可以包含多个ProducerRecord,ProducerRecord会被包含在PruducerBatch中,这样可以使字节的使用更加紧凑,较小的ProducerRecord被拼接成一个较大的ProducerBatch,也可以减少网络请求的次数提升吞吐量);如果kafka被拆分为很多分区,生产者客户端可以适当将buffer.memory扩大,以便提升整体吞吐量;
消息在网络上是以字节(Byte)的形式传输的,所以在发送之前还要创建一块儿内存区域来保存对应的消息。在kafka生产者客户端中,通过java.io.ByteBuffer实现消息内存的创建与释放,不过频繁的创建和释放是比较消耗内存的,在Accumulator的内部还有一个BufferPool,主要来实现ByteBuffer的复用,以实现缓存的高效利用。不过BufferPool只针对特定大小的ByteBuffer进行管理,而其他大小的ByteBuffer不允许缓存进BufferPool中,这个特定的大小由batch.size参数来指定,默认值为16384B,即16KB。我们可以适当调大batch.size大小,增加缓存量;
ProduerBatch的大小和batch.size参数也有着密切的关系。当一条消息流入到消息累加器中的时候,会先找到消息分区所对应的双端队列(如果没有则新建),在这个双端队列的尾部先获取一个ProducerBatch(如果没有则新建),再去看ProducerBatch的大小是否还可以再写入这条PoducerRecord消息;若果这条消息的大小超过了ProducerBatch的容量,就需要创建一个新的ProducerBatch; 这里创建新ProducerBatch时的大小容量和batch.size紧密相关的:首先评估这小ProducerRecord消息的大小是否超过了batch.size,如果不超过就以batch.size的大小创建ProducerBatch,这样在使用完这段内存区域后,可以通过BufferPool的管理来进行复用;如果超过,那么就以评估的大小来创建ProduerBatch,这段内存区域不会被复用;
6.2、Sender线程发送消息前还需要进行哪些结构转换
Sender线程从RecordAccumulator中获取缓存到的消息后,会进一步将原本的<分区,Deque<ProducerBatch>>的保存形式转变为<Node,<List<ProducerBatch>>>其中Node表示kafka集群的broker节点。对于网络连接来说,生产者客户端是与具体的broker节点建立连接,也就是向具体的broker节点发送消息,而并不关心消息属于哪一个分区;而对于KafkaProducer的应用逻辑而言,我们只需要关注向哪个分区发送哪些消息;所以这里需要做一个应用逻辑层面到网络I/O层面的转换。
在转换成<Node,List<ProducerBatch>>的形式之后,Sender还会进一步封装成<Node,Request>的形式,Request是指kafka的各种协议请求,对于消息发送就是指具体的ProducerRequest(这里kafka协议相关的具体东西,后面单论);
在请求从Sender线程发往Kafka之前还会保存到InFlightRequest中,InFlightRequest对象的具体形式为Map<NodeId,Deque<Request>>,他主要的作用是缓存已经发送出去但是还没有收到响应的请求(NodeId是一个String类型的值,标识节点的id编号);与此同时,InFlightRequests还提供了很多管理类的方法,并且通过配置参数还可以限制每个连接(客户端与Node节点之间的连接)最多缓存的请求数,这个配置参数为mat.in.flight.requests.per.connection,默认值为5,即每个连接最多可以缓存5个为收到响应的请求,超过该数之后不能向这个连接发送更多的请求了,除非缓存的请求收到了响应。通过比较Deque<Request>的size和上述参数的大小来判断对应的Node是否已经堆积了很多未响应的消息;如果真是如此,那么说明这个node节点的负载较大或网络连接有问题,如果继续向这个节点发送请求就会增大超时的可能;
6.3、元数据更新
上面我们提到了InFlightRequest还可以获得leastLoadedNode,即所有Node中负载最小的那一个。这里负载最小是通过每个Node在InFlightRequest中还未响应Response的的请求决定的,请求越多负载越大;leatLoadNode的用处且听细细道来;
KafkaProducer客户端要将一条消息成功追加到某个topic主题的某个分区所对应的leader副本之前,首先要知道主题的分区数量,然后通多分区数计算出(或直接指定)消息的目标分区,之后KafkaProduer要知道目标分区的leader副本所在的broker节点、端口等信息才能建立连接,最终才能将消息发送到kafka,在这一过程中所需要的信息都属于元数据信息;
ProducerConfig中配置的节点不必要是全部节点的地址,因为客户端每次发送消息后都可以从响应中获取到其他所有节点的地址,这一过程也属于元数据相关的更新操作。于此同时,分区数量leader副本也会动态的变化,客户端也需要动态的捕捉这些变化;
这就明白了元数据指什么?是指Kafka集群的元数据,具体记录了集群中有哪些主题,主题有哪些分区,分区的leader副本配置到了哪个节点上,follower副本分配在哪些节点上,哪些再AR\ISR等集合中,集群中有哪些节点,控制器节点又是哪一个等信息。。。
当客户端中不存在需要使用的元数据信息时(这种情况一般是长时间没有进行消息发送),或者metadata.max.age.ms时间没有更新元数据,都会引起元数据的更新操作。客户端默认的metadata.max.age.ms的时间一般是300000,即5min。元数据的更新操作是在客户端内部进行的,对客户端外部使用者不可见。当需要更新元数据时,会挑选出leastLoadedNode,然后向这个节点发送MetaDataRequest请求来获取具体的元数据信息。这个更新操作是由Sender线程发起的,创建完MetaDataRequest后也会存入InFlightRequests之后的步骤就和发送消息类似;元数据虽然由Sender线程负责更新,但是主线程也需要读取这些信息,这里的数据同步通过synchronized和final关键字来保障(具体建议读码);
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。