赞
踩
DataStream(数据流)本身是 Flink 中一个用来表示数据集合的类(Class),我们编写的Flink 代码其实就是基于这种数据类型的处理,所以这套核心 API 就以 DataStream 命名。对于批处理和流处理,我们都可以用这同一套 API 来实现
一个 Flink 程序,其实就是对 DataStream 的各种转换。具体来说,代码基本上都由以下几部分构成
编 写 Flink 程 序 的 第 一 步 , 就 是 创 建 执 行 环 境 。 我们要 获 取 的 执 行 环 境 , 是StreamExecutionEnvironment 类的对象,这是所有 Flink 程序的基础。在代码中创建执行环境的方式,就是调用这个类的静态方法,具体有以下三种
1.getExecutionEnvironment
直接调用 getExecutionEnvironment 方法。它会根据当前运行的上下文直接得到正确的结果:如果程序是独立运行的,就返回一个本地执行环境;如果是创建了 jar包,然后从命令行调用它并提交到集群执行,那么就返回集群的执行环境。也就是说,这个方法会根据当前运行的方式,自行决定该返回什么样的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2. createLocalEnvironment
这个方法返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果不传入,则默认并行度就是本地的 CPU 核心数
StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();
3. createRemoteEnvironment
这个方法返回集群执行环境。需要在调用时指定 JobManager 的主机名和端口号,并指定要在集群中运行的 Jar 包
在获取到程序执行环境后,我们还可以对执行环境进行灵活的设置。比如可以全局设置程序的并行度、禁用算子链,还可以定义程序的时间语义、配置容错机制等
三种处理模式
流执行模式(STREAMING)
这是 DataStream API 最经典的模式,一般用于需要持续实时处理的无界数据流。默认情况下,程序使用的就是 STREAMING 执行模式,Flink 本身持有的就是流处理的世界观,即使是批量数据,也可以看作“有界流”来进行处理。所以 STREAMING 执行模式对于有界数据和无界数据都是有效的
批执行模式(BATCH)
专门用于批处理的执行模式, 这种模式下,Flink 处理作业的方式类似于 MapReduce 框架对于不会持续计算的有界数据,我们用这种模式处理会更方便
自动模式(AUTOMATIC)
在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式
如何设置BATCH模式?
(1)通过命令行配置
bin/flink run -Dexecution.runtime-mode=BATCH ...
在提交作业时,增加 execution.runtime-mode 参数,指定值为 BATCH
(2)通过代码配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
Flink 可以从各种来源获取数据,然后构建 DataStream 进行转换处理。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。所以,source就是我们整个处理程序的输入端
Flink 代码中通用的添加 source 的方式,是调用执行环境的 addSource()方法:
DataStream<String> stream = env.addSource(...);
方法传入一个对象参数,需要实现 SourceFunction 接口;返回 DataStreamSource,读取数据的 source 操作是一个算子,得到的是一个数据流(DataStream)
、
普通可直接使用的源算子
public class SourceTest { public static void main(String[] args) throws Exception { // 1.创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 方式一.从文件读取数据 DataStreamSource<String> source = env.readTextFile("./input/clicks.txt"); source.print(); // 方式二.从集合读取数据 ArrayList<Integer> list = new ArrayList<>(); list.add(10); list.add(20); DataStreamSource<Integer> source1 = env.fromCollection(list); source1.print(); // 方式三.读取Socket文本流 DataStreamSource<String> source2 = env.socketTextStream("hadoop102", 7777); env.execute(); } }
Kafka源
引入依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
public class KafkaSource { public static void main(String[] args) throws Exception { // 1.创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2.配置 Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class); properties.put(ConsumerConfig.GROUP_ID_CONFIG,"100"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest"); // 参数一是主题 参数二是反序列化类型 DataStreamSource<String> kafkaSource = env.addSource(new FlinkKafkaConsumer<String>("click", new SimpleStringSchema(), properties)); kafkaSource.print(); env.execute(); } }
自定义源
大多数情况下,前面的数据源已经能够满足需要。但是凡事总有例外,如果遇到特殊情况,我们想要读取的数据源来自某个外部系统,而 flink 既没有预实现的方法、也没有提供连接器,又该怎么办呢?
那就只好自定义实现 SourceFunction 了
接下来我们创建一个自定义的数据源,实现 SourceFunction 接口
主要重写两个关键方法:
自定义pojo Event
public class Event {
public String user; // 用户名
public String url; // 用户访问的url
public Long timestamp; // 访问时间戳
// 省略构造方法 get set toString
}
重写两个方法每隔1秒生成随机的模拟数据
public class ClickSource implements SourceFunction<Event> { // 声明一个布尔变量,作为控制数据生成的标识位 private Boolean running = true; @Override public void run(SourceContext<Event> ctx) throws Exception { Random random = new Random(); // 在指定的数据集中随机选取数据 String[] users = {"Mary", "Alice", "Bob", "Cary","Sun","Joe"}; String[] urls = {"./home", "./cart", "./fav", "./prod?id=1", "./prod?id=2","./like"}; while (running) { ctx.collect(new Event( users[random.nextInt(users.length)], urls[random.nextInt(urls.length)], Calendar.getInstance().getTimeInMillis() )); // 隔1秒生成一个点击事件,方便观测 Thread.sleep(1000); } } @Override public void cancel() { running = false; } }
这里要注意的是 SourceFunction 接口定义的数据源,并行度只能设置为 1,所以如果我们想要自定义并行的数据源的话,需要实现ParallelSourceFunction
map
map 主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个“一一映射”,消费一个元素就产出一个元素
public interface MapFunction<T, O> extends Function, Serializable {
O map(T var1) throws Exception;
}
filter
filter 转换操作,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置过滤
条件,对于每一个流内元素进行判断,若为 true 则元素正常输出,若为 false 则元素被过滤掉
public interface FilterFunction<T> extends Function, Serializable {
boolean filter(T var1) throws Exception;
}
flatMap
flatMap 操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。消费一个元素,可以产生 0 到多个元素。flatMap 可以认为是“扁平化”(flatten)和“映射”(map)两步操作的结合
public interface FlatMapFunction<T, O> extends Function, Serializable {
void flatMap(T var1, Collector<O> var2) throws Exception;
}
按键分区keyBy后才能使用聚合算子
对于 Flink 而言,DataStream 是没有直接进行聚合的 API 的。因为我们对海量数据做聚合肯定要进行分区并行处理,这样才能提高效率。所以在 Flink 中,要做聚合,需要先进行分区;这个操作就是通过 keyBy 来完成的
基于不同的 key,流中的数据将被分配到不同的分区中去,这样一来,所有具有相同的 key 的数据,都将被发往同一个分区,那么下一步算子操作就将会在同一个 slot中进行处理了,这里所说的分区,其实就是并行处理的子任务,也就对应着任务槽(task slot)
public interface KeySelector<IN, KEY> extends Function, Serializable {
KEY getKey(IN var1) throws Exception
}
简单聚合
有了按键分区的数据流 KeyedStream,我们就可以基于它进行聚合操作了。Flink 为我们内置实现了一些最基本、最简单的聚合 API,主要有以下几种
归约聚合reduce 两个输入和返回值类型都一样的
public interface ReduceFunction<T> extends Function, Serializable {
T reduce(T var1, T var2) throws Exception;
}
Flink 的 DataStream API 专门提供了向外部写入数据的方法:addSink。与 addSource 类似,addSink 方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的;Flink 程序中所有对外的输出操作,一般都是利用 Sink 算 子完成的
Sink 一词有“下沉”的意思,有些资料会相对于“数据源”把它翻译为“数据汇”。不论怎样理解,Sink 在 Flink 中代表了将结果数据收集起来、输出到外部的意思,所以我们这里统一把它直观地叫作“输出算子”
之前我们一直在使用的 print 方法其实就是一种 Sink,它表示将数据流写入标准控制台打印输出。查看源码可以发现,print 方法返回的就是一个 DataStreamSink
StreamingFileSink 为批处理和流处理提供了一个统一的 Sink,它可以将分区文件写入 Flink支持的文件系统
它的主要操作是将数据写入桶(buckets),每个桶中的数据都可以分割成一个个大小有限的分区文件,这样一来就实现真正意义上的分布式文件存储。我们可以通过各种配置来控制“分桶”的操作;默认的分桶方式是基于时间的,我们每小时写入一个新的桶。换句话说,每个桶内保存的文件,记录的都是 1 小时的输出数据
StreamingFileSink 支持行编码(Row-encoded)和批量编码(Bulk-encoded,比如 Parquet)
格式。这两种不同的方式都有各自的构建器(builder),调用方法也非常简单,可以直接调用
StreamingFileSink 的静态方法:
在创建行或批量编码 Sink 时,我们需要传入两个参数,用来指定存储桶的基本路径(basePath)和数据的编码逻辑(rowEncoder 或 bulkWriterFactory)
public class SinkToFile { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); DataStreamSource<Event> source = env.addSource(new ClickSource()); // 泛型String是要写入的数据类型 StreamingFileSink<String> fileSink = StreamingFileSink.<String>forRowFormat(new Path("./output"), new SimpleStringEncoder<>("UTF-8")) .withRollingPolicy( DefaultRollingPolicy.builder() .withMaxPartSize(1024 * 1024 * 1024) .withRolloverInterval(TimeUnit.MINUTES.toMillis(15)) .build() ) .build(); source.map(Event::toString).addSink(fileSink); env.execute(); } }
public class SinkToKafka { public static void main(String[] args) throws Exception { // 1.创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2.配置 Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class); properties.put(ConsumerConfig.GROUP_ID_CONFIG,"100"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest"); // 3.从Kafka读取数据 DataStreamSource<String> kafkaSource = env.addSource(new FlinkKafkaConsumer<String>("click", new SimpleStringSchema(), properties)); SingleOutputStreamOperator<String> map = kafkaSource.map(data -> { String[] strs = data.split(" "); return new Event(strs[0], strs[1], Long.valueOf(strs[2]), Long.valueOf(strs[3])).toString(); }); // 4.写入到Kafka map.addSink(new FlinkKafkaProducer<String>("hadoop102:9092","events",new SimpleStringSchema())); env.execute(); } }
这里我们可以看到,addSink 传入的参数是一个 FlinkKafkaProducer。这也很好理解,因为需要向 Kafka 写入数据,自然应该创建一个生产者。FlinkKafkaProducer 继承了抽象类TwoPhaseCommitSinkFunction,这是一个实现了“两阶段提交”的 RichSinkFunction。两阶段提交提供了 Flink 向 Kafka 写入数据的事务性保证,能够真正做到精确一次(exactly once)的状态一致性
Flink 没有直接提供官方的 Redis 连接器,不过 Bahir 项目还是担任了合格的辅助角色,为我们提供了 Flink-Redis 的连接工具
具体测试步骤如下
(1)导入的 Redis 连接器依赖
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
(2)启动 Redis 集群
这里我们为方便测试,只启动了单节点 Redis
(3)编写输出到 Redis 的示例代码
public class SinkToRedis { public static void main(String[] args) throws Exception { // 1.创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2.配置 FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder() .setHost("175.178.154.194") .setPassword("zks0.0") .setDatabase(0) .build(); DataStreamSource<Event> source = env.addSource(new ClickSource()); source.addSink(new RedisSink<>(conf, new RedisMapper<Event>() { @Override // 调用哪个redis数据结构 public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.SET); } // key @Override public String getKeyFromData(Event event) { return event.getUser(); } // value @Override public String getValueFromData(Event event) { return event.toString(); } })); env.execute(); } }
Flink 为 ElasticSearch 专门提供了官方的 Sink 连接器,Flink 1.13 支持当前最新版本的
ElasticSearch
写入数据的 ElasticSearch 的测试步骤如下
(1)添加 Elasticsearch 连接器依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
(2)启动 Elasticsearch 集群
(3)编写输出到 Elasticsearch 的示例代码
与 RedisSink 类 似 , 连 接 器 也 为 我 们 实 现 了 写 入 到 Elasticsearch 的SinkFunction—ElasticsearchSink区别在于,这个类的构造方法是私有(private)的,我们需要使用 ElasticsearchSink 的 Builder 内部静态类,调用它的 build()方法才能创建出真正的SinkFunction。 而 Builder 的构造方法中又有两个参数:
具体的操作需要重写中 elasticsearchSinkFunction 中的 process 方法,我们可以将要发送的数据放在一个 HashMap 中,包装成 IndexRequest 向外部发送 HTTP 请求
public class SinkToES { public static void main(String[] args) throws Exception { // 1.创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Event> source = env.addSource(new ClickSource()); // 2.定义host的列表 List<HttpHost> hostList = new ArrayList<>(); hostList.add(new HttpHost("175.178.154.194",9200)); // 3.定义ES Sink Function ElasticsearchSinkFunction<Event> elasticsearchSinkFunction = new ElasticsearchSinkFunction<Event>() { // requestIndexer的add方法发送请求 代替了客户端 @Override public void process(Event event, RuntimeContext runtimeContext, RequestIndexer requestIndexer) { for (int i = 1; i <= 5 ; i++) { IndexRequest request = new IndexRequest("clicks").id(String.valueOf(i)); request.source(JSONUtils.objectToJson(event), XContentType.JSON); requestIndexer.add(request); } } }; source.addSink(new ElasticsearchSink.Builder<>(hostList,elasticsearchSinkFunction).build()); env.execute(); } }
写入数据的 MySQL 的测试步骤如下
(1)添加依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.28</version>
</dependency>
(2)启动 MySQL,在 database 库下建表 clicks
mysql> create table clicks(
-> user varchar(20) not null,
-> url varchar(100) not null);
public class SinkToMySQL { public static void main(String[] args) throws Exception { // 1.创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Event> source = env.addSource(new ClickSource()); source.addSink(JdbcSink.sink( "insert into clicks(user,url) values(?,?)", ((preparedStatement, event) -> { preparedStatement.setString(1,event.user); preparedStatement.setString(2,event.url); }), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:mysql://175.178.154.194:3306/flink?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8") .withDriverName("com.mysql.cj.jdbc.Driver") .withUsername("root") .withPassword("xxxx") .build() )); env.execute(); } }
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
</exclusions>
</dependency>
和MySQL基本一致,只不过连接的时候没有用户名和密码
new JdbcExecutionOptions.Builder().withBatchSize(5).build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
.withUrl("jdbc:clickhouse://hadoop102:8123/default")
.build());
public class DimHbaseSinkFunction extends RichSinkFunction<JSONObject> { private Connection connection; // open 并行子任务都会执行一次open、close 常用来执行初始化操作 @Override public void open(Configuration parameters) throws Exception { try { String url = "jdbc:phoenix:hadoop102,hadoop103,hadoop104:2181"; Properties props = new Properties(); props.put("phoenix.schema.isNamespaceMappingEnabled","true"); connection = DriverManager.getConnection(url,props); } catch (SQLException e) { throw new RuntimeException(e); } } // 自定义输出 value 数据格式 sinkTable(hbase),database(mysql),tableName(mysql),before,after(数据),type @Override public void invoke(JSONObject value, Context context) throws Exception { //获取要插入的数据 {id:1,name:kun} JSONObject after = value.getJSONObject("after"); // 获取key 作为hbase的列 Set<String> keySet = after.keySet(); // 获取values 赋值 Collection<Object> values = after.values(); // 获取SQL语句 String sql = "upsert into RTDW_HBASE" +"."+ value.getString("sinkTable")+ "("+ StringUtils.join(keySet,",")+")" + "values('" + StringUtils.join(values,"','") + "')"; System.out.println(sql); PreparedStatement ps = null; try{ connection.setAutoCommit(false); // 预编译SQL ps = connection.prepareStatement(sql); // 执行插入修改操作 ps.executeUpdate(); connection.commit(); ps.close(); }catch (SQLException e){ e.printStackTrace(); } } }
“富函数类”也是 DataStream API 提供的一个函数类的接口,所有的 Flink 函数类都有其Rich 版本。富函数类一般是以抽象类的形式出现的。例如:RichMapFunction、RichFilterFunction、RichReduceFunction 等
既然“富”,那么它一定会比常规的函数类提供更多、更丰富的功能。与常规函数类的不同主要在于,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能
Rich Function 有生命周期的概念。典型的生命周期方法有:
open()方法,是 Rich Function 的初始化方法,也就是会开启一个算子的生命周期。当一个算子的实际工作方法例如 map()或者 filter()方法被调用之前,open()会首先被调用。所以像文件 IO 的创建,数据库连接的创建,配置文件的读取等等这样一次性的工作,都适合在 open()方法中完成
close()方法,是生命周期中的最后一个调用的方法,类似于解构方法。一般用来做一些清理工作
1.注意富函数是抽象类,不是接口,并且类中的方法不止一个,所以不能用Lambda表达式,只能自己写一个类
2.每一个并行子任务都会执行一次open、close
3.富函数和状态编程有着息息相关的关系
public class RichFunctionTest { public static void main(String[] args) throws Exception { // 1.创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 2.从自定义数据源读取 DataStreamSource<Event> source = env.addSource(new ClickSource()); source.map(new MyRich()).setParallelism(2).print(); env.execute(); } // 对于open和close 一个并行度会被调用一次 public static class MyRich extends RichMapFunction<Event,String>{ @Override public void open(Configuration parameters) throws Exception { super.open(parameters); System.out.println("open生命周期被调用 " + getRuntimeContext().getIndexOfThisSubtask()); } @Override public void close() throws Exception { super.close(); System.out.println("close生命周期被调用 " + getRuntimeContext().getIndexOfThisSubtask()); } @Override public String map(Event event) throws Exception { return event.getUrl(); } } }
有些时候,我们还需要手动控制数据分区分配策略。比如当发生数据倾斜的时候,系统无法自动调整,这时就需要我们重新进行负载均衡,将数据流较为平均地发送到下游任务操作分区中去。Flink 对于经过转换操作之后的 DataStream,提供了一系列的底层操作接口,能够帮我们实现数据流的手动重分区。为了同 keyBy 相区别,我们把这些操作统称为“物理分区”操作。物理分区与 keyBy 另一大区别在于,keyBy 之后得到的是一个KeyedStream,而物理分区之后结果仍是 DataStream,且流中元素数据类型保持不变。从这一点也可以看出,分区算子并不对数据进行转换处理,只是定义了数据的传输方式
常见的物理分区策略有随机分配(Random)、轮询分配(Round-Robin)、重缩放(Rescale)和广播(Broadcast),下边我们分别来做了解
1.随机分区(shuffle)
最简单的重分区方式就是直接“洗牌”。通过调用 DataStream 的.shuffle()方法,将数据随
机地分配到下游算子的并行任务中去
source.map(Event::getUser).shuffle().print().setParallelism(3);
map算子并行度1,经过shuffle后,随机的将数据分到3个并行的print算子中
2. 轮询分区(Round-Robin)
轮询也是一种常见的重分区方式。简单来说就是“发牌”,按照先后顺序将数据做依次分发,通过调用 DataStream 的.rebalance()方法,就可以实现轮询重分区。rebalance使用的是 Round-Robin 负载均衡算法,可以将输入流数据平均分配到下游的并行任务中去
source.map(Event::getUser).rebalance().print().setParallelism(3);
map算子并行度1,经过rebalance后,轮询将数据分到3个并行的print算子中
3.重缩放分区
重缩放分区和轮询分区非常相似。当调用 rescale()方法时,其实底层也是使用 Round-Robin算法进行轮询,但是只会将数据轮询发送到下游并行任务的一部分中,如图 5-11 所示。也就是说,“发牌人”如果有多个,那么rebalance 的方式是每个发牌人都面向所有人发牌;而 rescale的做法是分成小团体,发牌人只给自己团体内的所有人轮流发牌
4. 广播(broadcast)
这种方式其实不应该叫做“重分区”,因为经过广播之后,数据会在不同的分区都保留一份,可能进行重复处理。可以通过调用 DataStream 的 broadcast()方法,将输入数据复制并发送到下游算子的所有并行任务中去
5. 全局分区(global)
全局分区也是一种特殊的分区方式。这种做法非常极端,通过调用.global()方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了 1,所以使用这个操作需要非常谨慎,可能对程序造成很大的压力
自定义分区
当 Flink 提 供 的 所 有 分 区 策 略 都 不 能 满 足 用 户 的 需 求 时 , 我 们 可 以 通 过 使 用partitionCustom()方法来自定义分区策略
在调用时,方法需要传入两个参数,第一个是自定义分区器(Partitioner)对象,第二个是应用分区器的字段,它的指定方式与 keyBy 指定 key 基本一样:可以通过字段名称指定,也可以通过字段位置索引来指定,还可以实现一个 KeySelector
public class CustomPartition { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<Event> source = env.addSource(new ClickSource()); source.partitionCustom(new Partitioner<String>() { @Override public int partition(String name, int i) { if(name.length() == 3) return 0; return 1; } }, new KeySelector<Event, String>() { @Override public String getKey(Event event) throws Exception { return event.getUser(); // 根据名字分区 } }).print().setParallelism(2); env.execute(); } }
1.处理时间Processing Time
处理时间的概念非常简单,就是指执行处理操作的机器目前所处的系统时间
2. 事件时间(Event Time) (核心、重要)
事件时间,是指每个事件在对应的设备上发生的时间,也就是数据生成的时间数据一旦产生,这个时间自然就确定了,所以它可以作为一个属性嵌入到数据中。这其实就是这条数据记录的“时间戳”(Timestamp)
我们想统计8点-9点的数据,那这个窗口的时间以谁为标准呢?如果以系统时间为准,即到了9点这个窗口就关闭,显然是不合理的,因为由于延迟,可能有些数据没来得及在9点前进入窗口,但它是9点前发生的事件
一种简单的想法是,在数据流中加入一个时钟标记,记录当前的事件时间;这个标记可以直接广播到下游,当下游任务收到这个标记,就可以更新自己的时钟了。由于类似于水流中用来做标志的记号,在 Flink 中,这种用来衡量事件时间(Event Time)进展的标记,就被称作“水位线”(Watermark)
具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了
有序流周期性插入水位线(理解即可,实际中保证有序很难)
实际应用中,如果当前数据量非常大,可能会有很多数据的时间戳是相同的,这时每来一条数据就提取时间戳、插入水位线就做了大量的无用功。而且即使时间戳不同,同时涌来的数据时间差会非常小(比如几毫秒),往往对处理计算也没什么影响。所以为了提高效率,一般会每隔一段时间生成一个水位线,这个水位线的时间戳,就是当前最新数据的时间戳,如图,这时的水位线,其实就是有序流中的一个周期性出现的时间标记
无序流水位线
这里所说的“乱序”(out-of-order),是指数据的先后顺序不一致,在消息队列中由于某种原因导致数据乱序了
解决思路也很简单:我们插入新的水位线时,要先判断一下时间戳是否比之前的大,否则就不再生成新的水位线,也就是说,只有数据的时间戳比当前时钟大,才能推动时钟前进,这时才插入水位线
如果考虑到大量数据同时到来的处理效率,我们同样可以周期性地生成水位线。这时只需要保存一下之前所有数据中的最大时间戳,需要插入水位线时,就直接以它作为时间戳生成新的水位线
水位线设置延迟(重要)
没有延迟会导致什么情况
上图,当 9 秒产生的数据到来之后,我们就直接将时钟推进到了9 秒;如果有一个窗口结束时间就是 9 秒(比如,要统计 0~9 秒的所有数据),那么这时窗口就应该关闭、将收集到的所有数据计算输出结果了。但事实上,由于数据是乱序的,还可能有时间戳为 7 秒、8 秒的数据在 9 秒的数据之后才到来,这就是“迟到数据”(late data)。它们本来也应该属于 0~9 秒这个窗口,但此时窗口已经关闭,于是这些数据就被遗漏了,这会导致统计结果不正确
如何设置延迟
为了让窗口能够正确收集到迟到的数据,我们也可以等上一会(不需要很大的时间,一般是毫秒,最大也就几秒,比如2秒);也就是用当前已有数据的最大时间戳减去 2 秒,就是水位线的时间戳,这样的话,9 秒的数据到来之后,事件时钟不会直接推进到 9 秒,而是进展到了 7 秒;必须等到11 秒的数据到来之后,事件时钟才会进展到 9 秒,这时迟到数据也都已收集齐,0~9 秒的窗口就可以正确计算结果了
水位线的特性
现在我们可以知道,水位线就代表了当前的事件时间时钟,而且可以在数据的时间戳基础上加一些延迟来保证不丢数据,这一点对于乱序流的正确处理非常重要
我们可以总结一下水位线的特性:
代码设置水位线
在 Flink 的 DataStream API 中 , 有 一 个 单 独 用 于 生 成 水 位 线 的 法.assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间:
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks( WatermarkStrategy<T> watermarkStrategy)
数据里已经有时间戳了吗,为什么这里还要“分配”呢?这是因为原始的时间戳只是写入日志数据的一个字段,如果不提取出来并明确把它分配给数据,Flink 是无法知道数据真正产生的时间的。当然,有些时候数据源本身就提供了时间戳信息,比如读取 Kafka 时,我们就可以从 Kafka 数据中直接获取时间戳,而不需要单独提取字段分配
了
.assignTimestampsAndWatermarks()方法需要传入一个 WatermarkStrategy 作为参数,这就是 所 谓 的 “ 水 位 线 生 成 策 略 ” 。 WatermarkStrategy 中 包 含 了 一 个 “ 时 间 戳 分 配器”TimestampAssigner 和一个“水位线生成器”WatermarkGenerator
@Public
public interface WatermarkGenerator<T> {
void onEvent(T var1, long var2, WatermarkOutput var4);
void onPeriodicEmit(WatermarkOutput var1);
}
public class WaterMarkTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.getConfig().setAutoWatermarkInterval(100); // 100毫秒生成一次水位线 env.addSource(new ClickSource()) // 乱序流的WaterMark生成 .assignTimestampsAndWatermarks(WatermarkStrategy .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2)) // 延迟2秒保证数据正确 .withTimestampAssigner(new SerializableTimestampAssigner<Event>() { @Override // 时间戳的提取器 public long extractTimestamp(Event event, long l) { return event.getTimestamp(); } }) ); env.execute(); } }
总结
为了保证数据正确,我们设置了水位线,水位线其实就是一个时间戳,为了保证数据都在窗口范围内,我们设置了延迟时间,水位线的真正时间戳就是数据中的时间戳 - 延迟时间 - 1 ,单位是毫秒
窗口定义是前闭后开的,假如我们需要8点到9点的数据,实际上是包括8点整的数据,不包括9点整的数据,假设延迟时间为5秒,这样8点0分5秒1ms的数据来了后,Flink水位线是8点,就开始统计了,9点整的数据对应的水位线其实是8点59分55秒999ms,这时候不会关闭窗口,因为要等待延迟数据,当Flink水位线是8点59分59秒999毫秒时,关闭窗口
水位线的传递
如果一个任务收到了来自上游并行任务的不同的水位线,说明上游各个分区处理得有快有慢,进度各不相同比如上游有两个并行子任务都发来了水位线,一个是 5 秒,一个是 7 秒;这代表第一个并行任务已经处理完 5 秒之前的
所有数据,而第二个并行任务处理到了 7 秒。那这时自己的时钟怎么确定呢?当然也要以“这之前的数据全部到齐”为标准。如果我们以较大的水位线 7 秒作为当前时间,那就表示“7 秒前的数据都已经处理完”,这显然不是事实——第一个上游分区才处理到 5 秒,5~7 秒的数据还会不停地发来;而如果以最小的水位线 5 秒作为当前时钟就不会有这个问题了,因为确实所有上游分区都已经处理完,不会再发 5 秒前的数据了。这让我们想到“木桶原理”:所有的上游并行任务就像围成木桶的一块块木板,它们中最短的那一块,决定了我们桶中的水位
定义
下图,我们设置了水位线延迟以后,在水位线为10时,11 12 秒的数据也进来了,但它们不属于0 - 10 秒
在 Flink 中,窗口其实并不是一个“框”,流进来的数据被框住了就只能进这一个窗口。相比之下,我们应该把窗口理解成一个“桶”,在 Flink 中,窗口可以把流切割成有限大小的多个“存储桶”(bucket);每个数据都会分发到对应的桶中,当到达窗口结束时间时,就对每个桶中收集的数据进行计算处理
窗口分类
1. 按照驱动类型分类
时间窗口以时间点来定义窗口的开始(start)和结束(end),所以截取出的就是某一时间段的数据。到达结束时间时,窗口不再收数据,触发计算输出结果,并将窗口关闭销毁。所以可以说基本思路就是“定点发车”
计数窗口基于元素的个数来截取数据,到达固定的个数时就触发计算并关闭窗口。这相当于座位有限、“人满就发车”,是否发车与时间无关。每个窗口截取数据的个数,就是窗口的大小
2. 按照窗口分配数据的规则分类
滚动窗口有固定的大小,是一种对数据进行“均匀切片”的划分方式。窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态。如果我们把多个窗口的创建,看作一个窗口的运动,那就好像它在不停地向前“翻滚”一样。这是最简单的窗口形式,我们之前所举的例子都是滚动窗口。也正是因为滚动窗口是“无缝衔接”,所以每个数据都会被分配到一个窗口,而且只会属于一个窗口
滚动窗口可以基于时间定义,也可以基于数据个数定义;需要的参数只有一个,就是窗口的大小(window size)。比如我们可以定义一个长度为 1 小时的滚动时间窗口,那么每个小时就会进行一次统计;或者定义一个长度为 10 的滚动计数窗口,就会每 10 个数进行一次统计
滑动窗口的大小也是固定的。区别在于,窗口之间并不是首尾相接的,而是可以“错开”一定的位置。如果看作一个窗口的运动,那么就像是向前小步“滑动”一样。既然是向前滑动,那么每一步滑多远,就也是可以控制的。所以定义滑动窗口的参数有两个:除去窗口大小(window size)之外,还有一个“滑动步长”(window slide),它其实就代表了窗口计算的频率
我们可以看到,与前两种窗口不同,会话窗口的长度不固定,起始和结束时间也是不确定的,各个分区之间窗口没有任何关联。会话窗口之间一定是不会重叠的,而且会留有至少为 size 的间隔(session gap)。在一些类似保持会话的场景下,往往可以使用会话窗口来进行数据的处理统计,两个数据之间间隔大于gap,即开启新的窗口
还有一类比较通用的窗口,就是“全局窗口”。这种窗口全局有效,会把相同 key 的所有数据都分配到同一个窗口中;说直白一点,就跟没分窗口一样。无界流的数据永无止尽,所以这种窗口也没有结束的时候,默认是不会做触发计算的。如果希望它能对数据进行计算处理,还需要自定义“触发器”(Trigger)
(1)按键分区窗口(Keyed Windows)
经过按键分区 keyBy 操作后,数据流会按照 key 被分为多条逻辑流(logical streams),这就是 KeyedStream。基于 KeyedStream 进行窗口操作时, 窗口计算会在多个并行子任务上同时执行。相同 key 的数据会被发送到同一个并行子任务,而窗口操作会基于每个 key 进行单独的处理。所以可以认为,每个 key 上都定义了一组窗口,各自独立地进行统计计算
stream.keyBy(...)
.window(...)
(2)非按键分区(Non-Keyed Windows)
如果没有进行 keyBy,那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了 1。所以在实际应用中一般不推荐使用这种方式。在代码中,直接基于 DataStream 调用.windowAll()定义窗口
stream.windowAll(...)
这里需要注意的是,对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll本身就是一个非并行的操作
窗口 = 窗口分配器 + 窗口函数
窗口分配器(Window Assigners)定义窗口分配器(Window Assigners)是构建窗口算子的第一步,它的作用就是定义数据应该被“分配”到哪个窗口
streamOperator.keyBy(Event::getUser) .window(TumblingEventTimeWindows.of(Time.hours(1)));// 滚动时间窗口 streamOperator.keyBy(Event::getUser) .window(SlidingEventTimeWindows.of(Time.hours(1),Time.minutes(10)));// 滑动时间窗口 streamOperator.keyBy(Event::getUser) .window(EventTimeSessionWindows.withGap(Time.minutes(1)));// 会话时间窗口 streamOperator.keyBy(Event::getUser) .window(GlobalWindows.create());// 全局窗口 streamOperator.keyBy(Event::getUser) .countWindow(100); // 滚动计数窗口 streamOperator.keyBy(Event::getUser) .countWindow(100,10); // 滑动计数窗口
窗口函数
窗口分配器只说明了数据分到哪个窗口,我们还需要窗口函数才能实现对数据的处理
典型的增量聚合函数有两个:ReduceFunction 和 AggregateFunction
(1)归约函数(ReduceFunction)
中间聚合的状态和输出的结果,都和输入的数据类型是一样的
public interface ReduceFunction<T> extends Function, Serializable {
T reduce(T var1, T var2) throws Exception;
}
归约函数使用
stream.redcue(new MyRedcueFunction())
(2)聚合函数(AggregateFunction)
ReduceFunction 可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。这就迫使我们必须在聚合前,先将数据转换(map)成预期结果类型;而在有些情况下,还需要对状态进行进一步处理才能得到输出结果,这时它们的类型可能不同,使用 ReduceFunction 就会非常麻烦
AggregateFunction 可以看作是 ReduceFunction 的通用版本,这里有三种类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。输入类型 IN 就是输入流中元素的数据类型;累加器类型 ACC 则是我们进行聚合的中间状态类型;而输出类型当然就是最终计算结果的类型了
接口中有四个方法:
createAccumulator():创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次
add():将输入的元素添加到累加器中。这就是基于聚合状态,对新来的数据进行进一步聚合的过程。方法传入两个参数:当前新到的数据 value,和当前的累加器accumulator;返回一个新的累加器值,也就是对聚合状态进行更新。每条数据到来之后都会调用这个方法
getResult():从累加器中提取聚合的输出结果。也就是说,我们可以定义多个状态,然后再基于这些聚合的状态计算出一个结果进行输出。比如之前我们提到的计算平均值,就可以把 sum 和 count 作为状态放入累加器,而在调用这个方法时相除得到最终结果。这个方法只在窗口要输出结果时调用
merge():合并两个累加器,并将合并后的状态作为一个累加器返回。这个方法只在需要合并窗口的场景下才会被调用;最常见的合并窗口(Merging Window)的场景就是会话窗口(Session Windows)
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable
{
ACC createAccumulator();
ACC add(IN value, ACC accumulator);
OUT getResult(ACC accumulator);
ACC merge(ACC a, ACC b);
}
聚合函数使用
stream.aggregate(new MyAggregateFunction())
窗口操作中的另一大类就是全窗口函数。与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。很明显,这就是典型的批处理思路了——先攒数据,等一批都到齐了再正式启动处理流程,其实属于处理函数的一类
如果使用了keyBy
ProcessWindowFunction<IN, OUT, KEY, W extends Window>
如果没有key,用ProcessAllWindowFunction
ProcessAllWindowFunction<IN, OUT, W extends Window>
IN 输入类型
OUT 输出类型
KEY 分组字段的类型
W 窗口类型 一般是TimeWindow
我们之前在调用 WindowedStream 的.reduce()和.aggregate()方法时,只是简单地直接传入了一个ReduceFunction 或 AggregateFunction 进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是 WindowFunction(一般不用) 或者 ProcessWindowFunction(功能更全)
此时ProcessWindowFunction的输入为增量聚合函数的输出
触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”,本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程
基于 WindowedStream 调用.trigger()方法,就可以传入一个自定义的窗口触发器(Trigger)
stream.keyBy(...)
.window(...)
.trigger(new MyTrigger())
.process(new xxx)
Trigger 是一个抽象类,自定义时必须实现下面四个抽象方法:
上面的前三个方法可以响应事件,那它们又是怎样跟窗口操作联系起来的呢?这就需要了解一下它们的返回值。这三个方法返回类型都是 TriggerResult,这是一个枚举类型(enum),其中定义了对窗口进行操作的四种类型
移除器主要用来定义移除某些数据的逻辑。基于 WindowedStream 调用.evictor()方法,就可以传入一个自定义的移除器(Evictor)。Evictor 是一个接口,不同的窗口类型都有各自预实现的移除器
stream.keyBy(...)
.window(...)
.evictor(new MyEvictor())
Evictor 接口定义了两个方法:
默认情况下,预实现的移除器都是在执行窗口函数(window fucntions)之前移除数据的
第一重保障 - 水位线延迟
第二重保障 - 窗口延迟不关闭
在事件时间语义下,窗口中可能会出现数据迟到的情况。这是因为在乱序流中,水位线(watermark)并不一定能保证时间戳更早的所有数据不会再来。当水位线已经到达窗口结束时间时,窗口会触发计算并输出结果,这时一般也就要销毁窗口了;如果窗口关闭之后,又有本属于窗口内的数据姗姗来迟,默认情况下就会被丢弃
不过在多数情况下,直接丢弃数据也会导致统计结果不准确。为了解决迟到数据的问题,Flink 提供了一个特殊的接口,可以为窗口算子设置一个“允许的最大延迟”(Allowed Lateness)。也就是说,我们可以设定允许延迟一段时间,在这段时间内,窗口不会销毁,继续到来的数据依然可以进入窗口中并触发计算。直到水位线推进到了窗口结束时间 + 延迟时间,才真正将窗口的内容清空,正式关闭窗口
基于 WindowedStream 调用.allowedLateness()方法,传入一个 Time 类型的延迟时间,就可以表示允许这段时间内的延迟数据
stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.mintues(1)))
.allowedLateness(Time.minutes(1))
比如上面的代码中,我们定义了 1 小时的滚动窗口,并设置了允许 1 分钟的延迟数据。也就是说,在不考虑水位线延迟的情况下,对于 8 点~9 点的窗口,本来应该是水位线到达 9 点整就触发计算并关闭窗口;现在允许延迟 1 分钟,那么 9 点整就只是触发一次计算并输出结果,并不会关窗。后续到达的数据,只要属于 8 点~9 点窗口,依然可以在之前统计的基础上继续叠加,并且再次输出一个更新后的结果。直到水位线到达了 9 点零 1 分,这时就真正清空状态、关闭窗口,之后再来的迟到数据就会被丢弃了
第三重保障 - 侧输出流
我们自然会想到,即使可以设置窗口的延迟时间,终归还是有限的,后续的数据还是会被丢弃。如果不想丢弃任何一个数据,又该怎么做呢?
Flink 还提供了另外一种方式处理迟到数据。我们可以将未收入窗口的迟到数据,放入“侧输出流”(side output)进行另外的处理。所谓的侧输出流,相当于是数据流的一个“分支”,这个流中单独放置那些错过了该上的车、本该被丢弃的数据。基于 WindowedStream 调用.sideOutputLateData() 方法,就可以实现这个功能。方法需要传入一个“输出标签”(OutputTag),用来标记分支的迟到数据流。因为保存的就是流中的原始数据,所以 OutputTag 的类型与流中数据类型相同
具体应用时,只要在处理函数的.processElement()或者.onTimer()方法中,调用上下文的.output()方法就可以了
这里 output()方法需要传入两个参数,第一个是一个“输出标签”OutputTag,用来标识侧输出流,一般会在外部统一声明;第二个就是要输出的数据
我们可以在外部先将 OutputTag 声明出来:
OutputTag<String> outputTag = new OutputTag<String>("side-output") {};
如果想要获取这个侧输出流,可以基于处理之后的 DataStream 直接调用.getSideOutput()方法,传入对应的 OutputTag,这个方式与窗口 API 中获取侧输出流是完全一样的
DataStream<String> stringStream = longStream.getSideOutput(outputTag);
案例(重要,理解三种保障)
窗口计算触发、窗口关闭都是由Flink水位线决定的,而计算的数据是真实的符合时间条件的数据,比如我设置水位线延迟2秒,那么0-22秒是收集第一个窗口数据的,因为有的数据在0-20秒内可能会进不来,所以多等2秒,确保事件时间为0-20秒的都进了窗口,但是多等2秒可能21秒、21.6秒的数据可能也已经进来了,没关系,Flink的窗口是分桶的,我们计算的时候多余的数据时不会被计算的,依旧计算的是0-20秒的数据!
下面这段代码,作用就是统计一个窗口内不同的页面被访问了多少次,主要关心三种保障是怎么发挥作用的即可
我们设置了一个20秒滚动窗口,并且水位线延迟了2秒,并且允许窗口延迟5秒关闭,而且还设置了侧输出流来处理延迟了太久的数据
读到这里,我们应该知道
1.哪些数据会被认为是第一个窗口的数据?(以下是自1970年开始毫秒单位)
时间戳在0-20000内的数据会是第一个窗口,包括0,但不包括20000,因为窗口的定义是前闭后开的!!!
同理,时间戳在20000 - 40000的数据会在第二个20秒的窗口
40000 - 60000会在第三个20秒的窗口
…
但是只有22000的数据到来后,第一个窗口才会进行计算,因为22000对应水位线为20秒
2.窗口会持续多长时间?
27秒的数据来了以后,第一个窗口就关闭了,因为此时Flink的水位线是25秒 = 窗口本身的20秒 + 允许延迟5秒
也就是说,只要27秒的数据及其以后的数据不来,且数据时间戳在0-20000的数据都会被视为第一个窗口,并且持续计算,如果27秒的数据来了,那么第一个窗口就关闭了,要是再来0-20000的数据就会进入侧输出流
同理,47秒的数据来了后,第二个窗口也就关闭了
public class LateDataTest { public static void main(String[] args) throws Exception { // 1.创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.getConfig().setAutoWatermarkInterval(100); // 100毫秒生成一次水位线 SingleOutputStreamOperator<Event> streamOperator = env.socketTextStream("175.178.154.194", 7777) .map(str -> { String[] strs = str.split(" "); return new Event(strs[0], strs[1], Long.valueOf(strs[2]), Long.valueOf(strs[3])); }) .assignTimestampsAndWatermarks(WatermarkStrategy .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2)) // 第一重保证 延迟2秒保证数据正确 .withTimestampAssigner(new SerializableTimestampAssigner<Event>() { @Override // 时间戳的提取器 public long extractTimestamp(Event event, long l) { return event.getTimestamp(); } }) ); streamOperator.print("input: "); // 定义一个输出标签 OutputTag<Event> late = new OutputTag<Event>("late"){}; SingleOutputStreamOperator<String> res = streamOperator.keyBy(Event::getUrl) .window(TumblingEventTimeWindows.of(Time.seconds(20))) .allowedLateness(Time.seconds(5)) // 第二重保证 窗口延迟5秒 .sideOutputLateData(late) // 第三重保证 侧输出流 .aggregate(new UrlCount.MyAgg(), new UrlCount.MyProcessFunction()); res.print("result: "); res.getSideOutput(late).print("late: "); env.execute(); } }
首先第一个窗口的数据时0-20秒(不包括20秒),可以看到符合条件的只有前三条数据,并且只有水位线到达20秒(也就是事件时间为22秒的数据到来后),触发了计算,但此时窗口没有关闭
当27秒的数据到达后,也就是水位线时间为25秒时,第一个窗口关闭,此时再进来属于第一个窗口的数据,就会进入侧输入流
1.我们计算的数据肯定是在我们规定的时间范围内的,比如滚动窗口设置1小时,那么8点-9点的数据肯定都在这个窗口内,而且这个窗口内也不会有其他时间的数据,因为Flink的窗口是分桶的
2.为了保证延迟数据的到来,我们可以延迟水位线,多等一会来确保8-9点的数据都来了
水位线时间戳 = 实际事件时间 - 水位线延迟时间 - 1 (单位是毫秒)。窗口触发计算、窗口关闭时间都是以水位线为驱动
3.为了更加确保延迟数据的到来,我们允许窗口延迟关闭,即可以再设置个窗口关闭时间
窗口关闭时间 = 窗口正常时间 + 窗口延迟关闭时间,和水位线延迟时间没有关系
4.只要达到窗口计算触发时间并且窗口没关闭,来数据立即触发计算
5.窗口关闭后,再来属于关闭窗口的数据,只能进入侧输出流
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。