当前位置:   article > 正文

kafka原理2_kafka 与zab协议有关么

kafka 与zab协议有关么

生产者原理

消息发送的整体流程。生产端主要有两个线程协调运行。这两条线程分别为main线程和sender线程(发送线程)。

拦截器

拦截器的作用实现消息的定制化(类似于Spring Intercepter)
拦截器的代码定义在生产者代码中,实现ProducerIntercepter接口。

public class ChargingInterceptor implements ProducerInterceptor<String, String> {
    // 发送消息的时候触发
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        System.out.println("1分钱1条消息,不管那么多反正先扣钱");
        return record;
    }

    // 收到服务端的ACK的时候触发
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        System.out.println("消息被服务端接收啦");
    }

    @Override
    public void close() {
        System.out.println("生产者关闭了");
    }

    // 用键值对配置的时候触发
    @Override
    public void configure(Map<String, ?> configs) {
        System.out.println("configure...");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

序列化

调用send方法以后,第二步利用指定工具对key和value进行序列化。

public class SerializerProducer {
    public static void main(String[] args) {
        Properties props=new Properties();
        //pros.put("bootstrap.servers","192.168.44.161:9093,192.168.44.161:9094,192.168.44.161:9095");
        props.put("bootstrap.servers","192.168.44.161:9092");
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer","com.qingshan.serializer.ProtobufSerializer");
        // 0 发出去就确认 | 1 leader 落盘就确认| all 所有Follower同步完才确认
        props.put("acks","1");
        // 异常自动重试次数
        props.put("retries",3);
        // 多少条数据发送一次,默认16K
        props.put("batch.size",16384);
        // 批量发送的等待时间
        props.put("linger.ms",5);
        // 客户端缓冲区大小,默认32M,满了也会触发消息发送
        props.put("buffer.memory",33554432);
        // 获取元数据时生产者的阻塞时间,超时后抛出异常
        props.put("max.block.ms",3000);
        Producer<String, User> producer = new KafkaProducer<String,User>(props);

/*        for (int i =0 ;i<1000000;i++) {
            producer.send(new ProducerRecord<String,String>("mytopic",Integer.toString(i),Integer.toString(i)));
            // System.out.println("发送:"+i);
        }*/

        User user = new User(100L,"qingshan",1,"13677778888");
        producer.send(new ProducerRecord<String, User>("ser-topic","1",user));
        producer.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

路由指定

int partition = partition(record,serializedValue,cluster);

一条消息会发送到哪个partition呢?它返回的是一个分区编号,从0开始。

消息累加器

选择分区以后并没有直接发送消息,而是把消息放到消息累加器。
消息累加器本质上是一个ConcurrentMap;
一个partition一个batch,batch满了以后,会唤醒sender线程,发送消息。

数据可靠性保证ACK

生产者发送消息如果发送失败,生产者是不知道的。所以kafka应该要提供一种响应客户端的方式,只有在服务器确认后,生产者才发送下一轮消息,否则要重发数据。
服务端什么时候才算接受成功?因为消息存储在不同的partition副本集里面。
在这里插入图片描述
当然,单个partition(leader)写入成功,还是不够可靠。如果有多个副本,follower也要写入成功。

服务端ACK给生产者总体上有两种思路:
1)只要有半数以上的follower节点完成同步。
2)需要所有follower全部完成同步。
kafka选择第二种方案。
那么问题来了,如果有一个follower出了问题,没办法从leader同步数据。按照这个规则,leader会一直等待,无法发送ack。
如何解决?
我们应该把正常和leader保持同步的replica维护起来,放到一个动态set集合里(叫做 ISR)in-sync-replica set。现在只要ISR里面的follower同步完数据之后,就可以给客户端发送ACK。
如果leader挂了,ISR也会重新选举leader。

ACK应答机制

kafka为客户端提供3种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择相应的配置。

pros.put(“acks”,“1”);

1)acks=0;
producer不等待Broker的ack,这一操作提供了一个最低的延迟,Broker一旦接收到还没有写入磁盘就已经返回,当Broker故障时有可能丢失数据。
2)acks=1;(默认)
producer等待Broker的ack, partition的leader落盘成功后返回ack,如果follower同步成功之前leader宕机,那么数据将会丢失。
3)acks=-1;(all)
producer等待Broker的ack,partition的leader和follower全部落盘成功后才返回ack。

kafka Broker存储原理

kafka根据partition分区存储,为了防止log不断追加导致过大。一个partition又被分成多个segment来组织数据。
在磁盘上,每个segment由1个log文件和2个index文件组成。
在这里插入图片描述
1) .log日志文件 (日志就是数据文件)
kafka根据日志文件大小切分segment。当一个segment写满时,会创建一个新的segment,用最新的offset作为名称。
segment默认是1G,由这个参数控制

log.segment.bytes

  1. index索引
    kafka并不是为每条数据都建立索引,而是一种稀疏索引
    也就是说隔几条消息才产生一个索引记录。
    在这里插入图片描述
    3)时间戳索引
    为什么要有时间戳索引?
    1、如果要基于时间切分日志文件,必须要有记录时间戳
    2、如果要基于时间清理消息,必须要有时间戳。

存储总结

kafka为什么不用B+Tree?
kafka是写多,查少。如果kafka用B+树,首先会索引文件很大,大量插入数据带来的B+树的调整会非常消耗性能。
在这里插入图片描述

高可用架构

当创建一个分区或者分区增加副本的时候,都要从副本中选举一个新的Leader出来。
比如利用ZK实现。如何利用zk实现选举?zk如何感知节点变化?或者说zk如何实现加锁和释放锁?
3个特点:watcher机制;节点不允许重复写入;临时节点
这样做如果一旦节点增减,就会造成大量的watch事件被触发。
kafka早期版本是这样做的,后来换了一种实现方式。

并不是所有的replica都参与leader选举,而是由其中的一个Broker统一来指挥,这个Broker角色叫做Controller
  • 1

所有Broker会尝试在zookeeper中创建临时节点选出唯一一个controller,只有一个能创建成功(先到先得)。

controller选举

controller会从ISR集合中,根据一种规则选举leader。
首先,第一个问题,分布式系统中常见的选举协议有哪些?(或者说共识算法)
ZAB(zookeeper)、Raft(Redis Sentinel)这些都是Paxos算法的变种。它们的思想就是:先到先得,少数服从多数。
但是kafka没有用这些算法,而是自己实现的算法,类似微软的PacificA算法。
为什么kafka不用ZAB、Raft这种协议?因为可能会出现节点不通的情况,出现多个leader,以及惊群效应(大量watch事件被触发)。

主从同步

leader确定以后,客户端读写就只能操作leader节点。follower需要向leader同步数据。
不通的replica的offset不一样,同步到底怎么同步?
只有leader把消息同步到所有的replica数据保持一致。这个时候leader才可以消费
kafka的独特ISR复制,做到了既可以保证数据一致又可以提供高吞吐量。

follower故障

1)follower故障,会被踢出ISR
2)leader故障,ISR会选举一个新leader

kafka消费者原理

在partition中,消息是不会删除的,所以才可以追加写入,写入的消息连续有序的。
对于一个partition,消费者是怎么做到接着上次消费的位置(offset)继续消费呢?
消费者把offset保存到了Broker,需要消费者上报给Broker。并不是消费者消费了消息,offset就会更新。消费者必须要有一个commit(提交)的动作。就跟RabbitMq的ack一样。

消费策略

1)第一个问题,多个consumer group和partition的关系:重复消费。
任何一个消费组,都会把topic的所有partition消费瓜分干净。
2)如果5个消费者,消费3个分区。肯定有2个消费者消费不到。(站着听课)
3)如果2个消费者,消费5各分区。
按照范围分配:(默认策略)
在这里插入图片描述
轮训策略:
在这里插入图片描述

kafka为什么这么快

1)顺序读写
随机I/O,顺序I/O。
随机I/O:读写多条数据在磁盘上是分散的,寻址会很耗时。
顺序I/O:读写数据在磁盘上是集中的,不需要重复寻址过程。
kafka的message是不断追加到本地磁盘文件末尾的,而不是随机写入的,这使得kafka写入吞吐量得到显著提升。

内存I/O是不是一定比磁盘I/O快呢?
那可不一定。
在这里插入图片描述
图片显示,在一定条件测试下,磁盘的顺序读写可以达到53.2M每秒,比内存的随机读写还要快。

2)零拷贝
操作系统把内存分为两块,一部分是内核空间,一部分是用户空间。这样就可以避免用户进程直接操作内核,保证内核安全。
进程在内核空间可以执行任意命令,调用系统的一切资源;在用户空间必须要通过一些系统接口才能像内核发出指令。

比如kafka消费消息,必须先把数据从磁盘拷贝到内核空间,然后从内核到用户缓冲区,最后才能返回用户。
在这里插入图片描述
第二个是DMA拷贝。没有DMA技术的时候,拷贝数据的事情需要cpu亲自去做。如果传输量大的时候就有问题了,cpu就没法干其他事情了。
DMA技术叫做内存直接访问(Direct Memory Access),其实可以理解为CPU给自己找了一个小弟。在进行I/O设备和内存的数据传输的时候,数据搬运工作全部交给DMA控制器,解放了CPU的双手。
在这里插入图片描述
在linux 操作系统提供了sendfile函数,可以实现“零拷贝”。这个时候就不需要经过用户缓冲区了,直接把数据拷贝到网卡。
因为这个只有DMA拷贝,没有CPU拷贝,所以叫做“零拷贝”。零拷贝至少可以提供一倍的性能。
kafka文件传输最终调用的是Java NIO库里的transferTo方法。
transferTo实际上最后使用的还是Linux系统的sendfile()系统调用函数。
零拷贝技术大大提升了文件传输的性能。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/142616
推荐阅读
相关标签
  

闽ICP备14008679号