当前位置:   article > 正文

Flink入门编程_flink 开发

flink 开发

环境准备

Flink 底层是以 Java 编写,但给开发提供了 Java 和 Scala API

使用 IntelliJ IDEA 作为开发工具,用 Maven 作为包管理工具

  • 基于 Flink 1.13.0
  • 基于 JDK 1.8

GitHub 代码托管 : https://github.com/CPU-Code/Hadoop

创建项目

创建工程

打开 IntelliJ IDEA ,创建一个 Maven 工程

image-20220418154914540

image-20220418154949749

将这个 Maven 工程命名为 FlinkDemo , 选定这个 Maven 工程所在存储路径,并点击 Finish

image-20220418155123570

添加项目依赖

在项目的 pom 文件中,增加 <properties> 标签设置属性,然后增加 <denpendencies> 标签引入需要的依赖

Flink的架构中使用了 Akka 来实现底层的分布式通信,而 Akka 是用 Scala 开发

    <properties>
        <flink.version>1.13.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <slf4j.version>1.7.30</slf4j.version>
    </properties>

    <dependencies>
        <!-- 引入 Flink 相关依赖-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- 客户端,可省略 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- 引入日志管理相关依赖-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
        </dependency>
    </dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

image-20220418155359718

配置日志管理

在目录 src/main/resources 下添加文件: log4j.properties

image-20220418155440291

内容配置 :

log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
  • 1
  • 2
  • 3
  • 4

image-20220418155453325

编写代码

需求 : 统计一段文字中,每个单词出现的频次

源码位于 src/main/java 目录下。首先新建一个包,命名为 com.cpucode.wc

image-20220418155622221

批处理

批处理 : 输入是收集好的数据集。将要统计的文字,写入一个文本文档,然后读取这个文件处理数据

在目录 src/main/resources 下新建一个 input 文件夹,并在下面创建文本文件 words.txt

words.txt 中输入一些文字 :

hello world
hello flink
hello java
  • 1
  • 2
  • 3

image-20220418155750866

创建建 Java 类 BatchWordCount,在静态 main 方法中编写测试代码

image-20220418160100656

进行单词频次统计的基本思路是:

  • 先逐行读入文件数据,然后将每一行文字拆分成单词
  • 接着按照单词分组,统计每组数据的个数,就是对应单词的频次

具体代码实现 :

package com.cpucode.wc;

// 在引入包时,有 Java 和 Scala ,注意选用 Java 的包
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.util.Collector;

/**
 * @author : cpucode
 * @date : 2022/4/18 16:00
 * @github : https://github.com/CPU-Code
 * @csdn : https://blog.csdn.net/qq_44226094
 */
public class BatchWordCount {
    public static void main(String[] args) throws Exception {
        // 1. 创建执行环境 , 获取执行环境对象,也就是运行时上下文环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 2. 从文件读取数据 按行读取(存储的元素就是每行的文本)
        DataSource<String> lineDS = env.readTextFile("FlinkDemo/src/main/resources/input/words.txt");

        // 3. 转换数据格式 flatmap 方法可以对一行文字进行分词转换
        FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            // 每一行文字拆分成单词
            String[] words = line.split(" ");
            for (String word : words) {
                // 转换成(word,count)形式的二元组
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));
        //当 Lambda 表达式使用 Java 泛型的时候, 由于泛型擦除的存在, 需要显示的声明类型信息


        // 4. 按照 word 进行分组 , 采用位置索引或属性名称进行分组
        UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);

        // 5. 分组内聚合统计 , 指定聚合字段的位置索引或属性名称
        AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);

        // 6. 打印结果
        sum.print();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50

运行程序

将文档中的所有单词的频次,全部统计出来,以二元组的形式在控制台打印输出

image-20220419210759646

上面的实现方式,是基于 DataSet API ( 软弃用 ) , 从 Flink 1.12 开始,推荐使用 DataStream API,流批统一的处理架构,在提交任务时将执行模式设为 BATCH 来进行批处理 :

bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar
  • 1

流处理

  • 用 DataSet API 实现批处理
  • 用 DataStream API 实现流处理

Flink : 流才是整个处理逻辑的底层核心,所以DataStream API实现了流批统一,可以直接处理批处理流处理的所有场景

读取文件

读取文档 words.txt 中的数据,并统计每个单词出现的频次

com.cpucode.wc 包下新建 Java 类 BoundedStreamWordCount ,在静态 main 方法中编写测试代码

具体代码实现 :

package com.cpucode.wc;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;

/**
 * @author : cpucode
 * @date : 2022/4/19 21:18
 * @github : https://github.com/CPU-Code
 * @csdn : https://blog.csdn.net/qq_44226094
 */
public class BoundedStreamWordCount {
    public static void main(String[] args) throws Exception {
        // 1. 创建流式执行环境 , 流处理
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. 读取文件
        DataStreamSource<String> lineDSS = env.readTextFile("FlinkDemo/src/main/resources/input/words.txt");

        // 3. 转换数据格式
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDSS.flatMap((String line, Collector<String> word) -> {
            Arrays.stream(line.split(" ")).forEach(word::collect);
        }).returns(Types.STRING)
                .map(word -> Tuple2.of(word, 1L))
                .returns(Types.TUPLE(Types.STRING, Types.LONG));

        // 4. 分组 , 匿名函数作为键选择器
        KeyedStream<Tuple2<String, Long>, String> wordAndOneKS = wordAndOne.keyBy(t -> t.f0);

        // 5. 求和
        SingleOutputStreamOperator<Tuple2<String, Long>> result = wordAndOneKS.sum(1);

        // 6. 打印
        result.print();

        // 7. 开始执行任务
        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

运行程序

image-20220419213250289

与批处理的结果是完全不同

  • 批处理针对每个单词,只会输出一个最终的统计个数
  • 而流处理的打印结果中,单词每出现一次,都会有一个频次统计数据输出。

这就是流处理的特点,数据逐个处理,每来一条数据就会处理输出一次

显示的编号为 1~16,是 CPU 是 16 核,所以默认模拟的并行线程是 16 个。不同的运行环境,得到的结果会是不同的

读取文本流

在实际的生产环境中,真正的数据流是无界的,只有开始没有结束

为了模拟这种场景,就不读取文件来获取数据了,而是监听数据发送端主机的指定端口,统计发送来的文本数据中出现过的单词的个数

新建一个 Java 类 StreamWordCount, 将 BoundedStreamWordCount 代码中读取文件数据的 readTextFile 方法,替换成读取 socket 文本流的方法 socketTextStream

具体代码实现 :

package com.cpucode.wc;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.lang.reflect.Array;
import java.util.Arrays;

/**
 * @author : cpucode
 * @date : 2022/4/19 21:37
 * @github : https://github.com/CPU-Code
 * @csdn : https://blog.csdn.net/qq_44226094
 */
public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        // 1. 创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. 读取文本流 , 发送端主机名和端口号
        DataStreamSource<String> lineDSS = env.socketTextStream("cpu101", 7777);

        // 3. 转换数据格式
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDSS.flatMap((String line, Collector<String> words) -> {
            Arrays.stream(line.split(" ")).forEach(words::collect);
        }).returns(Types.STRING)
                .map(word -> Tuple2.of(word, 1L))
                .returns(Types.TUPLE(Types.STRING, Types.LONG));

        // 4. 分组
        KeyedStream<Tuple2<String, Long>, String> wordAndOneKS = wordAndOne.keyBy(t -> t.f0);

        // 5. 求和
        SingleOutputStreamOperator<Tuple2<String, Long>> result = wordAndOneKS.sum(1);

        // 6. 打印
        result.print();

        // 7. 执行
        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

在 Linux 环境的主机 cpu101 上,发送数据进行测试

安装 netcat 工具

yum install -y nc
  • 1
nc -lk 7777
  • 1

运行程序

image-20220419214645798

程序启动后没有任何输出也不会退出。 Flink 的流处理是事件驱动的,程序会一直处于监听状态,只有接收到数据才会执行任务、输出统计结

从 cpu101 发送数据:

image-20220419214703128

看到控制台输出结果 :

image-20220419214713534

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/黑客灵魂/article/detail/751899
推荐阅读
相关标签
  

闽ICP备14008679号