当前位置:   article > 正文

Flink Connector(一) FlinkKafkaConsumer

flinkkafkaconsumer

 

 

前言

分析基于的版本是Flink 1.12.1 , Kafka 是 2.11 。 下面是在IDEA 里边直接依赖的包的截图 。

FlinkKafkaConsumer

Maven依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.12.1</version>
</dependency>

 

Flink Kafka Consumer  特性

Flink Kafka Consumer 直接依赖的关键 Class 是  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer , 这个类有两个方面的指责,一是作为Flink  框架支持的一种 Source, 要适配Source 接口的相关逻辑保证正确接入Flink,第二个是与Kafka 本身的Consumer 方式交互的逻辑 。 官方给出使用Flink Kafka Consumer 的简单例子 :

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
DataStream<String> stream = env
    .addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));

 

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(
    java.util.regex.Pattern.compile("test-topic-[0-9]"),
    new SimpleStringSchema(),
    properties);
 

 

从官方给出的例子可以看出,使用 Flink Kafka Consumer 的三个要素

1、 Kafka 的Topic 或则会是 Topic的一个列表 (支持正则匹配的方式),表征从Kafka Cluster 哪里获取数据

2、 数据如何反序列化,与Kafka 自身的Consumer 使用模式类似,用户APP 需要负责将read 的bytes 还原成业务操作的对象,涉及到的接口就是  DeserializationSchema / KafkaDeserializationSchema

3、 Kafka Consumer的配置属性 ,如“bootstrap.servers”,“group.id” 必须存在,其他的控制属性如 poll超时时间,每次poll多少条等

引用自官方 

  1. The topic name / list of topic names
  2. A DeserializationSchema / KafkaDeserializationSchema for deserializing the data from Kafka
  3. Properties for the Kafka consumer. The following properties are required:
    • “bootstrap.servers” (comma separated list of Kafka brokers)
    • “group.id” the id of the consumer group

 

 

Flink Kafka Consumer  相关源代码分析

入口类 FlinkKafkaConsumer,需要一个范型指定SOURCE 输出到 Flink Stream 中的Record 类型 T,这个T 是通过用户配置的 反序列化类实现的~

public class FlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> 

     FlinkKafkaConsumer的构造函数重载的形式,最终都是调用最后一个构造器完成对象的创建

 

public FlinkKafkaConsumer(String topic, DeserializationSchema<T> valueDeserializer, Properties props)
public  FlinkKafkaConsumer(String topic, KafkaDeserializationSchema<T> deserializer, Properties props)
public FlinkKafkaConsumer(List<String> topics, DeserializationSchema<T> deserializer, Properties props)
public FlinkKafkaConsumer(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props)
public FlinkKafkaConsumer(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props) 
public FlinkKafkaConsumer(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props)
  1. private FlinkKafkaConsumer(
  2. List<String> topics,
  3. Pattern subscriptionPattern,
  4. KafkaDeserializationSchema<T> deserializer,
  5. Properties props)

 

 

  1. private FlinkKafkaConsumer(
  2. List<String> topics,
  3. Pattern subscriptionPattern,
  4. KafkaDeserializationSchema<T> deserializer,
  5. Properties props) {
  6. super(
  7. topics,
  8. subscriptionPattern,
  9. deserializer,
  10. getLong(
  11. checkNotNull(props, "props"),
  12. KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED),
  13. !getBoolean(props, KEY_DISABLE_METRICS, false));
  14. this.properties = props;
  15. setDeserializer(this.properties);
  16. // configure the polling timeout
  17. try {
  18. if (properties.containsKey(KEY_POLL_TIMEOUT)) {
  19. this.pollTimeout = Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT));
  20. } else {
  21. this.pollTimeout = DEFAULT_POLL_TIMEOUT;
  22. }
  23. }
  24. catch (Exception e) {
  25. throw new IllegalArgumentException("Cannot parse poll timeout for '" + KEY_POLL_TIMEOUT + '\'', e);
  26. }
  27. }

上边这个构造函数的核心流程,super 是调用父类的构造起,之后是将传入的properties赋值到该对象的成员变量 this.properties 。

比较有意思的是最后进行 deserialize 的接口就只是 KafkaDeserializationSchema,也就就算用户DeserializationSchema 传入接口的实现,也会被包装成KafkaDeserializationSchema,后便会对比

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/429434
推荐阅读
相关标签
  

闽ICP备14008679号