赞
踩
下面以一个Kafka集群中2个Broker举例,创建1个topic包含3个Partition,2 Replication;数据Producer流动如图所示:
1). Producer:消息生产者,就是向 kafka borker 发送消息的客户端
2). Consumer: 消息消费者,向 kafka 取消息的客户端
3). Consumer Group:消费者组,由多个 consumer 组成,消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个消费者消费,消费者组内互不影响
4). Broker:一台 Kafka 服务器就是一个 broker,一个集群多个 broker 组成
5). Topic:可以理解一个队列,生产者消费者面向的都是一个 topic
6). Partition:为实现扩展性,一个非常大的 topic 可以分布到多个 broker上,一个 topic 可以分成多个 partition,每个 partition 是一个有序队列
Kafka 的启动需要依赖 Zookeeper,查看 Zookeper 的启动状态:
bin/zkServer.sh status
1.启动 Kafka:
bin/kafka-server-start.sh -daemon config/server.properties
2.查看 kafka 里的 topic 的数量
bin/kafka-topics.sh --zookeeper zk地址 --list
因为kafka 是注册进 Zookeeper 的,查询 topic 和创建 topic 都需要指定 zookeeper
3.创建一个 topic
bin/kafka-topics.sh --zookeeper zk地址 --create --topic first --partition 3 --replication-factor 2
4.查看一个 topic 的描述
bin/kafka-topics.sh --zookeeper zk地址 --describe --topic first
5.发送消息
bin/kafka-console-producer.sh --topic first --broker-list kafka地址
6.消费消息
bin/kafka-console-consumer.sh --topic first --bootstrap-server kafka地址
7.修改分区的数量 (分区的数量只能增加)
待填充…
1). 何时发送 ack?
确保有 follower 和 leader 同步完成,leader 再发送 ack,这样才能保证 leader 挂掉之后,能在 follower 中选举出新的 leader
2). 多少个 follower 同步完成之后发送 ack ?
半数以上的 follower 同步完成
Kafka 为用户提供了三种可靠性级别,用户根据可靠性和延迟权衡,选择一下的配置:
ack:
0: Producer 不等待 broker 的 ack,这一操作提供一个最低的延迟,broker 一接收到还没有完全写入磁盘就已经返回 ack ,当 broker 故障时就有可能发生 丢失数据
1: Producer 等待 broker 的 ack,partition 的 leader 罗盘成功后返回 ack,如果在 follower 同步成功之前 leader 故障,那么就会发生 丢失数据
-1: Producer 等待 broker 的 ack,partition 的 leader 和 follower 全部罗盘成功后才返回 ack ,但是如果在 follower 同步完成之后, broker 发送 ack 之前,leader 发生故障,会造成 数据重复
Leader 维护了一个动态的 ISR,意为和 leader 保持同步的 follower 集合,当 ISR 中的 follower 完成数据的同步之后, leader 就会给 follower 发送 ack,如果 follower 长时间未向 leader 同步数据,则该 follower 将被踢出 ISR,该时间阈值由 replia.lag.time.max.ms 参数设定。
Leader 发生了故障,就会从 ISR 中选举新的 leader
ack应答机制:对于某些不太重要的数据,对数据的可靠性要求不高,能够容忍数据的少量丢失,所以没有必要等待 ISR 中的 follower 全部接收成功
LEO:每个副本的最后一个 offset
HW:所有副本中最小的 LEO
对于某些比较重要的消息,我们需要保证 exactly once 语义,保证每条消息被发送一次,且仅被发送一次
在 0.11 版本之后, kafka 引入了幂等性机制,配合 acks=-1时的 at least once 语义,实现 producer 到 broker 的 exactly once
使用时:只需将 enable.idempotence 属性设置为 true,kafka 自动将 acks 属性设为 -1
kakfa 是给消息加上id,才保证幂等性的
首先是存在多个 producer 给一个 kafka 发送消息,每个 producer 有一个 PID,每条消息到达 kafka 后,有一个 sequence number (这个 sequence number 会先缓存)。如果 PID+sequence number 在缓存内,这条消息就视为重复,会被丢失
consumer 采用 pull (拉)模式从 broker 中读取数据
因为 push(推)很难适应消费速率不同的消费者,因为消息发送的速率是由 broker 决定
pull 的不足之处是,如果 kafka 没有数据,消费者可能会小茹循环中,一直返回空数据,针对这一点,kafka 的消费者在消费数据是会传入一个时长参数 timeout,如果当前没有数据可供消费, consumer 会等待一段时间再返回,这段时间即为 timeout
一个 consumer group 中有多个 consumer,一个 topic 有多个 partition,必然会涉及到 partition 的分配问题
Kafka 有两种分配策略,一种是 roundrin,一个是 range
策略详讲先欠着…忘了
1). 顺序写磁盘
kafka 的 producer 生产数据,要写入到 log 文件中,写的过程一直追加到末端,为顺序写,顺序写之所以这么快,是因为省去了大量磁头寻址的时间
2). 零拷贝技术
省去文件拷贝到用户应用层那一段时间
kafka 集群中有一个 broker 会被选举成 KafkaController,负责管理集群 broker 的上下线,所有 topic 的分区副本分配和 leader 选举等供
kafka 的 producer 发小消息采用的是异步发送的方式,在消息发送的过程中,涉及两个线程: main 线程 和 sender 线程,以及一个 线程共享变量—RecordAccumulator
main 线程将消息发送给 RecordAccumulator ,Sender 线程不断从 RecordAccumulator 中拉取消息到 Kafka broker
batch.size:只有数据积累到 batch.size 之后,sender 才会发送数据
linger.ms:如果数据迟迟未到达 batch.size,sender 等待 linger.time 之后才会发送数据
Talking is cheap,show my code:
public class CustomeProducer { public static void main(String[] args) { Properties properties=new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.124.9:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); properties.put(ProducerConfig.ACKS_CONFIG,"all"); properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384); properties.put(ProducerConfig.LINGER_MS_CONFIG,1); List<String> interceptors=new ArrayList<String>(); interceptors.add("com..interceptor.CouterInterceptor"); interceptors.add("com..kafka.interceptor.TimeInterceptor"); //指定拦截器(拦截器自己定义的) properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptors); //1.创建一个生产者对象 KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties); //2.调用send方法 for (int i=0;i<100;i++){ producer.send(new ProducerRecord<String, String>("first", i + "", "message_" + i)); } //3.关闭生产者 producer.close(); } }
consumer消费数据时的可靠性是很容日保证的,因为数据在 Kafka 中是持久化的,故不用担心数据丢失问题
consumer 在消费过程中可能会出现宕机等故障, consumer 恢复后,需要重故障前的位置继续消费, 所以 consumer 需要实时记录自己消费到了哪个 offset
offset 的维护是 consumer 消费数据时必须考虑的问题
(采用方式)
为了使我们专注自己业务逻辑,kafka 提供自动提交 offset 的功能
enable.auto.commit:是否开启自动提交 offset 功能
auto.commit.interval.ms: 自动提交 offset 的时间间隔
Talking is cheap,show my code:
public class CustomConsumer { public static void main(String[] args) { Properties properties=new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.124.9:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName()); //消费者组的 id 只要group.id相同,就属于同一个消费者组 properties.put(ConsumerConfig.GROUP_ID_CONFIG,"1205"); //自动提交offset properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); //1.创建一个消费者 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); //指定消费哪个主题 consumer.subscribe(Arrays.asList("first")); //2.调用消费者的 poll(轮询) 方法 while (true){ ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record:records){ System.out.println("topic="+ record.topic()+ " offset="+record.offset()+" value="+record.value()); } //同步提交offset // consumer.commitAsync(); //异步提交offset consumer.commitSync(); /** * 两者的区别就是 同步提交是有失败重试机制 ,异步提交没有 * 一般用异步提交的方式,这样响应会快点 */ } } }
对于 Producer 而言,interceptor 使得用户在消息发送钱以及 Producer 回调逻辑前有机会对消息做一些定制化需求
定义的方法包括:
1). configure (configs)
获取配置信息和初始化数据时调用
2). onsend (ProducerRecord)
该方法封装进 KafkaProducer.send 方法中,即它运行在用户主线程中,Producer 确保在消息被序列化以及计算分区前调用该方法,用户可以对消息做任何处理,但是最好保证不要修改消息所属 topic 和分区
3). onAcknowledgement (RecordMetadata ,Excepetion)
该方法会在消息从 RecordAccumulator 成功发送到 Kafka Broker 之后,或者消息在发送过程中失败调用, 通常在 Producer 回调之前
4). close
关闭 Interceptor,主要用于执行一些资源清理工作
下面来写两个 Kafka Interceprot
计数拦截器:
/** * @author jarvis * @date 2020/8/17 0017 9:10 * * Kafka拦截器 * 统计它发送消息到kafka 成功、失败的次数 */ public class CouterInterceptor implements ProducerInterceptor<String,String> { private long successNum=0L; private long errorNum=0L; public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { return record; } public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { if(e==null){ successNum++; }else { errorNum++; } } //这个方法,是在 producer生产完消息过后,调用的那个close方法才调用 public void close() { System.out.println("successNum"+successNum); System.out.println("errorNum"+errorNum); } public void configure(Map<String, ?> map) { } }
给消息内容加上时间拦截器:
public class TimeInterceptor implements ProducerInterceptor<String,String> { public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { return new ProducerRecord<String, String>(record.topic(),record.partition(),record.timestamp(),record.key(),System.currentTimeMillis()+ record.value(),record.headers()); } public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { } public void close() { } public void configure(Map<String, ?> map) { } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。