当前位置:   article > 正文

Flink入门实战详解

Flink入门实战详解

Flink入门实战

Flink项目构建

1)基于Maven+Idea创建项目:

使用maven进行项目构建,如图1所示。

图-34 构建maven项目

输入项目中的maven的坐标和存储坐标,如图2所示。

图2 maven坐标和存储位置

2)Maven依赖:

  1.     <properties>
  2.     <maven.compiler.source>1.8</maven.compiler.source>
  3.     <maven.compiler.target>1.8</maven.compiler.target>
  4.     <encoding>UTF-8</encoding>
  5.     <scala.version>2.11.12</scala.version>
  6.     <scala.compat.version>2.11</scala.compat.version>
  7.     <hadoop.version>2.6.0</hadoop.version>
  8.     <flink.version>1.9.1</flink.version>
  9. </properties>
  10. <dependencies>
  11.     <dependency>
  12.         <groupId>org.apache.flink</groupId>
  13.         <artifactId>flink-java</artifactId>
  14.         <version>${flink.version}</version>
  15.     </dependency>
  16.     <dependency>
  17.         <groupId>org.apache.flink</groupId>
  18.         <artifactId>flink-streaming-java_2.11</artifactId>
  19.         <version>${flink.version}</version>
  20.     </dependency>
  21.     <dependency>
  22.         <groupId>org.apache.flink</groupId>
  23.         <artifactId>flink-scala_2.11</artifactId>
  24.         <version>${flink.version}</version>
  25.     </dependency>
  26.     <dependency>
  27.         <groupId>org.apache.flink</groupId>
  28.         <artifactId>flink-streaming-scala_2.11</artifactId>
  29.         <version>${flink.version}</version>
  30.     </dependency>
  31.     <dependency>
  32.         <groupId>org.apache.flink</groupId>
  33.         <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
  34.         <version>${flink.version}</version>
  35.     </dependency>
  36.     <dependency>
  37.         <groupId>mysql</groupId>
  38.         <artifactId>mysql-connector-java</artifactId>
  39.         <version>8.0.23</version>
  40.     </dependency>
  41.     <!-- flink-2-hadoop-->
  42.     <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2 -->
  43.     <dependency>
  44.         <groupId>org.apache.flink</groupId>
  45.         <artifactId>flink-shaded-hadoop-2</artifactId>
  46.         <version>2.7.5-9.0</version>
  47.     </dependency>
  48.     <!-- lombok -->
  49.     <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
  50.     <dependency>
  51.         <groupId>org.projectlombok</groupId>
  52.         <artifactId>lombok</artifactId>
  53.         <version>1.18.12</version>
  54.     </dependency>
  55.     <!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis -->
  56.     <dependency>
  57.         <groupId>org.apache.bahir</groupId>
  58.         <artifactId>flink-connector-redis_2.11</artifactId>
  59.         <version>1.0</version>
  60.     </dependency>
  61.     <dependency>
  62.         <groupId>org.apache.flink</groupId>
  63.         <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
  64.         <version>${flink.version}</version>
  65.     </dependency>
  66.     <dependency>
  67.         <groupId>org.apache.flink</groupId>
  68.         <artifactId>flink-table</artifactId>
  69.         <type>pom</type>
  70.         <version>${flink.version}</version>
  71.     </dependency>
  72.     <dependency>
  73.         <groupId>org.apache.flink</groupId>
  74.         <artifactId>flink-table-planner_2.11</artifactId>
  75.         <version>${flink.version}</version>
  76.     </dependency>
  77.     <dependency>
  78.         <groupId>org.apache.flink</groupId>
  79.         <artifactId>flink-table-api-java-bridge_2.11</artifactId>
  80.         <version>${flink.version}</version>
  81.     </dependency>
  82.     <dependency>
  83.         <groupId>org.apache.flink</groupId>
  84.         <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
  85.         <version>${flink.version}</version>
  86.     </dependency>
  87.     <dependency>
  88.         <groupId>org.apache.flink</groupId>
  89.         <artifactId>flink-table-common</artifactId>
  90.         <version>${flink.version}</version>
  91.     </dependency>
  92.     <dependency>
  93.         <groupId>org.apache.flink</groupId>
  94.         <artifactId>flink-cep-scala_2.11</artifactId>
  95.         <version>${flink.version}</version>
  96.     </dependency>
  97.     <dependency>
  98.         <groupId>com.alibaba</groupId>
  99.         <artifactId>fastjson</artifactId>
  100.         <version>1.2.58</version>
  101.     </dependency>
  102.     <dependency>
  103.         <groupId>ru.yandex.clickhouse</groupId>
  104.         <artifactId>clickhouse-jdbc</artifactId>
  105.         <version>0.2.4</version>
  106.         <exclusions>
  107.             <exclusion>
  108.                 <groupId>com.fasterxml.jackson.core</groupId>
  109.                 <artifactId>jackson-databind</artifactId>
  110.             </exclusion>
  111.         </exclusions>
  112.     </dependency>
  113. </dependencies>
  114. <build>
  115.     <sourceDirectory>src/main/scala</sourceDirectory>
  116.     <testSourceDirectory>src/test/scala</testSourceDirectory>
  117.     <plugins>
  118.         <plugin>
  119.             <groupId>org.apache.maven.plugins</groupId>
  120.             <artifactId>maven-compiler-plugin</artifactId>
  121.             <version>2.5.1</version>
  122.             <configuration>
  123.                 <source>${maven.compiler.source}</source>
  124.                 <target>${maven.compiler.target}</target>
  125.                 <!--<encoding>${project.build.sourceEncoding}</encoding>-->
  126.             </configuration>
  127.         </plugin>
  128.         <plugin>
  129.             <groupId>net.alchim31.maven</groupId>
  130.             <artifactId>scala-maven-plugin</artifactId>
  131.             <version>3.2.0</version>
  132.             <executions>
  133.                 <execution>
  134.                     <goals>
  135.                         <goal>compile</goal>
  136.                         <goal>testCompile</goal>
  137.                     </goals>
  138.                     <configuration>
  139.                         <args>
  140.                             <!--<arg>-make:transitive</arg>-->
  141.                             <arg>-dependencyfile</arg>
  142.                             <arg>${project.build.directory}/.scala_dependencies</arg>
  143.                         </args>
  144.                     </configuration>
  145.                 </execution>
  146.             </executions>
  147.         </plugin>
  148.         <plugin>
  149.             <groupId>org.apache.maven.plugins</groupId>
  150.             <artifactId>maven-surefire-plugin</artifactId>
  151.             <version>2.18.1</version>
  152.             <configuration>
  153.                 <useFile>false</useFile>
  154.                 <disableXmlReport>true</disableXmlReport>
  155.                 <includes>
  156.                     <include>**/*Test.*</include>
  157.                     <include>**/*Suite.*</include>
  158.                 </includes>
  159.             </configuration>
  160.         </plugin>
  161.         <plugin>
  162.             <groupId>org.apache.maven.plugins</groupId>
  163.             <artifactId>maven-shade-plugin</artifactId>
  164.             <version>2.3</version>
  165.             <executions>
  166.                 <execution>
  167.                     <phase>package</phase>
  168.                     <goals>
  169.                         <goal>shade</goal>
  170.                     </goals>
  171.                     <configuration>
  172.                         <filters>
  173.                             <filter>
  174.                                 <artifact>*:*</artifact>
  175.                                 <excludes>
  176.                                     <!--
  177.                                     zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF
  178.                                     -->
  179.                                     <exclude>META-INF/*.SF</exclude>
  180.                                     <exclude>META-INF/*.DSA</exclude>
  181.                                     <exclude>META-INF/*.RSA</exclude>
  182.                                 </excludes>
  183.                             </filter>
  184.                         </filters>
  185.                         <transformers>
  186.                             <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
  187.                                 <mainClass>chapter1.BatchWordCount</mainClass>
  188.                             </transformer>
  189.                         </transformers>
  190.                     </configuration>
  191.                 </execution>
  192.             </executions>
  193.         </plugin>
  194.     </plugins>
  195. </build>

Flink基础API概念

Flink编程是在分布式集合的基础的规律的编程模型(比如,执行filtering,mapping,updating,state,joining,grouping,defining,windows,aggregating)。这些集合可以通过外部数据源(比如从文件,kafka的topics、本地或者内存的集合)。通过下沉算子返回结果,比如将数据写入到一个分布式的文件中,或者控制台。Flink程序可以基于各种context、stanalone或者嵌入其他程序进行运行。可以在本地的jvm或者在多台机器间分布式运行。

基于外部的数据源,比如有界或者无界的数据源,我们可能会选择使用批处理的DataSet API或者流处理的DataStream API来处理。

需要注意的是,在DataStream和DataSet中的绝大多数的API是一致的,只需要替换对应的ExecutionEnvironment或者StreamExecutionEnvironment即可。

Flink在编程的过程中使用特定类——DataSet和DataStream来体现数据,类似Spark中的RDD。可以将其认为是一个可以拥有重复的不可变的集合。其中DataSet表示的是一个有界的数据集,DataStream则表示的是无界的集合。

这些集合在一些关键的地方和Java中的普通集合不同。首先,DataSet和DataStream是不可变的,这就意味着一旦被创建,便不能进行add或者remove的操作。同样也不能简单的查看集合内部的元素。

Flink可以通过外部的数据源来创建DataSet或者DataStream,也可以通过在一个已知的集合上面执行一系列的Transformation操作来转换产生新的集合。

Flink程序看起来就是一个普通的程序,进行数据的转换,每一个程序包含如下相同的集合基础概念,通用编程步骤如下:

1)创建一个执行环境ExecutionEnvironment。

2)加载或者创建初始化数据——DataSet或者DataStream。

3)在此数据基础之上进行特定的转化操作。

4)将计算的结果输出到特定的目的地。

5)触发作业的执行。

Flink编程的入口,便是ExecutionEnvironment,不同之处在于,DataSet和DataStream使用的ExecutionEnvironment不同。DataSet使用ExecutionEnviroment,而DataStream使用StreamExectionEnvironment。

获得ExecutionEnvironment可以通过ExecutionEnvironment的如下方法:

getExecutionEnvironment()

createLocalEnvironment()

createRemoteEnvironment(String host, int port, String... jarFiles)

通常情况下,我们只需要使用getExecutionEnvironment()即可,因为这种方式会自动选择正确的context。如果我们在IDE中执行,则会创建一个Local的Context,如果打包到集群中执行,会返回一个Cluster的Context。

加载数据源的方式有多种。可以一行一个的读入,比如CSV文件,或者自定义格式。如果只是从一个文本文件中按顺序读取行数据。只需要如下操作即可。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = env.readTextFile("file:///path")

创建了一个DataStream或者DataSet,接下来便可以执行各种transformation转换操作。比如执行一个map操作。

创建一个新的DataStream,类型为Integer的集合。

DataStream中包含了最终的结果,我们可以将结果通过创建一个sink操作,写入外部系统中,比如:writeAsText(path)

一旦我们完成整个程序,我们需要通过调用StreamExecutionEnvironment的execute()方法来触发作业的执行。基于ExecutionEnvironment会在本地或者集群中执行。

execute()方法返回值为JobExecutionResult,包含本次执行时间或者累加器结果信息。

与Spark中的Transformation操作相同,Flink中的Transformation操作是Lazy懒加载的,需要execute()去触发。基于此,我们可以创建并添加程序的执行计划。进行任务调度和数据分离,执行更加高效。

目前Flink支持7种数据类型,分别为:

1)Java Tuples和Scala Case Classes。

2)Java POJOS(一种数据结构类型)。

3)Primitive Types(Java的基本数据类型)。

4)Regular Classes(普通类)。

5)Values。

6)Hadoop Writables。

7)SpecialTypes。

DataSet批处理API

Flink中的DataSet程序是实现数据集转换的常规程序(例如,Filter,映射,连接, 分组)。数据集最初是从某些来源创建的(例如,通过读取文件或从本地集合创建)。结果通过接收器返回,接收器可以例如将数据写入(分布式)文件或标准输出(例如命令行终端)。Flink程序可以在各种环境中运行,独立运行或嵌入其他程序中。执行可以在本地JVM中执行,也可以在许多计算机的集群上执行。

  1. import org.apache.flink.api.scala._
  2. object WordCountOps {
  3. def main(args: Array[String]): Unit = {
  4. val env = ExecutionEnvironment.getExecutionEnvironment
  5. val text = env.fromElements("Who's there?",
  6. "I think I hear them. Stand, ho! Who's there?"
  7. )
  8. val wordCounts:DataSet[(String, Int)] = text
  9. .flatMap(line => line.split("\\s+")).map((_, 1))
  10. .groupBy(0)
  11. .sum(1)
  12. wordCounts.print()
  13. }
  14. }

Streaming流式处理API

Flink中的DataStream程序是实现数据流转换的常规程序(例如 filtering, updating state, defining windows, aggregating)。最初从各种源(例如, message queues, socket streams, files)创建数据流。结果通过接收器返回,接收器可以例如将数据写入文件或标准输出(例如命令行终端)。Flink程序可以在各种环境中运行,独立运行或嵌入其他程序中。执行可以在本地JVM中执行,也可以在许多计算机的集群上执行。

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
object StreamDemo {
    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val file = env.socketTextStream("localhost", 9999)
        val spliFile: DataStream[String] = file.flatMap(_.split(" "))
        val wordAndOne: DataStream[(String, Int)] = spliFile.map((_, 1))
        val keyed = wordAndOne.keyBy(data=>data._1)
        val wordAndCount: DataStream[(String, Int)] = keyed.sum(1)
        wordAndCount.print()
        env.execute()
    }
}

要运行示例程序,首先从终端使用netcat启动输入流:

nc -lk 9999

只需键入一些单词就可以返回一个新单词。这些将是字数统计程序的输入。

Flink程序提交到集群

1)Web提交方式:

图3 web提交方式

1)脚本方式:

  1. #!/bin/sh
  2. FLINK_HOME=/home/bigdata/apps/flink
  3. $FLINK_HOME/bin/flink run \
  4. -c BatchDemo \
  5. /root/wc.jar \
  6. hdfs://hadoop101:8020/wordcount/words.txt \
  7. hdfs://hadoop101:8020/wordcount/output3

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小桥流水78/article/detail/764782
推荐阅读
相关标签
  

闽ICP备14008679号