赞
踩
Kafka0.9版本之前,consumer默认将offset保存在Zookeeper中。从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为__consumer_offsets
__consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic+分区号,value 就是当前 offset 的值。每隔一段时间,kafka 内部会对这个 topic 进行compact,也就是每个 group.id+topic+分区号就保留最新数据。
消费 offset 案例:
__consumer_offsets 为 Kafka 中的 topic,那就可以通过消费者进行消费。但是需要在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false,默认是 true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false。
① 创建一个主题topic
[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --create --partitions 1 --replication-factor 3 --topic topic02
② 启动消费者消费数据
# 指定消费者组
[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-console-consumer.sh --bootstrap-server hadoop101:9092 --topic topic02 --group test1 --from-beginning
hello,kafka
hello,zhangsan
hello,lisi
hello
hello,wangwu
hello,haha
③ 启动生产者生产数据
[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-console-producer.sh --broker-list hadoop101:9092 --topic topic02
>hello,kafka
>hello,zhangsan
>hello,lisi
>hello
>hello,wangwu
>hello,haha
>
④ 查看消费者消费主题__consumer_offsets
[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-console-consumer.sh --bootstrap-server hadoop101:9092 --topic __consumer_offsets --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
# key 是 group.id+topic+分区号,value 就是当前 offset 的值
[test1,topic02,0]::OffsetAndMetadata(offset=5, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1670144851463, expireTimestamp=None)
[test1,topic02,0]::OffsetAndMetadata(offset=6, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1670144891473, expireTimestamp=None)
为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。
自动提交offset的相关参数:
enable.auto.commit:是否开启自动提交offset功能,默认是true
auto.commit.interval.ms:自动提交offset的时间间隔,默认是5s
消费者自动提交 offset :
① 创建一个主题topic
[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --create --partitions 1 --replication-factor 3 --topic nini
② 启动消费者消费数据
public class CustomConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 配置分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");
// 创建消费者组,组名任意起名都可以
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-1");
// 自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
// 提交时间间隔,默认为5s,修改为1s
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
// 创建消费者
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
// 订阅主题
ArrayList<String> topics = new ArrayList<>();
topics.add("nini");
consumer.subscribe(topics);
// 消费数据
while (true){
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println("分区:"+consumerRecord.partition()+",消息:"+consumerRecord.value());
}
}
}
}
③ 启动生产者生产数据
public class CustomProducerCallbackPartitions {
public static void main(String[] args) throws InterruptedException {
// kafka生产者属性配置
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 添加自定义分区器
// properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.hh.producer.MyPartitioner");
// kafka生产者
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
for(int i=0;i<5;i++){
kafkaProducer.send(new ProducerRecord<>("nini" ,"hello,kafka"), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
if(exception==null){
// 消息发送成功
System.out.println("主题"+recordMetadata.topic()+",发往的分区:"+recordMetadata.partition());
}else{
// 消息发送失败
exception.printStackTrace();
}
}
});
Thread.sleep(2);
}
// 关闭资源
kafkaProducer.close();
}
}
④ 查看自动提交的 Offset:
# 每隔1秒提交一次offet
[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-console-consumer.sh --bootstrap-server hadoop101:9092 --topic __consumer_offsets --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
[group-1,nini,0]::OffsetAndMetadata(offset=5, leaderEpoch=Optional[0], metadata=, commitTimestamp=1670146009308, expireTimestamp=None)
[group-1,nini,0]::OffsetAndMetadata(offset=5, leaderEpoch=Optional[0], metadata=, commitTimestamp=1670146010308, expireTimestamp=None)
[group-1,nini,0]::OffsetAndMetadata(offset=5, leaderEpoch=Optional[0], metadata=, commitTimestamp=1670146011309, expireTimestamp=None)
[group-1,nini,0]::OffsetAndMetadata(offset=5, leaderEpoch=Optional[0], metadata=, commitTimestamp=1670146012310, expireTimestamp=None)
[group-1,nini,0]::OffsetAndMetadata(offset=5, leaderEpoch=Optional[0], metadata=, commitTimestamp=1670146013311, expireTimestamp=None)
虽然自动提交offset十分简单便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因此Kafka还提供了手动提交offset的API。
手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交):
commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。
commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。
两者的相同点是,都会将本次提交的一批数据最
高的偏移量提交;不同点是,同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败。
由于同步提交 offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提交的效率比较低。
public class CustomConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 配置分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");
// 创建消费者组,组名任意起名都可以
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-1");
// 自动提交关闭
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
// 创建消费者
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
// 订阅主题
ArrayList<String> topics = new ArrayList<>();
topics.add("nini");
consumer.subscribe(topics);
// 消费数据
while (true){
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println("分区:"+consumerRecord.partition()+",消息:"+consumerRecord.value());
}
// 手动同步提交offset
consumer.commitSync();
}
}
}
虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。
public class CustomConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 配置分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");
// 创建消费者组,组名任意起名都可以
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-1");
// 自动提交关闭
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
// 创建消费者
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
// 订阅主题
ArrayList<String> topics = new ArrayList<>();
topics.add("nini");
consumer.subscribe(topics);
// 消费数据
while (true){
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println("分区:"+consumerRecord.partition()+",消息:"+consumerRecord.value());
}
// 手动同步提交offset
consumer.commitAsync();
}
}
}
auto.offset.reset = earliest | latest | none 默认是 latest
当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办?
earliest:自动将偏移量重置为最早的偏移量,–from-beginning
latest(默认值):自动将偏移量重置为最新偏移量。
none:如果未找到消费者组的先前偏移量,则向消费者抛出异常
任意指定 offset 位移开始消费:
① 查看消费者消费主题__consumer_offsets
[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-console-consumer.sh --bootstrap-server hadoop101:9092 --topic __consumer_offsets --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
[group-3,nini,0]::OffsetAndMetadata(offset=60, leaderEpoch=Optional[0], metadata=, commitTimestamp=1670149746455, expireTimestamp=None)
可以看到消费者组为group-3,主题为 nini 的消费Offset = 60,所以我们可以指定消费者从Offset=55开始消费:
public class CustomConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 创建消费者组,组名任意起名都可以
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-3");
// 创建消费者
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
// 订阅主题
ArrayList<String> topics = new ArrayList<>();
topics.add("nini");
consumer.subscribe(topics);
// 获取所有的分区信息
Set<TopicPartition> assignment= new HashSet<>();
while (assignment.size() == 0) {
consumer.poll(Duration.ofSeconds(1));
// 获取消费者分区分配信息(有了分区分配信息才能开始消费)
assignment = consumer.assignment();
}
// 遍历所有分区,并指定 offset 从55的位置开始消费
for (TopicPartition tp: assignment) {
consumer.seek(tp, 55);
}
// 消费数据
while (true){
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println("分区:"+consumerRecord.partition()+",消息:"+consumerRecord.value());
}
}
}
}
需求:在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。例如要求按照时间消费前一天的数据,怎么处理?
public class CustomConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 创建消费者组,组名任意起名都可以
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-3");
// 创建消费者
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
// 订阅主题
ArrayList<String> topics = new ArrayList<>();
topics.add("nini");
kafkaConsumer.subscribe(topics);
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0) {
kafkaConsumer.poll(Duration.ofSeconds(1));
// 获取消费者分区分配信息(有了分区分配信息才能开始消费)
assignment = kafkaConsumer.assignment();
}
HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();
// 封装集合存储,每个分区对应一天前的数据
for (TopicPartition topicPartition : assignment) {
timestampToSearch.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
}
// 获取从 1 天前开始消费的每个分区的 offset
Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(timestampToSearch);
// 遍历每个分区,对每个分区设置消费时间。
for (TopicPartition topicPartition : assignment) {
OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition);
// 根据时间指定开始消费的位置
if (offsetAndTimestamp != null){
kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());
}
}
// 消费数据
while (true){
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println("分区:"+consumerRecord.partition()+",消息:"+consumerRecord.value());
}
}
}
}
重复消费:已经消费了数据,但是 offset 没提交。
漏消费:先提交 offset 后消费,有可能会造成数据的漏消费。
① 场景1:重复消费,自动提交Offset引起的
② 场景1:漏消费。设置offset为手动提交,当offset被提交时,数据还在内存中未落盘,此时刚好消费者线程被kill掉,那么offset已经提交,但是数据未处理,导致这部分内存中的数据丢失。
思考:怎么能做到既不漏消费也不重复消费呢?需要使用消费者事务。
如果想完成Consumer端的精准一次性消费,那么需要Kafka消费端将消费过程和提交offset过程做原子绑定 。 此 时我们需要将 Kafka 的 offset 保存到支持事务的自定义介质( 比如MySQL)。
如果是Kafka消费能力不足,则可以考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数 = 分区数。(两者缺一不可)
如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间 < 生产速度),使处理的数据小于生产的数据,也会造成数据积压。从一次最多拉取500条,调整为一次最多拉取1000条。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。