赞
踩
注:次文档参考 【尚硅谷】大数据高级 flink技术精讲(2020年6月) 编写。
1.由于视频中并未涉及到具体搭建流程,Flink 环境搭建部分并未编写。
2.视频教程 Flink 版本为 1.10.0,此文档根据 Flink v1.11.1 进行部分修改。
3.文档中大部分程序在 Windows 端运行会有超时异常,需要打包后在 Linux 端运行。
4.程序运行需要的部分 Jar 包,请很具情况去掉 pom 中的 “scope” 标签的再进行打包,才能在集群上运行。
5.原始文档在 Markdown 中编写,此处目录无法直接跳转。且因字数限制,分多篇发布
此文档仅用作个人学习,请勿用于商业获利。
概念
Flink 是一个 框架 和 分布式处理引擎,用于对 无界和有界数据流 进行 状态 计算。
为什么选择 Flink
哪些行业需要处理流数据
Flink 主要特点
分层 API
Flink 其他特点
Flink VS SparkStreaming
# 创建用户
userdel -r flink && useradd flink && echo flink | passwd --stdin flink
# 下载
wget https://archive.apache.org/dist/flink/flink-1.11.1/flink-1.11.1-bin-scala_2.11.tgz
或
wget https://mirror.bit.edu.cn/apache/flink/flink-1.11.1/flink-1.11.1-bin-scala_2.11.tgz
# 解压并启动
tar -zxvf flink-1.11.1-bin-scala_2.11.tgz
/home/flink/flink-1.11.1/bin/start-cluster.sh
# UI
http://test01:8081/#/overview
sudo yum -y install nc
# 使用 linux 的 nc 命令来向 socket 当中发送一些单词
nc -lk 7777
<properties> </properties> <dependencies> <!-- ======== Flink Core ======== --> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>1.11.1</version> <!-- 由于集群上已经有该 jar 包,若要上传到集群上执行,则去掉以下注释 --> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.11.1</version> <!-- 由于集群上已经有该 jar 包,若要上传到集群上执行,则去掉以下注释 --> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>1.11.1</version> <scope>provided</scope> </dependency> </dependencies> <profiles> <profile> <id>dev</id> <activation> <activeByDefault>true</activeByDefault> </activation> <properties> <env>dev</env> </properties> </profile> <profile> <id>prod</id> <properties> <env>prod</env> </properties> </profile> </profiles> <build> <filters> <filter>src/main/resources/env/config-${env}.properties</filter> </filters> <resources> <resource> <directory>src/main/resources</directory> <includes> <include>*.properties</include> <include>*.txt</include> </includes> <excludes> <exclude>*.xml</exclude> <exclude>*.yaml</exclude> </excludes> </resource> </resources> <plugins> <!-- 编译 Scala 需要用到的插件 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.4.6</version> <configuration> <addScalacArgs>-target:jvm-1.8</addScalacArgs> </configuration> <executions> <execution> <!-- 声明绑定到 maven 的 compile 阶段 --> <goals> <goal>compile</goal> </goals> </execution> </executions> </plugin> <!-- Java Compiler --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>1.8</encoding> </configuration> </plugin> <!-- 项目打包需要用到的插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
Code
package com.mso.flink.dataset import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.api.scala._ object DataSetWordCount { def main(args: Array[String]): Unit = { // 创建一个批处理执行环境 val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment // 从文件中读取数据 // val resource: URL = getClass.getResource("/word.txt") // val inputDataSet: DataSet[String] = environment.readTextFile(resource.getPath) val params: ParameterTool = ParameterTool.fromArgs(args) val inputDataSet: DataSet[String] = environment.readTextFile(params.get("input-path")) // 基于 DataSet 做转换,首先按空格拆分,然后按照 word 作为 key 做 groupBy 分组聚合 val resultDataSet: AggregateDataSet[(String, Int)] = inputDataSet .flatMap((_: String).split(" ")) // 分词得到 word 构成的数据集 .map(((_: String), 1)) // 转换成一个二元组 (word, count) .groupBy(0) // 以二元组中第一个元素作为 key 分组 .sum(1) // 聚合二元组中第二个元素的值 resultDataSet.printOnTaskManager("DataSetWordCount") environment.execute("DataSetWordCount") // ~/flink-1.11.1/bin/flink run -p 1 -c com.mso.flink.dataset.DataSetWordCount FlinkPractice-1.0-SNAPSHOT-jar-with-dependencies.jar --input-path /home/flink/word.txt } }
Run
~/flink-1.11.1/bin/flink run -p 1 -c com.mso.flink.dataset.DataSetWordCount FlinkPractice-1.0-SNAPSHOT-jar-with-dependencies.jar --input-path /home/flink/word.txt
Code
package com.mso.flink.stream import org.apache.flink.streaming.api.scala._ object StreamWordCount { def main(args: Array[String]): Unit = { // 创建流处理执行环境 val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment // 接收 socket 文本流 val inputSocketDataStream: DataStream[String] = environment.socketTextStream("test01", 7777) // 定义转换操作, word count val resultDataStream: DataStream[(String, Int)] = inputSocketDataStream .flatMap(_.split(" ")) // 分词得到 word 构成的数据集 .filter(_.nonEmpty) // 过滤空集 .map((_, 1)) // 转换成一个二元组 (word, count) .keyBy(0) // 以二元组中第一个元素作为 key 分组 .sum(1) // 聚合二元组中第二个元素的值 // 打印输出 resultDataStream.print() // 提交执行 environment.execute() } }
Run
~/flink-1.11.1/bin/flink run -c com.mso.flink.stream.StreamWordCount -p 1 FlinkPractice-1.0-SNAPSHOT-jar-with-dependencies.jar
Result
tail -f flink-flink-taskexecutor-0-test01.out
Stop
~/flink-1.11.1/bin/flink list
~/flink-1.11.1/bin/flink cancel jobID
规划
Node | JobManager | TaskManager | JPS |
---|---|---|---|
test01 | Y | Y | StandaloneSessionClusterEntrypoint、TaskManagerRunner |
test02 | N | Y | TaskManagerRunner |
test03 | N | Y | TaskManagerRunner |
安装
# 修改 flink-conf.yaml vim ~/flink-1.11.1/conf/flink-conf.yaml #jobmanager.rpc.address: localhost jobmanager.rpc.address: test01 # 修改 workers vim ~/flink-1.11.1/conf/workers test01 test02 test03 # 免密 ssh-keygen ssh-copy-id test02 ssh-copy-id test03 # 分发安装包 scp -r ~/flink-1.11.1/ flink@test02: scp -r ~/flink-1.11.1/ flink@test03: # 启动 Flink 集群 ~/flink-1.11.1/bin/start-cluster.sh # WebUI 界面访问 http://test01:8081/#/overview
提交任务
Session-cluster 模式
先启动集群,然后再提交作业。首先向 yarn 申请一块空间,之后资源永远保持不变,如果资源满了,下一个作业就无法提交。
所有作业共享 Dispatcher 和 ResourceManager。
适用于规模小且执行时间短的作业。
Per-Job-Cluster
每次提交 Job 都会对应一个 Flink 集群,每提交一个作业会根据自身的情况,都会单独向 yarn 申请资源,直到作业执行完成。
# 启动
~/flink-1.11.1/bin/yarn-session.sh -n 2 -s 2 -jm 1024 -tm 1024 -nm test -d
-n(--container) : TaskManager 的数量
-s(--slots) : 每个 TaskManager 的 slot 数量,默认一个 slot 一个 core,默认每个 taskmanager 的 slot 的个数为 1,有时可以多一些
-jm : JobManager 的内存(单位 MB)
-tm : 每个 TaskManager 的内存(单位 MB)
-nm : yarn 的 appName(现在 yarn 的 ui 上的名字)
-d : 后台执行
# 取消 yarn session
yarn application --kill applicationId
# 提交任务
~/flink-1.11.1/bin/flink run -m yarn-cluster -c com.mso.flink.stream.StreamWordCount -p 1 FlinkPractice-1.0-SNAPSHOT-jar-with-dependencies.jar
# 提交任务
~/flink-1.11.1/bin/flink run -c com.mso.flink.stream.StreamWordCount -p 1 FlinkPractice-1.0-SNAPSHOT-jar-with-dependencies.jar
搭建 Kubernetes 集群
略
配置各组件的 yaml 文件
在 k8s 上构建 Flink Session Cluster,需要将 Flink 集群的组件对应的 docker 镜像分别在 k8s 上启动。
包括 JobManager、TaskManager、JobManagerService 三个镜像服务。每个镜像服务都可以从中央镜像仓库中获取。
启动Flink Session Cluster
# 启动 jobmanager-service 服务
kubectl create -f jobmanager-service.yaml
# 启动 jobmanager-deployment 服务
kubectl create -f jobmanager-deployment.yaml
# 启动 taskmanager-deployment 服务
kubectl create -f taskmanager-deployment.yaml
访问 Flink 111 页面
http://(JobManagerHost:Port)/api/v1/namespaces/default/services/flink-jobmanager:ui/proxy
JobManager
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。