赞
踩
当 FlinkKafkaConsumer 初始化时,每个 subtask 会订阅一批 partition,但是当
Flink 任务运行过程中,如果被订阅的 topic 创建了新的 partition,FlinkKafkaConsumer
如何实现动态发现新创建的 partition 并消费呢?
在使用 FlinkKafkaConsumer 时,可以开启 partition 的动态发现。通过 Properties指定参数开启(单位是毫秒):
FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS
该参数表示间隔多久检测一次是否有新创建的 partition。默认值是 Long 的最小值,表示不开启,大于 0 表示开启。开启时会启动一个线程根据传入的 interval 定期获取 Kafka最新的元数据,新 partition 对应的那一个 subtask 会自动发现并从 earliest 位置开始消费,新创建的 partition 对其他 subtask 并不会产生影响。
代码如下:
properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTE RVAL_MILLIS, 30 * 1000 + "");
Kafka 单分区内有序,多分区间无序。在这种情况下,可以使用 Flink 中可识别 Kafka 分区的 watermark 生成机制。使用此特性,将在 Kafka 消费端内部针对每个 Kafka 分区生成 watermark,并且不同分区 watermark 的合并方式与在数据流 shuffle 时的合并方式相同。
在单分区内有序的情况下,使用时间戳单调递增按分区生成的 watermark 将生成完美的全局 watermark。
可以不使用 TimestampAssigner,直接用 Kafka 记录自身的时间戳:
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers",
"hadoop1:9092,hadoop2:9092,hadoop3:9092");
properties.setProperty("group.id", "fffffffffff");
FlinkKafkaConsumer<String> kafkaSourceFunction = new FlinkKafkaConsumer<>(
"flinktest",
new SimpleStringSchema(),
properties
);
kafkaSourceFunction.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofMinutes(2))
);
env.addSource(kafkaSourceFunction)
如果数据源中的某一个分区/分片在一段时间内未发送事件数据,则意味着WatermarkGenerator 也不会获得任何新数据去生成 watermark。称这类数据源为空闲输入或空闲源。在这种情况下,当某些其他分区仍然发送事件数据的时候就会出现问题。
举栗:
Kafka 的 Topic 中,由于某些原因,造成个别 Partition 一直没有新的数据。
由于下游算子 watermark 的计算方式是取所有不同的上游并行数据源 watermark
的最小值,则其 watermark 将不会发生变化,导致窗口、定时器等不会被触发。
使用 WatermarkStrategy 来检测空闲输入并将其标记为空闲状态。
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers",
"hadoop1:9092,hadoop2:9092,hadoop3:9092");
properties.setProperty("group.id", "fffffffffff");
FlinkKafkaConsumer<String> kafkaSourceFunction = new FlinkKafkaConsumer<>(
"flinktest",
new SimpleStringSchema(),
properties
);
kafkaSourceFunction.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofMinutes(2))
.withIdleness(Duration.ofMinutes(5))
);
env.addSource(kafkaSourceFunction)
FlinkKafkaConsumer 可以调用以下 API,注意与”auto.offset.reset”区分开:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。