赞
踩
今天深入分析kafka的消费者配置原理
一、消费者入门:
1、消费者群组:
但是,当我们增加更多的消费者,超过了主题的分区数量,就会有一部分的消费者被闲置,不会接收到任何消息。 往消费者群组里增加消费者是进行横向伸缩能力的主要方式。所以我们有必要为主题设定合适规模的分区,在负载均衡的时候可以加入更多的消费者。但是要记住,一个群组里消费者数量超过了主题的分区数量,多出来的消费者是没有用处的。 如果是多个应用程序,需要从同一个主题中读取数据,只要保证每个应用程序有自己的消费者群组就行了。
2、消费者配置:
消费者有很多属性可以设置,大部分都有合理的默认值,无需调整。有些参数可能对内存使用,性能和可靠性方面有较大影响。可以参考如下代码:
- public static void main(String[] args) {
- //TODO 消费者三个属性必须指定(broker地址清单、key和value的反序列化器)
- Properties properties = new Properties();
- properties.put("bootstrap.servers","127.0.0.1:9092");
- properties.put("key.deserializer", StringDeserializer.class);
- properties.put("value.deserializer", StringDeserializer.class);
- //TODO 群组并非完全必须
- properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");
-
- //TODO 更多消费者配置(重要的)
- properties.put("auto.offset.reset","latest"); //消费者在读取一个没有偏移量的分区或者偏移量无效的情况下,如何处理
- properties.put("enable.auto.commit",true); // 表明消费者是否自动提交偏移 默认值true
- properties.put("max.poll.records",500); // 控制每次poll方法返回的的记录数量 默认值500
- //分区分配给消费者的策略。系统提供两种策略。默认为Range
- properties.put("partition.assignment.strategy",Collections.singletonList(RangeAssignor.class));
-
-
-
- KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
- try {
- //TODO 消费者订阅主题(可以多个)
- consumer.subscribe(Collections.singletonList(BusiConst.HELLO_TOPIC));
- while(true){
- //TODO 拉取(新版本)
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
- for(ConsumerRecord<String, String> record:records){
- System.out.println(String.format("topic:%s,分区:%d,偏移量:%d," + "key:%s,value:%s",record.topic(),record.partition(),
- record.offset(),record.key(),record.value()));
- //do my work
- //打包任务投入线程池
- // ex
- }
- }
- } finally {
- consumer.close();
- }
-
- }
auto.offset.reset
注意:如果是消费者在读取一个没有偏移量的分区或者偏移量无效的情况(因消费者长时间失效,包含的偏移量记录已经过时并被删除)下,默认值是 latest 的话,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录),可以先启动生产者,再启动消费者,观察到这种情况。
自定义策略
二、基础概念
1、订阅
2、轮询
3、提交偏移量
三、核心概念:
1、多线程安全:
2、群组协调:
3、分区再均衡
4、消费安全问题:
四、提交偏移量导致的问题
所以, 处理偏移量的方式对客户端会有很大的影响 。KafkaConsumer API 提供了很多种方式来提交偏移量 。
1、自动提交
2、手动提交(同步)
我们通过控制偏移量提交时间来消除丢失消息的可能性,并在发生再均衡时减少重复消息的数量。消费者 API 提供了另一种提交偏移量的方式,开发 者可以在必要的时候提交当前偏移量,而不是基于时间间隔。 把 auto.commit. offset 设为 false,自行决定何时提交偏移量。使用 commitsync()提交偏移量最简单也最可靠。这个方法会提交由 poll()方法返回的最 新偏移量,提交成功后马上返回,如果提交失败就抛出异常。
- public static void main(String[] args) {
- /*消息消费者*/
- Properties properties = KafkaConst.consumerConfig("CommitSync",
- StringDeserializer.class,
- StringDeserializer.class);
- //TODO 取消自动提交
- /*取消自动提交*/
- properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
-
- KafkaConsumer<String,String> consumer
- = new KafkaConsumer<String, String>(properties);
- try {
- consumer.subscribe(Collections.singletonList(
- BusiConst.CONSUMER_COMMIT_TOPIC));
- while(true){
- ConsumerRecords<String, String> records
- = consumer.poll(Duration.ofMillis(500));
- for(ConsumerRecord<String, String> record:records){ //100个 100~ 200
- System.out.println(String.format(
- "主题:%s,分区:%d,偏移量:%d,key:%s,value:%s",
- record.topic(),record.partition(),record.offset(),
- record.key(),record.value()));
- //do our work
-
- }
- //开始事务
- //读业务写数据库-
- //偏移量写入数据库
- //TODO 同步提交(这个方法会阻塞)
- consumer.commitSync(); //offset =200 max
-
- consumer.commitSync(); //offset =200 max
- }
- } finally {
- consumer.close();
- }
- }
3、异步提交
- public static void main(String[] args) {
- /*消息消费者*/
- Properties properties = KafkaConst.consumerConfig(
- "CommitAsync",
- StringDeserializer.class,
- StringDeserializer.class);
- //TODO 取消自动提交
- /*取消自动提交*/
- properties.put("enable.auto.commit",false);
-
- KafkaConsumer<String,String> consumer
- = new KafkaConsumer<String, String>(properties);
- try {
- consumer.subscribe(Collections.singletonList(
- BusiConst.CONSUMER_COMMIT_TOPIC));
- while(true){
- ConsumerRecords<String, String> records
- = consumer.poll(Duration.ofMillis(500));
- for(ConsumerRecord<String, String> record:records){
- System.out.println(String.format(
- "主题:%s,分区:%d,偏移量:%d,key:%s,value:%s",
- record.topic(),record.partition(),record.offset(),
- record.key(),record.value()));
- //do our work
- }
- //TODO 异步提交偏移量
- consumer.commitAsync();
- /*允许执行回调*/
- consumer.commitAsync(new OffsetCommitCallback() {
- public void onComplete(
- Map<TopicPartition, OffsetAndMetadata> offsets,
- Exception exception) {
- if(exception!=null){
- System.out.print("Commmit failed for offsets ");
- System.out.println(offsets);
- exception.printStackTrace();
- }
- }
- });
-
- }
- } finally {
- consumer.close();
- }
- }
4、同步异步组合
- public static void main(String[] args) {
- /*消息消费者*/
- Properties properties = KafkaConst.consumerConfig("SyncAndAsync",
- StringDeserializer.class,
- StringDeserializer.class);
- //TODO 取消自动提交
- /*取消自动提交*/
- properties.put("enable.auto.commit",false);
-
- KafkaConsumer<String,String> consumer
- = new KafkaConsumer<String, String>(properties);
- try {
- consumer.subscribe(Collections.singletonList(
- BusiConst.CONSUMER_COMMIT_TOPIC));
- while(true){
- ConsumerRecords<String, String> records
- = consumer.poll(Duration.ofMillis(500));
- for(ConsumerRecord<String, String> record:records){
- System.out.println(String.format(
- "主题:%s,分区:%d,偏移量:%d,key:%s,value:%s",
- record.topic(),record.partition(),record.offset(),
- record.key(),record.value()));
- //do our work
- }
- //TODO 异步提交
- consumer.commitAsync();
- }
- } catch (CommitFailedException e) {
- System.out.println("Commit failed:");
- e.printStackTrace();
- } finally {
- try {
- //TODO 为了万不一失,需要同步提交下
- consumer.commitSync();
- } finally {
- consumer.close();
- }
- }
- }
5、特定提交
- public static void main(String[] args) {
- /*消息消费者*/
- Properties properties = KafkaConst.consumerConfig(
- "CommitSpecial",
- StringDeserializer.class,
- StringDeserializer.class);
- //TODO 必须做
- /*取消自动提交*/
- properties.put("enable.auto.commit",false);
-
- KafkaConsumer<String,String> consumer
- = new KafkaConsumer<String, String>(properties);
- Map<TopicPartition, OffsetAndMetadata> currOffsets
- = new HashMap<TopicPartition, OffsetAndMetadata>();
- int count = 0;
- try {
- consumer.subscribe(Collections.singletonList(
- BusiConst.CONSUMER_COMMIT_TOPIC));
- while(true){
- ConsumerRecords<String, String> records
- = consumer.poll(Duration.ofMillis(500));
- for(ConsumerRecord<String, String> record:records){
- System.out.println(String.format(
- "主题:%s,分区:%d,偏移量:%d,key:%s,value:%s",
- record.topic(),record.partition(),record.offset(),
- record.key(),record.value()));
- currOffsets.put(new TopicPartition(record.topic(),record.partition()),
- new OffsetAndMetadata(record.offset()+1,"no meta"));
- if(count%11==0){
- //TODO 这里特定提交(异步方式,加入偏移量),每11条提交一次
- consumer.commitAsync(currOffsets,null);
- }
- count++;
- }
- }
- } finally {
- //TODO 在关闭前最好同步提交一次偏移量
- consumer.commitSync();
- consumer.close();
- }
- }
五、分区再均衡
1、在均衡监听器
2、从特定偏移量开始记录
六、独立消费者
七、优雅退出
八、反序列化
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。