当前位置:   article > 正文

Flink 1. 13(二)时间和窗口_flink 时间窗口

flink 时间窗口

一.DataStream

1.概述

DataStream(数据流)本身是 Flink 中一个用来表示数据集合的类(Class),我们编写的Flink 代码其实就是基于这种数据类型的处理,所以这套核心 API 就以 DataStream 命名。对于批处理和流处理,我们都可以用这同一套 API 来实现

一个 Flink 程序,其实就是对 DataStream 的各种转换。具体来说,代码基本上都由以下几部分构成

  • 获取执行环境(execution environment)
  • 读取数据源(source)
  • 定义基于数据的转换操作(transformations)
  • 定义计算结果的输出位置(sink)
  • 触发程序执行(execute)

在这里插入图片描述

2.执行环境

编 写 Flink 程 序 的 第 一 步 , 就 是 创 建 执 行 环 境 。 我们要 获 取 的 执 行 环 境 , 是StreamExecutionEnvironment 类的对象,这是所有 Flink 程序的基础。在代码中创建执行环境的方式,就是调用这个类的静态方法,具体有以下三种

1.getExecutionEnvironment

直接调用 getExecutionEnvironment 方法。它会根据当前运行的上下文直接得到正确的结果:如果程序是独立运行的,就返回一个本地执行环境;如果是创建了 jar包,然后从命令行调用它并提交到集群执行,那么就返回集群的执行环境。也就是说,这个方法会根据当前运行的方式,自行决定该返回什么样的运行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  • 1

2. createLocalEnvironment

这个方法返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果不传入,则默认并行度就是本地的 CPU 核心数

StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();
  • 1

3. createRemoteEnvironment

这个方法返回集群执行环境。需要在调用时指定 JobManager 的主机名和端口号,并指定要在集群中运行的 Jar 包

在获取到程序执行环境后,我们还可以对执行环境进行灵活的设置。比如可以全局设置程序的并行度、禁用算子链,还可以定义程序的时间语义、配置容错机制等

三种处理模式

  • 流执行模式(STREAMING)
    这是 DataStream API 最经典的模式,一般用于需要持续实时处理的无界数据流。默认情况下,程序使用的就是 STREAMING 执行模式,Flink 本身持有的就是流处理的世界观,即使是批量数据,也可以看作“有界流”来进行处理。所以 STREAMING 执行模式对于有界数据和无界数据都是有效的

  • 批执行模式(BATCH)
    专门用于批处理的执行模式, 这种模式下,Flink 处理作业的方式类似于 MapReduce 框架对于不会持续计算的有界数据,我们用这种模式处理会更方便

  • 自动模式(AUTOMATIC)
    在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式

如何设置BATCH模式?

(1)通过命令行配置

bin/flink run -Dexecution.runtime-mode=BATCH ...
  • 1

在提交作业时,增加 execution.runtime-mode 参数,指定值为 BATCH

(2)通过代码配置

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
  • 1
  • 2
4.源算子

Flink 可以从各种来源获取数据,然后构建 DataStream 进行转换处理。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。所以,source就是我们整个处理程序的输入端

Flink 代码中通用的添加 source 的方式,是调用执行环境的 addSource()方法:

DataStream<String> stream = env.addSource(...);
  • 1

方法传入一个对象参数,需要实现 SourceFunction 接口;返回 DataStreamSource,读取数据的 source 操作是一个算子,得到的是一个数据流(DataStream)

普通可直接使用的源算子

public class SourceTest {
    public static void main(String[] args) throws Exception {
        // 1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        // 方式一.从文件读取数据
        DataStreamSource<String> source = env.readTextFile("./input/clicks.txt");
        source.print();

        // 方式二.从集合读取数据
        ArrayList<Integer> list = new ArrayList<>();
        list.add(10);
        list.add(20);
        DataStreamSource<Integer> source1 = env.fromCollection(list);
        source1.print();

        // 方式三.读取Socket文本流
        DataStreamSource<String> source2 = env.socketTextStream("hadoop102", 7777);

        env.execute();


    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

Kafka源

引入依赖

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
  <version>${flink.version}</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
public class KafkaSource {
    public static void main(String[] args) throws Exception {
        // 1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2.配置
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");

        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);

        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"100");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");

		// 参数一是主题 参数二是反序列化类型 
        DataStreamSource<String> kafkaSource =
                env.addSource(new FlinkKafkaConsumer<String>("click", new SimpleStringSchema(), properties));

        kafkaSource.print();

        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

自定义源

大多数情况下,前面的数据源已经能够满足需要。但是凡事总有例外,如果遇到特殊情况,我们想要读取的数据源来自某个外部系统,而 flink 既没有预实现的方法、也没有提供连接器,又该怎么办呢?
那就只好自定义实现 SourceFunction 了

接下来我们创建一个自定义的数据源,实现 SourceFunction 接口
主要重写两个关键方法:

  • run()方法:使用运行时上下文对象(SourceContext)向下游发送数据
  • cancel()方法:通过标识位控制退出循环,来达到中断数据源的效果

自定义pojo Event

public class Event {
    public String user;  // 用户名
    public String url; // 用户访问的url
    public Long timestamp; // 访问时间戳
    // 省略构造方法  get set toString
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

重写两个方法每隔1秒生成随机的模拟数据

public class ClickSource implements SourceFunction<Event> {
    // 声明一个布尔变量,作为控制数据生成的标识位
    private Boolean running = true;
    @Override
    public void run(SourceContext<Event> ctx) throws Exception {
        Random random = new Random();    // 在指定的数据集中随机选取数据
        String[] users = {"Mary", "Alice", "Bob", "Cary","Sun","Joe"};
        String[] urls = {"./home", "./cart", "./fav", "./prod?id=1", "./prod?id=2","./like"};

        while (running) {
            ctx.collect(new Event(
                    users[random.nextInt(users.length)],
                    urls[random.nextInt(urls.length)],
                    Calendar.getInstance().getTimeInMillis()
            ));
            // 隔1秒生成一个点击事件,方便观测
            Thread.sleep(1000);
        }
    }
    @Override
    public void cancel() {
        running = false;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

这里要注意的是 SourceFunction 接口定义的数据源,并行度只能设置为 1,所以如果我们想要自定义并行的数据源的话,需要实现ParallelSourceFunction

5.转换算子
5.1 基本转换算子

map

map 主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个“一一映射”,消费一个元素就产出一个元素

在这里插入图片描述

public interface MapFunction<T, O> extends Function, Serializable {
    O map(T var1) throws Exception;
}
  • 1
  • 2
  • 3

filter

filter 转换操作,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置过滤
条件,对于每一个流内元素进行判断,若为 true 则元素正常输出,若为 false 则元素被过滤掉
在这里插入图片描述

public interface FilterFunction<T> extends Function, Serializable {
    boolean filter(T var1) throws Exception;
}
  • 1
  • 2
  • 3

flatMap

flatMap 操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。消费一个元素,可以产生 0 到多个元素。flatMap 可以认为是“扁平化”(flatten)和“映射”(map)两步操作的结合
在这里插入图片描述

public interface FlatMapFunction<T, O> extends Function, Serializable {
    void flatMap(T var1, Collector<O> var2) throws Exception;
}

  • 1
  • 2
  • 3
  • 4
5.2 聚合算子

按键分区keyBy后才能使用聚合算子

对于 Flink 而言,DataStream 是没有直接进行聚合的 API 的。因为我们对海量数据做聚合肯定要进行分区并行处理,这样才能提高效率。所以在 Flink 中,要做聚合,需要先进行分区;这个操作就是通过 keyBy 来完成的

基于不同的 key,流中的数据将被分配到不同的分区中去,这样一来,所有具有相同的 key 的数据,都将被发往同一个分区,那么下一步算子操作就将会在同一个 slot中进行处理了,这里所说的分区,其实就是并行处理的子任务,也就对应着任务槽(task slot)

在这里插入图片描述

public interface KeySelector<IN, KEY> extends Function, Serializable {
    KEY getKey(IN var1) throws Exception
}
  • 1
  • 2
  • 3

简单聚合

有了按键分区的数据流 KeyedStream,我们就可以基于它进行聚合操作了。Flink 为我们内置实现了一些最基本、最简单的聚合 API,主要有以下几种

  • sum():在输入流上,对指定的字段做叠加求和的操作
  • min():在输入流上,对指定的字段求最小值
  • max():在输入流上,对指定的字段求最大值
  • minBy():与 min()类似,在输入流上针对指定字段求最小值。不同的是,min()只计算指定字段的最小值,其他字段会保留最初第一个数据的值;而 minBy()则会返回包含字段最小值的整条数据
  • maxBy():与 max()类似,在输入流上针对指定字段求最大值。两者区别与
    min()/minBy()完全一致

归约聚合reduce 两个输入和返回值类型都一样的

public interface ReduceFunction<T> extends Function, Serializable {
    T reduce(T var1, T var2) throws Exception;
}
  • 1
  • 2
  • 3
6.输出算子

Flink 的 DataStream API 专门提供了向外部写入数据的方法:addSink。与 addSource 类似,addSink 方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的;Flink 程序中所有对外的输出操作,一般都是利用 Sink 算 子完成的

Sink 一词有“下沉”的意思,有些资料会相对于“数据源”把它翻译为“数据汇”。不论怎样理解,Sink 在 Flink 中代表了将结果数据收集起来、输出到外部的意思,所以我们这里统一把它直观地叫作“输出算子”

之前我们一直在使用的 print 方法其实就是一种 Sink,它表示将数据流写入标准控制台打印输出。查看源码可以发现,print 方法返回的就是一个 DataStreamSink

在这里插入图片描述

8.1 输出到文件

StreamingFileSink 为批处理和流处理提供了一个统一的 Sink,它可以将分区文件写入 Flink支持的文件系统

它的主要操作是将数据写入桶(buckets),每个桶中的数据都可以分割成一个个大小有限的分区文件,这样一来就实现真正意义上的分布式文件存储。我们可以通过各种配置来控制“分桶”的操作;默认的分桶方式是基于时间的,我们每小时写入一个新的桶。换句话说,每个桶内保存的文件,记录的都是 1 小时的输出数据

StreamingFileSink 支持行编码(Row-encoded)和批量编码(Bulk-encoded,比如 Parquet)
格式。这两种不同的方式都有各自的构建器(builder),调用方法也非常简单,可以直接调用
StreamingFileSink 的静态方法:

  • 行编码:StreamingFileSink.forRowFormat(basePath,rowEncoder)
  • 批量编码:StreamingFileSink.forBulkFormat(basePath,bulkWriterFactory)

在创建行或批量编码 Sink 时,我们需要传入两个参数,用来指定存储桶的基本路径(basePath)和数据的编码逻辑(rowEncoder 或 bulkWriterFactory)

public class SinkToFile {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        
        DataStreamSource<Event> source = env.addSource(new ClickSource());

        // 泛型String是要写入的数据类型
        StreamingFileSink<String> fileSink = StreamingFileSink.<String>forRowFormat(new Path("./output"),
                        new SimpleStringEncoder<>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withMaxPartSize(1024 * 1024 * 1024)
                                .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
                                .build()
                )
                .build();

        source.map(Event::toString).addSink(fileSink);

        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
8.2 输出到Kafka
public class SinkToKafka {
    public static void main(String[] args) throws Exception {
        // 1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2.配置
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");

        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);

        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"100");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");

		// 3.从Kafka读取数据
        DataStreamSource<String> kafkaSource =
                env.addSource(new FlinkKafkaConsumer<String>("click", new SimpleStringSchema(), properties));
        
        SingleOutputStreamOperator<String> map = kafkaSource.map(data -> {
            String[] strs = data.split(" ");
            return new Event(strs[0], strs[1], Long.valueOf(strs[2]), Long.valueOf(strs[3])).toString();
        });
		// 4.写入到Kafka
        map.addSink(new FlinkKafkaProducer<String>("hadoop102:9092","events",new SimpleStringSchema()));
        
        
        env.execute();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

这里我们可以看到,addSink 传入的参数是一个 FlinkKafkaProducer。这也很好理解,因为需要向 Kafka 写入数据,自然应该创建一个生产者。FlinkKafkaProducer 继承了抽象类TwoPhaseCommitSinkFunction,这是一个实现了“两阶段提交”的 RichSinkFunction。两阶段提交提供了 Flink 向 Kafka 写入数据的事务性保证,能够真正做到精确一次(exactly once)的状态一致性

8.3 输出到redis

Flink 没有直接提供官方的 Redis 连接器,不过 Bahir 项目还是担任了合格的辅助角色,为我们提供了 Flink-Redis 的连接工具

具体测试步骤如下

(1)导入的 Redis 连接器依赖

<dependency>
	<groupId>org.apache.bahir</groupId>
	<artifactId>flink-connector-redis_2.11</artifactId>
	<version>1.0</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

(2)启动 Redis 集群

这里我们为方便测试,只启动了单节点 Redis

(3)编写输出到 Redis 的示例代码

public class SinkToRedis {
    public static void main(String[] args) throws Exception {
        // 1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2.配置
        FlinkJedisPoolConfig conf = new
                FlinkJedisPoolConfig.Builder()
                .setHost("175.178.154.194")
                .setPassword("zks0.0")
                .setDatabase(0)
                .build();

        DataStreamSource<Event> source = env.addSource(new ClickSource());

        source.addSink(new RedisSink<>(conf, new RedisMapper<Event>() {
            @Override // 调用哪个redis数据结构
            public RedisCommandDescription getCommandDescription() {
                return new RedisCommandDescription(RedisCommand.SET);
            }
            // key
            @Override
            public String getKeyFromData(Event event) {
                return event.getUser();
            }
            // value
            @Override
            public String getValueFromData(Event event) {
                return event.toString();
            }
        }));

        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
8.4 输出到Elasticsearch

Flink 为 ElasticSearch 专门提供了官方的 Sink 连接器,Flink 1.13 支持当前最新版本的
ElasticSearch

写入数据的 ElasticSearch 的测试步骤如下
(1)添加 Elasticsearch 连接器依赖

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId>
	<version>${flink.version}</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

(2)启动 Elasticsearch 集群

(3)编写输出到 Elasticsearch 的示例代码

与 RedisSink 类 似 , 连 接 器 也 为 我 们 实 现 了 写 入 到 Elasticsearch 的SinkFunction—ElasticsearchSink区别在于,这个类的构造方法是私有(private)的,我们需要使用 ElasticsearchSink 的 Builder 内部静态类,调用它的 build()方法才能创建出真正的SinkFunction。 而 Builder 的构造方法中又有两个参数:

  • httpHosts:连接到的 Elasticsearch 集群主机列表
  • elasticsearchSinkFunction:这并不是我们所说的 SinkFunction,而是用来说明具体处理逻辑、准备数据向 Elasticsearch 发送请求的函数

具体的操作需要重写中 elasticsearchSinkFunction 中的 process 方法,我们可以将要发送的数据放在一个 HashMap 中,包装成 IndexRequest 向外部发送 HTTP 请求

public class SinkToES {
    public static void main(String[] args) throws Exception {
        // 1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<Event> source = env.addSource(new ClickSource());

        // 2.定义host的列表
        List<HttpHost> hostList = new ArrayList<>();
        hostList.add(new HttpHost("175.178.154.194",9200));

        // 3.定义ES Sink Function
        ElasticsearchSinkFunction<Event> elasticsearchSinkFunction = new ElasticsearchSinkFunction<Event>() {
            // requestIndexer的add方法发送请求  代替了客户端
            @Override
            public void process(Event event, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {

                for (int i = 1; i <= 5 ; i++) {
                    IndexRequest request = new IndexRequest("clicks").id(String.valueOf(i));

                    request.source(JSONUtils.objectToJson(event), XContentType.JSON);

                    requestIndexer.add(request);
                }
            }
        };

        source.addSink(new ElasticsearchSink.Builder<>(hostList,elasticsearchSinkFunction).build());



        env.execute();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
8.5 JDBCSink — 输出到MySQL

写入数据的 MySQL 的测试步骤如下
(1)添加依赖

<dependency>
	 <groupId>org.apache.flink</groupId>
	 <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
	 <version>${flink.version}</version>
</dependency>
<dependency>
	 <groupId>mysql</groupId>
	 <artifactId>mysql-connector-java</artifactId>
	 <version>8.0.28</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

(2)启动 MySQL,在 database 库下建表 clicks

mysql> create table clicks(
-> user varchar(20) not null,
-> url varchar(100) not null);

public class SinkToMySQL {
    public static void main(String[] args) throws Exception {
        // 1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<Event> source = env.addSource(new ClickSource());

        source.addSink(JdbcSink.sink(
                "insert into clicks(user,url) values(?,?)",
                ((preparedStatement, event) -> {
                    preparedStatement.setString(1,event.user);
                    preparedStatement.setString(2,event.url);
                }),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://175.178.154.194:3306/flink?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8")
                        .withDriverName("com.mysql.cj.jdbc.Driver")
                        .withUsername("root")
                        .withPassword("xxxx")
                        .build()

        ));

        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
8.6 JDBCSink — 输出到ClickHouse
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
	 <artifactId>clickhouse-jdbc</artifactId>
	 <version>0.2.4</version>
	 <exclusions>
	     <exclusion>
	         <groupId>com.fasterxml.jackson.core</groupId>
	         <artifactId>jackson-databind</artifactId>
	     </exclusion>
	     <exclusion>
	         <groupId>com.fasterxml.jackson.core</groupId>
	         <artifactId>jackson-core</artifactId>
	     </exclusion>
	 </exclusions>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

和MySQL基本一致,只不过连接的时候没有用户名和密码

new JdbcExecutionOptions.Builder().withBatchSize(5).build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
                        .withUrl("jdbc:clickhouse://hadoop102:8123/default")
                        .build());
  • 1
  • 2
  • 3
  • 4
  • 5
8.7 自定义Sink 输出到Hbase
public class DimHbaseSinkFunction extends RichSinkFunction<JSONObject> {
    private Connection connection;


    // open 并行子任务都会执行一次open、close 常用来执行初始化操作
    @Override
    public void open(Configuration parameters) throws Exception {
        try {
            String url = "jdbc:phoenix:hadoop102,hadoop103,hadoop104:2181";
            Properties props = new Properties();
            props.put("phoenix.schema.isNamespaceMappingEnabled","true");
            connection = DriverManager.getConnection(url,props);
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

    // 自定义输出 value 数据格式  sinkTable(hbase),database(mysql),tableName(mysql),before,after(数据),type
    @Override
    public void invoke(JSONObject value, Context context) throws Exception {
        //获取要插入的数据 {id:1,name:kun}
        JSONObject after = value.getJSONObject("after");
        // 获取key 作为hbase的列
        Set<String> keySet = after.keySet();
        // 获取values 赋值
        Collection<Object> values = after.values();

        // 获取SQL语句
        String sql = "upsert into RTDW_HBASE" +"."+ value.getString("sinkTable")+
                "("+ StringUtils.join(keySet,",")+")" +
                "values('" + StringUtils.join(values,"','")
                + "')";
        System.out.println(sql);
        PreparedStatement ps = null;
        try{
            connection.setAutoCommit(false);
            // 预编译SQL
            ps = connection.prepareStatement(sql);
            // 执行插入修改操作
            ps.executeUpdate();
            connection.commit();
            ps.close();
        }catch (SQLException e){
            e.printStackTrace();
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
7.富函数

“富函数类”也是 DataStream API 提供的一个函数类的接口,所有的 Flink 函数类都有其Rich 版本。富函数类一般是以抽象类的形式出现的。例如:RichMapFunction、RichFilterFunction、RichReduceFunction 等

既然“富”,那么它一定会比常规的函数类提供更多、更丰富的功能。与常规函数类的不同主要在于,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能

Rich Function 有生命周期的概念。典型的生命周期方法有:

  • open()方法,是 Rich Function 的初始化方法,也就是会开启一个算子的生命周期。当一个算子的实际工作方法例如 map()或者 filter()方法被调用之前,open()会首先被调用。所以像文件 IO 的创建,数据库连接的创建,配置文件的读取等等这样一次性的工作,都适合在 open()方法中完成

  • close()方法,是生命周期中的最后一个调用的方法,类似于解构方法。一般用来做一些清理工作

1.注意富函数是抽象类,不是接口,并且类中的方法不止一个,所以不能用Lambda表达式,只能自己写一个类

2.每一个并行子任务都会执行一次open、close

3.富函数和状态编程有着息息相关的关系

public class RichFunctionTest {
    public static void main(String[] args) throws Exception {
        // 1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);
        // 2.从自定义数据源读取
        DataStreamSource<Event> source = env.addSource(new ClickSource());

        source.map(new MyRich()).setParallelism(2).print();

        env.execute();


    }
    // 对于open和close 一个并行度会被调用一次
    public static class MyRich extends RichMapFunction<Event,String>{
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            System.out.println("open生命周期被调用 " + getRuntimeContext().getIndexOfThisSubtask());
        }

        @Override
        public void close() throws Exception {
            super.close();
            System.out.println("close生命周期被调用 " + getRuntimeContext().getIndexOfThisSubtask());
        }

        @Override
        public String map(Event event) throws Exception {
            return event.getUrl();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
8.物理分区

有些时候,我们还需要手动控制数据分区分配策略。比如当发生数据倾斜的时候,系统无法自动调整,这时就需要我们重新进行负载均衡,将数据流较为平均地发送到下游任务操作分区中去。Flink 对于经过转换操作之后的 DataStream,提供了一系列的底层操作接口,能够帮我们实现数据流的手动重分区。为了同 keyBy 相区别,我们把这些操作统称为“物理分区”操作。物理分区与 keyBy 另一大区别在于,keyBy 之后得到的是一个KeyedStream,而物理分区之后结果仍是 DataStream,且流中元素数据类型保持不变。从这一点也可以看出,分区算子并不对数据进行转换处理,只是定义了数据的传输方式

常见的物理分区策略有随机分配(Random)、轮询分配(Round-Robin)、重缩放(Rescale)和广播(Broadcast),下边我们分别来做了解

1.随机分区(shuffle)

最简单的重分区方式就是直接“洗牌”。通过调用 DataStream 的.shuffle()方法,将数据随
机地分配到下游算子的并行任务中去

在这里插入图片描述

source.map(Event::getUser).shuffle().print().setParallelism(3);
  • 1

map算子并行度1,经过shuffle后,随机的将数据分到3个并行的print算子中

2. 轮询分区(Round-Robin)

轮询也是一种常见的重分区方式。简单来说就是“发牌”,按照先后顺序将数据做依次分发,通过调用 DataStream 的.rebalance()方法,就可以实现轮询重分区。rebalance使用的是 Round-Robin 负载均衡算法,可以将输入流数据平均分配到下游的并行任务中去

在这里插入图片描述

source.map(Event::getUser).rebalance().print().setParallelism(3);
  • 1

map算子并行度1,经过rebalance后,轮询将数据分到3个并行的print算子中
在这里插入图片描述

3.重缩放分区

重缩放分区和轮询分区非常相似。当调用 rescale()方法时,其实底层也是使用 Round-Robin算法进行轮询,但是只会将数据轮询发送到下游并行任务的一部分中,如图 5-11 所示。也就是说,“发牌人”如果有多个,那么rebalance 的方式是每个发牌人都面向所有人发牌;而 rescale的做法是分成小团体,发牌人只给自己团体内的所有人轮流发牌

在这里插入图片描述
4. 广播(broadcast)

这种方式其实不应该叫做“重分区”,因为经过广播之后,数据会在不同的分区都保留一份,可能进行重复处理。可以通过调用 DataStream 的 broadcast()方法,将输入数据复制并发送到下游算子的所有并行任务中去

5. 全局分区(global)

全局分区也是一种特殊的分区方式。这种做法非常极端,通过调用.global()方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了 1,所以使用这个操作需要非常谨慎,可能对程序造成很大的压力

自定义分区

当 Flink 提 供 的 所 有 分 区 策 略 都 不 能 满 足 用 户 的 需 求 时 , 我 们 可 以 通 过 使 用partitionCustom()方法来自定义分区策略

在调用时,方法需要传入两个参数,第一个是自定义分区器(Partitioner)对象,第二个是应用分区器的字段,它的指定方式与 keyBy 指定 key 基本一样:可以通过字段名称指定,也可以通过字段位置索引来指定,还可以实现一个 KeySelector

public class CustomPartition {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        DataStreamSource<Event> source = env.addSource(new ClickSource());

        source.partitionCustom(new Partitioner<String>() {
            @Override
            public int partition(String name, int i) {
                if(name.length() == 3) return 0;
                return 1;
            }
        }, new KeySelector<Event, String>() {
            @Override
            public String getKey(Event event) throws Exception {
                return event.getUser();  // 根据名字分区
            }
        }).print().setParallelism(2);

        env.execute();

    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

二.时间和窗口

1.时间语义

1.处理时间Processing Time

处理时间的概念非常简单,就是指执行处理操作的机器目前所处的系统时间

2. 事件时间(Event Time) (核心、重要)

事件时间,是指每个事件在对应的设备上发生的时间,也就是数据生成的时间数据一旦产生,这个时间自然就确定了,所以它可以作为一个属性嵌入到数据中。这其实就是这条数据记录的“时间戳”(Timestamp)

2.水位线

在这里插入图片描述
我们想统计8点-9点的数据,那这个窗口的时间以谁为标准呢?如果以系统时间为准,即到了9点这个窗口就关闭,显然是不合理的,因为由于延迟,可能有些数据没来得及在9点前进入窗口,但它是9点前发生的事件

一种简单的想法是,在数据流中加入一个时钟标记,记录当前的事件时间;这个标记可以直接广播到下游,当下游任务收到这个标记,就可以更新自己的时钟了。由于类似于水流中用来做标志的记号,在 Flink 中,这种用来衡量事件时间(Event Time)进展的标记,就被称作“水位线”(Watermark)

具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了

在这里插入图片描述

有序流周期性插入水位线(理解即可,实际中保证有序很难)

实际应用中,如果当前数据量非常大,可能会有很多数据的时间戳是相同的,这时每来一条数据就提取时间戳、插入水位线就做了大量的无用功。而且即使时间戳不同,同时涌来的数据时间差会非常小(比如几毫秒),往往对处理计算也没什么影响。所以为了提高效率,一般会每隔一段时间生成一个水位线,这个水位线的时间戳,就是当前最新数据的时间戳,如图,这时的水位线,其实就是有序流中的一个周期性出现的时间标记

在这里插入图片描述

无序流水位线

这里所说的“乱序”(out-of-order),是指数据的先后顺序不一致,在消息队列中由于某种原因导致数据乱序了
在这里插入图片描述
解决思路也很简单:我们插入新的水位线时,要先判断一下时间戳是否比之前的大,否则就不再生成新的水位线,也就是说,只有数据的时间戳比当前时钟大,才能推动时钟前进,这时才插入水位线

在这里插入图片描述
如果考虑到大量数据同时到来的处理效率,我们同样可以周期性地生成水位线。这时只需要保存一下之前所有数据中的最大时间戳,需要插入水位线时,就直接以它作为时间戳生成新的水位线

在这里插入图片描述

水位线设置延迟(重要)

没有延迟会导致什么情况

上图,当 9 秒产生的数据到来之后,我们就直接将时钟推进到了9 秒;如果有一个窗口结束时间就是 9 秒(比如,要统计 0~9 秒的所有数据),那么这时窗口就应该关闭、将收集到的所有数据计算输出结果了。但事实上,由于数据是乱序的,还可能有时间戳为 7 秒、8 秒的数据在 9 秒的数据之后才到来,这就是“迟到数据”(late data)。它们本来也应该属于 0~9 秒这个窗口,但此时窗口已经关闭,于是这些数据就被遗漏了,这会导致统计结果不正确

如何设置延迟

为了让窗口能够正确收集到迟到的数据,我们也可以等上一会(不需要很大的时间,一般是毫秒,最大也就几秒,比如2秒);也就是用当前已有数据的最大时间戳减去 2 秒,就是水位线的时间戳,这样的话,9 秒的数据到来之后,事件时钟不会直接推进到 9 秒,而是进展到了 7 秒;必须等到11 秒的数据到来之后,事件时钟才会进展到 9 秒,这时迟到数据也都已收集齐,0~9 秒的窗口就可以正确计算结果了
在这里插入图片描述

水位线的特性

现在我们可以知道,水位线就代表了当前的事件时间时钟,而且可以在数据的时间戳基础上加一些延迟来保证不丢数据,这一点对于乱序流的正确处理非常重要

我们可以总结一下水位线的特性:

  • 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据
  • 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展
  • 水位线是基于数据的时间戳生成的
  • 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进
  • 水位线可以通过设置延迟,来保证正确处理乱序数据
  • 一个水位线 Watermark(t),表示在当前流中事件时间已经达到了时间戳 t, 这代表 t 之前的所有数据都到齐了,之后流中不会出现时间戳 t’ ≤ t 的数据

代码设置水位线

在 Flink 的 DataStream API 中 , 有 一 个 单 独 用 于 生 成 水 位 线 的 法.assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间:

public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks( WatermarkStrategy<T> watermarkStrategy)
  • 1

数据里已经有时间戳了吗,为什么这里还要“分配”呢?这是因为原始的时间戳只是写入日志数据的一个字段,如果不提取出来并明确把它分配给数据,Flink 是无法知道数据真正产生的时间的。当然,有些时候数据源本身就提供了时间戳信息,比如读取 Kafka 时,我们就可以从 Kafka 数据中直接获取时间戳,而不需要单独提取字段分配

.assignTimestampsAndWatermarks()方法需要传入一个 WatermarkStrategy 作为参数,这就是 所 谓 的 “ 水 位 线 生 成 策 略 ” 。 WatermarkStrategy 中 包 含 了 一 个 “ 时 间 戳 分 配器”TimestampAssigner 和一个“水位线生成器”WatermarkGenerator

  • TimestampAssigner:主要负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础
  • WatermarkGenerator:主要负责按照既定的方式,基于时间戳生成水位线。在WatermarkGenerator 接口中,主要又有两个方法:onEvent()和 onPeriodicEmit()
@Public
public interface WatermarkGenerator<T> {
    void onEvent(T var1, long var2, WatermarkOutput var4);

    void onPeriodicEmit(WatermarkOutput var1);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • onEvent:每个事件(数据)到来都会调用的方法,它的参数有当前事件、时间戳,以及允许发出水位线的一个 WatermarkOutput,可以基于事件做各种操作
  • onPeriodicEmit:周期性调用的方法,可以由 WatermarkOutput 发出水位线。周期时间为处理时间,可以调用环境配置的.setAutoWatermarkInterval()方法来设置,默认为200ms
public class WaterMarkTest {
    public static void main(String[] args) throws Exception {
        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);
        env.getConfig().setAutoWatermarkInterval(100); // 100毫秒生成一次水位线

        
        env.addSource(new ClickSource())
                // 乱序流的WaterMark生成
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2)) // 延迟2秒保证数据正确
                                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                                    @Override // 时间戳的提取器
                                    public long extractTimestamp(Event event, long l) {
                                        return event.getTimestamp();
                                    }
                                })
                        );

        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

总结

为了保证数据正确,我们设置了水位线,水位线其实就是一个时间戳,为了保证数据都在窗口范围内,我们设置了延迟时间,水位线的真正时间戳就是数据中的时间戳 - 延迟时间 - 1 ,单位是毫秒

窗口定义是前闭后开的,假如我们需要8点到9点的数据,实际上是包括8点整的数据,不包括9点整的数据,假设延迟时间为5秒,这样8点0分5秒1ms的数据来了后,Flink水位线是8点,就开始统计了,9点整的数据对应的水位线其实是8点59分55秒999ms,这时候不会关闭窗口,因为要等待延迟数据,当Flink水位线是8点59分59秒999毫秒时,关闭窗口

水位线的传递

如果一个任务收到了来自上游并行任务的不同的水位线,说明上游各个分区处理得有快有慢,进度各不相同比如上游有两个并行子任务都发来了水位线,一个是 5 秒,一个是 7 秒;这代表第一个并行任务已经处理完 5 秒之前的
所有数据,而第二个并行任务处理到了 7 秒。那这时自己的时钟怎么确定呢?当然也要以“这之前的数据全部到齐”为标准。如果我们以较大的水位线 7 秒作为当前时间,那就表示“7 秒前的数据都已经处理完”,这显然不是事实——第一个上游分区才处理到 5 秒,5~7 秒的数据还会不停地发来;而如果以最小的水位线 5 秒作为当前时钟就不会有这个问题了,因为确实所有上游分区都已经处理完,不会再发 5 秒前的数据了。这让我们想到“木桶原理”:所有的上游并行任务就像围成木桶的一块块木板,它们中最短的那一块,决定了我们桶中的水位

在这里插入图片描述

3.窗口

定义

下图,我们设置了水位线延迟以后,在水位线为10时,11 12 秒的数据也进来了,但它们不属于0 - 10 秒

在 Flink 中,窗口其实并不是一个“框”,流进来的数据被框住了就只能进这一个窗口。相比之下,我们应该把窗口理解成一个“桶”,在 Flink 中,窗口可以把流切割成有限大小的多个“存储桶”(bucket);每个数据都会分发到对应的桶中,当到达窗口结束时间时,就对每个桶中收集的数据进行计算处理

在这里插入图片描述
窗口分类

1. 按照驱动类型分类

时间窗口以时间点来定义窗口的开始(start)和结束(end),所以截取出的就是某一时间段的数据。到达结束时间时,窗口不再收数据,触发计算输出结果,并将窗口关闭销毁。所以可以说基本思路就是“定点发车”

计数窗口基于元素的个数来截取数据,到达固定的个数时就触发计算并关闭窗口。这相当于座位有限、“人满就发车”,是否发车与时间无关。每个窗口截取数据的个数,就是窗口的大小

在这里插入图片描述
2. 按照窗口分配数据的规则分类

滚动窗口有固定的大小,是一种对数据进行“均匀切片”的划分方式。窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态。如果我们把多个窗口的创建,看作一个窗口的运动,那就好像它在不停地向前“翻滚”一样。这是最简单的窗口形式,我们之前所举的例子都是滚动窗口。也正是因为滚动窗口是“无缝衔接”,所以每个数据都会被分配到一个窗口,而且只会属于一个窗口

滚动窗口可以基于时间定义,也可以基于数据个数定义;需要的参数只有一个,就是窗口的大小(window size)。比如我们可以定义一个长度为 1 小时的滚动时间窗口,那么每个小时就会进行一次统计;或者定义一个长度为 10 的滚动计数窗口,就会每 10 个数进行一次统计
在这里插入图片描述
滑动窗口的大小也是固定的。区别在于,窗口之间并不是首尾相接的,而是可以“错开”一定的位置。如果看作一个窗口的运动,那么就像是向前小步“滑动”一样。既然是向前滑动,那么每一步滑多远,就也是可以控制的。所以定义滑动窗口的参数有两个:除去窗口大小(window size)之外,还有一个“滑动步长”(window slide),它其实就代表了窗口计算的频率

在这里插入图片描述

我们可以看到,与前两种窗口不同,会话窗口的长度不固定,起始和结束时间也是不确定的,各个分区之间窗口没有任何关联。会话窗口之间一定是不会重叠的,而且会留有至少为 size 的间隔(session gap)。在一些类似保持会话的场景下,往往可以使用会话窗口来进行数据的处理统计,两个数据之间间隔大于gap,即开启新的窗口

在这里插入图片描述
还有一类比较通用的窗口,就是“全局窗口”。这种窗口全局有效,会把相同 key 的所有数据都分配到同一个窗口中;说直白一点,就跟没分窗口一样。无界流的数据永无止尽,所以这种窗口也没有结束的时候,默认是不会做触发计算的。如果希望它能对数据进行计算处理,还需要自定义“触发器”(Trigger)

在这里插入图片描述

4.窗口API(重点)
4.1 窗口分类

(1)按键分区窗口(Keyed Windows)

经过按键分区 keyBy 操作后,数据流会按照 key 被分为多条逻辑流(logical streams),这就是 KeyedStream。基于 KeyedStream 进行窗口操作时, 窗口计算会在多个并行子任务上同时执行。相同 key 的数据会被发送到同一个并行子任务,而窗口操作会基于每个 key 进行单独的处理。所以可以认为,每个 key 上都定义了一组窗口,各自独立地进行统计计算

stream.keyBy(...)
.window(...)
  • 1
  • 2

(2)非按键分区(Non-Keyed Windows)

如果没有进行 keyBy,那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了 1。所以在实际应用中一般不推荐使用这种方式。在代码中,直接基于 DataStream 调用.windowAll()定义窗口

stream.windowAll(...)
  • 1

这里需要注意的是,对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll本身就是一个非并行的操作

4.2 窗口组成

窗口 = 窗口分配器 + 窗口函数

窗口分配器(Window Assigners)定义窗口分配器(Window Assigners)是构建窗口算子的第一步,它的作用就是定义数据应该被“分配”到哪个窗口

streamOperator.keyBy(Event::getUser)
        .window(TumblingEventTimeWindows.of(Time.hours(1)));// 滚动时间窗口


streamOperator.keyBy(Event::getUser)
        .window(SlidingEventTimeWindows.of(Time.hours(1),Time.minutes(10)));// 滑动时间窗口

streamOperator.keyBy(Event::getUser)
        .window(EventTimeSessionWindows.withGap(Time.minutes(1)));// 会话时间窗口

streamOperator.keyBy(Event::getUser)
        .window(GlobalWindows.create());// 全局窗口


streamOperator.keyBy(Event::getUser)
        .countWindow(100);   // 滚动计数窗口

streamOperator.keyBy(Event::getUser)
        .countWindow(100,10); // 滑动计数窗口
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

窗口函数

窗口分配器只说明了数据分到哪个窗口,我们还需要窗口函数才能实现对数据的处理

在这里插入图片描述

4.3 窗口函数 - 增量聚合函数

典型的增量聚合函数有两个:ReduceFunction 和 AggregateFunction

(1)归约函数(ReduceFunction)

中间聚合的状态和输出的结果,都和输入的数据类型是一样的

public interface ReduceFunction<T> extends Function, Serializable {
    T reduce(T var1, T var2) throws Exception;
}
  • 1
  • 2
  • 3

归约函数使用

stream.redcue(new MyRedcueFunction())
  • 1

(2)聚合函数(AggregateFunction)

ReduceFunction 可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。这就迫使我们必须在聚合前,先将数据转换(map)成预期结果类型;而在有些情况下,还需要对状态进行进一步处理才能得到输出结果,这时它们的类型可能不同,使用 ReduceFunction 就会非常麻烦

AggregateFunction 可以看作是 ReduceFunction 的通用版本,这里有三种类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。输入类型 IN 就是输入流中元素的数据类型;累加器类型 ACC 则是我们进行聚合的中间状态类型;而输出类型当然就是最终计算结果的类型了

接口中有四个方法:

  • createAccumulator():创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次

  • add():将输入的元素添加到累加器中。这就是基于聚合状态,对新来的数据进行进一步聚合的过程。方法传入两个参数:当前新到的数据 value,和当前的累加器accumulator;返回一个新的累加器值,也就是对聚合状态进行更新。每条数据到来之后都会调用这个方法

  • getResult():从累加器中提取聚合的输出结果。也就是说,我们可以定义多个状态,然后再基于这些聚合的状态计算出一个结果进行输出。比如之前我们提到的计算平均值,就可以把 sum 和 count 作为状态放入累加器,而在调用这个方法时相除得到最终结果。这个方法只在窗口要输出结果时调用

  • merge():合并两个累加器,并将合并后的状态作为一个累加器返回。这个方法只在需要合并窗口的场景下才会被调用;最常见的合并窗口(Merging Window)的场景就是会话窗口(Session Windows)

public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable 
{
 ACC createAccumulator();
 ACC add(IN value, ACC accumulator);
 OUT getResult(ACC accumulator);
 ACC merge(ACC a, ACC b);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

聚合函数使用

stream.aggregate(new MyAggregateFunction())
  • 1
4.4 窗口函数 - 全窗口函数

窗口操作中的另一大类就是全窗口函数。与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。很明显,这就是典型的批处理思路了——先攒数据,等一批都到齐了再正式启动处理流程,其实属于处理函数的一类

如果使用了keyBy

ProcessWindowFunction<IN, OUT, KEY, W extends Window>
  • 1

如果没有key,用ProcessAllWindowFunction

ProcessAllWindowFunction<IN, OUT, W extends Window>
  • 1

IN 输入类型
OUT 输出类型
KEY 分组字段的类型
W 窗口类型 一般是TimeWindow

4.5 增量函数 + 全窗口函数(常用)

我们之前在调用 WindowedStream 的.reduce()和.aggregate()方法时,只是简单地直接传入了一个ReduceFunction 或 AggregateFunction 进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是 WindowFunction(一般不用) 或者 ProcessWindowFunction(功能更全)

此时ProcessWindowFunction的输入为增量聚合函数的输出

5.其他窗口API
5.1 触发器

触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”,本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程

基于 WindowedStream 调用.trigger()方法,就可以传入一个自定义的窗口触发器(Trigger)

stream.keyBy(...)
 .window(...)
 .trigger(new MyTrigger())
 .process(new xxx)
  • 1
  • 2
  • 3
  • 4

Trigger 是一个抽象类,自定义时必须实现下面四个抽象方法:

  • onElement():窗口中每到来一个元素,都会调用这个方法
  • onEventTime():当注册的事件时间定时器触发时,将调用这个方法
  • onProcessingTime ():当注册的处理时间定时器触发时,将调用这个方法
  • clear():当窗口关闭销毁时,调用这个方法。一般用来清除自定义的状态

上面的前三个方法可以响应事件,那它们又是怎样跟窗口操作联系起来的呢?这就需要了解一下它们的返回值。这三个方法返回类型都是 TriggerResult,这是一个枚举类型(enum),其中定义了对窗口进行操作的四种类型

  • CONTINUE(继续):什么都不做
  • FIRE(触发):触发计算,输出结果
  • PURGE(清除):清空窗口中的所有数据,销毁窗口
  • FIRE_AND_PURGE(触发并清除):触发计算输出结果,并清除窗口
5.2 移除器(Evictor)

移除器主要用来定义移除某些数据的逻辑。基于 WindowedStream 调用.evictor()方法,就可以传入一个自定义的移除器(Evictor)。Evictor 是一个接口,不同的窗口类型都有各自预实现的移除器

stream.keyBy(...)
 .window(...)
 .evictor(new MyEvictor())
  • 1
  • 2
  • 3

Evictor 接口定义了两个方法:

  • evictBefore():定义执行窗口函数之前的移除数据操作
  • evictAfter():定义执行窗口函数之后的以处数据操作

默认情况下,预实现的移除器都是在执行窗口函数(window fucntions)之前移除数据的

6.延迟数据处理 (三重保障)

第一重保障 - 水位线延迟

第二重保障 - 窗口延迟不关闭

在事件时间语义下,窗口中可能会出现数据迟到的情况。这是因为在乱序流中,水位线(watermark)并不一定能保证时间戳更早的所有数据不会再来。当水位线已经到达窗口结束时间时,窗口会触发计算并输出结果,这时一般也就要销毁窗口了;如果窗口关闭之后,又有本属于窗口内的数据姗姗来迟,默认情况下就会被丢弃

不过在多数情况下,直接丢弃数据也会导致统计结果不准确。为了解决迟到数据的问题,Flink 提供了一个特殊的接口,可以为窗口算子设置一个“允许的最大延迟”(Allowed Lateness)。也就是说,我们可以设定允许延迟一段时间,在这段时间内,窗口不会销毁,继续到来的数据依然可以进入窗口中并触发计算。直到水位线推进到了窗口结束时间 + 延迟时间,才真正将窗口的内容清空,正式关闭窗口

基于 WindowedStream 调用.allowedLateness()方法,传入一个 Time 类型的延迟时间,就可以表示允许这段时间内的延迟数据

stream.keyBy(...)
 .window(TumblingEventTimeWindows.of(Time.mintues(1)))
 .allowedLateness(Time.minutes(1))
  • 1
  • 2
  • 3

比如上面的代码中,我们定义了 1 小时的滚动窗口,并设置了允许 1 分钟的延迟数据。也就是说,在不考虑水位线延迟的情况下,对于 8 点~9 点的窗口,本来应该是水位线到达 9 点整就触发计算并关闭窗口;现在允许延迟 1 分钟,那么 9 点整就只是触发一次计算并输出结果,并不会关窗。后续到达的数据,只要属于 8 点~9 点窗口,依然可以在之前统计的基础上继续叠加,并且再次输出一个更新后的结果。直到水位线到达了 9 点零 1 分,这时就真正清空状态、关闭窗口,之后再来的迟到数据就会被丢弃了

第三重保障 - 侧输出流

我们自然会想到,即使可以设置窗口的延迟时间,终归还是有限的,后续的数据还是会被丢弃。如果不想丢弃任何一个数据,又该怎么做呢?

Flink 还提供了另外一种方式处理迟到数据。我们可以将未收入窗口的迟到数据,放入“侧输出流”(side output)进行另外的处理。所谓的侧输出流,相当于是数据流的一个“分支”,这个流中单独放置那些错过了该上的车、本该被丢弃的数据。基于 WindowedStream 调用.sideOutputLateData() 方法,就可以实现这个功能。方法需要传入一个“输出标签”(OutputTag),用来标记分支的迟到数据流。因为保存的就是流中的原始数据,所以 OutputTag 的类型与流中数据类型相同

具体应用时,只要在处理函数的.processElement()或者.onTimer()方法中,调用上下文的.output()方法就可以了

这里 output()方法需要传入两个参数,第一个是一个“输出标签”OutputTag,用来标识侧输出流,一般会在外部统一声明;第二个就是要输出的数据

我们可以在外部先将 OutputTag 声明出来:

OutputTag<String> outputTag = new OutputTag<String>("side-output") {};
  • 1

如果想要获取这个侧输出流,可以基于处理之后的 DataStream 直接调用.getSideOutput()方法,传入对应的 OutputTag,这个方式与窗口 API 中获取侧输出流是完全一样的

DataStream<String> stringStream = longStream.getSideOutput(outputTag);
  • 1

案例(重要,理解三种保障)

窗口计算触发、窗口关闭都是由Flink水位线决定的,而计算的数据是真实的符合时间条件的数据,比如我设置水位线延迟2秒,那么0-22秒是收集第一个窗口数据的,因为有的数据在0-20秒内可能会进不来,所以多等2秒,确保事件时间为0-20秒的都进了窗口,但是多等2秒可能21秒、21.6秒的数据可能也已经进来了,没关系,Flink的窗口是分桶的,我们计算的时候多余的数据时不会被计算的,依旧计算的是0-20秒的数据!

下面这段代码,作用就是统计一个窗口内不同的页面被访问了多少次,主要关心三种保障是怎么发挥作用的即可

我们设置了一个20秒滚动窗口,并且水位线延迟了2秒,并且允许窗口延迟5秒关闭,而且还设置了侧输出流来处理延迟了太久的数据

读到这里,我们应该知道

1.哪些数据会被认为是第一个窗口的数据?(以下是自1970年开始毫秒单位)

时间戳在0-20000内的数据会是第一个窗口,包括0,但不包括20000,因为窗口的定义是前闭后开的!!!

同理,时间戳在20000 - 40000的数据会在第二个20秒的窗口

40000 - 60000会在第三个20秒的窗口

但是只有22000的数据到来后,第一个窗口才会进行计算,因为22000对应水位线为20秒

2.窗口会持续多长时间?

27秒的数据来了以后,第一个窗口就关闭了,因为此时Flink的水位线是25秒 = 窗口本身的20秒 + 允许延迟5秒

也就是说,只要27秒的数据及其以后的数据不来,且数据时间戳在0-20000的数据都会被视为第一个窗口,并且持续计算,如果27秒的数据来了,那么第一个窗口就关闭了,要是再来0-20000的数据就会进入侧输出流

同理,47秒的数据来了后,第二个窗口也就关闭了

public class LateDataTest {
    public static void main(String[] args) throws Exception {
        // 1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);
        env.getConfig().setAutoWatermarkInterval(100); // 100毫秒生成一次水位线

        SingleOutputStreamOperator<Event> streamOperator = env.socketTextStream("175.178.154.194", 7777)
                .map(str -> {
                    String[] strs = str.split(" ");
                    return new Event(strs[0], strs[1], Long.valueOf(strs[2]), Long.valueOf(strs[3]));
                })
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2)) // 第一重保证 延迟2秒保证数据正确
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override // 时间戳的提取器
                            public long extractTimestamp(Event event, long l) {
                                return event.getTimestamp();
                            }
                        })
                );

        streamOperator.print("input: ");

        // 定义一个输出标签
        OutputTag<Event> late = new OutputTag<Event>("late"){};

        SingleOutputStreamOperator<String> res = streamOperator.keyBy(Event::getUrl)
                .window(TumblingEventTimeWindows.of(Time.seconds(20)))
                .allowedLateness(Time.seconds(5)) // 第二重保证  窗口延迟5秒
                .sideOutputLateData(late) // 第三重保证 侧输出流
                .aggregate(new UrlCount.MyAgg(), new UrlCount.MyProcessFunction());

        res.print("result: ");
        res.getSideOutput(late).print("late: ");


        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

在这里插入图片描述

首先第一个窗口的数据时0-20秒(不包括20秒),可以看到符合条件的只有前三条数据,并且只有水位线到达20秒(也就是事件时间为22秒的数据到来后),触发了计算,但此时窗口没有关闭

在这里插入图片描述
当27秒的数据到达后,也就是水位线时间为25秒时,第一个窗口关闭,此时再进来属于第一个窗口的数据,就会进入侧输入流

7.关于窗口、水位线延迟、窗口延迟、侧输出流总结

1.我们计算的数据肯定是在我们规定的时间范围内的,比如滚动窗口设置1小时,那么8点-9点的数据肯定都在这个窗口内,而且这个窗口内也不会有其他时间的数据,因为Flink的窗口是分桶的

2.为了保证延迟数据的到来,我们可以延迟水位线,多等一会来确保8-9点的数据都来了

水位线时间戳 = 实际事件时间 - 水位线延迟时间 - 1 (单位是毫秒)。窗口触发计算、窗口关闭时间都是以水位线为驱动

3.为了更加确保延迟数据的到来,我们允许窗口延迟关闭,即可以再设置个窗口关闭时间

窗口关闭时间 = 窗口正常时间 + 窗口延迟关闭时间,和水位线延迟时间没有关系

4.只要达到窗口计算触发时间并且窗口没关闭,来数据立即触发计算

5.窗口关闭后,再来属于关闭窗口的数据,只能进入侧输出流

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/544109
推荐阅读
相关标签
  

闽ICP备14008679号