当前位置:   article > 正文

精通Flink项目优化(四.KafkaSource调优)_flink kafkasource

flink kafkasource

KafkaSource调优

动态发现分区

当 FlinkKafkaConsumer 初始化时,每个 subtask 会订阅一批 partition,但是当
Flink 任务运行过程中,如果被订阅的 topic 创建了新的 partition,FlinkKafkaConsumer
如何实现动态发现新创建的 partition 并消费呢?

在使用 FlinkKafkaConsumer 时,可以开启 partition 的动态发现。通过 Properties指定参数开启(单位是毫秒):
FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS
该参数表示间隔多久检测一次是否有新创建的 partition。默认值是 Long 的最小值,表示不开启,大于 0 表示开启。开启时会启动一个线程根据传入的 interval 定期获取 Kafka最新的元数据,新 partition 对应的那一个 subtask 会自动发现并从 earliest 位置开始消费,新创建的 partition 对其他 subtask 并不会产生影响。
代码如下:
properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTE RVAL_MILLIS, 30 * 1000 + "");

从Kafka数据源生成 watermark

Kafka 单分区内有序,多分区间无序。在这种情况下,可以使用 Flink 中可识别 Kafka 分区的 watermark 生成机制。使用此特性,将在 Kafka 消费端内部针对每个 Kafka 分区生成 watermark,并且不同分区 watermark 的合并方式与在数据流 shuffle 时的合并方式相同。
在单分区内有序的情况下,使用时间戳单调递增按分区生成的 watermark 将生成完美的全局 watermark。
可以不使用 TimestampAssigner,直接用 Kafka 记录自身的时间戳:

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", 
"hadoop1:9092,hadoop2:9092,hadoop3:9092");
properties.setProperty("group.id", "fffffffffff");
FlinkKafkaConsumer<String> kafkaSourceFunction = new FlinkKafkaConsumer<>(
 "flinktest",
 new SimpleStringSchema(),
 properties
 );
kafkaSourceFunction.assignTimestampsAndWatermarks(
 WatermarkStrategy
 .forBoundedOutOfOrderness(Duration.ofMinutes(2))
);
env.addSource(kafkaSourceFunction)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
设置空闲等待

如果数据源中的某一个分区/分片在一段时间内未发送事件数据,则意味着WatermarkGenerator 也不会获得任何新数据去生成 watermark。称这类数据源为空闲输入或空闲源。在这种情况下,当某些其他分区仍然发送事件数据的时候就会出现问题。
举栗:

Kafka 的 Topic 中,由于某些原因,造成个别 Partition 一直没有新的数据。
由于下游算子 watermark 的计算方式是取所有不同的上游并行数据源 watermark
的最小值,则其 watermark 将不会发生变化,导致窗口、定时器等不会被触发。

使用 WatermarkStrategy 来检测空闲输入并将其标记为空闲状态。

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", 
"hadoop1:9092,hadoop2:9092,hadoop3:9092");
properties.setProperty("group.id", "fffffffffff");
FlinkKafkaConsumer<String> kafkaSourceFunction = new FlinkKafkaConsumer<>(
 "flinktest",

 new SimpleStringSchema(),
 properties
 );
kafkaSourceFunction.assignTimestampsAndWatermarks(
 WatermarkStrategy
 .forBoundedOutOfOrderness(Duration.ofMinutes(2))
.withIdleness(Duration.ofMinutes(5))
);
env.addSource(kafkaSourceFunction)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
Kafka 的 offset 消费策略

FlinkKafkaConsumer 可以调用以下 API,注意与”auto.offset.reset”区分开:

  1. setStartFromGroupOffsets():默认消费策略,默认读取上次保存的 offset 信息,如果应用第一次启动,读取不到上次offset信息 ,根据参数auto.offset.reset 的值来进行消费数据。建议使用这个。
  2. setStartFromEarliest():从最早的数据开始进行消费,忽略存储的 offset 信息
  3. setStartFromLatest():从最新的数据进行消费,忽略存储的 offset 信息
  4. setStartFromSpecificOffsets(Map):从指定位置进行消费
  5. setStartFromTimestamp(long):从 topic 中指定的时间点开始消费,指定时间点之前的数据忽略
  6. 当 checkpoint 机制开启的时候,KafkaConsumer 会定期把 kafka 的 offset 信息还有其他 operator 的状态信息一块保存起来。当 job 失败重启的时候,Flink 会从最近一次的 checkpoint 中进行恢复数据,重新从保存的 offset 消费 kafka 中的数据(上面几种策略,只有第一次启动的时候起作用)。
  7. 为了能够使用支持容错的 kafka Consumer,需要开启 checkpoint
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/喵喵爱编程/article/detail/789342
推荐阅读
相关标签
  

闽ICP备14008679号