当前位置:   article > 正文

《十堂课学习 Flink》第六章:Flink 流计算数据源 env.fromSource(以 Kafka作为数据源为例)_flink的fromsource一定要存在kafka的主题吗

flink的fromsource一定要存在kafka的主题吗

6.1 本章概述

本章内容介绍基于 Flink 1.14.x 版本开发流计算案例。这个案例中我们将 kafka 作为数据源,启动 Flink 任务以后,将会监听 kafka 的特定的一个或多个 TOPIC,并根据消息内容进行计算。

这是一种实时计算场景,即数据一旦流入 kafka ,就触发计算条件,也就是 flink 官方一直强调的 “流批一体” 的概念的一种体现。这里与批计算的差别也非常明显,即无需等待凑足数据以后再批量执行。

我们的例子非常简单:

  1. flink 任务启动后,将写入 kafka 中的字符串打印到控制台;
  2. flink 任务启动后,将写入 kafka 的 json 格式数据进行反序列化,转换为 实体类,然后将满足特定条件的实体打印到控制台。

相关内容可以概述为:

  • 明确 flink 与 对应 kafka 、jdk 版本之间的适配关系;
  • 明确 flink 开发时,env.fromSourceenv.addSource 的区别;
  • 明确 flink 监听 kafka topic 的基本方法;
  • 了解 flink 监听 kafka 中的topic未创建这种情况,以及对应的处理方法;
  • 了解 flink 的 MultipleParameterTool 的用法(用于任务的启动参数);
  • 了解 kafka 客户端的使用方法;
  • 明确 flink 监听 kafka 中消息的反序列操作(实际业务场景中也是直接操作实体类,而不是字符串)。

6.2 效果展示

6.2.1 打印生产者写入 kafka 的字符串

flink-kafka 简单例子

6.2.2 将生产者写入的字符串反序列化为实体类

flink 接收kafka消息并反序列化

6.3 环境准备

相关环境基础包括:

  • Oracle JDK / OpenJDK 1.8.x ;
  • maven 环境,开发时需要通过 maven 下载相关包;
  • kafka

6.3.1 检查 jdk 版本

确保是 1.8 版本

$ java -versio

$ javac -version
  • 1
  • 2
  • 3

在这里插入图片描述

6.3.2 本地启动 kafka

确保 kafka 的版本为 2.x

cd 到 kafka 所在目录,启动 zookeeper

$ bin/zookeeper-server-start.sh config/zookeeper.properties
  • 1

接着再打开一个命令行窗口,启动 kafka

$ bin/kafka-server-start.sh config/server.properties
  • 1

注意:以上内容均是基于 macOS 以及 linux 的命令,windows 下基本一致,请查阅相关资料。

6.3.2 启动 kafka 生产者客户端

cd 到 kafka 所在目录

$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
  • 1

在这里插入图片描述
接下来将会基于此控制台与 flink 进行交互,即输入字符串,flink job 读取字符串。

6.4 代码编写

具体依赖请参考源码仓库:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <artifactId>quick-start-kafka</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <scala.binary.version>2.12</scala.binary.version>
        <lombok.version>1.18.30</lombok.version>
        <flink.version>1.14.6</flink.version>
        <slf4j.version>2.0.9</slf4j.version>
        <logback.version>1.3.11</logback.version>
        <junit.version>4.13.2</junit.version>
    </properties>

    <dependencies>
        <!-- flink 相关 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- 编译工具 -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- log 相关 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>${logback.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>${logback.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- test 相关 -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
            <scope>test</scope>
        </dependency>
        
        <!-- Flink Kafka Connector -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba.fastjson2</groupId>
            <artifactId>fastjson2</artifactId>
            <version>2.0.48</version>
        </dependency>
    </dependencies>
</project>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78

6.4.1 打印生产者写入 kafka 的字符串

package cn.smileyan.demos;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;

import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * flink 使用 kafka 作为数据源的简单例子
 * @author Smileyan
 */
@Slf4j
public class FlinkKafkaExample {
    /**
     * 参数解释:
     *  -bs broker 地址
     *  -kcg kafka consumer group
     *  -it kafka 输入数据 topic
     *  -ct 是否自动创建 topic
     *  -pt topic 分区数
     *  -rf topic 副本数
     */
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        final MultipleParameterTool cmd = MultipleParameterTool.fromArgs(args);
        final String bootstrapServer = cmd.get("bs", "localhost:9092");
        final String kafkaConsumerGroup = cmd.get("kcg", "flink-consumer");
        final String inputTopic = cmd.get("it", "quickstart-events");
        final boolean createTopic = cmd.getBoolean("ct", false);

        log.info("broker is {} and topic is {}", bootstrapServer, inputTopic);

        // 如果 topic 不存在,并且开启了由 flink 任务创建 TOPIC。默认不开启,一般情况下,部署人员应当根据实际情况设置不同topic的并行度,副本数
        if (createTopic) {
            final int partitions = cmd.getInt("pt", 1);
            final short replicationFactor = cmd.getShort("rf", (short) 1);
            createTopic(bootstrapServer, inputTopic, partitions, replicationFactor);
        }

        final KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setGroupId(kafkaConsumerGroup)
                .setStartingOffsets(OffsetsInitializer.latest())
                .setBootstrapServers(bootstrapServer)
                .setTopics(inputTopic)
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        final DataStreamSource<String> kafkaStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");

        kafkaStream.print();

        env.execute("Flink Kafka Example");
    }

    /**
     * 如果 TOPIC 不存在则创建该 TOPIC
     * @param bootstrapServer kafka broker 地址
     * @param topic 想要创建的 TOPIC
     * @param partitions 并行度
     * @param replicationFactor 副本数
     */
    public static void createTopic(String bootstrapServer,
                                   String topic,
                                   int partitions,
                                   int replicationFactor) throws ExecutionException, InterruptedException {
        Properties adminProperties = new Properties();
        adminProperties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        try (AdminClient adminClient = AdminClient.create(adminProperties)) {
            if (!adminClient.listTopics().names().get().contains(topic)) {
                NewTopic newTopic = new NewTopic(topic, partitions, (short) replicationFactor);
                adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
                log.info("created topic: {}", topic);
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87

6.4.2 将生产者写入的字符串反序列化为实体类

package cn.smileyan.demos;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;

import java.util.Collections;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * 实体类序列化
 * @author smileyan
 */
@Slf4j
public class FlinkKafkaEntityExample {
    /**
     * 参数解释:
     *  -bs broker 地址
     *  -kcg kafka consumer group
     *  -it kafka 输入数据 topic
     *  -ct 是否自动创建 topic
     *  -pt topic 分区数
     *  -rf topic 副本数
     */
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        final MultipleParameterTool cmd = MultipleParameterTool.fromArgs(args);
        final String bootstrapServer = cmd.get("bs", "localhost:9092");
        final String kafkaConsumerGroup = cmd.get("kcg", "flink-consumer");
        final String inputTopic = cmd.get("it", "quickstart-events");
        final boolean createTopic = cmd.getBoolean("ct", false);

        log.info("broker is {} and topic is {}", bootstrapServer, inputTopic);

        // 如果 topic 不存在,并且开启了由 flink 任务创建 TOPIC。默认不开启,一般情况下,部署人员应当根据实际情况设置不同topic的并行度,副本数
        if (createTopic) {
            final int partitions = cmd.getInt("pt", 1);
            final short replicationFactor = cmd.getShort("rf", (short) 1);
            createTopic(bootstrapServer, inputTopic, partitions, replicationFactor);
        }

        final KafkaSource<Student> kafkaSource = KafkaSource.<Student>builder()
                .setGroupId(kafkaConsumerGroup)
                .setStartingOffsets(OffsetsInitializer.latest())
                .setBootstrapServers(bootstrapServer)
                .setTopics(inputTopic)
                .setValueOnlyDeserializer(new CommonDeserializationSchema<>(Student.class))
                .build();

        final DataStreamSource<Student> kafkaStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");

        // 过滤掉反序列化失败的对象,只保留正确的对象
        SingleOutputStreamOperator<Student> out1 = kafkaStream.filter(Objects::nonNull)
                .map(student -> {
                    log.info("filter none objects is {}", student);
                    return student;
                });

        // 只选择年纪小于 10 的对象
        out1.filter(student -> student.getAge() != null && student.getAge() < 10)
                .map(student -> {
                    log.info("filter age < 10: {}", student);
                    return student;
                });

        env.execute("Flink Kafka Example");
    }

    /**
     * 如果 TOPIC 不存在则创建该 TOPIC
     * @param bootstrapServer kafka broker 地址
     * @param topic 想要创建的 TOPIC
     * @param partitions 并行度
     * @param replicationFactor 副本数
     */
    public static void createTopic(String bootstrapServer,
                                   String topic,
                                   int partitions,
                                   int replicationFactor) throws ExecutionException, InterruptedException {
        Properties adminProperties = new Properties();
        adminProperties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        try (AdminClient adminClient = AdminClient.create(adminProperties)) {
            if (!adminClient.listTopics().names().get().contains(topic)) {
                NewTopic newTopic = new NewTopic(topic, partitions, (short) replicationFactor);
                adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
                log.info("created topic: {}", topic);
            }
        }
    }

    @Data
    static class Student {
        private String name;
        private Integer age;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
package cn.smileyan.demos;

import com.alibaba.fastjson2.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;

import java.io.IOException;
import java.nio.charset.StandardCharsets;


/**
 * 将字节码数据进行序列化
 * @author smileyan
 * @param <O> 实体类
 */
@Slf4j
public class CommonDeserializationSchema<O> implements DeserializationSchema<O> {

    private final Class<O> clazz;

    public CommonDeserializationSchema(Class<O> clazz) {
        this.clazz = clazz;
    }

    @Override
    public O deserialize(byte[] message) {
        try {
            String str = new String(message, StandardCharsets.UTF_8);
            log.info("kafka received message: {}", str);
            return JSON.parseObject(str, clazz);
        } catch (Exception e) {
            log.error(e.getMessage());
        }
        return null;
    }

    @Override
    public boolean isEndOfStream(O nextElement) {
        return false;
    }

    @Override
    public TypeInformation<O> getProducedType() {
        return TypeInformation.of(clazz);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48

6.5 相关问题

6.5.1 jdk / flink / kafka 的版本适配问题

jdk:1.8.x
flink: 1.14.x,不能使用 1.15 以及以上的版本,因为依赖的 jdk 版本发生了变化(jdk 11)
kafka: 2.x.x ,不能使用 3.x,因为依赖的 jdk 版本发生了变化(jdk11)。

6.5.2 env.fromSource 以及 env.addSource 的区别

Flink 中 env.fromSource()env.addSource() 方法都是用于创建数据流(DataStream),即从不同的数据源引入数据到 Flink 流处理或批处理作业中。虽然它们的目的相似,但具体的使用方式和语境有所不同:

env.addSource(sourceFunction)

  1. 方法签名与参数
    addSourceStreamExecutionEnvironment 类的一个方法,它接收一个实现了 SourceFunction 接口的对象作为参数。SourceFunction 是 Flink 提供的一个通用接口,用于定义数据源的逻辑,包括如何初始化数据源、如何产生数据以及如何正确清理资源。

  2. 用途
    addSource 主要用于自定义数据源或者使用 Flink 提供的某些特定数据源,这些数据源可能没有提供更高级别的抽象或者封装。通过实现 SourceFunction,开发者可以完全控制数据读取的细节,如从文件、网络套接字、自定义服务等非标准或非常规数据源读取数据。

  3. 灵活性与复杂性
    使用 addSource 的方式提供了极大的灵活性,因为可以直接编写代码来处理数据源的各种特性和行为。然而,这也意味着需要更多手动编码工作,包括处理错误恢复、并行化(如果支持)、数据分区等复杂任务。对于容错和并行读取的支持通常需要在 SourceFunction 实现中集成相应的机制。

env.fromSource(source)

  1. 方法签名与参数
    fromSource 方法同样属于 StreamExecutionEnvironment 类,但其参数通常是某个具体数据源类的实例,而非 SourceFunction 接口。这里的 source 参数往往代表一个已经封装好的、针对特定数据源类型的高级抽象,如 FlinkKafkaConsumerSocketTextStreamFunction 等。

  2. 用途
    fromSource 通常用于直接使用 Flink 内置或社区提供的对常见数据源(如 Apache Kafka、文件系统、数据库连接器等)的预封装支持。这些封装好的数据源类通常会隐藏底层复杂性,提供友好的配置选项,并且内置了对容错、并行读取等特性的支持。

  3. 便捷性与标准化
    使用 fromSource 方法更为便捷,因为它针对常见的数据源类型提供了开箱即用的解决方案。开发者无需从头实现复杂的 SourceFunction,只需配置必要的参数(如 Kafka 主题名、数据库连接信息等)即可快速接入数据源。这种做法遵循 Flink 的最佳实践,确保了与 Flink 生态系统的良好集成以及对数据源特性的有效利用。

总结:

  • env.addSource(sourceFunction) 适用于需要自定义数据源逻辑、处理非标准数据源或对数据源控制有特殊需求的情况。使用时需要自行实现 SourceFunction,处理数据读取、错误恢复、并行化等细节。

  • env.fromSource(source) 适用于对接已知、常见的数据源,如 Kafka、文件、数据库等,利用 Flink 提供的预封装数据源类。这种方式简化了开发过程,提供了更好的容错性和易用性,但可能不支持所有定制化需求。

  • flink 官方文档中,更加推荐使用 env.fromSource 处理我们前面提到的场景。

6.6 本章小结

本章举了两个例子,介绍 Flink 流计算中最常见的场景:基于 Kafka 通讯。实际业务场景中我们将这个通讯过程称为 “下发任务”。比如后端同事开发一个接口,触发后将实际业务需求打包成为一个实体类(Task),写入 kafka,大数据平台(Flink)通过监听这个 kafka 消息进行驱动计算过程,并且将算法结果输出到 kafka 或者 elastic search 等。

由于实际业务场景、现场部署环境的差别,我们通过引入 MultipleParameterTool 来调整相关参数,从而更加灵活。

最后还有序列化的过程,即将 json 格式的字符串,转换为我们需要的实体类,进而完成更加复杂的操作。

感谢阅读 ~

感谢点赞 ~

请添加图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/732370
推荐阅读
相关标签
  

闽ICP备14008679号