当前位置:   article > 正文

Flink学习笔记【巨详细!】(三)_pyflink学习笔记(三):

pyflink学习笔记(三):

第 5 章 DataStream API(基础篇)

  • 我们在第 2 章介绍 Flink 快速上手时,曾编写过一个简单的词频统计(WordCount)程序,相信读者已经对 Flink 的编程方式有了基本的认识。接下来,我们就将开始大量的代码练习,详细了解用于 Flink 程序开发的 API 用法。
  • Flink 有非常灵活的分层 API 设计,其中的核心层就是 DataStream/DataSet API。由于新版本已经实现了流批一体,DataSet API 将被弃用,官方推荐统一使用 DataStream API 处理流数据和批数据。由于内容较多,我们将会用几章的篇幅来做详细讲解,本章主要介绍基本的DataStream API 用法。
  • DataStream(数据流)本身是 Flink 中一个用来表示数据集合的类(Class),我们编写的Flink 代码其实就是基于这种数据类型的处理,所以这套核心 API 就以 DataStream 命名。对于批处理和流处理,我们都可以用这同一套 API 来实现。
  • DataStream 在用法上有些类似于常规的 Java 集合,但又有所不同。我们在代码中往往并不关心集合中具体的数据,而只是用 API 定义出一连串的操作来处理它们;这就叫作数据流的“转换”(transformations)。
  • 一个 Flink 程序,其实就是对 DataStream 的各种转换。具体来说,代码基本上都由以下部分构成,如图 5-1 所示:
  1. 获取执行环境(execution environment)
  2. 读取数据源(source)
  3. 定义基于数据的转换操作(transformations)
  4. 定义计算结果的输出位置(sink)
  5. 触发程序执行(execute)
    其中,获取环境和触发执行,都可以认为是针对执行环境的操作。所以本章我们就从执行环境、数据源(source)、转换操作(transformation)、输出(sink)四大部分,对常用的DataStream API 做基本介绍。

在这里插入图片描述

5.1 执行环境(Execution Environment)

  • Flink 程序可以在各种上下文环境中运行:我们可以在本地 JVM 中执行程序,也可以提交到远程集群上运行。
  • 不同的环境,代码的提交运行的过程会有所不同。这就要求我们在提交作业执行计算时,首先必须获取当前 Flink 的运行环境,从而建立起与 Flink 框架之间的联系。只有获取了环境上下文信息,才能将具体的任务调度到不同的 TaskManager 执行。

5.1.1 创建执行环境

编 写 Flink 程 序 的 第 一 步 , 就 是 创 建 执 行 环 境 。 我 们 要 获 取 的 执 行 环 境 , 是StreamExecutionEnvironment 类的对象,这是所有 Flink 程序的基础。在代码中创建执行环境的方式,就是调用这个类的静态方法,具体有以下三种。

  1. getExecutionEnvironment
  • 最简单的方式,就是直接调用 getExecutionEnvironment 方法。它会根据当前运行的上下文直接得到正确的结果:如果程序是独立运行的,就返回一个本地执行环境;如果是创建了 jar包,然后从命令行调用它并提交到集群执行,那么就返回集群的执行环境。也就是说,这个方法会根据当前运行的方式,自行决定该返回什么样的运行环境。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  • 1

这种“智能”的方式不需要我们额外做判断,用起来简单高效,是最常用的一种创建执行环境的方式。

  1. createLocalEnvironment
    这个方法返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果不传入,则默认并行度就是本地的 CPU 核心数。
StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();
  • 1
  1. createRemoteEnvironment
    这个方法返回集群执行环境。需要在调用时指定 JobManager 的主机名和端口号,并指定要在集群中运行的 Jar 包。
StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment
.createRemoteEnvironment(
"host", // JobManager 主机名
1234, // JobManager 进程端口号(一般情况下是6123)
"path/to/jarFile.jar" // 提交给 JobManager 的 JAR 包
); 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

在获取到程序执行环境后,我们还可以对执行环境进行灵活的设置。比如可以全局设置程序的并行度、禁用算子链,还可以定义程序的时间语义、配置容错机制。关于时间语义和容错机制,我们会在后续的章节介绍。

5.1.2 执行模式(Execution Mode)

  • 上节中我们获取到的执行环境,是一个 StreamExecutionEnvironment,顾名思义它应该是做流处理的。那对于批处理,又应该怎么获取执行环境呢?

  • 在之前的 Flink 版本中,批处理的执行环境与流处理类似,是调用类 ExecutionEnvironment的静态方法,返回它的对象:

// 批处理环境
ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
// 流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  • 1
  • 2
  • 3
  • 4
  • 基于 ExecutionEnvironment 读入数据创建的数据集合,就是 DataSet;对应的调用的一整套转换方法,就是 DataSet API。这些我们在第二章的批处理 word count 程序中已经有了基本了解。
  • 而从 1.12.0 版本起,Flink 实现了 API 上的流批统一。DataStream API 新增了一个重要特性:可以支持不同的“执行模式”(execution mode),通过简单的设置就可以让一段 Flink 程序在流处理和批处理之间切换。这样一来,DataSet API 也就没有存在的必要了。
  1. 流执行模式(STREAMING)
    这是 DataStream API 最经典的模式,一般用于需要持续实时处理的无界数据流。默认情况下,程序使用的就是 STREAMING 执行模式。
  2. 批执行模式(BATCH)
    专门用于批处理的执行模式, 这种模式下,Flink 处理作业的方式类似于 MapReduce 框架。对于不会持续计算的有界数据,我们用这种模式处理会更方便。
  3. 自动模式(AUTOMATIC)
    在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。

1. BATCH 模式的配置方法
由于 Flink 程序默认是 STREAMING 模式,我们这里重点介绍一下 BATCH 模式的配置。
主要有两种方式:
(1)通过命令行配置

bin/flink run -Dexecution.runtime-mode=BATCH …

在提交作业时,增加 execution.runtime-mode 参数,指定值为 BATCH。 (2)通过代码配置

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
  • 1
  • 2
  • 3

在代码中,直接基于执行环境调用 setRuntimeMode 方法,传入 BATCH 模式。

建议: 不要在代码中配置,而是使用命令行。这同设置并行度是类似的:在提交作业时指定参数可以更加灵活,同一段应用程序写好之后,既可以用于批处理也可以用于流处理。而在代码中硬编码(hard code)的方式可扩展性比较差,一般都不推荐

2. 什么时候选择 BATCH 模式

  • 我们知道,Flink 本身持有的就是流处理的世界观,即使是批量数据,也可以看作“有界流”来进行处理。所以 STREAMING 执行模式对于有界数据和无界数据都是有效的;而 BATCH模式仅能用于有界数据。
  • 看起来 BATCH 模式似乎被 STREAMING 模式全覆盖了,那还有必要存在吗?我们能不能所有情况下都用流处理模式呢?
  • 当然是可以的,但是这样有时不够高效。
  • 我们可以仔细回忆一下 word count 程序中,批处理和流处理输出的不同:在 STREAMING模式下,每来一条数据,就会输出一次结果(即使输入数据是有界的);而 BATCH 模式下,只有数据全部处理完之后,才会一次性输出结果。最终的结果两者是一致的,但是流处理模式会将更多的中间结果输出。在本来输入有界、只希望通过批处理得到最终的结果的场景下,STREAMING 模式的逐个输出结果就没有必要了。
  • 所以总结起来,一个简单的原则就是:用 BATCH 模式处理批量数据,用 STREAMING模式处理流式数据。因为数据有界的时候,直接输出结果会更加高效;而当数据无界的时候, 我们没得选择——只有 STREAMING 模式才能处理持续的数据流。
  • 当然,在后面的示例代码中,即使是有界的数据源,我们也会统一用 STREAMING 模式处理。这是因为我们的主要目标还是构建实时处理流数据的程序,有界数据源也只是我们用来测试的手段。

5.1.3 触发程序执行

  • 有了执行环境,我们就可以构建程序的处理流程了:基于环境读取数据源,进而进行各种转换操作,最后输出结果到外部系统。
  • 需要注意的是,写完输出(sink)操作并不代表程序已经结束。因为当 main()方法被调用时,其实只是定义了作业的每个执行操作,然后添加到数据流图中;这时并没有真正处理数据——因为数据可能还没来。Flink 是由事件驱动的,只有等到数据到来,才会触发真正的计算,这也被称为“延迟执行”或“懒执行”(lazy execution)。
  • 所以我们需要显式地调用执行环境的 execute()方法,来触发程序执行。execute()方法将一直等待作业完成,然后返回一个执行结果(JobExecutionResult)。
env.execute();
  • 1

5.2 源算子(Source)

在这里插入图片描述

  • 创建环境之后,就可以构建数据处理的业务逻辑了,如图 5-2 所示,本节将主要讲解 Flink的源算子(Source)。想要处理数据,先得有数据,所以首要任务就是把数据读进来。
  • Flink 可以从各种来源获取数据,然后构建 DataStream 进行转换处理。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。所以,source就是我们整个处理程序的输入端。
  • Flink 代码中通用的添加 source 的方式,是调用执行环境的 addSource()方法:
DataStream<String> stream = env.addSource(...);
  • 1
  • 方法传入一个对象参数,需要实现 SourceFunction 接口;返回 DataStreamSource。这里的DataStreamSource 类继承自 SingleOutputStreamOperator 类,又进一步继承自DataStream。所以很明显,读取数据的 source 操作是一个算子,得到的是一个数据流(DataStream)。
  • 这里可能会有些麻烦:传入的参数是一个“源函数”(source function),需要实现SourceFunction 接口。这是何方神圣,又该怎么实现呢?
  • 自己去实现它显然不会是一件容易的事。好在 Flink 直接提供了很多预实现的接口,此外还有很多外部连接工具也帮我们实现了对应的 source function,通常情况下足以应对我们的实际需求。接下来我们就详细展开讲解。

5.2.1 准备工作

  • 为了更好地理解,我们先构建一个实际应用场景。比如网站的访问操作,可以抽象成一个三元组(用户名,用户访问的 urrl,用户访问 url 的时间戳),所以在这里,我们可以创建一个类 Event,将用户行为包装成它的一个对象。Event 包含了以下一些字段,如表 5-1 所示:
                            表 5-1 Event类字段设计
  • 1
字段名数据类型说明
userString用户名
urlString用户访问的 url
timestampLong用户访问 url 的时间戳

具体代码如下:

package online.liujiahao.chapter05;

import java.sql.Timestamp;

public class Event {
    public String user;
    public String url;
    public Long timeStamp;

    public Event() {
    }

    public Event(String user, String url, Long timeStamp) {
        this.user = user;
        this.url = url;
        this.timeStamp = timeStamp;
    }

    @Override
    public String toString() {
        return "Event{" +
                "user='" + user + '\'' +
                ", url='" + url + '\'' +
                ", timeStamp=" + new Timestamp(timeStamp) +
                '}';
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

这里需要注意,我们定义的 Event,有这样几个特点:

  1. 类是公有(public)的
  2. 有一个无参的构造方法
  3. 所有属性都是公有(public)的
  4. 所有属性的类型都是可以序列化的
  • Flink 会把这样的类作为一种特殊的 POJO 数据类型来对待,方便数据的解析和序列化。另外我们在类中还重写了 toString 方法,主要是为了测试输出显示更清晰。关于 Flink 支持的数据类型,我们会在后面章节做详细说明。
  • 我们这里自定义的 Event POJO 类会在后面的代码中频繁使用,所以在后面的代码中碰到Event,把这里的 POJO 类导入就好了。

注:Java 编程比较好的实践是重写每一个类的 toString 方法,来自 Joshua Bloch 编写的《Effective Java》。

5.2.2 从集合中读取数据

最简单的读取数据的方式,就是在代码中直接创建一个 Java 集合,然后调用执行环境的fromCollection 方法进行读取。这相当于将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用,一般用于测试。

package online.liujiahao.chapter05;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;

/*
    这部分主要是针对Source读取数据源的api的调用
 */
public class SourceTest {
    public static void main(String[] args) throws Exception{
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //1.从文件中直接读取数据(读取的批量的数据(有界),流式方式输出)
        //todo 最常用的读取有界流数据的方式还是方式1(从文件中读取),后面两种一般是用于测试环境中。
        DataStreamSource<String> stream1 = env.readTextFile("input/click.txt");

        //2.从集合中读取数据(.fromCollection)
        ArrayList<Integer> nums = new ArrayList<>();
        nums.add(2);
        nums.add(5);
        DataStreamSource<Integer> numStream = env.fromCollection(nums);

        ArrayList<Event> events = new ArrayList<>();
        events.add(new Event("Mary", "./home", 1000L));
        events.add(new Event("Bob", "./cart", 2000L));
        DataStreamSource<Event> stream2 = env.fromCollection(events);

        //从元素中读取数据
        DataStreamSource<Event> stream3 = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice","./prod?id=100",3000L)
        );

//        stream1.print("1");
//        numStream.print("nums");
//        stream2.print("2");
//        stream3.print("3");
        //4. 从socket文本流中直接读取
        DataStreamSource<String> stream4 = env.socketTextStream("hadoop102", 7777);

        stream4.print("4");

        env.execute();


    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

我们也可以不构建集合,直接将元素列举出来,调用 fromElements 方法进行读取数据:

DataStreamSource<Event> stream2 = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L)
);
  • 1
  • 2
  • 3
  • 4

5.2.3 从文件读取数据

真正的实际应用中,自然不会直接将数据写在代码中。通常情况下,我们会从存储介质中获取数据,一个比较常见的方式就是读取日志文件。这也是批处理中最常见的读取方式。

DataStream<String> stream = env.readTextFile("clicks.txt");
  • 1

说明:

  1. 参数可以是目录,也可以是文件;
  2. 路径可以是相对路径,也可以是绝对路径;
  3. 相对路径是从系统属性 user.dir 获取路径: idea 下是 project 的根目录, standalone 模式下是集群节点根目录;
  4. 也可以从 hdfs 目录下读取, 使用路径 hdfs://…, 由于 Flink 没有提供 hadoop 相关依赖, 需要 pom 中添加相关依赖:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.5</version>
<scope>provided</scope>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

5.2.4 从 Socket 读取数据

  • 不论从集合还是文件,我们读取的其实都是有界数据。在流处理的场景中,数据往往是无界的。这时又从哪里读取呢?
  • 一个简单的方式,就是我们之前用到的读取 socket 文本流。这种方式由于吞吐量小、稳定性较差,一般也是用于测试。
DataStream<String> stream = env.socketTextStream("localhost", 7777);
  • 1

5.2.5 从 Kafka 读取数据

  • 那对于真正的流数据,实际项目应该怎样读取呢?
  • Kafka 作为分布式消息传输队列,是一个高吞吐、易于扩展的消息系统。而消息队列的传输方式,恰恰和流处理是完全一致的。所以可以说 Kafka 和 Flink 天生一对,是当前处理流式数据的双子星。在如今的实时流处理应用中,由 Kafka 进行数据的收集和传输,Flink 进行分析计算,这样的架构已经成为众多企业的首选,如图 5-3 所示。

在这里插入图片描述

kafka数据源本身传入的FlinkKafkaConsumer类继承自FlinkKafkaConsumerBase

  • 略微遗憾的是,与 Kafka 的连接比较复杂,Flink 内部并没有提供预实现的方法。所以我们只能采用通用的 addSource 方式、实现一个 SourceFunction 了。
  • 好在Kafka与Flink确实是非常契合,所以Flink官方提供了连接工具flink-connector-kafka,直接帮我们实现了一个消费者 FlinkKafkaConsumer,它就是用来读取 Kafka 数据的SourceFunction。
  • 所以想要以 Kafka 作为数据源获取数据,我们只需要引入 Kafka 连接器的依赖。Flink 官方提供的是一个通用的 Kafka 连接器,它会自动跟踪最新版本的 Kafka 客户端。目前最新版本只支持 0.10.0 版本以上的 Kafka,读者使用时可以根据自己安装的 Kafka 版本选定连接器的依赖版本。这里我们需要导入的依赖如下。
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
	<version>${flink.version}</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

然后调用 env.addSource(),传入 FlinkKafkaConsumer 的对象实例就可以了。

package online.liujiahao.chapter05;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

public class SourceKafkaTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "hadoop102:9092");
        properties.setProperty("group.id", "consumer-group");
        properties.setProperty("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");
        DataStreamSource<String> stream = env.addSource(new
                FlinkKafkaConsumer<String>(
                "clicks",
                new SimpleStringSchema(),
                properties
        ));
        stream.print("Kafka");
        env.execute();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

创建 FlinkKafkaConsumer 时需要传入三个参数:

  1. 第一个参数 topic,定义了从哪些主题中读取数据。可以是一个 topic,也可以是 topic列表,还可以是匹配所有想要读取的 topic 的正则表达式。当从多个 topic 中读取数据时,Kafka 连接器将会处理所有 topic 的分区,将这些分区的数据放到一条流中去。
  2. 第二个参数是一个 DeserializationSchema 或者 KeyedDeserializationSchema。Kafka 消息被存储为原始的字节数据,所以需要反序列化成 Java 或者 Scala 对象。上面代码中使用的 SimpleStringSchema,是一个内置的 DeserializationSchema,它只是将字节数组简单地反序列化成字符串。DeserializationSchema 和 KeyedDeserializationSchema 是公共接口,所以我们也可以自定义反序列化逻辑。
  3. 第三个参数是一个 Properties 对象,设置了 Kafka 客户端的一些属性。

5.2.6 自定义 Source

  • 大多数情况下,前面的数据源已经能够满足需要。但是凡事总有例外,如果遇到特殊情况,我们想要读取的数据源来自某个外部系统,而 flink 既没有预实现的方法、也没有提供连接器,又该怎么办呢?
  • 那就只好自定义实现 SourceFunction 了。

接下来我们创建一个自定义的数据源,实现 SourceFunction 接口。主要重写两个关键方法:run()和 cancel()。

  1. run()方法:使用运行时上下文对象(SourceContext)向下游发送数据;
  2. cancel()方法:通过标识位控制退出循环,来达到中断数据源的效果。
    代码如下:
    我们先来自定义一下数据源:
package online.liujiahao.chapter05;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Calendar;
import java.util.Random;

public class ClickSource implements SourceFunction<Event> {
    //申明一个标志位
    private Boolean running = true;
    @Override
    public void run(SourceContext<Event> sourceContext) throws Exception {
        // 模拟真实环境,随机生成数据
        Random random = new Random();
        //定义字段选取的数据集
        String[] users = {"Marry", "Alice", "Bob", "Cary"};
        String[] urls = {"./home", "./cart", "./fav", "./prod?id=200", "./prod?id=10"};


        //循环生成数据
        while (running) {
            //在0-users.length-1 之间随机选取一个索引位置作为user的值。
            String user = users[random.nextInt(users.length)];
            //url的选取
            String url = urls[random.nextInt(urls.length)];
            //获取当前系统时间的一个 毫秒级的时间戳
            Long timestamp = Calendar.getInstance().getTimeInMillis();
            sourceContext.collect(new Event(user,url,timestamp));
            Thread.sleep(1000L);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

这个数据源,我们后面会频繁使用,所以在后面的代码中涉及到 ClickSource()数据源,使用上面的代码就可以了。
下面的代码我们来读取一下自定义的数据源。有了自定义的 source function,接下来只要调用 addSource()就可以了:

env.addSource(new ClickSource())
  • 1

下面是完整的代码:

package online.liujiahao.chapter05;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;

import java.util.Random;

public class SourceCustomTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Integer> customStream = env.addSource(new ParallelCustomSource()).setParallelism(2);

        customStream.print();

        env.execute();
    }

    public static class ParallelCustomSource implements ParallelSourceFunction<Integer>{
        private Boolean running = true;
        private Random random = new Random();
        
        @Override
        public void run(SourceContext<Integer> ctx) throws Exception {
            while (running) {
                ctx.collect(random.nextInt());
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

这里要注意的是 SourceFunction 接口定义的数据源,并行度只能设置为 1,如果数据源设置为大于 1 的并行度,则会抛出异常。如下程序所示:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;
public class SourceThrowException {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new ClickSource()).setParallelism(2).print();
env.execute();
} }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

输出的异常如下:

Exception in thread "main" java.lang.IllegalArgumentException: The parallelism 
of non parallel operator must be 1.
  • 1
  • 2

所以如果我们想要自定义并行的数据源的话,需要使用 ParallelSourceFunction,示例程序
如下:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import java.util.Random;
public class ParallelSourceExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.addSource(new CustomSource()).setParallelism(2).print();
        env.execute();
    }

    public static class CustomSource implements ParallelSourceFunction<Integer> {
        private boolean running = true;
        private Random random = new Random();

        @Override
        public void run(SourceContext<Integer> sourceContext) throws Exception {
            while (running) {
                sourceContext.collect(random.nextInt());
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

输出结果如下:

2> -686169047
2> 429515397
2> -223516288
2> 1137907312
2> -380165730
2> 2082090389
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

5.2.7 Flink 支持的数据类型

我们已经了解了 Flink 怎样从不同的来源读取数据。在之前的代码中,我们的数据都是定义好的 UserBehavior 类型,而且在 5.2.1 小节中特意说明了对这个类的要求。那还有没有其他更灵活的类型可以用呢?Flink 支持的数据类型到底有哪些?

  1. Flink 的类型系统
  • 为什么会出现“不支持”的数据类型呢?因为 Flink 作为一个分布式处理框架,处理的是以数据对象作为元素的流。如果用水流来类比,那么我们要处理的数据元素就是随着水流漂动的物体。在这条流动的河里,可能漂浮着小木块,也可能行驶着内部错综复杂的大船。要分布式地处理这些数据,就不可避免地要面对数据的网络传输、状态的落盘和故障恢复等问题,这就需要对数据进行序列化和反序列化。小木块是容易序列化的;而大船想要序列化之后传输,就需要将它拆解、清晰地知道其中每一个零件的类型。
  • 为了方便地处理数据,Flink 有自己一整套类型系统。Flink 使用“类型信息”(TypeInformation)来统一表示数据类型。TypeInformation 类是 Flink 中所有类型描述符的基类。
  • 它涵盖了类型的一些基本属性,并为每个数据类型生成特定的序列化器、反序列化器和比较器。
  1. Flink 支持的数据类型
    简单来说,对于常见的 Java 和 Scala 数据类型,Flink 都是支持的。Flink 在内部,Flink对支持不同的类型进行了划分,这些类型可以在 Types 工具类中找到:
    (1)基本类型
    所有 Java 基本类型及其包装类,再加上 Void、String、Date、BigDecimal 和 BigInteger。
    (2)数组类型
    包括基本类型数组(PRIMITIVE_ARRAY)和对象数组(OBJECT_ARRAY)
    (3)复合数据类型
  • Java 元组类型(TUPLE):这是 Flink 内置的元组类型,是 Java API 的一部分。最多
    25 个字段,也就是从 Tuple0~Tuple25,不支持空字段
  • Scala 样例类及 Scala 元组:不支持空字段
  • 行类型(ROW):可以认为是具有任意个字段的元组,并支持空字段
  • POJO:Flink 自定义的类似于 Java bean 模式的类

(4)辅助类型
Option、Either、List、Map 等
(5)泛型类型(GENERIC)
Flink 支持所有的 Java 类和 Scala 类。不过如果没有按照上面 POJO 类型的要求来定义,就会被 Flink 当作泛型类来处理。Flink 会把泛型类型当作黑盒,无法获取它们内部的属性;它们也不是由 Flink 本身序列化的,而是由 Kryo 序列化的。

在这些类型中,元组类型和 POJO 类型最为灵活,因为它们支持创建复杂类型。而相比之下,POJO 还支持在键(key)的定义中直接使用字段名,这会让我们的代码可读性大大增加。

所以,在项目实践中,往往会将流处理程序中的元素类型定为 Flink 的 POJO 类型。
Flink 对 POJO 类型的要求如下:

⚫ 类是公共的(public)和独立的(standalone,也就是说没有非静态的内部类);
⚫ 类有一个公共的无参构造方法;
⚫ 类中的所有字段是 public 且非 final 的;或者有一个公共的 getter 和 setter 方法,这些方法需要符合 Java bean 的命名规范。

所以我们看到,之前的 UserBehavior,就是我们创建的符合 Flink POJO 定义的数据类型。

  1. 类型提示(Type Hints)
  • Flink 还具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。但是,由于 Java 中泛型擦除的存在,在某些特殊情况下(比如 Lambda 表达式中),自动提取的信息是不够精细的——只告诉 Flink 当前的元素由“船头、船身、船尾”构成,根本无法重建出“大船”的模样;这时就需要显式地提供类型信
    息,才能使应用程序正常工作或提高其性能。
  • 为了解决这类问题,Java API 提供了专门的“类型提示”(type hints)。
  • 回忆一下之前的 word count 流处理程序,我们在将 String 类型的每个词转换成(word,count)二元组后,就明确地用 returns 指定了返回的类型。因为对于 map 里传入的 Lambda 表达式,系统只能推断出返回的是 Tuple2 类型,而无法得到 Tuple2<String, Long>。只有显式地告诉系统当前的返回类型,才能正确地解析出完整数据。
.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));
  • 1
  • 2
  • 这是一种比较简单的场景,二元组的两个元素都是基本数据类型。那如果元组中的一个元素又有泛型,该怎么处理呢?
  • Flink 专门提供了 TypeHint 类,它可以捕获泛型的类型信息,并且一直记录下来,为运行时提供足够的信息。我们同样可以通过.returns()方法,明确地指定转换之后的 DataStream 里元素的类型。
returns(new TypeHint<Tuple2<Integer, SomeType>>(){})
  • 1
TypeInformation相关源码

在这里插入图片描述
在java源码中,TypeInformation本身是一个抽象类,他就是flink所有数据类型的基类,也就是说所有在flink当中支持的数据类型的类型都属于TypeInformation
在这里插入图片描述
有个createSerializer抽象方法,它会为数据类型创建TypeSerializer类型的特定的序列化器

5.3.1 基本转换算子

首先我们来介绍一些基本的转换算子。

  1. 映射(map)
    map 是大家非常熟悉的大数据操作算子,主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个**“一一映射”**,消费一个元素就产出一个元素,如图 5-5 所示。

在这里插入图片描述
我们只需要基于 DataStrema 调用 map()方法就可以进行转换处理。方法需要传入的参数是接口 MapFunction 的实现;返回值类型还是 DataStream,不过泛型(流中的元素类型)可能改变。

下面的代码用不同的方式,实现了提取 Event 中的 user 字段的功能。

先看map函数的源码:
在这里插入图片描述
点进map,可以看到是要传入一个MapFunction函数,再点进MapFunction:
在这里插入图片描述
发现MapFunction是一个接口,调用它就需要重写它的 map() 方法,MapFunction有两个范型,<T,O>,T 就是传入的数据类型,O就是经过Map转换后输出的数据类型。我们可以在代码里单独定义一个类去实现MapFunction接口。

接下来是第一种方法:用自定义方法去实现MapFunction接口
代码如下:

package online.liujiahao.chapter05;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class TransformMapTest {
    public static void main(String[] args)throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //全局并行度设为1,方便后面的串行测试
        env.setParallelism(1);

        //从元素中读取数据
        DataStreamSource<Event> stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice","./prod?id=100",3000L)
        );

        //进行转换,提取user字段
        //1.使用自定义类,实现MapFunction接口
        SingleOutputStreamOperator<String> result = stream.map(new MyMapper());

        result.print();

        env.execute();
    }
    //MapFunction是一个接口,调用它就需要重写它的 map()  方法,MapFunction有两个范型,<T,O>,T 就是传入的数据类型,O就是经过Map转换后输出的数据类型。我们可以在代码里单独定义一个类去实现MapFunction接口。
    //自定义MapFunction
    public static class MyMapper implements MapFunction<Event, String> {

        @Override
        public String map(Event value) throws Exception {
            return value.user;
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

下面用匿名内部类的形式实现一次,
代码如下:

package online.liujiahao.chapter05;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class TransformMapTest {
    public static void main(String[] args)throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //全局并行度设为1,方便后面的串行测试
        env.setParallelism(1);

        //从元素中读取数据
        DataStreamSource<Event> stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice","./prod?id=100",3000L)
        );

        //进行转换,提取user字段
        //1.使用自定义类,实现MapFunction接口
        SingleOutputStreamOperator<String> result1 = stream.map(new MyMapper());

        //2.使用匿名类去实现MapFunction接口
        SingleOutputStreamOperator<String> result2 = stream.map(new MapFunction<Event, String>() {
            @Override
            public String map(Event value) throws Exception {
                return value.user;
            }
        });
        //result1.print();
        result2.print();

        env.execute();
    }
    //MapFunction是一个接口,调用它就需要重写它的 map()  方法,MapFunction有两个范型,<T,O>,T 就是传入的数据类型,O就是经过Map转换后输出的数据类型。我们可以在代码里单独定义一个类去实现MapFunction接口。
    //自定义MapFunction
    public static class MyMapper implements MapFunction<Event, String> {

        @Override
        public String map(Event value) throws Exception {
            return value.user;
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

还有没有更简单的方法呢?
其实对于只有一个方法的接口,使用lambda表达式就可以很轻松的实现。
代买如下:

package online.liujiahao.chapter05;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class TransformMapTest {
    public static void main(String[] args)throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //全局并行度设为1,方便后面的串行测试
        env.setParallelism(1);

        //从元素中读取数据
        DataStreamSource<Event> stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice","./prod?id=100",3000L)
        );

        //进行转换,提取user字段
        //1.使用自定义类,实现MapFunction接口
        SingleOutputStreamOperator<String> result1 = stream.map(new MyMapper());

        //2.使用匿名类去实现MapFunction接口
        SingleOutputStreamOperator<String> result2 = stream.map(new MapFunction<Event, String>() {
            @Override
            public String map(Event value) throws Exception {
                return value.user;
            }
        });

        //3.传入lambda表达式去实现
        SingleOutputStreamOperator<String> result3 = stream.map(data -> data.user);


        //result1.print();
        //result2.print();
        result3.print();

        env.execute();
    }
    //MapFunction是一个接口,调用它就需要重写它的 map()  方法,MapFunction有两个范型,<T,O>,T 就是传入的数据类型,O就是经过Map转换后输出的数据类型。我们可以在代码里单独定义一个类去实现MapFunction接口。
    //自定义MapFunction
    public static class MyMapper implements MapFunction<Event, String> {

        @Override
        public String map(Event value) throws Exception {
            return value.user;
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

我们可以发现直接传入lanbda表达式是最为简单的一种方式,但是java中存在范型擦除的问题,到时候就需要在后面加上return来返回转换结果的数据类型了。

  • map 是一个用户可以自定义的转换(transformation)算子,它作用于一条数据流上,转换处理的结果是一个确定的输出类型。当然,SingleOutputStreamOperator 类本身也继承自 DataStream 类,所以说 map 是将一个 DataStream 转换成另一个 DataStream 是完全正确的。

注意:
DataStreamSource继承自SingleOutputStreamOperator
SingleOutputStreamOperator又继承自DataStream
所以我们得到的其实还是一个DataStream
MapFunction有两个范型,一个Event,一个String
相当于是从Event转换成了String,实际上就是两个DataStream之间的转换

  1. 过滤(filter)
  • filter 转换操作,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置过滤条件,对于每一个流内元素进行判断,若为 true 则元素正常输出,若为 false 则元素被过滤掉,如图 5-6 所示。

在这里插入图片描述

进行 filter 转换之后的新数据流的数据类型与原数据流是相同的。filter 转换需要传入的参数需要实现 FilterFunction 接口,而 FilterFunction 内要实现 filter()方法,就相当于一个返回布尔类型的条件表达式。
下面的代码会将数据流中用户 Mary 的浏览行为过滤出来 。

package online.liujiahao.chapter05;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class TramsformFilterTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //从元素中读取数据

        DataStreamSource<Event> stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice","./prod?id=100",3000L)
        );

        //1.传入一个实现了FileterFunction的类的对象
        SingleOutputStreamOperator<Event> result1 = stream.filter(new MyFilter());

        //2.传入一个匿名类实现FilterFunction接口
        SingleOutputStreamOperator<Event> result2 = stream.filter(new FilterFunction<Event>() {
            @Override
            public boolean filter(Event event) throws Exception {
                return event.user.equals("Bob");
            }
        });

        //3. 传入lambda表达式
        //这里filter不用考虑范型擦除,因为它输出的数据类型就是输入的数据类型
        //这里就可以直接打印了。
        stream.filter(data -> data.user.equals("Alice")).print("lambda: Alice click");
        result1.print();
        result2.print();

        env.execute();


    }

    // 实现一个自定义的FilterFunction
    public static class MyFilter implements FilterFunction<Event> {
        @Override
        public boolean filter(Event value) throws Exception {
            return value.user.equals("Mary");
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

输出结果如下:
在这里插入图片描述

  1. 扁平映射(flatMap)
    flatMap 操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。消费一个元素,可以产生 0 到多个元素。flatMap 可以认为是“扁平化”(flatten)和“映射”(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理,如图 5-7 所示。我们此前 WordCount 程序的第一步分词操作,就用到了flatMap。
    在这里插入图片描述
  • 同 map 一样,flatMap 也可以使用 Lambda 表达式或者 FlatMapFunction 接口实现类的方式来进行传参,返回值类型取决于所传参数的具体逻辑,可以与原数据流相同,也可以不同。

  • flatMap 操作会应用在每一个输入事件上面,FlatMapFunction 接口中定义了 flatMap 方法,用户可以重写这个方法,在这个方法中对输入数据进行处理,并决定是返回 0 个、1 个或多个结果数据。因此 flatMap 并没有直接定义返回值类型,而是通过一个“收集器”(Collector)来指定输出。希望输出结果时,只要调用收集器的.collect()方法就可以了;这个方法可以多次调用,也可以不调用。所以 flatMap 方法也可以实现 map 方法和 filter 方法的功能,当返回结果是 0 个的时候,就相当于对数据进行了过滤,当返回结果是 1 个的时候,相当于对数据进行了简单的转换操作。

  • flatMap 的使用非常灵活,可以对结果进行任意输出,下面就是一个例子:

package online.liujiahao.chapter05;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class TransformFlatMapTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //从元素中读取数据

        DataStreamSource<Event> stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice","./prod?id=100",3000L)
        );

        //1. 实现一个自定义的FlatMapFunction
        stream.flatMap(new MyFlatMap()).print("1");

        //2. 传入lambda表达式
        stream.flatMap((Event value,Collector<String> out) -> {
            //对于"Mary",就是进行了一个Map操作
            if (value.user.equals("Mary")) {
                out.collect(value.url);
                //对于"Bob",就是进行了一个flat操作,将数据打散成3条后输出
            } else if (value.user.equals("Bob")) {
                out.collect(value.user);
                out.collect(value.url);
                out.collect(value.timeStamp.toString());
            }
            //对于"Alice",就是进行了一个filter操作。被过滤掉了。
            //todo 这里定义了lambda表达式后,用到了Collector去进行结果输出(Collector是有范型的),但是jvm不知道转换之后具体的数据类型是什么。
            //所以这里要调用returns指明返回的数据类型。
        }).returns(new TypeHint<String>() {})
                .print("2");

        env.execute();

    }

    public static class MyFlatMap implements FlatMapFunction<Event, String> {
        @Override
        public void flatMap(Event event, Collector<String> out) throws Exception {
            out.collect(event.user);
            out.collect(event.url);
            //时间戳是Long类型的数据,Collector输出的范型是String,所以这里做toString转换。
            out.collect(event.timeStamp.toString());
        }
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

在这里插入图片描述
FlatMap的参数和Map有些许不同,T 还是输入的数据类型,后面那个Collector 用来输出转换后的数据。
因为Map是一对一,一次只返回一个数据,但是如果想实现一对多的返回,就需要先将数据都收集起来,然后一起返回。
这里的输出方式就是使用到了Collector :收集器。(调用一次,就输出一次。)
这个收集器里面有个collect方法,就是将数据收集起来,传递到下游去。
在这里插入图片描述

输出结果如下,可以看到,本来是三条数据,现在被按字段打散之后,变成9条数据输出。
也就是一个扁平化映射的过程。
在这里插入图片描述

加入Lambda表达式后输出的结果如下:
在这里插入图片描述

5.3.2 聚合算子(Aggregation)

  • 直观上看,基本转换算子确实是在“转换”——因为它们都是基于当前数据,去做了处理和输出。而在实际应用中,我们往往需要对大量的数据进行统计或整合,从而提炼出更有用的信息。比如之前 word count 程序中,要对每个词出现的频次进行叠加统计。这种操作,计算的结果不仅依赖当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并——这就是所谓的“聚合”(Aggregation),也对应着 MapReduce 中的 reduce 操作。
  1. 按键分区(keyBy)
  • 对于 Flink 而言,DataStream 是没有直接进行聚合的 API 的。因为我们对海量数据做聚合肯定要进行分区并行处理,这样才能提高效率。所以在 Flink 中,要做聚合,需要先进行分区;这个操作就是通过 keyBy 来完成的。

  • keyBy 是聚合前必须要用到的一个算子。keyBy 通过指定键(key),可以将一条流从逻辑上划分成不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务,也就对应着任务槽(task slot)。

  • 基于不同的 key,流中的数据将被分配到不同的分区中去,如图 5-8 所示;这样一来,所有具有相同的 key 的数据,都将被发往同一个分区,那么下一步算子操作就将会在同一个 slot中进行处理了。
    在这里插入图片描述

  • 在内部,是通过计算 key 的哈希值(hash code),对分区数进行取模运算来实现的。所以这里 key 如果是 POJO 的话,必须要重写 hashCode()方法。

  • keyBy()方法需要传入一个参数,这个参数指定了一个或一组 key。有很多不同的方法来指定 key:比如对于 Tuple 数据类型,可以指定字段的位置或者多个位置的组合;对于 POJO 类型,可以指定字段的名称(String);另外,还可以传入 Lambda 表达式或者实现一个键选择器(KeySelector),用于说明从数据中提取 key 的逻辑。

相同Key的数据,一定被分配到同一个分区,不同Key的数据,有可能被分配到同一个分区,也可能被分配到不同的分区。

先简单看看keyBy()的源码:
在这里插入图片描述
它需要传入的参数是一个KeySelector
点进KeySelector:
在这里插入图片描述
发现它是一个interface,有一个需要重写的方法,和两个范型
IN :就是当前传入的数据类型
KEY:就是我们指定需要提取出的那个Key的类型
在这里插入图片描述
看重写的方法,参数列表是IN,就是要传入的数据,返回的是KEY,也就是我们要提取的Key

我们再看keyBy()的返回值
在这里插入图片描述
它的返回值就不是之前map和flatmap返回的SingleOutputStreamOperator这样的数据类型了,而是一个KeyedStream,我们可以把它看作是一个指定了Key的一个数据流,也叫 按键分区流
它本质上是对DataStream追加了一个key的信息,做了一个针对于key的逻辑上的分区。
所以keyBy不能将他作为一个算子,因为它本质上只是在原来的数据流上做了一个追加key的信息。

KeyedStream 是一个非常重要的数据结构,只有基于它才可以做后续的聚合操作(比如 sum,reduce);**而且它可以将当前算子任务的状态(state)也按照 key 进行划分、限定为仅对当前 key 有效。**关于状态的相关知识我们会在后面章节继续讨论。

  • 在内部,是通过计算 key 的哈希值(hash code),对分区数进行取模运算来实现的。所以这里 key 如果是 POJO 的话,必须要重写 hashCode()方法。
  • keyBy()方法需要传入一个参数,这个参数指定了一个或一组 key。有很多不同的方法来指定 key:比如对于 Tuple 数据类型,可以指定字段的位置或者多个位置的组合;对于 POJO 类型,可以指定字段的名称(String);另外,还可以传入 Lambda 表达式或者实现一个键选择器(KeySelector),用于说明从数据中提取 key 的逻辑。

我们可以以 id 作为 key 做一个分区操作,代码实现如下:

package online.liujiahao.chapter05;

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class TransformSimpleAggTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //从元素中读取数据
        //拓展一下Bob得数据,方便之后做分组聚合操作
        DataStreamSource<Event> stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice","./prod?id=100",3000L),
                new Event("Bob", "./home", 2500L),
                new Event("Alice","./prod?id=1",3300L),
                new Event("Alice","./home",3500L),
                new Event("Bob", "./prod?id=10", 2000L),
                new Event("Alice","./prod?id=2",3800L),
                new Event("Alice","./prod?id=3",4500L)
        );


        //按键分组之后进行聚合,提取当前用户最近 一次访问数据
        //keyBy得范型和KeySelector得范型是一样的,按user分组,key的类型就是string。
        stream.keyBy(new KeySelector<Event, String>() {
            @Override
            public String getKey(Event event) throws Exception {
                return event.user;
            }
            //最近一次访问,max取最大的时间戳
        }).max("timeStamp")
                .print("max: ");

        stream.keyBy(data -> data.user)
                .maxBy("timeStamp")
                .print("maxBy: ");

        env.execute();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

这里区别一下max()和maxBy()两种方法。
在这里插入图片描述
此处max()有两个方法,一个传Int型的索引号,一个传String类型的字段名称,此前wordcount案例用的是第一种,这时候POJO类型的数据用索引号会报错,而且直接传字段名称显然更方便一些。

在这里插入图片描述

注意输出结果:
Bob的第二条数据来了之后,max的输出结果是,将timeStamp的最大值取出来做了输出,而这条Event数据的其他字段还是沿用的与第一条一样的,所以这里可以看出,max是对特定字段做聚合(后面再来Bob的数据的时候,它也是只跟新时间戳,而不跟新其他的字段)。
maxBy的输出结果是,找到最大timeStamp所在的那条字段,将其完整的输出。
因此我们可以看到,max和maxBy的适用场景是不同的。

  • 需要注意的是,keyBy 得到的结果将不再是 DataStream,而是会将 DataStream 转换为KeyedStream。KeyedStream 可以认为是“分区流”或者“键控流”,它是对 DataStream 按照key 的一个逻辑分区,所以泛型有两个类型:除去当前流中的元素类型外,还需要指定 key 的
    类型。
  • KeyedStream 也继承自 DataStream,所以基于它的操作也都归属于 DataStream API。但它跟之前的转换操作得到的 SingleOutputStreamOperator 不同,只是一个流的分区操作,并不是一个转换算子。KeyedStream 是一个非常重要的数据结构,只有基于它才可以做后续的聚合操作(比如 sum,reduce);而且它可以将当前算子任务的状态(state)也按照 key 进行划分、限定为仅对当前 key 有效。关于状态的相关知识我们会在后面章节继续讨论。
  1. 简单聚合
    有了按键分区的数据流 KeyedStream,我们就可以基于它进行聚合操作了。Flink 为我们内置实现了一些最基本、最简单的聚合 API,主要有以下几种:
    • sum():在输入流上,对指定的字段做叠加求和的操作。
    • min():在输入流上,对指定的字段求最小值。
    • max():在输入流上,对指定的字段求最大值。
    • minBy():与 min()类似,在输入流上针对指定字段求最小值。不同的是,min()只计算指定字段的最小值,其他字段会保留最初第一个数据的值;而 minBy()则会返回包含字段最小值的整条数据。
    • maxBy():与 max()类似,在输入流上针对指定字段求最大值。两者区别与min()/minBy()完全一致。
  • 简单聚合算子使用非常方便,语义也非常明确。这些聚合方法调用时,也需要传入参数;但并不像基本转换算子那样需要实现自定义函数,只要说明聚合指定的字段就可以了。指定字段的方式有两种:指定位置,和指定名称。
  • 对于元组类型的数据,同样也可以使用这两种方式来指定字段。需要注意的是,元组中字段的名称,是以 f0、f1、f2、…来命名的。
    例如,下面就是对元组数据流进行聚合的测试:
public class TransTupleAggreationTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Tuple2<String, Integer>> stream = env.fromElements(
                Tuple2.of("a", 1),
                Tuple2.of("a", 3),
                Tuple2.of("b", 3),
                Tuple2.of("b", 4)
        );
        stream.keyBy(r -> r.f0).sum(1).print();
        stream.keyBy(r -> r.f0).sum("f1").print();
        stream.keyBy(r -> r.f0).max(1).print();
        stream.keyBy(r -> r.f0).max("f1").print();
        stream.keyBy(r -> r.f0).min(1).print();
        stream.keyBy(r -> r.f0).min("f1").print();
        stream.keyBy(r -> r.f0).maxBy(1).print();
        stream.keyBy(r -> r.f0).maxBy("f1").print();
        stream.keyBy(r -> r.f0).minBy(1).print();
        stream.keyBy(r -> r.f0).minBy("f1").print();
        env.execute();
    } }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

而如果数据流的类型是 POJO 类,那么就只能通过字段名称来指定,不能通过位置来指定
了。

 import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class TransPojoAggregationTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Event> stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L)
        );
        stream.keyBy(e -> e.user).max("timestamp").print(); // 指定字段名称
        env.execute();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 简单聚合算子返回的,同样是一个 SingleOutputStreamOperator,也就是从 KeyedStream 又转换成了常规的 DataStream。所以可以这样理解:keyBy 和聚合是成对出现的,先分区、后聚合,得到的依然是一个 DataStream。而且经过简单聚合之后的数据流,元素的数据类型保持不变。
  • 一个聚合算子,会为每一个key保存一个聚合的值,在Flink中我们把它叫作**“状态”(state)。所以每当有一个新的数据输入,算子就会更新保存的聚合结果,并发送一个带有更新后聚合值的事件到下游算子**。对于无界流来说,这些状态是永远不会被清除的,所以我们使用聚合算子,应该只用在含有有限个 key 的数据流上。
  1. 归约聚合(reduce)
  • 如果说简单聚合是对一些特定统计需求的实现,那么 reduce 算子就是一个一般化的聚合统计操作了。从大名鼎鼎的 MapReduce 开始,我们对 reduce 操作就不陌生:它可以对已有的数据进行归约处理,把每一个新输入的数据和当前已经归约出来的值,再做一个聚合计算。
  • 与简单聚合类似,reduce 操作也会将 KeyedStream 转换为 DataStream。它不会改变流的元素数据类型,所以输出类型和输入类型是一样的。
  • 调用 KeyedStream 的 reduce 方法时,需要传入一个参数,实现 ReduceFunction 接口。接
    口在源码中的定义如下:
    在这里插入图片描述
  • ReduceFunction 接口里需要实现 reduce()方法,这个方法接收两个输入事件,经过转换处理之后输出一个相同类型的事件;所以,对于一组数据,我们可以先取两个进行合并,然后再将合并的结果看作一个数据、再跟后面的数据合并,最终会将它“简化”成唯一的一个数据,这也就是 reduce“归约”的含义。在流处理的底层实现过程中,实际上是将中间“合并的结果”作为任务的一个状态保存起来的;之后每来一个新的数据,就和之前的聚合状态进一步做归约。
  • 其实,reduce 的语义是针对列表进行规约操作,运算规则由 ReduceFunction 中的 reduce方法来定义,而在 ReduceFunction 内部会维护一个初始值为空的累加器,注意累加器的类型和输入元素的类型相同,当第一条元素到来时,累加器的值更新为第一条元素的值,当新的元素到来时,新元素会和累加器进行累加操作,这里的累加操作就是 reduce 函数定义的运算规则。然后将更新以后的累加器的值向下游输出。
  • 我们可以单独定义一个函数类实现 ReduceFunction 接口,也可以直接传入一个匿名类。当然,同样也可以通过传入 Lambda 表达式实现类似的功能。
  • 与简单聚合类似,reduce 操作也会将 KeyedStream 转换为 DataStrema。它不会改变流的元素数据类型,所以输出类型和输入类型是一样的。
  • 下面我们来看一个稍复杂的例子。
  • 我们将数据流按照用户 id 进行分区,然后用一个 reduce 算子实现 sum 的功能,统计每个用户访问的频次;进而将所有统计结果分到一组,用另一个 reduce 算子实现 maxBy 的功能,记录所有用户中访问频次最高的那个,也就是当前访问量最大的用户是谁。
package online.liujiahao.chapter05;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class TransformReduceTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // todo 需求:统计每个用户的访问总量,找出访问量最大的用户
        DataStreamSource<Event> stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice","./prod?id=100",3000L),
                new Event("Bob", "./home", 2500L),
                new Event("Alice","./prod?id=1",3300L),
                new Event("Alice","./home",3500L),
                new Event("Bob", "./prod?id=10", 2000L),
                new Event("Alice","./prod?id=2",3800L),
                new Event("Alice","./prod?id=3",4500L)
        );

        //1. 像wordcount一样,先用map做一个转换,统计每个用户的访问频次
        SingleOutputStreamOperator<Tuple2<String, Long>> clickByUser = stream.map(new MapFunction<Event, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(Event event) throws Exception {
                return Tuple2.of(event.user, 1L);
            }
            //我们要提取的是二元组里的第一个元素,所以用传索引的方式
        }).keyBy(data -> data.f0)
                .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> reduce(Tuple2<String, Long> stringLongTuple2, Tuple2<String, Long> t1) throws Exception {
                        return Tuple2.of(stringLongTuple2.f0, stringLongTuple2.f1 + t1.f1);
                    }
                });

        //2. 找出访问量最大的用户
        //问题来了:因为API规定必须基于kededStream的数据才能调用聚合的方法。所以对于clickByUser这样一个数据流(他返回的是SingleOutputStreamOperator这样的数据类型),想要做聚合操作的话,得先按键分区。
        //现在已经无法按user分区了,因为要统计的是所有用户里访问量最大的。所以就可以将所有用户都放到一个组里面去。
        SingleOutputStreamOperator<Tuple2<String, Long>> result = clickByUser.keyBy(data -> "key").reduce(new ReduceFunction<Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                return value1.f1 > value2.f1 ? value1 : value2;
            }
        });

        result.print();

        env.execute();

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60

reduce 同简单聚合算子一样,也要针对每一个 key 保存状态。因为状态不会清空,所以我们需要将 reduce 算子作用在一个有限 key 的流上。

5.3.3 用户自定义函数(UDF)

  • 在前面的介绍我们可以发现,Flink 的 DataStream API 编程风格其实是一致的:基本上都是基于 DataStream 调用一个方法,表示要做一个转换操作;方法需要传入一个参数,这个参数都是需要实现一个接口。我们还可以扩展到 5.2 节讲到的 Source 算子,其实也是需要自定义类实现一个 SourceFunction 接口。我们能否从中总结出一些规律呢?
  • 很容易发现,这些接口有一个共同特点:全部都以算子操作名称 + Function 命名,例如源算子需要实现 SourceFunction 接口,map 算子需要实现 MapFunction 接口,reduce 算子需要实现 ReduceFunction 接口。而且查看源码会发现,它们都继承自 Function 接口;这个接口是空的,主要就是为了方便扩展为单一抽象方法(Single Abstract Method,SAM)接口,这就是我们所说的“函数接口”——比如 MapFunction 中需要实现一个 map()方法,ReductionFunction中需要实现一个 reduce()方法,它们都是 SAM 接口。我们知道,Java 8 新增的 Lambda 表达式就可以实现 SAM 接口;所以这样的好处就是,我们不仅可以通过自定义函数类或者匿名类来实现接口,也可以直接传入 Lambda 表达式。这就是所谓的用户自定义函数(user-defined function,UDF)。

接下来我们就对这几种编程方式做一个梳理总结。

  1. 函数类(Function Classes)
  • 对于大部分操作而言,都需要传入一个用户自定义函数(UDF),实现相关操作的接口,来完成处理逻辑的定义。Flink 暴露了所有 UDF 函数的接口,具体实现方式为接口或者抽象类,例如 MapFunction、FilterFunction、ReduceFunction 等。
  • 所以最简单直接的方式,就是自定义一个函数类,实现对应的接口。之前我们对于 API的练习,主要就是基于这种方式。
    下面例子实现了 FilterFunction 接口,用来筛选 url 中包含“home”的事件:
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class TransFunctionUDFTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Event> clicks = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L)
        );
        DataStream<Event> stream = clicks.filter(new FlinkFilter());
        stream.print();
        env.execute();
    }

    public static class FlinkFilter implements FilterFunction<Event> {
        @Override
        public boolean filter(Event value) throws Exception {
            return value.url.contains("home");
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

当然还可以通过匿名类来实现 FilterFunction 接口:

 DataStream<String> stream=clicks.filter(new FilterFunction<Event>(){@Overridepublic boolean filter(Event value)throws Exception{        return value.url.contains("home");    }});
  • 1

为了类可以更加通用,我们还可以将用于过滤的关键字"home"抽象出来作为类的属性,调用构造方法时传进去。

DataStream<Event> stream = clicks.filter(new KeyWordFilter("home"));
public static class KeyWordFilter implements FilterFunction<Event> {
    private String keyWord;
    KeyWordFilter(String keyWord) { 
        this.keyWord = keyWord; 
    }
    @Override
    public boolean filter(Event value) throws Exception {
        return value.url.contains(this.keyWord);
    } 
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  1. 匿名函数(Lambda)
  • 匿名函数(Lambda 表达式)是 Java 8 引入的新特性,方便我们更加快速清晰地写代码。Lambda 表达式允许以简洁的方式实现函数,以及将函数作为参数来进行传递,而不必声明额外的(匿名)类。
  • Flink 的所有算子都可以使用 Lambda 表达式的方式来进行编码,但是,当 Lambda 表达式使用 Java 的泛型时,我们需要显式的声明类型信息。
  • 下例演示了如何使用 Lambda 表达式来实现一个简单的 map() 函数,我们使用 Lambda 表达式来计算输入的平方。在这里,我们不需要声明 map() 函数的输入 i 和输出参数的数据类型,因为 Java 编译器会对它们做出类型推断。
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class TransFunctionLambdaTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Event> clicks = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L)
        );
//map 函数使用 Lambda 表达式,返回简单类型,不需要进行类型声明
        DataStream<String> stream1 = clicks.map(event -> event.url);
        stream1.print();
        env.execute();
    } 
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 由于 OUT 是 String 类型而不是泛型,所以 Flink 可以从函数签名 OUT map(IN value) 的实现中自动提取出结果的类型信息。
  • 但是对于像 flatMap() 这样的函数,它的函数签名 void flatMap(IN value, Collector out) 被 Java 编译器编译成了 void flatMap(IN value, Collector out),也就是说将 Collector 的泛型信息擦除掉了。这样 Flink 就无法自动推断输出的类型信息了。
    我们来看一段代码:
// flatMap 使用 Lambda 表达式,抛出异常
DataStream<String> stream2 = clicks.flatMap((event, out) -> {
	out.collect(event.url);
});
stream2.print();
  • 1
  • 2
  • 3
  • 4
  • 5

如果执行程序,Flink 会抛出如下异常:

org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing.
In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved.
An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface.
Otherwise the type has to be specified explicitly using type information.
  • 1
  • 2
  • 3
  • 4

在这种情况下,我们需要显式地指定类型信息,否则输出将被视为 Object 类型,这会导致低效的序列化。

// flatMap 使用 Lambda 表达式,必须通过 returns 明确声明返回类型
DataStream<String> stream2 = clicks.flatMap((Event event, Collector<String> 
out) -> {
        out.collect(event.url);
        }).returns(Types.STRING);
        stream2.print();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

当使用 map() 函数返回 Flink 自定义的元组类型时也会发生类似的问题。下例中的函数签名 Tuple2<String, Long> map(Event value) 被类型擦除为 Tuple2 map(Event value)。

//使用 map 函数也会出现类似问题,以下代码会报错
DataStream<Tuple2<String, Long>> stream3 = clicks
        .map( event -> Tuple2.of(event.user, 1L) );
        stream3.print();
  • 1
  • 2
  • 3
  • 4

一般来说,这个问题可以通过多种方式解决:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class ReturnTypeResolve {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Event> clicks = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L)
        );
// 想要转换成二元组类型,需要进行以下处理
// 1) 使用显式的 ".returns(...)"
        DataStream<Tuple2<String, Long>> stream3 = clicks
                .map( event -> Tuple2.of(event.user, 1L) )
                .returns(Types.TUPLE(Types.STRING, Types.LONG));
        stream3.print();
// 2) 使用类来替代 Lambda 表达式
        clicks.map(new MyTuple2Mapper())
                .print();
// 3) 使用匿名类来代替 Lambda 表达式
        clicks.map(new MapFunction<Event, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(Event value) throws Exception {
                return Tuple2.of(value.user, 1L);
            }
        }).print();
        env.execute();
    }
    // 自定义 MapFunction 的实现类
    public static class MyTuple2Mapper implements MapFunction<Event, Tuple2<String,
            Long>>{
        @Override
        public Tuple2<String, Long> map(Event value) throws Exception {
            return Tuple2.of(value.user, 1L);
        } 
    } 
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

这些方法对于其它泛型擦除的场景同样适用。

  1. 富函数类(Rich Function Classes)
  • “富函数类”也是 DataStream API 提供的一个函数类的接口,所有的 Flink函数类都有其Rich 版本。富函数类一般是以抽象类的形式出现的。例如:RichMapFunction、RichFilterFunction、RichReduceFunction 等。
  • 既然“富”,那么它一定会比常规的函数类提供更多、更丰富的功能。与常规函数类的不同主要在于,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。
  • 注:生命周期的概念在编程中其实非常重要,到处都有体现。例如:对于C 语言来说,我们需要手动管理内存的分配和回收,也就是手动管理内存的生命周期。分配内存而不回收,会造成内存泄漏,回收没有分配过的内存,会造成空指针异常。而在 JVM 中,虚拟机会自动帮助我们管理对象的生命周期。对于前端来说,一个页面也会有生命周期。数据库连接、网络连接以及文件描述符的创建和关闭,也都形成了生命周期。所以生命周期的概念在编程中是无处不在的,需要我们多加注意。
  • Rich Function 有生命周期的概念。典型的生命周期方法有:
    1. open()方法,是 Rich Function 的初始化方法,也就是会开启一个算子的生命周期。当一个算子的实际工作方法例如 map()或者 filter()方法被调用之前,open()会首先被调用。所以像文件 IO 的创建,数据库连接的创建,配置文件的读取等等这样一次性的工作,都适合在 open()方法中完成。
    2. close()方法,是生命周期中的最后一个调用的方法,类似于解构方法。一般用来做一些清理工作。
  • 需要注意的是,这里的生命周期方法,对于一个并行子任务来说只会调用一次;而对应的,实际工作方法,例如 RichMapFunction 中的map(),在每条数据到来后都会触发一次调用。
    来看一个例子:
package online.liujiahao.chapter05;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class TransformRichFunctionTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice", "./prod?id=100", 3000L),
                new Event("Bob", "./prod?id=1", 3100L),
                new Event("Bob", "./prod?id=2", 3600L),
                new Event("Bob", "./prod?id=3", 3800L),
                new Event("Bob", "./prod?id=0", 4200L));

        stream.map(new MyRichMapper()).setParallelism(2).print();
        env.execute();

    }

    // 实现一个自定义的富函数类
    public static class MyRichMapper extends RichMapFunction<Event,Integer>{

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            System.out.println("open生命周期被调用" + getRuntimeContext().getIndexOfThisSubtask()+"号任务启动");
        }

        @Override
        public Integer map(Event event) throws Exception {
            return event.url.length();
        }

        @Override
        public void close() throws Exception {
            super.close();
            System.out.println("close生命周期被调用" + getRuntimeContext().getIndexOfThisSubtask()+"号任务结束");
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

输出结果是:
在这里插入图片描述
这里可以看到:

  • open和close都只被调用一次。
  • 只有一个0号任务,因为当前并行度是 1

这里我们将并行度改为2
执行后结果为:
在这里插入图片描述
每一个子任务都会调用一次open和close。

一个常见的应用场景就是,如果我们希望连接到一个外部数据库进行读写操作,那么将连接操作放在 map()中显然不是个好选择——因为每来一条数据就会重新连接一次数据库;所以我们可以在 open()中建立连接,在 map()中读写数据,而在 close()中关闭连接。所以我们推荐的最佳实践如下:

public class MyFlatMap extends RichFlatMapFunction<IN, OUT>> {
@Override
public void open(Configuration configuration) {
// 做一些初始化工作
// 例如建立一个和 MySQL 的连接
        }
        @Override
        public void flatMap(IN in, Collector<OUT out) {
// 对数据库进行读写
        }
        @Override
        public void close() {
// 清理工作,关闭和 MySQL 数据库的连接。
        } 
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

另外,富函数类提供了 getRuntimeContext()方法(我们在本节的第一个例子中使用了一下),可以获取到运行时上下文的一些信息,例如程序执行的并行度,任务名称,以及状态(state)。这使得我们可以大大扩展程序的功能,特别是对于状态的操作,使得 Flink 中的算子具备了处理复杂业务的能力。关于 Flink 中的状态管理和状态编程,我们会在后续章节逐渐展开。

5.3.4 物理分区(Physical Partitioning)

  • 本节的最后,我们再来深入了解一下分区操作。
  • 顾名思义,“分区”(partitioning)操作就是要将数据进行重新分布,传递到不同的流分区去进行下一步处理。其实我们对分区操作并不陌生,前面介绍聚合算子时,已经提到了 keyBy,它就是一种按照键的哈希值来进行重新分区的操作。只不过这种分区操作只能保证把数据按key“分开”,至于分得均不均匀、每个 key 的数据具体会分到哪一区去,这些是完全无从控制的——所以我们有时也说,keyBy 是一种逻辑分区(logical partitioning)操作。
  • 如果说 keyBy 这种逻辑分区是一种“软分区”,那真正硬核的分区就应该是所谓的“物理分区”(physical partitioning)。**也就是我们要真正控制分区策略,精准地调配数据,告诉每个数据到底去哪里。**其实这种分区方式在一些情况下已经在发生了:例如我们编写的程序可能对多个处理任务设置了不同的并行度,那么当数据执行的上下游任务并行度变化时,数据就不应该还在当前分区以直通(forward)方式传输了——因为如果并行度变小,当前分区可能没有下游任务了;而如果并行度变大,所有数据还在原先的分区处理就会导致资源的浪费。所以这种情况下,系统会自动地将数据均匀地发往下游所有的并行任务,保证各个分区的负载均衡。有些时候,我们还需要手动控制数据分区分配策略。比如当发生数据倾斜的时候,系统无法自动调整,这时就需要我们重新进行负载均衡,将数据流较为平均地发送到下游任务操作分区中去。Flink 对于经过转换操作之后的 DataStream,提供了一系列的底层操作接口,能够帮我们实现数据流的手动重分区。为了同 keyBy 相区别,我们把这些操作统称为“物理分区”操作。物理分区与 keyBy 另一大区别在于,keyBy 之后得到的是一个 KeyedStream,而物理分区之后结果仍是 DataStream,且流中元素数据类型保持不变。从这一点也可以看出,分区算子并不对数据进行转换处理,只是定义了数据的传输方式。
  • 常见的物理分区策略有随机分配(Random)、轮询分配(Round-Robin)、重缩放(Rescale)和广播(Broadcast),下边我们分别来做了解。
  1. 随机分区(shuffle)
    最简单的重分区方式就是直接“洗牌”。通过调用 DataStream 的.shuffle()方法,将数据随
    机地分配到下游算子的并行任务中去。
    随机分区服从均匀分布(uniform distribution),所以可以把流中的数据随机打乱,均匀地
    传递到下游任务分区,如图 5-9 所示。因为是完全随机的,所以对于同样的输入数据, 每次执
    行得到的结果也不会相同。
    在这里插入图片描述
    经过随机分区之后,得到的依然是一个 DataStream。
    我们可以做个简单测试:将数据读入之后直接打印到控制台,将输出的并行度设置为 4,
    中间经历一次 shuffle。执行多次,观察结果是否相同。
package online.liujiahao.chapter05;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

public class TransformPartitionTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // todo 需求:统计每个用户的访问总量,找出访问量最大的用户
        DataStreamSource<Event> stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice","./prod?id=100",3000L),
                new Event("Bob", "./home", 2500L),
                new Event("Alice","./prod?id=1",3300L),
                new Event("Alice","./home",3500L),
                new Event("Bob", "./prod?id=10", 2000L),
                new Event("Alice","./prod?id=2",3800L),
                new Event("Alice","./prod?id=3",4500L)
        );

        //1.随机分区
        //shuffle是一个平均分区的算子,就是当上下游数据并行度不一致时,他就将数据平均分配到下游的不同分区上面去。
        stream.shuffle().print().setParallelism(4);
        
        env.execute();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

可以得到如下形式的输出结果:
在这里插入图片描述
2. 轮询分区(Round-Robin)
轮询也是一种常见的重分区方式。简单来说就是“发牌”,按照先后顺序将数据做依次分发,如图 5-10 所示。通过调用 DataStream 的.rebalance()方法,就可以实现轮询重分区。rebalance使用的是 Round-Robin 负载均衡算法,可以将输入流数据平均分配到下游的并行任务中去。

注:Round-Robin 算法用在了很多地方,例如 Kafka 和 Nginx。
在这里插入图片描述
我们同样可以在代码中进行测试:

package online.liujiahao.chapter05;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

public class TransformPartitionTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // todo 需求:统计每个用户的访问总量,找出访问量最大的用户
        DataStreamSource<Event> stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice","./prod?id=100",3000L),
                new Event("Bob", "./home", 2500L),
                new Event("Alice","./prod?id=1",3300L),
                new Event("Alice","./home",3500L),
                new Event("Bob", "./prod?id=10", 2000L),
                new Event("Alice","./prod?id=2",3800L),
                new Event("Alice","./prod?id=3",4500L)
        );


        //2.轮询分区(如果上下游并行度不同,Flink默认调用的就是这种)
        stream.rebalance().print().setParallelism(4);


        env.execute();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

得到的结果:
在这里插入图片描述
如果用发牌来比喻的话,相当于像下游的分区轮询发牌。

  1. 重缩放分区(rescale)
    重缩放分区和轮询分区非常相似。当调用 rescale()方法时,其实底层也是使用 Round-Robin算法进行轮询,但是只会将数据轮询发送到下游并行任务的一部分中,如图 5-11 所示。也就是说,“发牌人”如果有多个,那么 rebalance 的方式是每个发牌人都面向所有人发牌;而 rescale的做法是分成小团体,发牌人只给自己团体内的所有人轮流发牌。

在这里插入图片描述

  • 当下游任务(数据接收方)的数量是上游任务(数据发送方)数量的整数倍时,rescale的效率明显会更高。比如当上游任务数量是 2,下游任务数量是 6 时,上游任务其中一个分区的数据就将会平均分配到下游任务的 3 个分区中。
  • 由于 rebalance 是所有分区数据的“重新平衡”,当 TaskManager 数据量较多时,这种跨节点的网络传输必然影响效率;而如果我们配置的 task slot 数量合适,用 rescale 的方式进行“局部重缩放”,就可以让数据只在当前 TaskManager 的多个 slot 之间重新分配,从而避免了网络传输带来的损耗。
  • 从底层实现上看,rebalance 和 rescale 的根本区别在于任务之间的连接机制不同。rebalance将会针对所有上游任务(发送数据方)和所有下游任务(接收数据方)之间建立通信通道,这是一个笛卡尔积的关系;而 rescale 仅仅针对每一个任务和下游对应的部分任务之间建立通信通道,节省了很多资源。

代码实现如下:

package online.liujiahao.chapter05;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

public class TransformPartitionTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // todo 需求:统计每个用户的访问总量,找出访问量最大的用户
        DataStreamSource<Event> stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice","./prod?id=100",3000L),
                new Event("Bob", "./home", 2500L),
                new Event("Alice","./prod?id=1",3300L),
                new Event("Alice","./home",3500L),
                new Event("Bob", "./prod?id=10", 2000L),
                new Event("Alice","./prod?id=2",3800L),
                new Event("Alice","./prod?id=3",4500L)
        );

        //1.随机分区
        //shuffle是一个平均分区的算子,就是当上下游数据并行度不一致时,他就将数据平均分配到下游的不同分区上面去。
        //stream.shuffle().print().setParallelism(4);

        //2.轮询分区(如果上下游并行度不同,Flink默认调用的就是这种)
        //stream.rebalance().print().setParallelism(4);

        //3. rescale重缩放分区(分区轮询)
        env.addSource(new RichParallelSourceFunction<Integer>() {
            @Override
            public void run(SourceContext<Integer> ctx) throws Exception {
                for (int i = 1; i <= 8; i++) {
                    //将奇偶数分别发送到0号和1号并行分区
                    if (i % 2 == getRuntimeContext().getIndexOfThisSubtask()) {
                        ctx.collect(i);
                    }
                }
            }

            @Override
            public void cancel() {

            }
        }).setParallelism(2)
                .rescale()
                .print()
                .setParallelism(4);

        env.execute();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

输出结果如下:
奇数都是由3 4 两个线程输出。偶数由1 2 两个线程输出
在这里插入图片描述
可以将 rescale 方法换成 rebalance 方法,来体会一下这两种方法的区别。

  1. 广播(broadcast)
    这种方式其实不应该叫做“重分区”,因为经过广播之后,数据会在不同的分区都保留一份,可能进行重复处理。可以通过调用 DataStream 的 broadcast()方法,将输入数据复制并发送到下游算子的所有并行任务中去。
    具体代码测试如下:
package online.liujiahao.chapter05;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

public class TransformPartitionTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // todo 需求:统计每个用户的访问总量,找出访问量最大的用户
        DataStreamSource<Event> stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice","./prod?id=100",3000L),
                new Event("Bob", "./home", 2500L),
                new Event("Alice","./prod?id=1",3300L),
                new Event("Alice","./home",3500L),
                new Event("Bob", "./prod?id=10", 2000L),
                new Event("Alice","./prod?id=2",3800L),
                new Event("Alice","./prod?id=3",4500L)
        );

        //1.随机分区
        //shuffle是一个平均分区的算子,就是当上下游数据并行度不一致时,他就将数据平均分配到下游的不同分区上面去。
        //stream.shuffle().print().setParallelism(4);

        //2.轮询分区(如果上下游并行度不同,Flink默认调用的就是这种)
        //stream.rebalance().print().setParallelism(4);

        //3. rescale重缩放分区(分区轮询)
        env.addSource(new RichParallelSourceFunction<Integer>() {
            @Override
            public void run(SourceContext<Integer> ctx) throws Exception {
                for (int i = 1; i <= 8; i++) {
                    //将奇偶数分别发送到0号和1号并行分区
                    if (i % 2 == getRuntimeContext().getIndexOfThisSubtask()) {
                        ctx.collect(i);
                    }
                }
            }

            @Override
            public void cancel() {

            }
        }).setParallelism(2)
//                .rescale()
//                .print()
                .setParallelism(4);
		//4. 广播
        stream.broadcast().print().setParallelism(4);
        env.execute();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58

结果如下:
相当于每条数据都被输出了4次。
在这里插入图片描述
5. 全局分区(global)

全局分区也是一种特殊的分区方式。这种做法非常极端,通过调用.global()方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了 1,所以使用这个操作需要非常谨慎,可能对程序造成很大的压力。

package online.liujiahao.chapter05;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

public class TransformPartitionTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // todo 需求:统计每个用户的访问总量,找出访问量最大的用户
        DataStreamSource<Event> stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice","./prod?id=100",3000L),
                new Event("Bob", "./home", 2500L),
                new Event("Alice","./prod?id=1",3300L),
                new Event("Alice","./home",3500L),
                new Event("Bob", "./prod?id=10", 2000L),
                new Event("Alice","./prod?id=2",3800L),
                new Event("Alice","./prod?id=3",4500L)
        );

        //1.随机分区
        //shuffle是一个平均分区的算子,就是当上下游数据并行度不一致时,他就将数据平均分配到下游的不同分区上面去。
        //stream.shuffle().print().setParallelism(4);

        //2.轮询分区(如果上下游并行度不同,Flink默认调用的就是这种)
        //stream.rebalance().print().setParallelism(4);

        //3. rescale重缩放分区(分区轮询)
        env.addSource(new RichParallelSourceFunction<Integer>() {
            @Override
            public void run(SourceContext<Integer> ctx) throws Exception {
                for (int i = 1; i <= 8; i++) {
                    //将奇偶数分别发送到0号和1号并行分区
                    if (i % 2 == getRuntimeContext().getIndexOfThisSubtask()) {
                        ctx.collect(i);
                    }
                }
            }

            @Override
            public void cancel() {

            }
        }).setParallelism(2)
//                .rescale()
//                .print()
                .setParallelism(4);

        //4. 广播
        //stream.broadcast().print().setParallelism(4);

        //5.全局分区
        stream.global().print().setParallelism(4);
        env.execute();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62

结果如下:
在这里插入图片描述
6. 自定义分区(Custom)

  • 当 Flink 提 供 的 所 有 分 区 策 略 都 不 能 满 足 用 户 的 需 求 时 , 我 们 可 以 通 过 使 用partitionCustom()方法来自定义分区策略。
  • 在调用时,方法需要传入两个参数,第一个是自定义分区器(Partitioner)对象,第二个是应用分区器的字段,它的指定方式与 keyBy 指定 key 基本一样:可以通过字段名称指定,也可以通过字段位置索引来指定,还可以实现一个 KeySelector。
  • 例如,我们可以对一组自然数按照奇偶性进行重分区。代码如下:
package online.liujiahao.chapter05;

import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;


import javax.xml.crypto.dsig.keyinfo.KeyInfo;

public class TransformPartitionTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // todo 需求:统计每个用户的访问总量,找出访问量最大的用户
        DataStreamSource<Event> stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice","./prod?id=100",3000L),
                new Event("Bob", "./home", 2500L),
                new Event("Alice","./prod?id=1",3300L),
                new Event("Alice","./home",3500L),
                new Event("Bob", "./prod?id=10", 2000L),
                new Event("Alice","./prod?id=2",3800L),
                new Event("Alice","./prod?id=3",4500L)
        );

        //1.随机分区
        //shuffle是一个平均分区的算子,就是当上下游数据并行度不一致时,他就将数据平均分配到下游的不同分区上面去。
        //stream.shuffle().print().setParallelism(4);

        //2.轮询分区(如果上下游并行度不同,Flink默认调用的就是这种)
        //stream.rebalance().print().setParallelism(4);

        //3. rescale重缩放分区(分区轮询)
        env.addSource(new RichParallelSourceFunction<Integer>() {
            @Override
            public void run(SourceContext<Integer> ctx) throws Exception {
                for (int i = 1; i <= 8; i++) {
                    //将奇偶数分别发送到0号和1号并行分区
                    if (i % 2 == getRuntimeContext().getIndexOfThisSubtask()) {
                        ctx.collect(i);
                    }
                }
            }

            @Override
            public void cancel() {

            }
        }).setParallelism(2)
//                .rescale()
//                .print()
                .setParallelism(4);

        //4. 广播
        //stream.broadcast().print().setParallelism(4);

        //5.全局分区
        //stream.global().print().setParallelism(4);

        //6. 自定义分区
        env.fromElements(1,2,3,4,5,6,7,8)
                .partitionCustom(new Partitioner<Integer>() {
                    @Override
                    public int partition(Integer key, int numPartitions) {
                        return key % 2;
                    }
                }, new KeySelector<Integer, Integer>() {
            @Override
            public Integer getKey(Integer value) throws Exception {
                return value;
            }
        }).print().setParallelism(4);

        env.execute();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82

在这里插入图片描述
输入如下:
虽然并行度是4,但是我们是对2取余,所以只会有两个线程有输出结果,取余的结果分别是0和1,0就对应1号线程,1就对应2号线程。
在这里插入图片描述

5.4 输出算子(Sink)

Flink 作为数据处理框架,最终还是要把计算处理的结果写入外部存储,为外部应用提供支持,如图 5-12 所示,本节将主要讲解 Flink 中的 Sink 操作。我们已经了解了 Flink 程序如何对数据进行读取、转换等操作,最后一步当然就应该将结果数据保存或输出到外部系统了。

在这里插入图片描述

5.4.1 连接到外部系统

  • 在 Flink 中,如果我们希望将数据写入外部系统,其实并不是一件难事。我们知道所有算子都可以通过实现函数类来自定义处理逻辑,所以只要有读写客户端,与外部系统的交互在任何一个处理算子中都可以实现。例如在 MapFunction 中,我们完全可以构建一个到 Redis 的连接,然后将当前处理的结果保存到 Redis 中。如果考虑到只需建立一次连接,我们也可以利用RichMapFunction,在 open() 生命周期中做连接操作。
  • 这样看起来很方便,却会带来很多问题。Flink 作为一个快速的分布式实时流处理系统,对稳定性和容错性要求极高。一旦出现故障,我们应该有能力恢复之前的状态,保障处理结果的正确性。这种性质一般被称作“状态一致性”。Flink 内部提供了**一致性检查点(checkpoint)**来保障我们可以回滚到正确的状态;但如果我们在处理过程中任意读写外部系统,发生故障后就很难回退到从前了。
  • 为了避免这样的问题,Flink 的 DataStream API 专门提供了向外部写入数据的方法:addSink。与 addSource 类似,addSink 方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的;Flink 程序中所有对外的输出操作,一般都是利用 Sink 算 子完成的。
  • Sink 一词有“下沉”的意思,有些资料会相对于“数据源”把它翻译为“数据汇”。不论怎样理解,Sink 在 Flink 中代表了将结果数据收集起来、输出到外部的意思,所以我们这里统一把它直观地叫作“输出算子”。
  • 之前我们一直在使用的 print 方法其实就是一种 Sink,它表示将数据流写入标准控制台打印输出。查看源码可以发现,print 方法返回的就是一个 DataStreamSink。
public DataStreamSink<T> print(String sinkIdentifier) {
	PrintSinkFunction<T> printFunction = new PrintSinkFunction<>(sinkIdentifier, false);
	return addSink(printFunction).name("Print to Std. Out");
}
  • 1
  • 2
  • 3
  • 4

与 Source 算子非常类似,除去一些 Flink 预实现的 Sink,一般情况下Sink 算子的创建是通过调用 DataStream 的.addSink()方法实现的。

stream.addSink(new SinkFunction());
  • 1

addSource 的参数需要实现一个 SourceFunction 接口;类似地,addSink 方法同样需要传入一个参数,实现的是 SinkFunction 接口。在这个接口中只需要重写一个方法 invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用:

default void invoke(IN value, Context context) throws Exception
  • 1

当然,SinkFuntion 多数情况下同样并不需要我们自己实现。Flink 官方提供了一部分的框架的 Sink 连接器。如图 5-13 所示,列出了 Flink 官方目前支持的第三方系统连接器:

在这里插入图片描述

我们可以看到,像 Kafka 之类流式系统,Flink 提供了完美对接,source/sink 两端都能连接,可读可写;而对于 Elasticsearch、文件系统(FileSystem)、JDBC 等数据存储系统,则只提供了输出写入的 sink 连接器。
除 Flink 官方之外,Apache Bahir 作为给 Spark 和 Flink 提供扩展支持的项目,也实现了一些其他第三方系统与 Flink 的连接器,如图 5-14 所示。
在这里插入图片描述
除此以外,就需要用户自定义实现 sink 连接器了。
接下来,我们就选取一些常见的外部系统进行展开讲解。

5.4.2 输出到文件

  • 最简单的输出方式,当然就是写入文件了。对应着读取文件作为输入数据源,Flink 本来也有一些非常简单粗暴的输出到文件的预实现方法:如 writeAsText()、writeAsCsv(),可以直接将输出结果保存到文本文件或 Csv 文件。但我们知道,这种方式是不支持同时写入一份文件的;所以我们往往会将最后的 Sink 操作并行度设为 1,这就大大拖慢了系统效率;而且对于故障恢复后的状态一致性,也没有任何保证。所以目前这些简单的方法已经要被弃用。
  • Flink 为此专门提供了一个流式文件系统的连接器:StreamingFileSink,它继承自抽象类RichSinkFunction,而且集成了 Flink 的检查点(checkpoint)机制,用来保证精确一次(exactly once)的一致性语义。
  • StreamingFileSink 为批处理和流处理提供了一个统一的 Sink,它可以将分区文件写入 Flink支持的文件系统。它可以保证精确一次的状态一致性,大大改进了之前流式文件 Sink 的方式。
  • 它的主要操作是将数据写入桶(buckets),每个桶中的数据都可以分割成一个个大小有限的分区文件,这样一来就实现真正意义上的分布式文件存储。我们可以通过各种配置来控制“分桶”的操作;默认的分桶方式是基于时间的,我们每小时写入一个新的桶。换句话说,每个桶内保存的文件,记录的都是 1 小时的输出数据。
  • StreamingFileSink 支持**行编码(Row-encoded)批量编码(Bulk-encoded,比如 Parquet)**格式。这两种不同的方式都有各自的构建器(builder),调用方法也非常简单,可以直接调用StreamingFileSink 的静态方法:
    1. 行编码:StreamingFileSink.forRowFormat(basePath,rowEncoder)。
    1. 批量编码:StreamingFileSink.forBulkFormat(basePath,bulkWriterFactory)。在创建行或批量编码 Sink 时,我们需要传入两个参数,用来指定存储桶的基本路径(basePath)和数据的编码逻辑(rowEncoder 或 bulkWriterFactory)。

下面我们就以行编码为例,将一些测试数据直接写入文件:

package online.liujiahao.chapter05;

import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.util.concurrent.TimeUnit;

public class SinkToFilesTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice", "./prod?id=100", 3000L),
                new Event("Bob", "./prod?id=1", 3100L),
                new Event("Bob", "./prod?id=2", 3600L),
                new Event("Bob", "./prod?id=3", 3800L),
                new Event("Bob", "./prod?id=0", 4200L));

        StreamingFileSink<String> streamingFileSink = StreamingFileSink.<String>forRowFormat(new Path("./output"), new SimpleStringEncoder<>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withMaxPartSize(1024 * 1024 * 1024)
                                .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
                                .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
                                .build()
                )
                .build();

        stream.map(data -> data.toString())
                .addSink(streamingFileSink);

        env.execute();


    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

这里我们创建了一个简单的文件 Sink,通过.withRollingPolicy()方法指定了一个“滚动策略”。“滚动”的概念在日志文件的写入中经常遇到:因为文件会有内容持续不断地写入,所以我们应该给一个标准,到什么时候就开启新的文件,将之前的内容归档保存。也就是说,上面的代码设置了在以下 3 种情况下,我们就会滚动分区文件:

  • 至少包含 15 分钟的数据
  • 最近 5 分钟没有收到新的数据
  • 文件大小已达到 1 GB
    输出结果如下:
    会在参数指定的路径生产对应的文件夹,并且此时程序的并行度是多少,就会将数据均匀写入到对应个数的文件中。
    在这里插入图片描述

5.4.3 输出到 Kafka

  • Kafka 是一个分布式的基于发布/订阅的消息系统,本身处理的也是流式数据,所以跟Flink“天生一对”,经常会作为 Flink 的输入数据源和输出系统。Flink 官方为 Kafka 提供了 Source和 Sink 的连接器,我们可以用它方便地从 Kafka 读写数据。如果仅仅是支持读写,那还说明不了 Kafka 和 Flink 关系的亲密;真正让它们密不可分的是,Flink 与 Kafka 的连接器提供了端到端的精确一次(exactly once)语义保证,这在实际项目中是最高级别的一致性保证。关于这部分内容,我们会在后续章节做更详细的讲解。

  • 现在我们要将数据输出到 Kafka,整个数据处理的闭环已经形成,所以可以完整测试如下:

    1. 添加 Kafka 连接器依赖
      由于我们已经测试过从 Kafka 数据源读取数据,连接器相关依赖已经引入,这里就不重复介绍了。
    1. 启动 Kafka 集群
    1. 编写输出到 Kafka 的示例代码
      我们可以直接将用户行为数据保存为文件 clicks.csv,读取后不做转换直接写入 Kafka,主题(topic)命名为“clicks”。

    代码如下

package online.liujiahao.chapter05;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import java.util.Properties;

public class SinkToKafkaTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //1.从kafka中读取数
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","hadoop102:9092");

        DataStreamSource<String> kafkaStream = env.addSource(new FlinkKafkaConsumer<String>("clicks", new SimpleStringSchema(), properties));

        //2.用flink进行数处理
        SingleOutputStreamOperator<String> result = kafkaStream.map(new MapFunction<String, String>() {

            @Override
            public String map(String value) throws Exception {
                String[] fields = value.split(",");
                //trim() 用来去除前后的空白字段
                return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
            }
        });

        //结果写入kafka
        result.addSink(new FlinkKafkaProducer<String>("hadoop102:9092","events",new SimpleStringSchema()));

        env.execute();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

我们可以看到消费者可以正常消费数据,证明向 Kafka 写入数据成功。另外,我们也可以读取 5.2 节中介绍过的任意数据源,进行更多的完整测试。比较有趣的一个实验是,我们可以同时将 Kafka 作为 Flink 程序的数据源和写入结果的外部系统。只要将输入和输出的数据设置为不同的 topic,就可以看到整个系统运行的路径:Flink 从 Kakfa 的一个 topic 读取消费数据,然后进行处理转换,最终将结果数据写入 Kafka 的另一个 topic——数据从 Kafka 流入、经 Flink处理后又流回到 Kafka 去,这就是所谓的“数据管道”应用。

5.4.4 输出到 Redis

  • Redis 是一个开源的内存式的数据存储,提供了像字符串(string)、哈希表(hash)、列表(list)、集合(set)、排序集合(sorted set)、位图(bitmap)、地理索引和流(stream)等一系列常用的数据结构。因为它运行速度快、支持的数据类型丰富,在实际项目中已经成为了架构优化必不可少的一员,一般用作数据库、缓存,也可以作为消息代理。
  • Flink 没有直接提供官方的 Redis 连接器,不过 Bahir 项目还是担任了合格的辅助角色,为我们提供了 Flink-Redis 的连接工具。但版本升级略显滞后,目前连接器版本为 1.0,支持的Scala 版本最新到 2.11。由于我们的测试不涉及到 Scala 的相关版本变化,所以并不影响使用。
  • 在实际项目应用中,应该以匹配的组件版本运行。
    具体测试步骤如下:
    (1)导入的 Redis 连接器依赖
<dependency>
	<groupId>org.apache.bahir</groupId>
	<artifactId>flink-connector-redis_2.11</artifactId>
	<version>1.0</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

(2)启动 Redis 集群
这里我们为方便测试,只启动了单节点 Redis。
(3)编写输出到 Redis 的示例代码
连接器为我们提供了一个 RedisSink,它继承了抽象类 RichSinkFunction,这就是已经实现好的向 Redis 写入数据的 SinkFunction。我们可以直接将 Event 数据输出到 Redis:

package online.liujiahao.chapter05;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

public class SinkToRedisTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Event> stream = env.addSource(new ClickSource());

        //创建一个jedis连接配置
        FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
                .setHost("hadoop102")
                .build();
        //写入到redis
        stream.addSink(new RedisSink<>(config,new MyRedisMapper()));
        env.execute();
    }

    //自定义类实现RedisMapper接口
    public static class MyRedisMapper implements RedisMapper<Event>{

        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET,"clicks");
        }

        @Override
        public String getKeyFromData(Event data) {
            return data.user;
        }

        @Override
        public String getValueFromData(Event data) {
            return data.url;
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

这里 RedisSink 的构造方法需要传入两个参数:

  • JFlinkJedisConfigBase:Jedis 的连接配置
  • RedisMapper:Redis 映射类接口,说明怎样将数据转换成可以写入 Redis 的类型
    接下来主要就是定义一个 Redis 的映射类,实现 RedisMapper 接口。
    在这里我们可以看到,保存到 Redis 时调用的命令是 HSET,所以是保存为哈希表(hash),表名为“clicks”;保存的数据以 user 为 key,以 url 为 value,每来一条数据就会做一次转换。
    (4)运行代码,Redis 查看是否收到数据。

在这里插入图片描述
我们会发现,这里的数据是覆盖写入的!

5.4.5 输出到 Elasticsearch

ElasticSearch 是一个分布式的开源搜索和分析引擎,适用于所有类型的数据。ElasticSearch有着简洁的 REST 风格的 API,以良好的分布式特性、速度和可扩展性而闻名,在大数据领域应用非常广泛。
Flink 为 ElasticSearch 专门提供了官方的 Sink 连接器,Flink 1.13 支持当前最新版本的ElasticSearch。
写入数据的 ElasticSearch 的测试步骤如下。
(1)添加 Elasticsearch 连接器依赖

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

(2)启动 Elasticsearch 集群
(3)编写输出到 Elasticsearch 的示例代码

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
        org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
        ;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
public class SinkToEsTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Event> stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice", "./prod?id=100", 3000L),
                new Event("Alice", "./prod?id=200", 3500L),
                new Event("Bob", "./prod?id=2", 2500L),
                new Event("Alice", "./prod?id=300", 3600L),
                new Event("Bob", "./home", 3000L),
                new Event("Bob", "./prod?id=1", 2300L),
                new Event("Bob", "./prod?id=3", 3300L));
        ArrayList<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("hadoop102", 9200, "http"));
// 创建一个 ElasticsearchSinkFunction
        ElasticsearchSinkFunction<Event> elasticsearchSinkFunction = new
                ElasticsearchSinkFunction<Event>() {
                    @Override
                    public void process(Event element, RuntimeContext ctx, RequestIndexer
                            indexer) {
                        HashMap<String, String> data = new HashMap<>();
                        data.put(element.user, element.url);
                        IndexRequest request = Requests.indexRequest()
                                .index("clicks")
                                .type("type") // Es 6 必须定义 type
                                .source(data);
                        indexer.add(request);
                    }
                };
        stream.addSink(new ElasticsearchSink.Builder<Event>(httpHosts,
                elasticsearchSinkFunction).build());
        stream.addSink(esBuilder.build());
        env.execute();
    } 
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 与 RedisSink 类 似 , 连 接 器 也 为 我 们 实 现 了 写 入 到 Elasticsearch 的SinkFunction——ElasticsearchSink。区别在于,这个类的构造方法是私有(private)的,我们需要使用 ElasticsearchSink 的 Builder 内部静态类,调用它的 build()方法才能创建出真正的SinkFunction。 而 Builder 的构造方法中又有两个参数:
    1. httpHosts:连接到的 Elasticsearch 集群主机列表
    1. elasticsearchSinkFunction:这并不是我们所说的 SinkFunction,而是用来说明具体处理逻辑、准备数据向 Elasticsearch 发送请求的函数具体的操作需要重写中 elasticsearchSinkFunction 中的 process 方法,我们可以将要发送的数据放在一个 HashMap 中,包装成 IndexRequest 向外部发送 HTTP 请求。
      (4)运行代码,访问 Elasticsearch 查看是否收到数据,查询结果如下所示。
{
	"took" : 5,
	"timed_out" : false,
	"_shards" : {
		"total" : 1,
		"successful" : 1,
		"skipped" : 0,
		"failed" : 0
		},
	"hits" : 
	"total" : {
		"value" : 9,
		"relation" : "eq"
		},
	"max_score" : 1.0,
	"hits" : [
		{
		"_index" : "clicks",
		"_type" : "_doc",
		"_id" : "dAxBYHoB7eAyu-y5suyU",
		"_score" : 1.0,
		"_source" : {
			"Mary" : "./home"
			} }
...
] } }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

5.4.6 输出到 MySQL(JDBC)

关系型数据库有着非常好的结构化数据设计、方便的 SQL 查询,是很多企业中业务数据存储的主要形式。MySQL 就是其中的典型代表。尽管在大数据处理中直接与 MySQL 交互的场景不多,但最终处理的计算结果是要给外部应用消费使用的,而外部应用读取的数据存储往往就是 MySQL。所以我们也需要知道如何将数据输出到 MySQL 这样的传统数据库。
写入数据的 MySQL 的测试步骤如下。
(1)添加依赖

<dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
</dependency>
<dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.47</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

(2)启动 MySQL,在 database 库下建表 clicks

mysql> create table clicks(
-> user varchar(20) not null,
-> url varchar(100) not null);

(3)编写输出到 MySQL 的示例代码

package online.liujiahao.chapter05;

import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SinkToMysql {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice", "./prod?id=100", 3000L),
                new Event("Bob", "./prod?id=1", 3100L),
                new Event("Bob", "./prod?id=2", 3600L),
                new Event("Bob", "./prod?id=3", 3800L),
                new Event("Bob", "./prod?id=0", 4200L));

        //JDBC连接,对应的就用JDBCSink,并且JdbcSink没有直接提供SinkFunction,他是提供了一个sink()方法,它的返回结果是SinkFunction。
        stream.addSink(JdbcSink.sink(
                //第一个参数 sql
                "INSERT INTO clicks (user,url) VALUES (? , ?)",
                //第二个参数 JdbcStatementBuilder 点进去它本身并没有方法,他的父接口有一个accept方法。具体我们实现的也是他的accept方法。
                ((statement, evnet) -> {
                    statement.setString(1, evnet.user);
                    statement.setString(2, evnet.url);
                }),
                //第三个参数 JDBC的连接参数
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        //指定Mysql的Url
                        .withUrl("jdbc:mysql://hadoop102:3306/test")
                        //指定Mysql驱动
                        .withDriverName("com.mysql.jdbc.Driver")
                        .withUsername("root")
                        .withPassword("123456")
                        .build()
        ));

        env.execute();


    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

(4)运行代码,用客户端连接 MySQL,查看是否成功写入数据。
在这里插入图片描述
成功写入!

5.4.7 自定义 Sink 输出

  • 如果我们想将数据存储到我们自己的存储设备中,而 Flink 并没有提供可以直接使用的连接器,又该怎么办呢?
  • 与 Source 类似,Flink 为我们提供了通用的 SinkFunction 接口和对应的 RichSinkDunction抽象类,只要实现它,通过简单地调用 DataStream 的.addSink()方法就可以自定义写入任何外部存储。之前与外部系统的连接,其实都是连接器帮我们实现了 SinkFunction,现在既然没有现成的,我们就只好自力更生了。例如,Flink 并没有提供 HBase 的连接器,所以需要我们自己写。
  • 在实现 SinkFunction 的时候,需要重写的一个关键方法 invoke(),在这个方法中我们就可以实现将流里的数据发送出去的逻辑。
  • 我们这里使用了 SinkFunction 的富函数版本,因为这里我们又使用到了生命周期的概念,创建 HBase 的连接以及关闭 HBase 的连接需要分别放在 open()方法和 close()方法中。

(1)导入依赖

<dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>${hbase.version}</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

(2)编写输出到 HBase 的示例代码

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;

import java.nio.charset.StandardCharsets;

public class SinkCustomtoHBase {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.fromElements("hello", "world")
                .addSink(
                        new RichSinkFunction<String>() {
                            public org.apache.hadoop.conf.Configuration configuration; // 管理 Hbase 的配置信息,这里因为 Configuration 的重名问题,将类以完整路径导入
                            public Connection connection; // 管理 Hbase 连接

                            @Override
                            public void open(Configuration parameters) throws Exception {
                                super.open(parameters);
                                configuration = HBaseConfiguration.create();
                                configuration.set("hbase.zookeeper.quorum", "hadoop102:2181");
                                connection = ConnectionFactory.createConnection(configuration);
                            }

                            @Override
                            public void invoke(String value, Context context) throws Exception {
                                Table table = connection.getTable(TableName.valueOf("test")); // 表名为 test
                                Put put = new Put("rowkey".getBytes(StandardCharsets.UTF_8)); // 指定 rowkey
                                put.addColumn("info".getBytes(StandardCharsets.UTF_8) // 指定列名
                                        , value.getBytes(StandardCharsets.UTF_8) // 写入的数据
                                        , "1".getBytes(StandardCharsets.UTF_8)); // 写入的数据
                                table.put(put); // 执行 put 操作
                                table.close(); // 将表关闭
                            }

                            @Override
                            public void close() throws Exception {
                                super.close();
                                connection.close(); // 关闭连接
                            }
                        }
                );
        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

(3)可以在 HBase 查看插入的数据。

5.5 本章总结

  • 本章从编写 Flink 程序的基本流程入手,依次讲解了执行环境的创建、数据源的读取、数据流的转换操作,和最终结果数据的输出,对各种常见的转换操作 API 和外部系统的连接都做了详细介绍,并在其中穿插阐述了 Flink 中支持的数据类型和 UDF 的用法。我们可以自信地说,到目前为止已经充分掌握了 DataStream API 的基本用法,熟悉了 Flink 的编程习惯,应该说已经真正跨进了 Flink 流处理的大门。
  • 当然,本章对于转换算子只是一个简单介绍,Flink 中的操作远远不止这些,还有窗口(Window)、多流转换、底层的处理函数(Process Function)以及状态编程等更加高级的用法。
  • 另外本章中由于涉及读写外部系统,我们不只一次地提到了“精确一次(exactly once)”的状态一致性,这也是 Flink 的高级特性之一。关于这些内容,我们将在后续章节逐一展开。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/789313
推荐阅读
相关标签
  

闽ICP备14008679号