当前位置:   article > 正文

Flink - 尚硅谷- 大数据高级 Flink 技术精讲 - 1

Flink - 尚硅谷- 大数据高级 Flink 技术精讲 - 1

Flink - 尚硅谷- 大数据高级 Flink 技术精讲

注:次文档参考 【尚硅谷】大数据高级 flink技术精讲(2020年6月) 编写。

1.由于视频中并未涉及到具体搭建流程,Flink 环境搭建部分并未编写。
2.视频教程 Flink 版本为 1.10.0,此文档根据 Flink v1.11.1 进行部分修改。
3.文档中大部分程序在 Windows 端运行会有超时异常,需要打包后在 Linux 端运行。
4.程序运行需要的部分 Jar 包,请很具情况去掉 pom 中的 “scope” 标签的再进行打包,才能在集群上运行。
5.原始文档在 Markdown 中编写,此处目录无法直接跳转。且因字数限制,分多篇发布
此文档仅用作个人学习,请勿用于商业获利。

一、Flink 简介

概念

Flink 是一个 框架分布式处理引擎,用于对 无界和有界数据流 进行 状态 计算。

为什么选择 Flink

  • 传统的数据架构是基于有限数据集的
  • 流数据更真实地反映我们的生活方式
  • 相较于 Spark 的微批处理,Flink 做到了真正的流式处理,且 Flink 包含了批处理和流处理两种处理引擎
  • Flink 的目标
    • 低延迟 : 来一条数据处理一条
    • 高吞吐 : 分布式的架构处理高吞吐的数据量
    • 结果的准确性和良好的容错性 : 因为网络延迟造成的乱序问题不会影响结果的准确性

哪些行业需要处理流数据

  • 电商和市场营销
    • 数据报表、广告投放、业务流程需要
  • 物联网(IOT)
    • 传感器实时数据采集和显示、实时报警、交通运输业
  • 电信业
    • 基站流量调配
  • 银行和金融业
    • 实时结算和通知推送,实时检测异常行为

Flink 主要特点

  • 事件驱动(Event-driven):来一条数据处理一条
  • 基于流的世界观:在 Flink 中,一切都是流,离线数据是有界的流实时数据是没有界限的流

分层 API

  • 越顶层越抽象,表达含义越简明,使用越发辫
  • 越底层越具体,表达能力越丰富,使用越灵活

Flink 分层 API

Flink 其他特点

  • 支持事件事件(event-time)和处理时间(processing-time)语义
  • 精确一次(exactly-once)的状态一致性保证
  • 低延迟,每秒处理百万个事件,毫秒级延迟
  • 与众多常用存储系统的连接
  • 高可用,动态扩展,实现 7*23 小时全天候运行

Flink VS SparkStreaming

  • 流(stream) 和 微批(micro–batching)
  • 数据模型
    • spark 采用 RDD 模型,spark streaming 和 DStream 实际上也就是一组组小批数据 RDD 的集合
    • Flink 基本数据模型是数据流,以及事件(Event)序列
  • 运行时架构
    • spark 是批计算,将 DAG 划分为不同的 stage,一个完成后才可以计算下一个
    • flink 是标准的流执行模型,一个事件在一个节点处理完后可以直接发往下一个节点进行处理

二、Quick Start

2.1 环境

2.1 Flink 安装包
# 创建用户
userdel -r flink && useradd flink && echo flink | passwd --stdin flink

# 下载
wget https://archive.apache.org/dist/flink/flink-1.11.1/flink-1.11.1-bin-scala_2.11.tgz
或
wget https://mirror.bit.edu.cn/apache/flink/flink-1.11.1/flink-1.11.1-bin-scala_2.11.tgz

# 解压并启动
tar -zxvf flink-1.11.1-bin-scala_2.11.tgz 
/home/flink/flink-1.11.1/bin/start-cluster.sh 

# UI
http://test01:8081/#/overview
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
2.2 nc
sudo yum -y install nc

# 使用 linux 的 nc 命令来向 socket 当中发送一些单词
nc -lk 7777
  • 1
  • 2
  • 3
  • 4
2.3 pom 配置
    <properties>
    </properties>

    <dependencies>
        <!--  ======== Flink Core ========  -->
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.11.1</version>
            <!-- 由于集群上已经有该 jar 包,若要上传到集群上执行,则去掉以下注释 -->
            <scope>provided</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.11.1</version>
            <!-- 由于集群上已经有该 jar 包,若要上传到集群上执行,则去掉以下注释 -->
            <scope>provided</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.11.1</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>


    <profiles>
        <profile>
            <id>dev</id>
            <activation>
                <activeByDefault>true</activeByDefault>
            </activation>
            <properties>
                <env>dev</env>
            </properties>
        </profile>

        <profile>
            <id>prod</id>

            <properties>
                <env>prod</env>
            </properties>
        </profile>
    </profiles>

    <build>
        <filters>
            <filter>src/main/resources/env/config-${env}.properties</filter>
        </filters>

        <resources>
            <resource>
                <directory>src/main/resources</directory>
                <includes>
                    <include>*.properties</include>
                    <include>*.txt</include>
                </includes>
                <excludes>
                    <exclude>*.xml</exclude>
                    <exclude>*.yaml</exclude>
                </excludes>
            </resource>
        </resources>

        <plugins>
            <!-- 编译 Scala 需要用到的插件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <configuration>
                    <addScalacArgs>-target:jvm-1.8</addScalacArgs>
                </configuration>
                <executions>
                    <execution>
                        <!-- 声明绑定到 maven 的 compile 阶段 -->
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>1.8</encoding>
                </configuration>
            </plugin>

            <!-- 项目打包需要用到的插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123

2.2 批处理

Code

package com.mso.flink.dataset

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._

object DataSetWordCount {
  def main(args: Array[String]): Unit = {
    // 创建一个批处理执行环境
    val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

    // 从文件中读取数据
    //    val resource: URL = getClass.getResource("/word.txt")
    //    val inputDataSet: DataSet[String] = environment.readTextFile(resource.getPath)
    val params: ParameterTool = ParameterTool.fromArgs(args)
    val inputDataSet: DataSet[String] = environment.readTextFile(params.get("input-path"))

    // 基于 DataSet 做转换,首先按空格拆分,然后按照 word 作为 key 做 groupBy 分组聚合
    val resultDataSet: AggregateDataSet[(String, Int)] = inputDataSet
      .flatMap((_: String).split(" ")) // 分词得到 word 构成的数据集
      .map(((_: String), 1)) // 转换成一个二元组 (word, count)
      .groupBy(0) // 以二元组中第一个元素作为 key 分组
      .sum(1) // 聚合二元组中第二个元素的值

    resultDataSet.printOnTaskManager("DataSetWordCount")
    environment.execute("DataSetWordCount")
    // ~/flink-1.11.1/bin/flink run -p 1 -c com.mso.flink.dataset.DataSetWordCount FlinkPractice-1.0-SNAPSHOT-jar-with-dependencies.jar --input-path /home/flink/word.txt
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

Run

 ~/flink-1.11.1/bin/flink run -p 1 -c com.mso.flink.dataset.DataSetWordCount FlinkPractice-1.0-SNAPSHOT-jar-with-dependencies.jar --input-path /home/flink/word.txt
  • 1

2.3 流处理

Code

package com.mso.flink.stream

import org.apache.flink.streaming.api.scala._

object StreamWordCount {
  def main(args: Array[String]): Unit = {
    // 创建流处理执行环境
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    // 接收 socket 文本流
    val inputSocketDataStream: DataStream[String] = environment.socketTextStream("test01", 7777)

    // 定义转换操作, word count
    val resultDataStream: DataStream[(String, Int)] = inputSocketDataStream
      .flatMap(_.split(" ")) // 分词得到 word 构成的数据集
      .filter(_.nonEmpty) // 过滤空集
      .map((_, 1)) // 转换成一个二元组 (word, count)
      .keyBy(0) // 以二元组中第一个元素作为 key 分组
      .sum(1) // 聚合二元组中第二个元素的值

    // 打印输出
    resultDataStream.print()

    // 提交执行
    environment.execute()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

Run

~/flink-1.11.1/bin/flink run -c com.mso.flink.stream.StreamWordCount -p 1  FlinkPractice-1.0-SNAPSHOT-jar-with-dependencies.jar 
  • 1

Result

tail -f flink-flink-taskexecutor-0-test01.out
  • 1

Stop

~/flink-1.11.1/bin/flink list 
~/flink-1.11.1/bin/flink cancel jobID
  • 1
  • 2

三、Flink 部署

3.1 Standalone 模式

规划

Node JobManager TaskManager JPS
test01 Y Y StandaloneSessionClusterEntrypoint、TaskManagerRunner
test02 N Y TaskManagerRunner
test03 N Y TaskManagerRunner

安装

# 修改 flink-conf.yaml
vim ~/flink-1.11.1/conf/flink-conf.yaml
#jobmanager.rpc.address: localhost
jobmanager.rpc.address: test01

# 修改 workers
vim ~/flink-1.11.1/conf/workers 
test01
test02
test03

# 免密
ssh-keygen
ssh-copy-id test02
ssh-copy-id test03

# 分发安装包
scp -r ~/flink-1.11.1/ flink@test02:
scp -r ~/flink-1.11.1/ flink@test03:

# 启动 Flink 集群
~/flink-1.11.1/bin/start-cluster.sh 

# WebUI 界面访问
http://test01:8081/#/overview
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

提交任务

    3.2 Yarn 模式

    3.2.1 Flink on Yarn

    Session-cluster 模式
    先启动集群,然后再提交作业。首先向 yarn 申请一块空间,之后资源永远保持不变,如果资源满了,下一个作业就无法提交。
    所有作业共享 Dispatcher 和 ResourceManager。
    适用于规模小且执行时间短的作业。

    Per-Job-Cluster
    每次提交 Job 都会对应一个 Flink 集群,每提交一个作业会根据自身的情况,都会单独向 yarn 申请资源,直到作业执行完成。

    3.2.2 Session Cluster
    # 启动
    ~/flink-1.11.1/bin/yarn-session.sh -n 2 -s 2 -jm 1024 -tm 1024 -nm test -d
    
    -n(--container) : TaskManager 的数量
    -s(--slots) : 每个 TaskManager 的 slot 数量,默认一个 slot 一个 core,默认每个 taskmanager 的 slot 的个数为 1,有时可以多一些
    -jm : JobManager 的内存(单位 MB)
    -tm : 每个 TaskManager 的内存(单位 MB)
    -nm : yarn 的 appName(现在 yarn 的 ui 上的名字)
    -d : 后台执行
    
    # 取消 yarn session
    yarn application --kill applicationId
    
    # 提交任务
    ~/flink-1.11.1/bin/flink run -m yarn-cluster -c com.mso.flink.stream.StreamWordCount -p 1  FlinkPractice-1.0-SNAPSHOT-jar-with-dependencies.jar 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    3.2.3 Per Job Cluster
    # 提交任务
    ~/flink-1.11.1/bin/flink run -c com.mso.flink.stream.StreamWordCount -p 1  FlinkPractice-1.0-SNAPSHOT-jar-with-dependencies.jar 
    
    • 1
    • 2

    3.3 Kubernets 部署

    搭建 Kubernetes 集群

    配置各组件的 yaml 文件
    在 k8s 上构建 Flink Session Cluster,需要将 Flink 集群的组件对应的 docker 镜像分别在 k8s 上启动。
    包括 JobManager、TaskManager、JobManagerService 三个镜像服务。每个镜像服务都可以从中央镜像仓库中获取。

    启动Flink Session Cluster

    # 启动 jobmanager-service 服务
    kubectl create -f jobmanager-service.yaml
    # 启动 jobmanager-deployment 服务
    kubectl create -f jobmanager-deployment.yaml
    # 启动 taskmanager-deployment 服务
    kubectl create -f taskmanager-deployment.yaml
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    访问 Flink 111 页面

    http://(JobManagerHost:Port)/api/v1/namespaces/default/services/flink-jobmanager:ui/proxy
    
    • 1

    四、Flink 运行时架构

    4.1 Flink 运行时的组件

    • JobManager : 作业管理器
    • TaskManager : 任务管理器
    • ResourceManager : 资源管理器
    • Dispacher : 分发器

    JobManager

    • 控制一个应用程序执行的主要进程,每个应用程序都会被一个不同的 JobManager 所控制执行
    • JobManager 会先接收到要执行的应用程序,这个应用程序会包括:作业图(JobGraph)、逻辑数据流图(logical dataflow graph)和打包了所有的类库和其他资源的 JAR 包
    • JobManager 会把 JobGraph 转换成一个物理层面的数据流图,这个图被叫做执行图(Ex
    声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/554821
    推荐阅读
    相关标签
      

    闽ICP备14008679号