赞
踩
拦截器作用是实现消息的定制化
ByteBufferSerialize
BytesSerializer
DoubleSerializer
ExtendwdSerializer
FloatSerializer
IntegerSerializer
LongSerializer
ShortSerializer
StringSerializer
UUIDSerializer
除了自带的序列化工具之外,可以使用如Avro、JSON、Thrift. Protobuf等
1、 指定了 partition;
2、 没有指定partition,自定义了分区器;
3、 没有指定partition,没有自定义分区器,但是key不为空;
4、 没有指定partition,没有自定义分区器,但是key是空的
选择分区以后并没有直接发送消息,而是把消息放入了消息累加器
—个partition —个Batch。batch满了之后,会唤醒Sender线程,发送消息
单个partition (leader)写入成功,还是不够可靠,如果有多个副本,follower 也要写入成功才可以
所有的follower全部完成同步,才发送ACK给客户端,延迟相对来说高 一些,但是节点挂掉的影响相对来说
小一些,因为所有的节点数据都是完整的。把那些正常和leader保持同步的replica维护起来,放到一个动态
set里 面/这个就叫做in-sync replica set (ISR)。现在只要ISR里面的follower同步完数据 之后,我就
给客户端发送ACK。
参数 replica.lag.time.max.ms决定剔除ISR的时间,默认30s
1、acks=0: producer不等待broker的ack,这一操作提供了一个最低的延迟,broker—接收到还没有写入磁盘
就已经返回,当broker故障时有可能丢失数据;
2、acks= 1 (默认):producer 等待 broker 的 ack, partition 的 leader 落盘成功后返回ack,如果
在follower同步成功之前leader故障,那么将会丢失数据;
3、acks=-1 (all) : producer 等待 broker 的 ack, partition 的 Ieader 和 follower全部落盘
成功后才返回ack
三种机制,性能依次递减(producer吞吐量降低),数据健壮性则依次递增
配置文件:config/server.properties
log.dir配置:默认/tmp/kafka-logs
为了实现横向扩展,把不同的数据存放在不同的Broker 同时降低单台服务器的访问压力,我们把一个
topic中的数据分隔成多个partition。一个partition中的消息是有序的,顺序写入,但是全局不一定有序。
在服务器上,每个partition都有一个物理目录,topic名字后面的数字标号即代表 分区
为了提高分区的可靠性,kafka又设计了副本机制。
创建topic的时候,通过指定replication-factor确定topic的副本数。
注意:副本数必须小于等于节点数,而不能大于Broker的数量,否则会报错。
1、firt of all,副本因子不能大于Broker的个数
2、第一个分区(编号为0的分区)的第一个副本放置位置是随机从brokerList选择的(Broker2的副本)
3、其他分区的第一个副本放置位置相对于第0个分区依次往后移。
为了防止log不断追加导致文件过大,导致检索消息效率变低,一个partition又被划分成多个segment来
组织数据(MySQL也有segment的逻辑概念,叶子节点就是 数据段,非叶子节点就是索引段)
1、日志删除是 通过定时任务实现的。默认5分钟执行一次,看看有没有需要删除的数据
log.retention.checkjnterval.ms=300000
2、log.retention.hours默认值是168个小时(一周),也就是时间戳超过一周的数据才会删除。
3、log.retention.minutes默认值是空。它的优先级比小时高,如果配置了则用这个
4、log.retention.ms默认值是空。它的优先级比分钟高,如果配置了则用这个。
5、log.retention.bytes默认值是-1,代表不限制大小,想写多少就写多少。log.retention.bytes指的是
所有日志文件的总大小。
6、log.segment.bytes可以对单个segment文件大小进行限制,默认值1073741824字节(1G)
当有了这些key相同的value不同的消息的时候,存储空间就被浪费了。压缩就是 把相同的key合并为最后
一个value
Kafka不是所有的repalica都参与leader选举,而是由其中的一个Broker统一来指挥, 这这个Broker的
角色就叫做Controller (控制器)
控制器作用
•监听Broker变化。
•监听Topic变化。
•监听Partition变化。
•获取和管理Broker. Topic、Partition的信息。
•管理Partiontion的主从信息
提到kafka的选举实现,最相近的是微软的PacificA算法,在这种算法中,默认是让ISR中第一个replica
变成leader
1、follower节点会向Leader发送一个fetch请求,leader向follower发送数据 后,既需要更新follower的LEO。
2、follower接收到数据响应后,依次写入消息并且更新LEO。
3、leader 更新(SR 最小的 LEO)。
kafka设计了独特的ISR复制,可以在保障数据一致性情况下又可提供高吞吐量。
首先follower发生故障,会被先踢出ISR
首先选一个leader,消息截取
放在一 特殊的topic中,名字叫_consumer_offsets,默认 有 50 个分区(
offsets.topic.num.partitions 默认是 50),每个分区默认一个 replication。
1、默认值是latest,也就是从最新的消息(最后发送的)开始消费的。历史消费是不能 消费的
2、earliest代表从最早的(最先发送的)消息开始消费。可以消费到历史消息
3、none,如果consumer group在服务端扌戈不到offset会报错。
1、enable.auto.commit默认是trueo true代表消费者消费消息以后自动提交此时Broker会更新消费者组
的offset
2、auto.commit.interval.ms默认是5秒钟,控制自动提交的频率
3、如果我们要在消费完消息做完业务逻辑处理之后才commit,就要把这个值改成false。如果是false,消费者
就必须要调用一个方法让Broker更新offset。
有两种方式:
consumer.commitSyncO的手动同步提交。
consumer. commitAsyncO 手动异步提交
1 顺序读写
顺序IO比随机IO读写快
2 索引
稀疏索引
3 批量读写和文件压缩
4 零拷贝
DMA直接内存访问,从磁盘读取到内核之后,直接发到网卡
1、 producer 端使用 producer.send(msg, callback)带有回调的send方法,而不是
producer.send(msg)方法。根据回调,一旦出现消息提交失败的情况,就可以有针对性地进行处理。
2、设置acks=all。acks是Producer的一个参数,代表"已提交"消息的定义。 如果设置成all,则表明所有
Broker都要接收到消息,该消息才算是”已提交”。
3、设置retries为一个较大的值。同样是Producer的参数。当出现网络抖动时,消 息发送可能会失败,此时
配置了retries的Producer能够自动重试发送消息,尽量避免消息丢失。
4、设置 unclean.leader.election.enable = false
5、设置replication.factor >= 3。需要三个以上的副本。
6、设置minJnsync.replicas > 1。 Broker端参数,控制消息至少要被写入到多少 个副本才算是”已提
交” 。设置成大于1,可以提升消息持久性。在生产环境中不要使用 默认值1。确保
replication.factor > min.insync.replicas。如果两者相等,那么只要有 一个副本离线,整个分区
就无 法正常工作了。推荐设置成replication.factor = min.insync.replicas + 1。
7、确保消息消费完成再提交。Consumer端有个参数enable.auto.commit,最好 设置成false,并自己来处
理offset的提交更新。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。