赞
踩
offset顾名思义,即偏移量,我们知道消息从生产者发送到kafka的topic之后,是进入到不同的分区,在consumer未对消息进行消费之前,消息是有序存储在各个分区中;
在之前我们了解了kafka的消费者原理之后,提出这样一个疑问,kafka怎么知道某个消费组中的消费者消费消息的进度呢?
1、从0.9版本开始,consumer默认将offset保存在Kafka ,一个内置的topic中,该topic为__consumer_offsets;
2、 Kafka0.9版本之前,consumer默认将offset保存在Zookeeper中;
这也就是说,kafka是通过 offset这个值来管理消费组消费进度的,下面是一张关于kafka的offset的原理图;
关于offset做下面几点补充:
默认情况下,保存offset数据的系统主题是看不到的,为了查看该系统主题数据,要将下面这个参数修改为false
exclude.internal.topics=false【在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false】
为了使我们能够专注于自己的业务逻辑, Kafka 提供了自动提交 offset的功能,自动提交 offset 的相关参数:
1、enable.auto.commit : 是否开启自动提交 offset 功能,默认是 true;
默认值为 true ,消费者会自动周期性地向服务器提交偏移量
2、auto.commit.interval.ms : 自动提交 offset 的时间间隔,默认是 5s;
如果设置了 enable.auto.commit 的值为 true , 则该值定义了消 费者偏移量向 Kafka 提交的频率, 默认 5s
代码展示
producer 端代码
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class OffsetProducer1 {
public static void main(String[] args) throws Exception {
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 3. 创建 kafka 生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
System.out.println("开始发送数据");
// 4. 调用 send 方法,发送消息
for (int i = 0; i < 15; i++) {
kafkaProducer.send(new ProducerRecord<>("zcy234","congge " + i));
}
// 5. 关闭资源
kafkaProducer.close();
}
}
consumer 端代码
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Properties;
public class OffsetConsumer1 {
public static void main(String[] args) {
// 1. 创建 kafka 消费者配置类
Properties properties = new Properties();
// 2. 添加配置参数
// 添加连接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");
// 配置序列化 必须
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group2");
// 是否自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// 修改 提交 offset 的时间周期 1000ms,默认情况下为 5s
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
//3. 创建 kafka 消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
//4. 设置消费主题 形参是列表
consumer.subscribe(Arrays.asList("zcy234"));
System.out.println("准备开始消费数据");
//5. 消费数据
while (true){
// 读取消息
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
// 输出消息
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.value());
}
}
}
}
核心的代码即添加下面这两行配置
// 是否自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);// 修改 提交 offset 的时间周期 1000ms,默认情况下为 5s
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
运行上面的程序,效果上面和之前差不多,
虽然自动提交 offset 十分简单便利,但由于其是基于时间提交的,开发人员难以把握 offset 提交的时机。因 此Kafka 还提供了手动提交 offset 的 API,关于手动提交offset,做如下几点说明:
commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据;
commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了;
同步提交 offset
由于同步提交 offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提
交的效率比较低。
下面看同步提交offset的consumer的完整代码:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class SyncConsumer1 {
public static void main(String[] args) {
// 1. 创建 kafka 消费者配置类
Properties properties = new Properties();
// 2. 添加配置参数
// 添加连接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");
// 配置序列化 必须
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group3");
// 是否自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// 修改 提交 offset 的时间周期 1000ms,默认情况下为 5s
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
//3. 创建 kafka 消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// 是否自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
//3. 创建 kafka 消费者
//4. 设置消费主题 形参是列表
consumer.subscribe(Arrays.asList("zcy234"));
System.out.println("准备开始消费数据");
//5. 消费数据
while (true){
// 读取消息
ConsumerRecords<String, String> consumerRecords =
consumer.poll(Duration.ofSeconds(1));
// 输出消息
for (ConsumerRecord<String, String> consumerRecord :
consumerRecords) {
System.out.println(consumerRecord.value());
}
// 同步提交 offset
consumer.commitSync();
}
}
}
仍然使用上面的producer向zcy234这个topic发送几条消息,观察消费端控制台输出情况,仍然可以正常消费到消息;
异步提交 offset
虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此 吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。
下面是完整的代码
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class AsyncConsumer1 {
public static void main(String[] args) {
// 1. 创建 kafka 消费者配置类
Properties properties = new Properties();
// 2. 添加配置参数
// 添加连接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");
// 配置序列化 必须
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// 配置消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group5");
// 是否自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
//3. 创建 Kafka 消费者
KafkaConsumer<String, String> consumer = new
KafkaConsumer<>(properties);
//4. 设置消费主题 形参是列表
consumer.subscribe(Arrays.asList("zcy234"));
//5. 消费数据
while (true) {
// 读取消息
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
// 输出消息
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.value());
}
// 异步提交 offset
consumer.commitAsync();
}
}
}
仍然使用上面的producer向zcy234这个topic发送几条消息,观察消费端控制台输出情况,仍然可以正常消费到消息;
指定 Offset 消费
kafka中消费者在消费数据时的offset的机制有3种,默认情况下为latest,即从最近的那一次的位置开始消费;
auto.offset.reset = earliest | latest | none 默认是 latest
当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量
时(例如该数据已被删除),该怎么办?
1、 earliest :自动将偏移量重置为最早的偏移量, --from-beginning;
2、latest (默认值) :自动将偏移量重置为最新偏移量;
3、none :如果未找到消费者组的先前偏移量,则向消费者抛出异常;
于是在实际业务中可能会遇到这么一种场景,即新的消费者并不想消费最早的那一批消息,而是指定从某个offset位置开始消费;
下面看具体的consumer端代码:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
public class SpecialOffsetConsumer1 {
public static void main(String[] args) {
// 0 配置信息
Properties properties = new Properties();
// 连接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");
// key value 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group6");
// 1 创建一个消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 2 订阅一个主题
ArrayList<String> topics = new ArrayList<>();
topics.add("zcy234");
kafkaConsumer.subscribe(topics);
Set<TopicPartition> assignment = new HashSet<>();
// 获取消费者分区分配信息(有了分区分配信息才能开始消费),避免开始消费的时候分区信息还未就绪
while (assignment.size() == 0) {
kafkaConsumer.poll(Duration.ofSeconds(1));
assignment = kafkaConsumer.assignment();
}
// 遍历所有分区,并指定 offset 从 5 的位置开始消费
for (TopicPartition tp : assignment) {
kafkaConsumer.seek(tp, 5);
}
System.out.println("准备开始消费数据");
// 3 消费该主题数据
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
}
运行这段代码,然后再次使用上面的producer发送消息,观察控制台输出效果,可以看到,数据消费的offset的位置从5开始
指定时间消费
需求:在生产环境中,比如说遇到最近消费的某一段时间的数据有异常,想重新按照时间消费?或者要求按照时间消费前一天的数据,怎么处理?
下面看具体的代码处理
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
public class SpecialTimeConsumer1 {
public static void main(String[] args) {
// 0 配置信息
Properties properties = new Properties();
// 连接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "101.34.23.80:9092");
// key value 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group7");
// 创建一个消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 订阅主题
ArrayList<String> topics = new ArrayList<>();
topics.add("zcy234");
kafkaConsumer.subscribe(topics);
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0) {
kafkaConsumer.poll(Duration.ofSeconds(1));
// 获取消费者分区分配信息(有了分区分配信息才能开始消费)
assignment = kafkaConsumer.assignment();
}
HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();
// 封装集合存储,每个分区对应一天前的数据
for (TopicPartition topicPartition : assignment) {
//用当前时间减去业务上需要回退的时间,比如这里想重新消费24个小时之前的数据
timestampToSearch.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
}
// 获取从 1 天前开始消费的每个分区的 offset
Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(timestampToSearch);
// 遍历每个分区,对每个分区设置消费时间。
for (TopicPartition topicPartition : assignment) {
OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition);
// 根据时间指定开始消费的位置
if (offsetAndTimestamp != null) {
kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());
}
}
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。