赞
踩
分析基于的版本是Flink 1.12.1 , Kafka 是 2.11 。 下面是在IDEA 里边直接依赖的包的截图 。
FlinkKafkaConsumer
Maven依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.12.1</version>
</dependency>
Flink Kafka Consumer 直接依赖的关键 Class 是 org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer , 这个类有两个方面的指责,一是作为Flink 框架支持的一种 Source, 要适配Source 接口的相关逻辑保证正确接入Flink,第二个是与Kafka 本身的Consumer 方式交互的逻辑 。 官方给出使用Flink Kafka Consumer 的简单例子 :
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(
java.util.regex.Pattern.compile("test-topic-[0-9]"),
new SimpleStringSchema(),
properties);
从官方给出的例子可以看出,使用 Flink Kafka Consumer 的三个要素
1、 Kafka 的Topic 或则会是 Topic的一个列表 (支持正则匹配的方式),表征从Kafka Cluster 哪里获取数据
2、 数据如何反序列化,与Kafka 自身的Consumer 使用模式类似,用户APP 需要负责将read 的bytes 还原成业务操作的对象,涉及到的接口就是 DeserializationSchema / KafkaDeserializationSchema
3、 Kafka Consumer的配置属性 ,如“bootstrap.servers”,“group.id” 必须存在,其他的控制属性如 poll超时时间,每次poll多少条等
引用自官方
- The topic name / list of topic names
- A DeserializationSchema / KafkaDeserializationSchema for deserializing the data from Kafka
- Properties for the Kafka consumer. The following properties are required:
- “bootstrap.servers” (comma separated list of Kafka brokers)
- “group.id” the id of the consumer group
入口类 FlinkKafkaConsumer,需要一个范型指定SOURCE 输出到 Flink Stream 中的Record 类型 T,这个T 是通过用户配置的 反序列化类实现的~
public class FlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T>
FlinkKafkaConsumer的构造函数重载的形式,最终都是调用最后一个构造器完成对象的创建
public FlinkKafkaConsumer(String topic, DeserializationSchema<T> valueDeserializer, Properties props)
public FlinkKafkaConsumer(String topic, KafkaDeserializationSchema<T> deserializer, Properties props)
public FlinkKafkaConsumer(List<String> topics, DeserializationSchema<T> deserializer, Properties props)
public FlinkKafkaConsumer(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props)
public FlinkKafkaConsumer(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props)
public FlinkKafkaConsumer(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props)
private FlinkKafkaConsumer( List<String> topics, Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props)
- private FlinkKafkaConsumer(
- List<String> topics,
- Pattern subscriptionPattern,
- KafkaDeserializationSchema<T> deserializer,
- Properties props) {
-
- super(
- topics,
- subscriptionPattern,
- deserializer,
- getLong(
- checkNotNull(props, "props"),
- KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED),
- !getBoolean(props, KEY_DISABLE_METRICS, false));
-
- this.properties = props;
- setDeserializer(this.properties);
-
- // configure the polling timeout
- try {
- if (properties.containsKey(KEY_POLL_TIMEOUT)) {
- this.pollTimeout = Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT));
- } else {
- this.pollTimeout = DEFAULT_POLL_TIMEOUT;
- }
- }
- catch (Exception e) {
- throw new IllegalArgumentException("Cannot parse poll timeout for '" + KEY_POLL_TIMEOUT + '\'', e);
- }
- }
上边这个构造函数的核心流程,super 是调用父类的构造起,之后是将传入的properties赋值到该对象的成员变量 this.properties 。
比较有意思的是最后进行 deserialize 的接口就只是 KafkaDeserializationSchema,也就就算用户DeserializationSchema 传入接口的实现,也会被包装成KafkaDeserializationSchema,后便会对比
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。