当前位置:   article > 正文

Kafka宝典_kafka from-beginning

kafka from-beginning

Kafka

笔记内容取自尚硅谷Kafka3.0教程,以及《深入理解Kafka核心设计与实践原理》

内容还会不断充实~

概述

定义

传统定义:

Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域

最新定义:

Kafka是一个开源的分布式事件流平台(Event Streaming Platform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。

消息队列

应用场景
  • 异步处理
  • 削峰
优势
  • 解耦

    允许你独立的扩展或修改两边的处理过程,只要确保他们遵守同样的接口约束

  • 可恢复性

    系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理

  • 缓冲

    有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的的处理速度不一致的情况

  • 灵活性&峰值处理能力

    • 灵活性:动态上下线,在流量高峰时可以动态增加服务器
    • 峰值处理:削峰,延迟处理

    使系统能够应对突发的高峰流量

  • 异步通信

    允许用户把一个消息放入队列,但不立即处理它,在需要的时候再去处理

模式
  1. 点对点模式

    一对一,消费者主动拉取数据,收到消息后,队列中的消息清除。队列可以有多个消费者,但对于一个消息而言,只能被一个消费者消费。
    在这里插入图片描述

  2. 发布/订阅模式

    • 一对多,消费者消费消息后,队列不会清除消息。消息生产者将消息发布到topic中,同时有多个消费者消费该消息。和点对点模式不同,发布到topic的消息会被所有订阅者消费

    • 发布/订阅模式中,又分为两种:

      • 消费者主动拉取消息

        Kafka就是属于这种类型

        优势:速度取决于消费者,可以根据消费能力以适当的速率消费消息

        弊端:需要轮询,查看队列中是否有消息,浪费资源

      • 队列推送消息

        类似于公众号推送

        弊端:

        推送消息的速度取决于队列,各个消费者处理消息的速度可能不一致,造成消费者崩掉(推送速度 >消费者处理速度)或者资源浪费(推送速度 < 消费者处理速度)

Kafka基础架构

架构图

在这里插入图片描述

zk在这里的作用:

  • 存储kafka集群信息
  • 存储消费者消费到的位置信息
    • 即:消费到第几条了
    • 消费者本地内存也会存储该条数信息,平时就是读取并维护本地的信息;但当机器挂掉重启后,会先去zk获取该信息,然后再在本地内存继续维护)
    • 0.9之后将位置信息存储到kafka里一个系统创建的topic中
      • 为何改存到kafka?
      • 因为消费者本身就需要维护与kafka的连接,去获取消息,如果将位置信息放在zk,则还需要请求zk获取信息,速度不如kafka(注:Kafka消息存到磁盘,默认存七天

名词解释

  • Broker:可以理解为起了Kafka进程的服务器
  • Topic:主题,可以理解为一个队列,用来将消息分类,便于发送和消费
  • Partition:分区,用来提高Kafka集群的并发处理能力,每一个partition是一个有序的队列(个人理解Partition就是对Topic的又一次细分,分布式的多个Broker是为了避免单个机器的性能造成的阻塞,多个partition是为了避免同一内存区域的IO阻塞)
  • Replication:副本。为保证集群中某个节点发生故障时,该节点上的partition数据不丢失,且kafka仍然能够继续工作,kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个leader和若干个follower
  • Leader:每个分区多个副本的”主“,生产者发送数据的对象,消费者消费数据的对象都是leader
  • Follower:
    • Leader的副本,每个分区多个副本的”从“,实时从leader同步数据,保持和leader数据的同步。用来备份数据(不直接对生产者和消费者提供读写),避免Leader所在机器挂掉后数据丢失。(所以,同一topic同一partition的Follower和Leader一定不在同一台机器)
    • leader发生故障时,某个Follower会成为新的leader
  • Consumer Group:消费者组,可以理解为一个大的消费者,目的是提高消费能力
    • 和其他普通消费者一样,需要订阅一个topic
    • 所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者
    • 一个消费者组里面的不同消费者,只能消费kafka中这个topic的不同partition的数据。(个人理解:建立partition就是为了提高消息的读写速度,对于同一个topic,消息的写入根据partition区分开了,消费者消费的时候如果不分开,会降低消息消费速度且造成重复消费,那partition的意义就不大了)

基本操作

安装

命令行操作

topic

查看该机器上所有topic:

kafka-topics.sh --list --zookeeper ip:zk端口
  • 1

创建topic:

kafka-topics.sh --create --topic topic名称 --zookeeper ip:zk端口 --partitions 分区数 --replication-factor 副本数

#注:副本数不能大于当前可用的Broker数,分区数可以大于当前可用的Broker数
#副本数 包括 leader 和 follower
  • 1
  • 2
  • 3
  • 4

删除topic:

kafka-topics.sh --delete --topic first --zookeeper ip:zk端口

#注:执行效果:
#Topic first is marked for deletion.   标记为删除
#Note: This will have no impact if delete.topic.enable is not set to true.  只有当delete.topic.enable设为true时才会真正删除
  • 1
  • 2
  • 3
  • 4
  • 5

查看topic详情:

kafka-topics.sh --describe --zookeeper ip:zk端口 --topic topic名称
  • 1
消息

生产者发送消息

kafka-console-producer.sh --topic first --broker-list kafkaIP:kafka端口
  • 1

消费者消费消息

kafka-console-consumer.sh --topic first --bootstrap-server kafkaIP:kafka端口 #从当前开始消费
#或者:
kafka-console-consumer.sh --topic first --bootstrap-server kafkaIP:kafka端口 --from-begining #从头开始消费
  • 1
  • 2
  • 3

架构深入

工作流程

在这里插入图片描述

说明:

  • 偏移量不是全局唯一的,是分区唯一的
  • kafka只保证消息的分区内有序,不保证全局有序
    • 有序是指消费消息的顺序和生产消息的顺序一致
  • kafka中消息是以topic进行分类的,生产者生产消息,消费者消费消息,都是面向topic的
  • topic是逻辑上的概念,而partition是物理上的概念
    • 每个partition对应一个文件夹(文件夹名 = topic - partition),文件夹内有.log文件,该log文件存储的就是producer生产的数据。producer生产的数据会不断追加到该log文件末端,且每条数据都有自己的offset。

    • 消费者都会实时记录自己消费到了哪个offset,以便出错恢复时,从上次的位置继续消费。

    • 个人更倾向于《深入理解Kafka核心设计与实践原理》上的叙述:

      分区同主题一样是一个逻辑的概念而没有物理上的存在

      主题、分区、副本和Log(日志)的关系如下图,主题和分区都是提供给上层用户的抽象,而在副本层面或更加确切的说是Log层面才有实际物理上的存在
      在这里插入图片描述

生产者

消息发送流程

在这里插入图片描述

整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和Sender线程(发送线程)。在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。

RecordAccumulator主要用来缓存消息以便Sender线程可以批量发送,进而减少网络传输的资源消耗以提升性能。 RecordAccumulator的大小可通过生产者客户端参数buffer.memory配置,默认32MB。如果生产者发送消息的速度超过发送到服务器端的速度,则会导致生产者空间不足,这个时候KafkaProducer的send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms的配置,此参数默认60秒。

主线程中发送过来的消息都会被追加到RecordAccumulator的某个双端队列(Deque)中,在RecordAccumulator的内部为每个分区都维护了一个双端队列,队列中的内容就是ProducerBatch,即Deque<ProducerBatch>。消息写入缓存时,追加到双端队列的尾部;Sender读取消息时,从双端队列的头部读取。

注意ProducerBatch不是ProducerRecord,ProducerBatch中可以包含一致多个ProducerRecord

  • ProducerBatch是一个消息批次,包含ProducerRecord,使字节使用更加紧凑
  • 将较小的ProducerRecord拼凑成一个较大的ProducerBatch,可以减少网络请求的次数以提升整体的吞吐量

如果生产者要向很多分区发送消息,则可将buffer.memory参数适当调大以增加整体吞吐量(buffer.memory大,RecordAccumulator则大,因RecordAccumulator中为每个分区都维护了一个双端队列,所以,RecordAccumulator大,每个分区分到的空间就大,可缓存的消息就多)。

在Kafka生产者客户端中,使用java.io.ByteBuffer实现消息内存的创建和释放,不过频繁的创建和释放比较耗费资源,故RecordAccumulator内部有一个BufferPool,主要用来实现ByteBuffer的复用,以实现缓存的高效利用。不过BufferPool只针对特定大小的ByteBuffer进行管理,而其他大小的ByteBuffer不会缓存进BufferPool中,这个特定的大小由batch.size参数指定,默认16KB。

ProducerBatch的大小和batch.size的关系:

当一条消息(ProducerRecord)进入RecordAccumulator时,会先寻找与消息分区所对应的双端队列(如果没有则新建),再从这个双端队列尾部获取一个ProducerBatch(如果没有则新建),查看ProducerBatch中是否还可以写入这个ProducerRecord,如果可以则写入,如果不可以则需要创建一个新的ProducerBatch。在新建ProducerBatch时评估这条消息的大小是否超过batch.size参数设定的大小,如果不超过,则以batch.size参数的大小来创建ProducerBatch,这样在使用完这段内存区域之后,可以通过BufferPool的管理来进行复用;如果超过,那么就以评估的大小来创建ProducerBatch,这段内存区域不会被复用。

向RecordAccumulator中追加消息源码:

public RecordAppendResult append(TopicPartition tp,
                                     long timestamp,
                                     byte[] key,
                                     byte[] value,
                                     Header[] headers,
                                     Callback callback,
                                     long maxTimeToBlock,
                                     boolean abortOnNewBatch,
                                     long nowMs) throws InterruptedException {
   
        // We keep track of the number of appending thread to make sure we do not miss batches in
        // abortIncompleteBatches().
        appendsInProgress.incrementAndGet();
        ByteBuffer buffer = null;
        if (headers == null) headers = Record.EMPTY_HEADERS;
        try {
   
            // check if we have an in-progress batch
            // 获取 or 创建队列
            Deque<ProducerBatch> dq = getOrCreateDeque(tp);
            synchronized (dq) {
   
                if (closed)
                    throw new KafkaException("Producer closed while send in progress");
                // 尝试向ProducerBatch中添加数据
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
                if (appendResult != null)
                    // 向已有的 ProducerBatch 中追加成功,直接返回即可
                    return appendResult;
            }

            // we don't have an in-progress record batch try to allocate a new batch
            if (abortOnNewBatch) {
   
                // Return a result that will cause another call to append.
                return new RecordAppendResult(null, false, false, true);
            }

            byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
            // 计算批次大小设置值,和真实消息(序列化和压缩后)大小 取较大值
            int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
            log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, tp.topic(), tp.partition(), maxTimeToBlock);
            // BufferPool分配内存(按照上一步计算得到的size来分配,如果size超过了16k,则这块区域不会被BufferPool复用)(BufferPool只对特定大小的ByteBuffer进行管理,这个特定大小由batch.size指定,默认16k)
            buffer = free.allocate(size, maxTimeToBlock);

            // Update the current time in case the buffer allocation blocked above.
            nowMs = time.milliseconds();
            synchronized (dq) {
   
                // Need to check if producer is closed again after grabbing the dequeue lock.
                if (closed)
                    throw new KafkaException("Producer closed while send in progress");

                //再向 ProducerBatch 中追加试试
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
                if (appendResult != null) {
   
                    // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                    return appendResult;
                }

                //不再舔了,我自己新建一个ProducerBatch
                // 封装ByteBuffer
                MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
                // 再封装
                ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);
                FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
                        callback, nowMs));

                // 将ProducerBatch添加到队列末尾
                dq.addLast(batch);
                incomplete.add(batch);

                // Don't deallocate this buffer in the finally block as it's being used in the record batch
                buffer = null;
                return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false);
            }
        } finally {
   
            if (buffer != null)
                free.deallocate(buffer);
            appendsInProgress.decrementAndGet();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84

Sender从RecordAccumulator中获取缓存的消息之后,会进一步将原本<分区,Deque<ProducerBatch>>的保存形式转变成<Node, List<ProducerBatch>>的形式,其中Node表示Kafka集群的broker节点。对于网络连接来说,生产者客户端是与具体的broker节点建立的连接,也就是向具体的broker节点发送消息,而并不关心消息是属于哪一个分区;而对于KafkaProducer的应用逻辑而言,我们只关注向哪个topic中发送哪些消息,所以这里需要做一个应用逻辑层到网络I/O层面的转换。

在转换成<Node, List<ProducerBatch>>的形式之后,Sender还会进一步封装成<Node, Request>的形式,这样就可以将Request请求发往各个Node了。这里的Request是指Kafka的各种协议请求,对于消息发送而言就是具体的ProduceRequest。

请求在从Sender线程发往Kafka之前还会保存到InFlightRequests中,InFlightRequests保存对象的具体形式为Map<NodeId, Deque<Request>>,它的主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId是一个String类型,表示节点的id编号)。通过max.in.flight.requests.per.connection参数可限制每个连接(也就是客户端与每个Node之间的连接)最多缓存的请求数,默认值为5,即每个连接最多只能缓存5个未响应的请求,超过该数值之后就不能再向这个连接发送更多请求了,除非有缓存的请求已经收到了响应(Response)。

如果响应成功,则会清理InFlightRequests中的请求,以及RecordAAccumulater中对应分区中的数据;

如果响应失败,则会进行重试,重试次数可通过retries参数进行设置,默认为int类型的最大值。

我们发送消息通常只指定了topic,那么生产者客户端如何知道要发往哪个broker节点呢?这就需要元数据

元数据是指kafka集群的元数据,这些元数据具体记录了集群中有哪些主题,这些主题有哪些分区,每个分区的leader副本分配在哪个节点上,follower副本分配在哪些节点上,哪些副本在AR,ISR等集合中,集群中有哪些节点,控制器节点又是哪一个等信息。

元数据的更新(二者满足其一即可触发更新):

  • 当客户端中没有要使用的元数据时
  • 超过metedata.max.age.ms时间没有更新元数据(默认5分钟)

当需要更新元数据时,会先挑选出latestLoadedNode(即InFlightRequests中还未确认的请求个数最小的Node),然后向这个Node发送MeteDataRequest请求来获取具体的元数据信息。这个更新操作由Sender线程发起,在创建完MeteDataRequest之后同样会存入InFlightRequests,之后的步骤就和发送消息时类似。元数据由Sender线程负责更新,但是主线程也需要读取这些信息,这里的数据同步由synchronized和final关键字来保障。

重要参数
参数名称 说明
bootstrap.servers 生 产 者 连 接 集 群 所 需 的 broker 地 址 清 单 。 例如ip:port,ip1:port,可以设置 1 个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者可以从给定的 broker里查找到其他 broker 信息。
key.serializer 和 value.serializer 指定发送消息的 key 和 value 的序列化类型。一定要写全类名
buffer.memory RecordAccumulator 缓冲区总大小, 默认 32m。
batch.size 缓冲区一批数据最大值, 默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加
linger.ms 如果数据迟迟未达到 batch.size, sender 等待 linger.time之后就会发送数据。单位 ms, 默认值是 0ms,表示没有延迟。 生产环境建议该值大小为 5-100ms 之间。
acks 0:生产者发送过来的数据,不需要等数据落盘应答。
1:生产者发送过来的数据, Leader 收到数据后应答。
-1(all):生产者发送过来的数据, Leader+和 isr 队列里面的所有节点收齐数据后应答。-1 和all 是等价的。 Kafka3.0中默认值是-1,之前版本默认是1。
max.in.flight.requests.per.connection 允许最多没有返回 ack 的次数, 默认为 5,开启幂等性要保证该值是 1-5 的数字
retries 当消息发送出现错误的时候,系统会重发消息。 retries表示重试次数。 默认是 int 最大值, 2147483647。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。
retry.backoff.ms 两次重试之间的时间间隔,默认是 100ms。
enable.idempotence 是否开启幂等性, 默认 true,开启幂等性。
compression.type 生产者发送的所有数据的压缩方式。 默认是 none,也就是不压缩。支持压缩类型: none、 gzip、 snappy、 lz4 和 zstd。
生产者分区
分区好处
  • 便于合理使用存储资源, 每个Partition在一个Broker上存储, 可以把海量的数据按照分区切割成一
    块一块数据存储在多台Broker上。 合理控制分区的任务, 可以实现负载均衡的效果。
  • 提高并行度, 生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据

在这里插入图片描述

分区策略
老版本

分区原则

发送的数据要封装成一个ProduceRecord对象,该对象中有partition、key、value等属性

  • 指明partition的情况下,直接将指明的值作为partition值
  • 没有指明partition值,但有key值的情况下,将key的hash值与当前topic的partition存活数进行取余得到partition值
  • 既没有指明partition值又没有key值的情况下,采用轮询的方式选取分区。但谁来做第一个分区?kafka采用的机制是:在第一次调用时随机生成一个整数,后面每次调用在这个整数上自增,将这个值与当前topic可用的partition总数取余得到partition值。(由于每次调用都会在这个整数上自增,所以取余后的结果也是自增或等于初始值,也就达到了轮询每个partition的效果)。这就是round-robin算法

源码

//这个方法是默认的分区策略类里的,能进到这个方法,说明肯定没有指定partition
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
   
  			//获取当前topic的partition数目
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        //没有指定key
        if (keyBytes == null) {
   
            int nextValue = this.nextValue(topic);
          //当前topic存活的partition数
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
   
              	//从存活的partition里选取一个partition返回
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return ((PartitionInfo)availablePartitions.get(part)).partition();
            
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/625984
推荐阅读
相关标签
  

闽ICP备14008679号