赞
踩
本章内容介绍基于 Flink 1.14.x 版本开发流计算案例。这个案例中我们将 kafka 作为数据源,启动 Flink 任务以后,将会监听 kafka 的特定的一个或多个 TOPIC,并根据消息内容进行计算。
这是一种实时计算场景,即数据一旦流入 kafka ,就触发计算条件,也就是 flink 官方一直强调的 “流批一体” 的概念的一种体现。这里与批计算的差别也非常明显,即无需等待凑足数据以后再批量执行。
我们的例子非常简单:
相关内容可以概述为:
env.fromSource
与 env.addSource
的区别;MultipleParameterTool
的用法(用于任务的启动参数);flink-kafka 简单例子
flink 接收kafka消息并反序列化
相关环境基础包括:
确保是 1.8 版本
$ java -versio
$ javac -version
确保 kafka 的版本为 2.x
。
cd
到 kafka 所在目录,启动 zookeeper
$ bin/zookeeper-server-start.sh config/zookeeper.properties
接着再打开一个命令行窗口,启动 kafka
$ bin/kafka-server-start.sh config/server.properties
注意
:以上内容均是基于 macOS 以及 linux 的命令,windows 下基本一致,请查阅相关资料。
cd
到 kafka 所在目录
$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
接下来将会基于此控制台与 flink 进行交互,即输入字符串,flink job 读取字符串。
具体依赖请参考源码仓库:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <artifactId>quick-start-kafka</artifactId> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <scala.binary.version>2.12</scala.binary.version> <lombok.version>1.18.30</lombok.version> <flink.version>1.14.6</flink.version> <slf4j.version>2.0.9</slf4j.version> <logback.version>1.3.11</logback.version> <junit.version>4.13.2</junit.version> </properties> <dependencies> <!-- flink 相关 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- 编译工具 --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>${lombok.version}</version> <scope>provided</scope> </dependency> <!-- log 相关 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${slf4j.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>${logback.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>${logback.version}</version> <scope>provided</scope> </dependency> <!-- test 相关 --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> <scope>test</scope> </dependency> <!-- Flink Kafka Connector --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>com.alibaba.fastjson2</groupId> <artifactId>fastjson2</artifactId> <version>2.0.48</version> </dependency> </dependencies> </project>
package cn.smileyan.demos; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.MultipleParameterTool; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; import java.util.Collections; import java.util.Properties; import java.util.concurrent.ExecutionException; /** * flink 使用 kafka 作为数据源的简单例子 * @author Smileyan */ @Slf4j public class FlinkKafkaExample { /** * 参数解释: * -bs broker 地址 * -kcg kafka consumer group * -it kafka 输入数据 topic * -ct 是否自动创建 topic * -pt topic 分区数 * -rf topic 副本数 */ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final MultipleParameterTool cmd = MultipleParameterTool.fromArgs(args); final String bootstrapServer = cmd.get("bs", "localhost:9092"); final String kafkaConsumerGroup = cmd.get("kcg", "flink-consumer"); final String inputTopic = cmd.get("it", "quickstart-events"); final boolean createTopic = cmd.getBoolean("ct", false); log.info("broker is {} and topic is {}", bootstrapServer, inputTopic); // 如果 topic 不存在,并且开启了由 flink 任务创建 TOPIC。默认不开启,一般情况下,部署人员应当根据实际情况设置不同topic的并行度,副本数 if (createTopic) { final int partitions = cmd.getInt("pt", 1); final short replicationFactor = cmd.getShort("rf", (short) 1); createTopic(bootstrapServer, inputTopic, partitions, replicationFactor); } final KafkaSource<String> kafkaSource = KafkaSource.<String>builder() .setGroupId(kafkaConsumerGroup) .setStartingOffsets(OffsetsInitializer.latest()) .setBootstrapServers(bootstrapServer) .setTopics(inputTopic) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); final DataStreamSource<String> kafkaStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source"); kafkaStream.print(); env.execute("Flink Kafka Example"); } /** * 如果 TOPIC 不存在则创建该 TOPIC * @param bootstrapServer kafka broker 地址 * @param topic 想要创建的 TOPIC * @param partitions 并行度 * @param replicationFactor 副本数 */ public static void createTopic(String bootstrapServer, String topic, int partitions, int replicationFactor) throws ExecutionException, InterruptedException { Properties adminProperties = new Properties(); adminProperties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); try (AdminClient adminClient = AdminClient.create(adminProperties)) { if (!adminClient.listTopics().names().get().contains(topic)) { NewTopic newTopic = new NewTopic(topic, partitions, (short) replicationFactor); adminClient.createTopics(Collections.singletonList(newTopic)).all().get(); log.info("created topic: {}", topic); } } } }
package cn.smileyan.demos; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.java.utils.MultipleParameterTool; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; import java.util.Collections; import java.util.Objects; import java.util.Properties; import java.util.concurrent.ExecutionException; /** * 实体类序列化 * @author smileyan */ @Slf4j public class FlinkKafkaEntityExample { /** * 参数解释: * -bs broker 地址 * -kcg kafka consumer group * -it kafka 输入数据 topic * -ct 是否自动创建 topic * -pt topic 分区数 * -rf topic 副本数 */ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final MultipleParameterTool cmd = MultipleParameterTool.fromArgs(args); final String bootstrapServer = cmd.get("bs", "localhost:9092"); final String kafkaConsumerGroup = cmd.get("kcg", "flink-consumer"); final String inputTopic = cmd.get("it", "quickstart-events"); final boolean createTopic = cmd.getBoolean("ct", false); log.info("broker is {} and topic is {}", bootstrapServer, inputTopic); // 如果 topic 不存在,并且开启了由 flink 任务创建 TOPIC。默认不开启,一般情况下,部署人员应当根据实际情况设置不同topic的并行度,副本数 if (createTopic) { final int partitions = cmd.getInt("pt", 1); final short replicationFactor = cmd.getShort("rf", (short) 1); createTopic(bootstrapServer, inputTopic, partitions, replicationFactor); } final KafkaSource<Student> kafkaSource = KafkaSource.<Student>builder() .setGroupId(kafkaConsumerGroup) .setStartingOffsets(OffsetsInitializer.latest()) .setBootstrapServers(bootstrapServer) .setTopics(inputTopic) .setValueOnlyDeserializer(new CommonDeserializationSchema<>(Student.class)) .build(); final DataStreamSource<Student> kafkaStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source"); // 过滤掉反序列化失败的对象,只保留正确的对象 SingleOutputStreamOperator<Student> out1 = kafkaStream.filter(Objects::nonNull) .map(student -> { log.info("filter none objects is {}", student); return student; }); // 只选择年纪小于 10 的对象 out1.filter(student -> student.getAge() != null && student.getAge() < 10) .map(student -> { log.info("filter age < 10: {}", student); return student; }); env.execute("Flink Kafka Example"); } /** * 如果 TOPIC 不存在则创建该 TOPIC * @param bootstrapServer kafka broker 地址 * @param topic 想要创建的 TOPIC * @param partitions 并行度 * @param replicationFactor 副本数 */ public static void createTopic(String bootstrapServer, String topic, int partitions, int replicationFactor) throws ExecutionException, InterruptedException { Properties adminProperties = new Properties(); adminProperties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); try (AdminClient adminClient = AdminClient.create(adminProperties)) { if (!adminClient.listTopics().names().get().contains(topic)) { NewTopic newTopic = new NewTopic(topic, partitions, (short) replicationFactor); adminClient.createTopics(Collections.singletonList(newTopic)).all().get(); log.info("created topic: {}", topic); } } } @Data static class Student { private String name; private Integer age; } }
package cn.smileyan.demos; import com.alibaba.fastjson2.JSON; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import java.io.IOException; import java.nio.charset.StandardCharsets; /** * 将字节码数据进行序列化 * @author smileyan * @param <O> 实体类 */ @Slf4j public class CommonDeserializationSchema<O> implements DeserializationSchema<O> { private final Class<O> clazz; public CommonDeserializationSchema(Class<O> clazz) { this.clazz = clazz; } @Override public O deserialize(byte[] message) { try { String str = new String(message, StandardCharsets.UTF_8); log.info("kafka received message: {}", str); return JSON.parseObject(str, clazz); } catch (Exception e) { log.error(e.getMessage()); } return null; } @Override public boolean isEndOfStream(O nextElement) { return false; } @Override public TypeInformation<O> getProducedType() { return TypeInformation.of(clazz); } }
jdk:1.8.x
flink: 1.14.x,不能使用 1.15 以及以上的版本,因为依赖的 jdk 版本发生了变化(jdk 11)
kafka: 2.x.x ,不能使用 3.x,因为依赖的 jdk 版本发生了变化(jdk11)。
Flink 中 env.fromSource()
和 env.addSource()
方法都是用于创建数据流(DataStream
),即从不同的数据源引入数据到 Flink 流处理或批处理作业中。虽然它们的目的相似,但具体的使用方式和语境有所不同:
env.addSource(sourceFunction)
方法签名与参数:
addSource
是 StreamExecutionEnvironment
类的一个方法,它接收一个实现了 SourceFunction
接口的对象作为参数。SourceFunction
是 Flink 提供的一个通用接口,用于定义数据源的逻辑,包括如何初始化数据源、如何产生数据以及如何正确清理资源。
用途:
addSource
主要用于自定义数据源或者使用 Flink 提供的某些特定数据源,这些数据源可能没有提供更高级别的抽象或者封装。通过实现 SourceFunction
,开发者可以完全控制数据读取的细节,如从文件、网络套接字、自定义服务等非标准或非常规数据源读取数据。
灵活性与复杂性:
使用 addSource
的方式提供了极大的灵活性,因为可以直接编写代码来处理数据源的各种特性和行为。然而,这也意味着需要更多手动编码工作,包括处理错误恢复、并行化(如果支持)、数据分区等复杂任务。对于容错和并行读取的支持通常需要在 SourceFunction
实现中集成相应的机制。
env.fromSource(source)
方法签名与参数:
fromSource
方法同样属于 StreamExecutionEnvironment
类,但其参数通常是某个具体数据源类的实例,而非 SourceFunction
接口。这里的 source
参数往往代表一个已经封装好的、针对特定数据源类型的高级抽象,如 FlinkKafkaConsumer
、SocketTextStreamFunction
等。
用途:
fromSource
通常用于直接使用 Flink 内置或社区提供的对常见数据源(如 Apache Kafka、文件系统、数据库连接器等)的预封装支持。这些封装好的数据源类通常会隐藏底层复杂性,提供友好的配置选项,并且内置了对容错、并行读取等特性的支持。
便捷性与标准化:
使用 fromSource
方法更为便捷,因为它针对常见的数据源类型提供了开箱即用的解决方案。开发者无需从头实现复杂的 SourceFunction
,只需配置必要的参数(如 Kafka 主题名、数据库连接信息等)即可快速接入数据源。这种做法遵循 Flink 的最佳实践,确保了与 Flink 生态系统的良好集成以及对数据源特性的有效利用。
总结:
env.addSource(sourceFunction)
适用于需要自定义数据源逻辑、处理非标准数据源或对数据源控制有特殊需求的情况。使用时需要自行实现 SourceFunction
,处理数据读取、错误恢复、并行化等细节。
env.fromSource(source)
适用于对接已知、常见的数据源,如 Kafka、文件、数据库等,利用 Flink 提供的预封装数据源类。这种方式简化了开发过程,提供了更好的容错性和易用性,但可能不支持所有定制化需求。
flink 官方文档中,更加推荐使用 env.fromSource
处理我们前面提到的场景。
本章举了两个例子,介绍 Flink 流计算中最常见的场景:基于 Kafka 通讯。实际业务场景中我们将这个通讯过程称为 “下发任务”。比如后端同事开发一个接口,触发后将实际业务需求打包成为一个实体类(Task),写入 kafka,大数据平台(Flink)通过监听这个 kafka 消息进行驱动计算过程,并且将算法结果输出到 kafka 或者 elastic search 等。
由于实际业务场景、现场部署环境的差别,我们通过引入 MultipleParameterTool 来调整相关参数,从而更加灵活。
最后还有序列化的过程,即将 json 格式的字符串,转换为我们需要的实体类,进而完成更加复杂的操作。
感谢阅读 ~
感谢点赞 ~
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。