赞
踩
消息发送的整体流程。生产端主要有两个线程协调运行。这两条线程分别为main线程和sender线程(发送线程)。
拦截器的作用实现消息的定制化(类似于Spring Intercepter)
拦截器的代码定义在生产者代码中,实现ProducerIntercepter接口。
public class ChargingInterceptor implements ProducerInterceptor<String, String> { // 发送消息的时候触发 @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { System.out.println("1分钱1条消息,不管那么多反正先扣钱"); return record; } // 收到服务端的ACK的时候触发 @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { System.out.println("消息被服务端接收啦"); } @Override public void close() { System.out.println("生产者关闭了"); } // 用键值对配置的时候触发 @Override public void configure(Map<String, ?> configs) { System.out.println("configure..."); } }
调用send方法以后,第二步利用指定工具对key和value进行序列化。
public class SerializerProducer { public static void main(String[] args) { Properties props=new Properties(); //pros.put("bootstrap.servers","192.168.44.161:9093,192.168.44.161:9094,192.168.44.161:9095"); props.put("bootstrap.servers","192.168.44.161:9092"); props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer","com.qingshan.serializer.ProtobufSerializer"); // 0 发出去就确认 | 1 leader 落盘就确认| all 所有Follower同步完才确认 props.put("acks","1"); // 异常自动重试次数 props.put("retries",3); // 多少条数据发送一次,默认16K props.put("batch.size",16384); // 批量发送的等待时间 props.put("linger.ms",5); // 客户端缓冲区大小,默认32M,满了也会触发消息发送 props.put("buffer.memory",33554432); // 获取元数据时生产者的阻塞时间,超时后抛出异常 props.put("max.block.ms",3000); Producer<String, User> producer = new KafkaProducer<String,User>(props); /* for (int i =0 ;i<1000000;i++) { producer.send(new ProducerRecord<String,String>("mytopic",Integer.toString(i),Integer.toString(i))); // System.out.println("发送:"+i); }*/ User user = new User(100L,"qingshan",1,"13677778888"); producer.send(new ProducerRecord<String, User>("ser-topic","1",user)); producer.close(); } }
int partition = partition(record,serializedValue,cluster);
一条消息会发送到哪个partition呢?它返回的是一个分区编号,从0开始。
选择分区以后并没有直接发送消息,而是把消息放到消息累加器。
消息累加器本质上是一个ConcurrentMap;
一个partition一个batch,batch满了以后,会唤醒sender线程,发送消息。
生产者发送消息如果发送失败,生产者是不知道的。所以kafka应该要提供一种响应客户端的方式,只有在服务器确认后,生产者才发送下一轮消息,否则要重发数据。
服务端什么时候才算接受成功?因为消息存储在不同的partition副本集里面。
当然,单个partition(leader)写入成功,还是不够可靠。如果有多个副本,follower也要写入成功。
服务端ACK给生产者总体上有两种思路:
1)只要有半数以上的follower节点完成同步。
2)需要所有follower全部完成同步。
kafka选择第二种方案。
那么问题来了,如果有一个follower出了问题,没办法从leader同步数据。按照这个规则,leader会一直等待,无法发送ack。
如何解决?
我们应该把正常和leader保持同步的replica维护起来,放到一个动态set集合里(叫做 ISR)in-sync-replica set。现在只要ISR里面的follower同步完数据之后,就可以给客户端发送ACK。
如果leader挂了,ISR也会重新选举leader。
kafka为客户端提供3种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择相应的配置。
pros.put(“acks”,“1”);
1)acks=0;
producer不等待Broker的ack,这一操作提供了一个最低的延迟,Broker一旦接收到还没有写入磁盘就已经返回,当Broker故障时有可能丢失数据。
2)acks=1;(默认)
producer等待Broker的ack, partition的leader落盘成功后返回ack,如果follower同步成功之前leader宕机,那么数据将会丢失。
3)acks=-1;(all)
producer等待Broker的ack,partition的leader和follower全部落盘成功后才返回ack。
kafka根据partition分区存储,为了防止log不断追加导致过大。一个partition又被分成多个segment来组织数据。
在磁盘上,每个segment由1个log文件和2个index文件组成。
1) .log日志文件 (日志就是数据文件)
kafka根据日志文件大小切分segment。当一个segment写满时,会创建一个新的segment,用最新的offset作为名称。
segment默认是1G,由这个参数控制
log.segment.bytes
kafka为什么不用B+Tree?
kafka是写多,查少。如果kafka用B+树,首先会索引文件很大,大量插入数据带来的B+树的调整会非常消耗性能。
当创建一个分区或者分区增加副本的时候,都要从副本中选举一个新的Leader出来。
比如利用ZK实现。如何利用zk实现选举?zk如何感知节点变化?或者说zk如何实现加锁和释放锁?
3个特点:watcher机制;节点不允许重复写入;临时节点
这样做如果一旦节点增减,就会造成大量的watch事件被触发。
kafka早期版本是这样做的,后来换了一种实现方式。
并不是所有的replica都参与leader选举,而是由其中的一个Broker统一来指挥,这个Broker角色叫做Controller
所有Broker会尝试在zookeeper中创建临时节点选出唯一一个controller,只有一个能创建成功(先到先得)。
controller会从ISR集合中,根据一种规则选举leader。
首先,第一个问题,分布式系统中常见的选举协议有哪些?(或者说共识算法)
ZAB(zookeeper)、Raft(Redis Sentinel)这些都是Paxos算法的变种。它们的思想就是:先到先得,少数服从多数。
但是kafka没有用这些算法,而是自己实现的算法,类似微软的PacificA算法。
为什么kafka不用ZAB、Raft这种协议?因为可能会出现节点不通的情况,出现多个leader,以及惊群效应(大量watch事件被触发)。
leader确定以后,客户端读写就只能操作leader节点。follower需要向leader同步数据。
不通的replica的offset不一样,同步到底怎么同步?
只有leader把消息同步到所有的replica,数据保持一致。这个时候leader才可以消费。
kafka的独特ISR复制,做到了既可以保证数据一致又可以提供高吞吐量。
1)follower故障,会被踢出ISR
2)leader故障,ISR会选举一个新leader
在partition中,消息是不会删除的,所以才可以追加写入,写入的消息连续有序的。
对于一个partition,消费者是怎么做到接着上次消费的位置(offset)继续消费呢?
消费者把offset保存到了Broker,需要消费者上报给Broker。并不是消费者消费了消息,offset就会更新。消费者必须要有一个commit(提交)的动作。就跟RabbitMq的ack一样。
1)第一个问题,多个consumer group和partition的关系:重复消费。
任何一个消费组,都会把topic的所有partition消费瓜分干净。
2)如果5个消费者,消费3个分区。肯定有2个消费者消费不到。(站着听课)
3)如果2个消费者,消费5各分区。
按照范围分配:(默认策略)
轮训策略:
1)顺序读写
随机I/O,顺序I/O。
随机I/O:读写多条数据在磁盘上是分散的,寻址会很耗时。
顺序I/O:读写数据在磁盘上是集中的,不需要重复寻址过程。
kafka的message是不断追加到本地磁盘文件末尾的,而不是随机写入的,这使得kafka写入吞吐量得到显著提升。
内存I/O是不是一定比磁盘I/O快呢?
那可不一定。
图片显示,在一定条件测试下,磁盘的顺序读写可以达到53.2M每秒,比内存的随机读写还要快。
2)零拷贝
操作系统把内存分为两块,一部分是内核空间,一部分是用户空间。这样就可以避免用户进程直接操作内核,保证内核安全。
进程在内核空间可以执行任意命令,调用系统的一切资源;在用户空间必须要通过一些系统接口才能像内核发出指令。
比如kafka消费消息,必须先把数据从磁盘拷贝到内核空间,然后从内核到用户缓冲区,最后才能返回用户。
第二个是DMA拷贝。没有DMA技术的时候,拷贝数据的事情需要cpu亲自去做。如果传输量大的时候就有问题了,cpu就没法干其他事情了。
DMA技术叫做内存直接访问(Direct Memory Access),其实可以理解为CPU给自己找了一个小弟。在进行I/O设备和内存的数据传输的时候,数据搬运工作全部交给DMA控制器,解放了CPU的双手。
在linux 操作系统提供了sendfile函数,可以实现“零拷贝”。这个时候就不需要经过用户缓冲区了,直接把数据拷贝到网卡。
因为这个只有DMA拷贝,没有CPU拷贝,所以叫做“零拷贝”。零拷贝至少可以提供一倍的性能。
kafka文件传输最终调用的是Java NIO库里的transferTo方法。
transferTo实际上最后使用的还是Linux系统的sendfile()系统调用函数。
零拷贝技术大大提升了文件传输的性能。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。