当前位置:   article > 正文

2021-03-08~09~10~11~12 大数据课程笔记 day47day48day49day50day51_flink取出dataframe最后一个值

flink取出dataframe最后一个值

时间煮雨
@R星校长

大数据技术之Flink

第一章 初识Flink

  在当前数据量激增的时代,各种业务场景都有大量的业务数据产生,对于这些不断产生的数据应该如何进行有效的处理,成为当下大多数公司所面临的问题。目前比较流行的大数据处理引擎Apache Spark,基本上已经取代了MapReduce成为当前大数据处理的标准。但对实时数据处理来说,Apache Spark的Spark-Streaming还有性能改进的空间。对于Spark-Streaming的流计算本质上还是批(微批)计算,Apache Flink就是近年来在开源社区不断发展的技术中的能够同时支持高吞吐、低延迟、高性能的纯实时的分布式处理框架。

1. Flink是什么?

  1) Flink 的发展历史

  在2010年至2014年间,由柏林工业大学、柏林洪堡大学和哈索普拉特纳研究所联合发起名为“Stratosphere:Information Management on the Cloud”研究项目,该项目在当时的社区逐渐具有了一定的社区知名度。2014年4月,Stratosphere代码被贡献给Apache软件基金会,成为Apache基金会孵化器项目。初期参与该项目的核心成员均是Stratosphere曾经的核心成员,之后团队的大部分创始成员离开学校,共同创办了一家名叫Data Artisans的公司,其主要业务便是将Stratosphere,也就是之后的Flink实现商业化。在项目孵化期间,项目Stratosphere改名为Flink。Flink在德语中是快速和灵敏的意思,用来体现流式数据处理器速度快和灵活性强等特点,同时使用棕红色松鼠图案作为Flink项目的Logo,也是为了突出松鼠灵活快速的特点,由此,Flink正式进入社区开发者的视线。在这里插入图片描述
  2014年12月,该项目成为Apache软件基金会顶级项目,从2015年9月发布第一个稳定版本0.9,到目前为止已经发布到1.11的版本,更多的社区开发成员逐步加入,现在Flink在全球范围内拥有350多位开发人员,不断有新的特性发布。同时在全球范围内,越来越多的公司开始使用Flink,在国内比较出名的互联网公司如阿里巴巴、美团、滴滴等,都在大规模使用Flink作为企业的分布式大数据处理引擎。

  2) Flink的定义

  Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。
  Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.

  3) 有界流和无界流

  任何类型的数据都可以形成一种事件流。信用卡交易、传感器测量、机器日志、网站或移动应用程序上的用户交互记录,所有这些数据都形成一种流。
  无界流: 有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流的数据必须持续处理,即数据被摄取后需要立刻处理。我们不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。处理无界数据通常要求以特定顺序摄取事件,例如事件发生的顺序,以便能够推断结果的完整性。
  有界流: 有定义流的开始,也有定义流的结束。有界流可以在摄取所有数据后再进行计算。有界流所有数据可以被排序,所以并不需要有序摄取。有界流处理通常被称为批处理。在这里插入图片描述
  Apache Flink 擅长处理无界和有界数据集 精确的时间控制和状态化使得 Flink 的运行时(runtime)能够运行任何处理无界流的应用。有界流则由一些专为固定大小数据集特殊设计的算法和数据结构进行内部处理,产生了出色的性能。

  4) 有状态的计算架构

  数据产生的本质,其实是一条条真实存在的事件按照时间顺序源源不断的产生,我们很难在数据产生的过程中进行计算并直接产生统计结果,因为这不仅对系统有非常高的要求,还必须要满足高性能、高吞吐、低延时等众多目标。而有状态流计算架构(如图所示)的提出,从一定程度上满足了企业的这种需求,企业基于实时的流式数据,维护所有计算过程的状态,所谓状态就是计算过程中产生的中间计算结果,每次计算新的数据进入到流式系统中都是基于中间状态结果的基础上进行运算,最终产生正确的统计结果。基于有状态计算的方式最大的优势是不需要将原始数据重新从外部存储中拿出来,从而进行全量计算,因为这种计算方式的代价可能是非常高的。从另一个角度讲,用户无须通过调度和协调各种批量计算工具,从数据仓库中获取数据统计结果,然后再落地存储,这些操作全部都可以基于流式计算完成,可以极大地减轻系统对其他框架的依赖,减少数据计算过程中的时间损耗以及硬件存储。在这里插入图片描述

2. 为什么要使用 Flink

  可以看出有状态流计算将会逐步成为企业作为构建数据平台的架构模式,而目前从社区来看,能够满足的只有Apache Flink。Flink通过实现Google Dataflow流式计算模型实现了高吞吐、低延迟、高性能兼具实时流式计算框架。同时Flink支持高度容错的状态管理,防止状态在计算过程中因为系统异常而出现丢失,Flink周期性地通过分布式快照技术Checkpoints实现状态的持久化维护,使得即使在系统停机或者异常的情况下都能计算出正确的结果。
  自 2019 年 1 月起,阿里巴巴逐步将内部维护的 Blink 回馈给 Flink 开源社区,目前贡献代码数量已超过 100 万行。国内包括腾讯、百度、字节跳动等公司,国外包括 Uber、Lyft、Netflix 等公司都是 Flink 的使用者。在这里插入图片描述

3. Flink 的应用场景

  在实际生产的过程中,大量数据在不断地产生,例如金融交易数据、互联网订单数据、GPS定位数据、传感器信号、移动终端产生的数据、通信信号数据等,以及我们熟悉的网络流量监控、服务器产生的日志数据,这些数据最大的共同点就是实时从不同的数据源中产生,然后再传输到下游的分析系统。针对这些数据类型主要包括实时智能推荐、复杂事件处理、实时欺诈检测、实时数仓与ETL类型、流数据分析类型、实时报表类型等实时业务场景,而Flink对于这些类型的场景都有着非常好的支持。

  (一)实时智能推荐

  智能推荐会根据用户历史的购买行为,通过推荐算法训练模型,预测用户未来可能会购买的物品。对个人来说,推荐系统起着信息过滤的作用,对Web/App服务端来说,推荐系统起着满足用户个性化需求,提升用户满意度的作用。推荐系统本身也在飞速发展,除了算法越来越完善,对时延的要求也越来越苛刻和实时化。利用Flink流计算帮助用户构建更加实时的智能推荐系统,对用户行为指标进行实时计算,对模型进行实时更新,对用户指标进行实时预测,并将预测的信息推送给Wep/App端,帮助用户获取想要的商品信息,另一方面也帮助企业提升销售额,创造更大的商业价值。

  (二)复杂事件处理

  对于复杂事件处理,比较常见的案例主要集中于工业领域,例如对车载传感器、机械设备等实时故障检测,这些业务类型通常数据量都非常大,且对数据处理的时效性要求非常高。通过利用Flink提供的CEP(复杂事件处理)进行事件模式的抽取,同时应用Flink的Sql进行事件数据的转换,在流式系统中构建实时规则引擎,一旦事件触发报警规则,便立即将告警结果传输至下游通知系统,从而实现对设备故障快速预警监测,车辆状态监控等目的。

  (三)实时欺诈检测

  在金融领域的业务中,常常出现各种类型的欺诈行为,例如信用卡欺诈、信贷申请欺诈等,而如何保证用户和公司的资金安全,是来近年来许多金融公司及银行共同面对的挑战。随着不法分子欺诈手段的不断升级,传统的反欺诈手段已经不足以解决目前所面临的问题。以往可能需要几个小时才能通过交易数据计算出用户的行为指标,然后通过规则判别出具有欺诈行为嫌疑的用户,再进行案件调查处理,在这种情况下资金可能早已被不法分子转移,从而给企业和用户造成大量的经济损失。而运用Flink流式计算技术能够在毫秒内就完成对欺诈判断行为指标的计算,然后实时对交易流水进行规则判断或者模型预测,这样一旦检测出交易中存在欺诈嫌疑,则直接对交易进行实时拦截,避免因为处理不及时而导致的经济损失。

  (四)实时数仓与ETL

  结合离线数仓,通过利用流计算诸多优势和SQL灵活的加工能力,对流式数据进行实时清洗、归并、结构化处理,为离线数仓进行补充和优化。另一方面结合实时数据ETL处理能力,利用有状态流式计算技术,可以尽可能降低企业由于在离线数据计算过程中调度逻辑的复杂度,高效快速地处理企业需要的统计结果,帮助企业更好地应用实时数据所分析出来的结果。

  (五)流数据分析

  实时计算各类数据指标,并利用实时结果及时调整在线系统相关策略,在各类内容投放、无线智能推送领域有大量的应用。流式计算技术将数据分析场景实时化,帮助企业做到实时化分析Web应用或者App应用的各项指标,包括App版本分布情况、Crash检测和分布等,同时提供多维度用户行为分析,支持日志自主分析,助力开发者实现基于大数据技术的精细化运营、提升产品质量和体验、增强用户黏性。

  (六)实时报表分析

  实时报表分析是近年来很多公司采用的报表统计方案之一,其中最主要的应用便是实时大屏展示。利用流式计算实时得出的结果直接被推送到前端应用,实时显示出重要指标的变换情况。最典型的案例便是淘宝的双十一活动,每年双十一购物节,除疯狂购物外,最引人注目的就是天猫双十一大屏不停跳跃的成交总额。在整个计算链路中包括从天猫交易下单购买到数据采集、数据计算、数据校验,最终落到双十一大屏上展现的全链路时间压缩在5秒以内,顶峰计算性能高达数三十万笔订单/秒,通过多条链路流计算备份确保万无一失。而在其他行业,企业也在构建自己的实时报表系统,让企业能够依托于自身的业务数据,快速提取出更多的数据价值,从而更好地服务于企业运行过程中。

4. Flink 的特点和优势

1) Flink 的具体优势和特点有以下几点

 (一)同时支持高吞吐、低延迟、高性能

  Flink是目前开源社区中唯一一套集高吞吐、低延迟、高性能三者于一身的分布式流式数据处理框架。像Apache Spark也只能兼顾高吞吐和高性能特性,主要因为在Spark Streaming流式计算中无法做到低延迟保障;而流式计算框架Apache Storm只能支持低延迟和高性能特性,但是无法满足高吞吐的要求。而满足高吞吐、低延迟、高性能这三个目标对分布式流式计算框架来说是非常重要的。

 (二)支持事件时间(Event Time)概念

  在流式计算领域中,窗口计算的地位举足轻重,但目前大多数框架窗口计算采用的都是系统时间(Process Time),也是事件传输到计算框架处理时,系统主机的当前时间。Flink能够支持基于事件时间(Event Time)语义进行窗口计算,也就是使用事件产生的时间,这种基于事件驱动的机制使得事件即使乱序到达,流系统也能够计算出精确的结果,保持了事件原本产生时的时序性,尽可能避免网络传输或硬件系统的影响。

 (三)支持有状态计算

  Flink在1.4版本中实现了状态管理,所谓状态就是在流式计算过程中将算子的中间结果数据保存在内存或者文件系统中,等下一个事件进入算子后可以从之前的状态中获取中间结果中计算当前的结果,从而无须每次都基于全部的原始数据来统计结果,这种方式极大地提升了系统的性能,并降低了数据计算过程的资源消耗。对于数据量大且运算逻辑非常复杂的流式计算场景,有状态计算发挥了非常重要的作用。

 (四)支持高度灵活的窗口(Window)操作

  在流处理应用中,数据是连续不断的,需要通过窗口的方式对流数据进行一定范围的聚合计算,例如统计在过去的1分钟内有多少用户点击某一网页,在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行再计算。Flink将窗口划分为基于Time、Count、Session,以及Data-driven等类型的窗口操作,窗口可以用灵活的触发条件定制化来达到对复杂的流传输模式的支持,用户可以定义不同的窗口触发机制来满足不同的需求。

 (五)基于轻量级分布式快照(CheckPoint)实现的容错

  Flink能够分布式运行在上千个节点上,将一个大型计算任务的流程拆解成小的计算过程,然后将tesk分布到并行节点上进行处理。在任务执行过程中,能够自动发现事件处理过程中的错误而导致数据不一致的问题,比如:节点宕机、网路传输问题,或是由于用户因为升级或修复问题而导致计算服务重启等。在这些情况下,通过基于分布式快照技术的Checkpoints,将执行过程中的状态信息进行持久化存储,一旦任务出现异常停止,Flink就能够从Checkpoints中进行任务的自动恢复,以确保数据在处理过程中的一致性(Exactly-Once)。

 (六)基于JVM实现独立的内存管理

  内存管理是所有计算框架需要重点考虑的部分,尤其对于计算量比较大的计算场景,数据在内存中该如何进行管理显得至关重要。针对内存管理,Flink实现了自身管理内存的机制,尽可能减少JVM GC对系统的影响。另外,Flink通过序列化/反序列化方法将所有的数据对象转换成二进制在内存中存储,降低数据存储的大小的同时,能够更加有效地对内存空间进行利用,降低GC带来的性能下降或任务异常的风险,因此Flink较其他分布式处理的框架会显得更加稳定,不会因为JVM GC等问题而影响整个应用的运行。

 (七)Save Points(保存点)

  对于7*24小时运行的流式应用,数据源源不断地接入,在一段时间内应用的终止有可能导致数据的丢失或者计算结果的不准确,例如进行集群版本的升级、停机运维操作等操作。值得一提的是,Flink通过Save Points技术将任务执行的快照保存在存储介质上,当任务重启的时候可以直接从事先保存的Save Points恢复原有的计算状态,使得任务继续按照停机之前的状态运行,Save Points技术可以让用户更好地管理和运维实时流式应用。

2) 流式计算框架的对比

  Storm是比较早的流式计算框架,后来又出现了Spark Streaming和Trident,现在又出现了Flink这种优秀的实时计算框架,那么这几种计算框架到底有什么区别呢?

产品 模型 API 保证次数 容错机制 状态管理 延时 吞吐量
Strom Native(数据进入立即处理) 组合式(基础API) At-least-once(至少一次) ACK机制
Trident Mico-Batching(划分小批次处理) 组合式 Exactly-once(仅一次) ACK机制 基于每次操作都有一个状态 中等 中等
SparkStreaming Mico-Batching(划分小批次处理) 声明式(有封装好的高级API) Exactly-once 基于RDD做checkpoint 基于DStream 中等
Flink Native 声明式(有封装好的高级API) Exactly-once Flink checkpoint 基于操作
  • 模型:Storm和Flink是真正的一条一条处理数据;而Trident(Storm的封装框架)和Spark Streaming其实都是小批处理,一次处理一批数据(小批量)。
  • API:Storm和Trident都使用基础API进行开发,比如实现一个简单的sum求和操作;而Spark Streaming和Flink中都提供封装后的高阶函数,可以直接来使用,非常方便。
  • 保证次数:在数据处理方面,Storm可以实现至少处理一次,但不能保证仅处理一次,这样就会导致数据重复处理问题,所以针对计数类的需求,可能会产生一些误差;Trident通过事务可以保证对数据实现仅一次的处理,Spark Streaming和Flink也是如此。
  • 容错机制:Storm和Trident可以通过ACK机制实现数据的容错机制,而Spark Streaming和Flink可以通过CheckPoint机制实现容错机制。
  • 状态管理:Storm中没有实现状态管理,Spark Streaming实现了基于DStream的状态管理,而Trident和Flink实现了基于操作的状态管理。
  • 延时:表示数据处理的延时情况,因此Storm和Flink接收到一条数据就处理一条数据,其数据处理的延时性是很低的;而Trident和Spark Streaming都是小型批处理,它们数据处理的延时性相对会偏高。
  • 吞吐量:Storm的吞吐量其实也不低,只是相对于其他几个框架而言较低;Trident属于中等;而Spark Streaming和Flink的吞吐量是比较高的。
    在这里插入图片描述

第二章 Flink快速入门

1. Flink的开发环境

 Flink课程选择的是Apache Flink 1.9.1 版本,是目前最新的稳定版本,并且兼容性比较好。下载地址:
https://flink.apache.org/zh/downloads.html

1) 开发工具

  先说明一下开发工具的问题。官方建议使用IntelliJ IDEA,因为它默认集成了Scala和Maven环境,使用更加方便,当然使用Eclipse也是可以的。我们这门课使用IDEA。开发Flink程序时,可以使用Java、Python或者Scala语言,本课程全部使用Scala,因为使用Scala实现函数式编程会比较简洁。学生可以在课后自己补充JAVA代码。

2) 配置依赖

  开发 Flink 应用程序需要最低限度的 API 依赖。最低的依赖库包括:flink-scala和flink-streaming-scala。大多数应用需要依赖特定的连接器或其他类库,例如 Kafka的连接器、TableAPI、CEP库等。这些不是 Flink 核心依赖的一部分,因此必须作为依赖项手动添加到应用程序中。

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.9.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.9.1</version>
        </dependency>

    <build>
        <plugins>
            <!-- 该插件用于将Scala代码编译成class文件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <executions>
                    <execution>
                        <!-- 声明绑定到maven的compile阶段 -->
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48

2. 第一个 Flink 流处理(Streaming)案例

  创建项目,并且修改源代码目录为scala在这里插入图片描述在这里插入图片描述
案例需求:采用Netcat 数据源发送数据,使用Flink统计每个单词的数量。
注意: Flink流式处理数据时,需要导入隐式转换:org.apache.flink.streaming.api.scala._

package com.bjsxt.flink

import org.apache.flink.streaming.api.scala.{
   DataStream, StreamExecutionEnvironment}

//基于流计算的WordCount案例
object StreamWordCount {
   

  def main(args: Array[String]): Unit = {
   

    //初始化Flink的Streaming(流计算)上下文执行环境
    val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //导入隐式转换,建议写在这里,可以防止IDEA代码提示出错的问题
    import org.apache.flink.streaming.api.scala._

    //读取数据
    val stream: DataStream[String] = streamEnv.socketTextStream("mynode5",8888)

    //转换计算
    val result: DataStream[(String, Int)] = stream.flatMap(_.split(","))
      .map((_, 1))
      .keyBy(0)
      .sum(1)

    //打印结果到控制台
    result.print()

    //启动流式处理,如果没有该行代码上面的程序不会运行
    streamEnv.execute("wordcount")
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

 在Linux系统中使用nc命令发送数据测试

 nc  -lk  8888
  • 1

在这里插入图片描述

3. 第一个Flink批处理(Batch)案例

需求:读取本地数据文件,统计文件中每个单词出现的次数。
根据需求,很明显是有界流(批计算),所以采用另外一个上下文环境:ExecutionEnvironment

package com.bjsxt.flink

import java.net.URL

import org.apache.flink.api.scala.ExecutionEnvironment

object BatchWordCount {
   

  def main(args: Array[String]): Unit = {
   
    //初始化flink的环境
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

    //导入隐式转换,建议写在这里,可以防止IDEA代码提示出错的问题
    import org.apache.flink.api.scala._

    //读取数据
    val dataURL = getClass.getResource("/wc.txt")//wc.txt文件在main目录下的resources中
    val data: DataSet[String] = env.readTextFile(dataURL.getPath)

    //计算
    val result: AggregateDataSet[(String, Int)] = data.flatMap(_.split(" "))
      .map((_, 1))
      .groupBy(0)  //其中0代表元组中的下标,“0”下标代表:单词
      .sum(1)      //其中1代表元组中的下标,“1”下标代表:单词出现的次数

    //打印结果
    result.print()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

第三章 Flink 的安装和部署

  Flink的安装和部署主要分为本地(单机)模式和集群模式,其中本地模式只需直接解压就可以使用,不以修改任何参数,一般在做一些简单测试的时候使用。本地模式在我们的课程里面不再赘述。集群模式包含:

  • Standalone。
  • Flink on Yarn。
  • Mesos。
  • Docker。
  • Kubernetes。
  • AWS。
  • Goole Compute Engine。

目前在企业中使用最多的是Flink on Yarn模式。我们的课程中讲Standalone和Flink on Yarn这两种模式。

1. 集群基本架构

 Flink整个系统主要由两个组件组成,分别为JobManager和TaskManager,Flink架构也遵循Master-Slave架构设计原则,JobManager为Master节点,TaskManager为Worker(Slave)节点。所有组件之间的通信都是借助于Akka Framework,包括任务的状态以及Checkpoint触发等信息。在这里插入图片描述
1) Client客户端

  客户端负责将任务提交到集群,与JobManager构建Akka连接,然后将任务提交到JobManager,通过和JobManager之间进行交互获取任务执行状态。客户端提交任务可以采用CLI方式或者通过使用Flink WebUI提交,也可以在应用程序中指定JobManager的RPC网络端口构建ExecutionEnvironment提交Flink应用。

2) JobManager

  JobManager负责整个Flink集群任务的调度以及资源的管理,从客户端中获取提交的应用,然后根据集群中TaskManager上TaskSlot的使用情况,为提交的应用分配相应的TaskSlots资源并命令TaskManger启动从客户端中获取的应用。JobManager相当于整个集群的Master节点,且整个集群中有且仅有一个活跃的JobManager,负责整个集群的任务管理和资源管理。JobManager和TaskManager之间通过Actor System进行通信,获取任务执行的情况并通过Actor System将应用的任务执行情况发送给客户端。同时在任务执行过程中,Flink JobManager会触发Checkpoints操作,每个TaskManager节点收到Checkpoint触发指令后,完成Checkpoint操作,所有的Checkpoint协调过程都是在Flink JobManager中完成。当任务完成后,Flink会将任务执行的信息反馈给客户端,并且释放掉TaskManager中的资源以供下一次提交任务使用。

3) TaskManager

  TaskManager相当于整个集群的Slave节点,负责具体的任务执行和对应任务在每个节点上的资源申请与管理。客户端通过将编写好的Flink应用编译打包,提交到JobManager,然后JobManager会根据已经注册在JobManager中TaskManager的资源情况,将任务分配给有资源的TaskManager节点,然后启动并运行任务。TaskManager从JobManager接收需要部署的任务,然后使用Slot资源启动Task,建立数据接入的网络连接,接收数据并开始数据处理。同时TaskManager之间的数据交互都是通过数据流的方式进行的。
  可以看出,Flink的任务运行其实是采用多线程的方式,这和MapReduce多JVM进程的方式有很大的区别Fink能够极大提高CPU使用效率,在多个任务和Task之间通过TaskSlot方式共享系统资源,每个TaskManager中通过管理多个TaskSlot资源池进行对资源进行有效管理。

2. Standalone集群安装和部署

  Standalone是Flink的独立部署模式,它不依赖其他平台。在使用这种模式搭建Flink集群之前,需要先规划集群机器信息。在这里为了搭建一个标准的Flink集群,这里准备3台Linux机器,如图下所示。在这里插入图片描述
1) 解压 Flink 的压缩包在这里插入图片描述
2) 修改配置文件

  ① 进入到conf目录下,编辑flink-conf.yaml配置文件:在这里插入图片描述
  其中:taskmanager.numberOfTaskSlot 参数默认值为1,修改成3。表示数每一个TaskManager上有3个Slot。

  ② 编辑conf/slaves配置文件在这里插入图片描述
3) 分发给另外两台服务器在这里插入图片描述
4) 启动 Flink 集群服务在这里插入图片描述

5) 访问 WebUI在这里插入图片描述
6) 通过命令提交 job 到集群

  ① 把上一章节中第一个 Flink 流处理案例代码打包,并上传在这里插入图片描述
  ② 执行命令: 在执行命令之前先确保 nc -lk 8888 是否启动在这里插入图片描述
其中-d选项表示提交job之后,客户端结束并退出。之后输入测试数据在这里插入图片描述
③ 查看job执行结果在这里插入图片描述
在这里插入图片描述
然后去 hadoop101 的 TaskManager 上查看最后的结果:在这里插入图片描述
7) 通过 WebUI 提交 job 到集群在这里插入图片描述
注意:通过webui上传的jar包会默认放在web.tmpdir目录下,这个目录在/tmp/flink-web-UUID组成,可以在jobManager的webui中查看,每次集群重启后这个目录会被删除重建,可以修改这个目录保存之前上传的jar包。

8) 配置文件参数说明

 下面针对 flink-conf.yaml 文件中的几个重要参数进行分析:

  • jobmanager.heap.size:JobManager节点可用的内存大小。
  • taskmanager.heap.size:TaskManager节点可用的内存大小。
  • taskmanager.numberOfTaskSlots:每台机器可用的Slot数量。
  • parallelism.default:默认情况下Flink任务的并行度。

 上面参数中所说的 Slot 和 parallelism 的区别:

  • Slot是静态的概念,是指TaskManager具有的并发执行能力。
  • parallelism是动态的概念,是指程序运行时实际使用的并发能力。
  • 设置合适的parallelism能提高运算效率。

3. Flink提交到Yarn

 Flink on Yarn模式的原理是依靠YARN来调度Flink任务,目前在企业中使用较多。这种模式的好处是可以充分利用集群资源,提高集群机器的利用率,并且只需要1套Hadoop集群,就可以执行MapReduce和Spark任务,还可以执行Flink任务等,操作非常方便,不需要维护多套集群,运维方面也很轻松。Flink on Yarn模式需要依赖Hadoop集群,并且Hadoop的版本需要是2.2及以上。我们的课程里面选择的Hadoop版本是2.7.5。

 Flink On Yarn的内部实现原理:在这里插入图片描述

  • 当启动一个新的Flink YARN Client会话时,客户端首先会检查所请求的资源(容器和内存)是否可用。之后,它会上传Flink配置和JAR文件到HDFS。
  • 客户端的下一步是请求一个YARN容器启动ApplicationMaster。JobManager和ApplicationMaster(AM)运行在同一个容器中,一旦它们成功地启动了,AM就能够知道JobManager的地址,它会为TaskManager生成一个新的Flink配置文件(这样它才能连上JobManager),该文件也同样会被上传到HDFS。另外,AM容器还提供了Flink的Web界面服务。Flink用来提供服务的端口是由用户和应用程序ID作为偏移配置的,这使得用户能够并行执行多个YARN会话。
  • 之后,AM开始为Flink的TaskManager分配容器(Container),从HDFS下载JAR文件和修改过的配置文件。一旦这些步骤完成了,Flink就安装完成并准备接受任务了。

 Flink on Yarn模式在使用的时候又可以分为两种:

  • 第1种模式(Session-Cluster):是在YARN中提前初始化一个Flink集群(称为Flink yarn-session),开辟指定的资源,以后的Flink任务都提交到这里。这个Flink集群会常驻在YARN集群中,除非手工停止(yarn application -kill id),当手动停止yarn application对应的id时,运行在当前application上的所有flink任务都会被kill。这种方式创建的Flink集群会独占资源,不管有没有Flink任务在执行,YARN上面的其他任务都无法使用这些资源。

在这里插入图片描述

  • 第2种模式(Per-Job-Cluster):每次提交Flink任务都会创建一个新的Flink集群,每个Flink任务之间相互独立、互不影响,管理方便。任务执行完成之后创建的Flink集群也会消失,不会额外占用资源,按需使用,这使资源利用率达到最大,在工作中推荐使用这种模式。当杀掉一个当前yarn flink任务时,不会影响其他flink任务执行。

在这里插入图片描述
注意:Flink on Yarn 还需要以下先决条件:

  • 配置Hadoop的环境变量
  • 关闭yarn的虚拟内存检查。

在每台 nodemanager 节点上 $HADOOP_HOME/etc/hadoop/yarn-site.xml中配置如下配置:

<property>
 <name>yarn.nodemanager.vmem-check-enabled</name>
   <value>false</value>
 </property>
  • 1
  • 2
  • 3
  • 4
  • 下载Flink提交到Hadoop的连接器(jar包),并把jar拷贝到Flink的lib目录下
    注意:可以将这个jar包拷贝到所有的flink节点上lib目录下,也可以只是拷贝到对应的提交任务的flink节点上。

在这里插入图片描述

1) Session-Cluster 模式(yarn-session)

 ① 先启动Hadoop集群,然后通过命令启动一个Flink的yarn-session集群:

  bin/yarn-session.sh  -n 3 -s 3 -nm bjsxt  -d
  • 1

在这里插入图片描述
其中yarn-session.sh后面支持多个参数。下面针对一些常见的参数进行讲解:

  • -n,–container <arg> 表示分配容器的数量(也就是TaskManager的数量)。目前版本yarn-session.sh提交任务中,经过测试发现,-n参数是不起作用的。
  • -D <arg> 动态属性。
  • -d,–detached在后台独立运行。
  • -jm,–jobManagerMemory <arg>:设置JobManager的内存,单位是MB。
  • -nm,–name:在YARN上为一个自定义的应用设置一个名字。
  • -q,–query:显示YARN中可用的资源(内存、cpu核数)。
  • -qu,–queue <arg>:指定YARN队列。
  • -s,–slots <arg>:每个TaskManager使用的Slot数量。
  • -tm,–taskManagerMemory <arg>:每个TaskManager的内存,单位是MB。
  • -z,–zookeeperNamespace <arg>:针对HA模式在ZooKeeper上创建NameSpace。
  • -id,–applicationId <yarnAppId>:指定YARN集群上的任务ID,附着到一个后台独立运行的yarn session中。

在这里插入图片描述

 ② 查看WebUI: 由于还没有提交Flink job,所以都是0。在这里插入图片描述
这个时候注意查看本地文件系统中有一个临时文件。有了这个文件可以提交 job 到 Yarn在这里插入图片描述
 ③ 提交Job : 由于有了之前的配置,所以自动会提交到Yarn中。

  bin/flink run -c com.bjsxt.flink.StreamWordCount /home/Flink-Demo-1.0-SNAPSHOT.jar
  • 1

在这里插入图片描述在这里插入图片描述
 注意:如果删除目录/tmp/.yarn-properties-root文件,那么再按照以上命令提交任务,将会是寻找Standalone模式中的jobManager节点提交,如果想要重新提交到当前yarn-session中可以使用-yid命令指定对应的yarn application的id,命令如下:

./flink run -yid  application_1598346048136_0002  -c com.lw.scala.myflink.streaming.example.FlinkReadSocketData  /root/test/MyFlink-1.0-SNAPSHOT-jar-with-dependencies.jar
  • 1

 至此第一种模式全部完成。

2) Pre-Job-Cluster 模式(yarn-cluster

 这种模式下不需要先启动yarn-session。所以我们可以把前面启动的yarn-session集群先停止,停止的命令是:

yarn application -kill application_1576832892572_0002
//其中 application_1576832892572_0002 是ID
  • 1
  • 2

 确保Hadoop集群是健康的情况下直接提交Job命令:

bin/flink run -m yarn-cluster -yn 3 -ys 3 -ynm bjsxt02 -c com.bjsxt.flink.StreamWordCount /home/Flink-Demo-1.0-SNAPSHOT.jar
  • 1

 可以看到一个全新的yarn-session在这里插入图片描述
任务提交参数讲解:相对于Yarn-Session参数而言,只是前面加了y。

  • -yn,–container <arg> 表示分配容器的数量,也就是TaskManager的数量。目前版本yarn-cluster提交任务中,经过测试发现,-yn参数是不起作用的。
  • -d,–detached:设置在后台运行。
  • -yjm,–jobManagerMemory<arg>:设置JobManager的内存,单位是MB。
  • -ytm,–taskManagerMemory<arg>:设置每个TaskManager的内存,单位是MB。
  • -ynm,–name:给当前Flink application在Yarn上指定名称。
  • -yq,–query:显示yarn中可用的资源(内存、cpu核数)
  • -yqu,–queue<arg> :指定yarn资源队列
  • -ys,–slots<arg> :每个TaskManager使用的Slot数量。
  • -yz,–zookeeperNamespace<arg>:针对HA模式在Zookeeper上创建NameSpace
  • -yid,–applicationID<yarnAppId> : 指定Yarn集群上的任务ID,附着到一个后台独立运行的Yarn Session中。

4. Flink的HA

 默认情况下,每个Flink集群只有一个JobManager,这将导致单点故障(SPOF),如果这个JobManager挂了,则不能提交新的任务,并且运行中的程序也会失败。使用JobManager HA,集群可以从JobManager故障中恢复,从而避免单点故障。用户可以在Standalone或Flink on Yarn集群模式下配置Flink集群HA(高可用性)。
 Standalone模式下,JobManager的高可用性的基本思想是,任何时候都有一个Alive JobManager和多个Standby JobManager。Standby JobManager可以在Alive JobManager挂掉的情况下接管集群成为Alive JobManager,这样避免了单点故障,一旦某一个Standby JobManager接管集群,程序就可以继续运行。Standby JobManagers和Alive JobManager实例之间没有明确区别,每个JobManager都可以成为Alive或Standby。在这里插入图片描述
1) Flink Standalone集群的HA安装和配置(目前测试1.9版本有bug)

  实现HA还需要依赖ZooKeeper和HDFS,因此要有一个ZooKeeper集群和Hadoop集群,首先启动Zookeeper集群和HDFS集群。我们的课程中分配3台JobManager,如下表:

hadoop101 hadoop102 hadoop103
JobManager JobManager JobManager
TaskManager TaskManager TaskManager

  ① 修改配置文件conf/masters在这里插入图片描述
  ② 修改配置文件conf/flink-conf.yaml

#要启用高可用,设置修改为zookeeper
high-availability: zookeeper
#Zookeeper的主机名和端口信息,多个参数之间用逗号隔开
high-availability.zookeeper.quorum: hadoop103:2181,hadoop101:2181,hadoop102:2181
# 建议指定HDFS的全路径。如果某个Flink节点没有配置HDFS的话,不指定HDFS的全路径则无法识到,storageDir存储了恢复一个JobManager所需的所有元数据。这里如果指定hdfs路径需要在每台节点上配置hadoop的依赖包flink-shaded-hadoop-2-uber-2.7.5-10.0.jar。
high-availability.storageDir: hdfs://mycluster/flink/ha
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

  ③ 把修改的配置文件拷贝其他服务器中

[root@hadoop101 ]# scp -r ./flink-xxx root@hadoop102:`pwd`
[root@hadoop101 ]# scp -r ./flink-xxx root@hadoop103:`pwd`
  • 1
  • 2

  ④ 启动集群在这里插入图片描述
版本问题:目前使用Flink1.7.1版本测试没有问题,使用Flink1.9版本存在HA界面不能自动跳转到对应的Alive JobManager的现象。

2) Flink On Yarn HA 安装和配置

  正常基于Yarn提交Flink程序,无论是使用yarn-session模式还是yarn-cluster模式,基于yarn运行后的application 只要kill 掉对应的Flink 集群进程“YarnSessionClusterEntrypoint”后,基于Yarn的Flink任务就失败了,不会自动进行重试,所以基于Yarn运行Flink任务,也有必要搭建HA,这里同样还是需要借助zookeeper来完成,步骤如下:

  ① 修改所有Hadoop节点的yarn-site.xml

  将所有Hadoop节点的yarn-site.xml中的提交应用程序最大尝试次数调大,这里默认是2次,也可以不调。

#在每台hadoop节点yarn-site.xml中设置提交应用程序的最大尝试次数,建议不低于4,这里重试指的是ApplicationMaster
<property>
  <name>yarn.resourcemanager.am.max-attempts</name>
  <value>4</value>
</property>
  • 1
  • 2
  • 3
  • 4
  • 5

  ② 启动Hadoop集群
  启动zookeeper,启动Hadoop集群。
  ③ 修改Flink对应flink-conf.yaml配置
  配置对应的conf下的flink-conf.yaml,配置内容如下:

#配置依赖zookeeper模式进行HA搭建
high-availability: zookeeper
#配置JobManager原数据存储路径
high-availability.storageDir: hdfs://mycluster/flink/yarnha/
#配置zookeeper集群节点
high-availability.zookeeper.quorum: hadoop101:2181,hadoop102:2181,hadoop103:2181
#向yarn提交一个application重试的次数,也可以不设置。
yarn.application-attempts: 10
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

  ④ 启动yarn-session.sh 测试HA: yarn-session.sh -n 2 ,也可以直接提交Job
启动之后,可以登录yarn中对应的flink webui,如下图示:在这里插入图片描述
  点击对应的Tracking UI,进入Flink 集群UI:在这里插入图片描述
  查看对应的JobManager在哪台节点上启动:在这里插入图片描述
进入对应的节点,kill掉对应的“YarnSessionClusterEntrypoint”进程。然后进入到Yarn中观察“applicationxxxx_0001”job信息:在这里插入图片描述
点击job ID,发现会有对应的重试信息:在这里插入图片描述
点击对应的“Tracking UI”进入到Flink 集群UI,查看新的JobManager节点由原来的hadoop103变成了hadoop101,说明HA起作用。在这里插入图片描述

5. Flink并行度和Slot

  Flink中每一个worker(TaskManager)都是一个JVM进程,它可能会在独立的线程(Solt)上执行一个或多个subtask。Flink的每个TaskManager为集群提供Solt。Solt的数量通常与每个TaskManager节点的可用CPU内核数成比例,一般情况下Slot的数量就是每个节点的CPU的核数。
  Slot的数量由集群中flink-conf.yaml配置文件中设置taskmanager.numberOfTaskSlots的值为3,这个值的大小建议和节点CPU的数量保持一致。在这里插入图片描述
一个任务的并行度设置可以从4个层面指定:

  • Operator Level(算子层面)。
  • Execution Environment Level(执行环境层面)。
  • Client Level(客户端层面)。
  • System Level(系统层面)。

这些并行度的优先级为Operator Level>Execution Environment Level>Client Level>System Level。

1) 并行度设置之 Operator Level

 Operator、Source和Sink目的地的并行度可以通过调用setParallelism()方法来指定在这里插入图片描述
2) 行度设置之 Execution Environment Level

  任务的默认并行度可以通过调用setParallelism()方法指定。为了以并行度3来执行所有的Operator、Source和Sink,可以通过如下方式设置执行环境的并行度在这里插入图片描述
3) 并行度设置之 Client Level

  并行度还可以在客户端提交Job到Flink时设定。对于CLI客户端,可以通过-p参数指定并行度。在这里插入图片描述
4) 并行度设置之 System Level

  在系统级可以通过设置flink-conf.yaml文件中的parallelism.default属性来指定所有执行环境的默认并行度。在这里插入图片描述
5) 并行度案例分析

Flink集群中有3个TaskManager节点,每个TaskManager的Slot数量为3在这里插入图片描述在这里插入图片描述

第四章 Flink 常用 API 详解

  Flink 根据抽象程度分层,提供了三种不同的 API和库。每一种 API 在简洁性和表达力上有着不同的侧重,并且针对不同的应用场景。在这里插入图片描述

  • ProcessFunction 是 Flink 所提供最底层接口。ProcessFunction 可以处理一或两条输入数据流中的单个事件或者归入一个特定窗口内的多个事件。它提供了对于时间和状态的细粒度控制。开发者可以在其中任意地修改状态,也能够注册定时器用以在未来的某一时刻触发回调函数。因此,你可以利用 ProcessFunction 实现许多有状态的事件驱动应用所需要的基于单个事件的复杂业务逻辑。
  • DataStream API 为许多通用的流处理操作提供了处理原语。这些操作包括窗口、逐条记录的转换操作,在处理事件时进行外部数据库查询等。DataStream API 支持 Java 和 Scala 语言,预先定义了例如map()、reduce()、aggregate() 等函数。你可以通过扩展实现预定义接口或使用 Java、Scala 的 lambda 表达式实现自定义的函数。
  • SQL & Table API:Flink 支持两种关系型的 API,Table API 和 SQL。这两个 API 都是批处理和流处理统一的 API,这意味着在无边界的实时数据流和有边界的历史记录数据流上,关系型 API 会以相同的语义执行查询,并产生相同的结果。Table API 和 SQL 借助了 Apache Calcite 来进行查询的解析,校验以及优化。它们可以与 DataStream 和 DataSet API 无缝集成,并支持用户自定义的标量函数,聚合函数以及表值函数。
    另外Flink 具有数个适用于常见数据处理应用场景的扩展库。
  • 复杂事件处理(CEP):模式检测是事件流处理中的一个非常常见的用例。Flink 的 CEP 库提供了 API,使用户能够以例如正则表达式或状态机的方式指定事件模式。CEP 库与 Flink 的 DataStream API 集成,以便在 DataStream 上评估模式。CEP 库的应用包括网络入侵检测,业务流程监控和欺诈检测。
  • DataSet API:DataSet API 是 Flink 用于批处理应用程序的核心 API。DataSet API 所提供的基础算子包括map、reduce、(outer) join、co-group、iterate等。所有算子都有相应的算法和数据结构支持,对内存中的序列化数据进行操作。如果数据大小超过预留内存,则过量数据将存储到磁盘。Flink 的 DataSet API 的数据处理算法借鉴了传统数据库算法的实现,例如混合散列连接(hybrid hash-join)和外部归并排序(external merge-sort)。
  • Gelly: Gelly 是一个可扩展的图形处理和分析库。Gelly 是在 DataSet API 之上实现的,并与 DataSet API 集成。因此,它能够受益于其可扩展且健壮的操作符。Gelly 提供了内置算法,如 label propagation、triangle enumeration 和 page rank 算法,也提供了一个简化自定义图算法实现的 Graph API。

1. DataStream 的编程模型

DataStream 的编程模型包括四个部分:Environment,DataSource,Transformation,Sink。在这里插入图片描述

2. Flink 的 DataSource 数据源

  1) 基于文件的 Source

  读取本地文件系统的数据,前面的案例已经讲过了。本课程主要讲基于HDFS文件系统的Source。首先需要配置Hadoop的依赖

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.7.5</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

 读取HDFS上的文件:

object FileSource {
   

  def main(args: Array[String]): Unit = {
   
    //初始化Flink的Streaming(流计算)上下文执行环境
    val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment

    streamEnv.setParallelism(1)

    //导入隐式转换,建议写在这里,可以防止IDEA代码提示出错的问题
    import org.apache.flink.streaming.api.scala._

    //读取数据

    val stream = streamEnv.readTextFile("hdfs://mycluster/wc.txt")

    //转换计算
    val result: DataStream[(String, Int)] = stream.flatMap(_.split(","))
      .map((_, 1))
      .keyBy(0)
      .sum(1)

    //打印结果到控制台
    result.print()

    //启动流式处理,如果没有该行代码上面的程序不会运行
    streamEnv.execute("wordcount")
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

2) 基于集合的 Source

/**
 * 通信基站日志数据
 *
 * @param sid 基站ID
 * @param callOut 主叫号码
 * @param callIn 被叫号码
 * @param callType 通话类型eg:呼叫失败(fail),占线(busy),拒接(barring),接通(success):
 * @param callTime 呼叫时间戳,精确到毫秒
 * @Param duration 通话时长 单位:秒
 */
case class StationLog(sid:String,callOut:String,callIn:String,callType:String,callTime:Long,duration:Long)

object CollectionSource {
   

  def main(args: Array[String]): Unit = {
   
    //初始化Flink的Streaming(流计算)上下文执行环境
    val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment

    streamEnv.setParallelism(1)

    //导入隐式转换,建议写在这里,可以防止IDEA代码提示出错的问题
    import org.apache.flink.streaming.api.scala._

    //读取数据
    var dataStream =streamEnv.fromCollection(Array(
      new StationLog("001","186","189","busy",1577071519462L,0),
      new StationLog("002","186","188","busy",1577071520462L,0),
      new StationLog("003","183","188","busy",1577071521462L,0),
      new StationLog("004","186","188","success",1577071522462L,32)
    ))

    dataStream.print()

    streamEnv.execute()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

3) 基于 KafkaSource
  首先需要配置Kafka连接器的依赖,另外更多的连接器可以查看官网:https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/connectors/

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-版本_2.11</artifactId>
    <version>1.9.1</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

  ① 第一种:读取Kafka中的普通数据(String)

object KafkaSourceWithoutKey {
   
  def main(args: Array[String]): Unit = {
   
    import org.apache.flink.streaming.api.scala._
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //组织配置项
    val props = new Properties()
  props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")
    props.setProperty("key.deserializer",classOf[StringDeserializer].getName)
    props.setProperty("value.deserializer",classOf[StringDeserializer].getName)
    props.setProperty("group.id","ft1_group")
//    props.setProperty("auto.offset.reset","latest")//也可以不设置,默认是 flinkKafkaConsumer.setStartFromGroupOffsets(),设置了也不会起作用
    //读取Kafka中的数据
    val lines: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("ft1",new SimpleStringSchema(),props))
    lines.print()

    //触发执行
    env.execute()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

  ② 第二种:读取Kafka中的KeyValue数据

object KafkaSourceWithKey {
   
  def main(args: Array[String]): Unit = {
   
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val props = new Properties()
    props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")
    props.setProperty("key.serializer",classOf[StringDeserializer].getName)
    props.setProperty("value.serializer",classOf[StringDeserializer].getName)
    props.setProperty("group.id","ft2_group")
//    props.setProperty("auto.offset.reset","latest") 设置不设置无所谓,因为可以对FlinkKafkaConsumer设置 从什么位置读取数据

   val flinkKafkaConsumer =  new FlinkKafkaConsumer[(Int, String)]("ft2", new KafkaDeserializationSchema[(Int, String)] {
   
      override def isEndOfStream(t: (Int, String)): Boolean = false //是否流结束

      override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (Int, String) = {
   
        var key = "0"
        var value = "null"
        if(consumerRecord.key() != null){
   
          key = new String(consumerRecord.key(), "UTF-8")
        }
        if(consumerRecord.value() != null){
   
          value = new String(consumerRecord.value, "UTF-8")
        }
        (key.toInt, value)
      }
     
      //设置返回的二元组类型 ,createTuple2TypeInformation 需要导入隐式转换
      override def getProducedType: TypeInformation[(Int, String)] = {
   
        createTuple2TypeInformation(createTypeInformation[Int], createTypeInformation[String])
      }
    }, props)

    //设置读取Kafka中的数据从最后开始,默认设置为 setStartFromGroupOffsets
    val infos: DataStream[(Int, String)] = env.addSource(flinkKafkaConsumer.setStartFromLatest())

    //打印结果
    infos.print()

    //触发执行
    env.execute()

  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50

注意:

  • 默认在kafka consoler中向topic中写入数据时,数据是没有key的,如果想要在console中写入的数据有key,可以使用下面命令:
./kafka-console-producer.sh --broker-list mynode1:9092,mynode2:9092,mynode3:9092 --topic ft2 --property  parse.key=true --property key.separator='|'
parse.key=true: 生产数据带有key
key.separator=’|’:指定分隔符,默认分隔符是\t
  • 1
  • 2
  • 3

console中消费数据也可以打印出key,命令如下:

./kafka-console-consumer.sh --bootstrap-server mynode1:9092,mynode2:9092,mynode3:9092 --topic ft2 --property print.key=true
print.key=true:打印key
  • 1
  • 2
  • Flink读取Kafka数据确定开始位置有以下几种设置方式:
    flinkKafkaConsumer.setStartFromEarliest()
    从topic的最早offset位置开始处理数据,如果kafka中保存有消费者组的消费位置将被忽略。
    flinkKafkaConsumer.setStartFromLatest()
    从topic的最新offset位置开始处理数据,如果kafka中保存有消费者组的消费位置将被忽略。
    flinkKafkaConsumer.setStartFromTimestamp(…)
    从指定的时间戳(毫秒)开始消费数据,Kafka中每个分区中数据大于等于设置的时间戳的数据位置将被当做开始消费的位置。如果kafka中保存有消费者组的消费位置将被忽略。
    flinkKafkaConsumer.setStartFromGroupOffsets()
    默认的设置。根据代码中设置的group.id设置的消费者组,去kafka中或者zookeeper中找到对应的消费者offset位置消费数据。如果没有找到对应的消费者组的位置,那么将按照auto.offset.reset设置的策略读取offset。
  • 关于Flink消费Kafka中数据offset问题
     Flink提供了消费kafka数据的offset如何提交给Kafka或者zookeeper(kafka0.8之前)的配置。注意,Flink并不依赖提交给Kafka或者zookeeper中的offset来保证容错。提交的offset只是为了外部来查询监视kafka数据消费的情况。
     配置offset的提交方式取决于是否为job设置开启checkpoint。可以使用env.enableCheckpointing(5000)来设置开启checkpoint。
      关闭checkpoint:
      如果禁用了checkpoint,那么offset位置的提交取决于Flink读取kafka客户端的配置,enable.auto.commit ( auto.commit.enable【Kafka 0.8】)配置是否开启自动提交offset, auto.commit.interval.ms决定自动提交offset的周期。
      开启checkpoint:
      如果开启了checkpoint,那么当checkpoint保存状态完成后,将checkpoint中保存的offset位置提交到kafka。这样保证了Kafka中保存的offset和checkpoint中保存的offset一致,可以通过配置setCommitOffsetsOnCheckpoints(boolean)来配置是否将checkpoint中的offset提交到kafka中(默认是true)。如果使用这种方式,那么properties中配置的kafka offset自动提交参数enable.auto.commit和周期提交参数auto.commit.interval.ms参数将被忽略。

4) 自定义 Source

 当然也可以自定义数据源,有两种方式实现:

  • 通过实现SourceFunction接口来自定义无并行度(也就是并行度只能为1)的Source。
  • 通过实现ParallelSourceFunction 接口或者继承RichParallelSourceFunction 来自定义有并行度的数据源。

 实现SourceFunction 接口实现无并行度的自定义Source:

class MyDefinedSource extends SourceFunction[StationLog]{
   
  //是否生成数据的标志
  var flag = true

  /**
    * 主要方法:启动一个Source,大部分情况下都需要在run方法中实现一个循环产生数据
    * 这里计划每次产生10条基站数据
    */
  override def run(ctx: SourceFunction.SourceContext[StationLog]): Unit = {
   
    val random = new Random()
    val callType = Array[String]("fail","success","busy","barring")
    while(flag){
   
      1.to(10).map(i=>{
   
        StationLog("sid_"+random.nextInt(10),"1811234%04d".format(random.nextInt(8)),
          "1915678%04d".format(random.nextInt(8)),callType(random.nextInt(4)),System.currentTimeMillis(),random.nextInt(50))
      }).foreach(sl =>{
   
        ctx.collect(sl)
      })
      //每次生成10条数据就休息 5s
      Thread.sleep(5000)
    }

  }

  //当取消对应的Flink任务时被调用
  override def cancel(): Unit = flag = false
}

object DefinedNoParalleSource {
   
  def main(args: Array[String]): Unit = {
   
    import org.apache.flink.streaming.api.scala._
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stationLogInfos: DataStream[StationLog] = env.addSource(new MyDefinedSource())
    stationLogInfos.print()
    env.execute()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

 实现 ParallelSourceFunction 接口实现有并行度的自定义Source:

class MyDefinedParallelSouce extends ParallelSourceFunction[StationLog]{
   
  //是否生成数据的标志
  var flag = true

  /**
    * 主要方法:启动一个Source,大部分情况下都需要在run方法中实现一个循环产生数据
    * 这里计划每次产生10条基站数据
    */
  override def run(ctx: SourceFunction.SourceContext[StationLog]): Unit = {
   
    val random = new Random()
    val callType = Array[String]("fail","success","busy","barring")
    while(flag){
   
      1.to(10).map(i=>{
   
        StationLog("sid_"+random.nextInt(10)+"_"+Thread.currentThread().getName,"181%04d".format(random.nextInt(8)),
          "191%04d".format(random.nextInt(8)),callType(random.nextInt(4)),System.currentTimeMillis(),random.nextInt(50))
      }).foreach(sl =>{
   
        ctx.collect(sl)
      })
      //每次生成10条数据就休息 5s
      Thread.sleep(5000)
    }

  }

  //当取消对应的Flink任务时被调用
  override def cancel(): Unit = flag = false
}

object DefinedParalleSource {
   
  def main(args: Array[String]): Unit = {
   
    import org.apache.flink.streaming.api.scala._
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stationInfos: DataStream[StationLog] = env.addSource(new MyDefinedParallelSouce())
    stationInfos.print()

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/560294
推荐阅读
相关标签
  

闽ICP备14008679号