赞
踩
创建一个执行环境,表示当前执行程序的上下文。
有三种创建方式:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", 6123,"YOURPATH//WordCount.jar");
DataStream<SensorReading> sensorDataStream = env.fromCollection(
Arrays.asList(
new SensorReading("sensor_1", 1547718199L, 35.8),
new SensorReading("sensor_6", 1547718201L, 15.4),
new SensorReading("sensor_7", 1547718202L, 6.7),
new SensorReading("sensor_10", 1547718205L, 38.1)
)
);
env.readTextFile("YOUR_FILE_PATH ");
env.addSource( new FlinkKafkaConsumer011<String>("sensor", new SimpleStringSchema(), properties));
需要实现SourceFunction 或者继承SourceFunction的富函数RichSourceFunction
:env.addSource( new SourceFunction(){xxx});
DataStream<Tuple2<Integer, Integer>> inputStream= env.addSource(new RandomFibonacciSource());
private static class RandomFibonacciSource implements SourceFunction<Tuple2<Integer, Integer>> {
private static final long serialVersionUID = 1L;
private Random rnd = new Random();
private int counter = 0;
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
while (isRunning && counter < BOUND) {
int first = rnd.nextInt(BOUND / 2 - 1) + 1;
int second = rnd.nextInt(BOUND / 2 - 1) + 1;
ctx.collect(new Tuple2<>(first, second));
counter++;
Thread.sleep(50L);
}
}
@Override
public void cancel() {
isRunning = false;
}
}
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-hpHXBCZw-1649949748146)(C:\Users\Simon\AppData\Roaming\Typora\typora-user-images\1648291788955.png)]
——使用lambda表达式时需提供返回类型,datastream.flatMap(xxx).returns(Types.STRING)。
对一个DataStream中的每个元素使用自定义函数处理,每个输入对应一个输出,输入输出类型可以不同,我们可以重写MapFunction或RichMapFunction来自定义map(),或使用lambda
DataStream<Integer> mapStram = dataStream.map(new MapFunction<String, Integer>() {
public Integer map(String value) throws Exception {
return value.length();
}
});
与map()类似,但每个输入可以对应多个输出
DataStream<String> flatMapStream = dataStream.flatMap(new FlatMapFunction<String,
String>() {
public void flatMap(String value, Collector<String> out) throws Exception {
String[] fields = value.split(",");
for( String field: fields )
out.collect(field);
}
});
对每个元素进行过滤,返回False则该元素将被过滤,返回True则元素被保留。
可以使用lambda表达式或继承FilterFunction或RichFilterFunction
DataStream<Interger> filterStream = dataStream.filter(new FilterFunction<String>(){
public boolean filter(String value) throws Exception {
return value == 1;
}
});
DataStream -> KeyedStream:keyBy()按照Key分组,逻辑地将一个流拆分成不相交的分区(子算子任务),每个分区包含具有相同key的元素,在内部以hash的形式实现的。
1、KeyBy会重新分区;
2、不同的key有可能分到一起,因为是通过hash原理实现的;
3、一旦按照Key分组后,我们便可以对每组数据进行时间窗口的处理以及状态的创建和更新。
1 使用数字位置来指定Key:DataStream<Tuple2<Integer, Double>> keyedStream = dataStream.keyBy(0).sum(1);
2 使用字段名来指定key(适用pojo类型):DataStream<Word> fieldNameStream = wordStream.keyBy("word").sum("count");
3 实现KeySelector接口(推荐):重写getKey方法,IN是输入类型,KEY是输出类型,表示用于分组的key。前两种方式本质上都是实现KeySelector接口
// IN为数据流元素,KEY为所选择的Key
@FunctionalInterface
public interface KeySelector<IN, KEY> extends Function, Serializable {
// 选择一个字段作为Key
KEY getKey(IN value) throws Exception;
}
对于一个KeyedStream,一次只能使用一个聚合函数,无法链式使用多个。
聚合函数对流入的数据进行实时统计,并不断输出到下游。
这些算子可以针对KeyedStream的每一个支流做聚合。
使用聚合函数时,我们需要一个参数来指定按照哪个字段进行聚合。跟keyBy()相似,我们可以使用数字位置来指定对哪个字段进行聚合,也可以实现一个KeySelector。
例如一条数据有a b c三个字段,max(a)会将当前数据的a字段更新为最大值,bc不变,然后返回;而maxBy(a),会将当前数据替换为a字段最大的那条数据,也就是bc也可能变化
reduce()在KeyedStream上生效,它接受两个输入,生成一个输出,即合并当前的元素和上次聚合的结果,生成一个同类型的新元素。(是否两个输入和一个输出都是相同数据类型?)
实现方式:实现ReduceFunction接口或使用lambda表达式。如:
keyedStream.reduce((curSensor,newSensor)->new SensorReading(curSensor.getId(),newSensor.getTimestamp(), Math.max(curSensor.getTemperature(), newSensor.getTemperature())));
其中curSensor表示上一条数据reduce的结果,newSensor当前数据
连接两个数据流,两个输入流的数据类型可以不一致,返回一个ConnectedStreams。操作ConnectedStreams时,一般是map操作,我们需要重写CoMapFunction或CoFlatMapFunction,对两个输入流用两个分开的方法进行操作。stream1.connect(stream2)
// 2. 将高温流转换成二元组类型
DataStream<Tuple2<String, Double>> warningStream = highTempStream.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {
@Override
public Tuple2<String, Double> map(SensorReading value) throws Exception {
return new Tuple2<>(value.getId(), value.getTemperature());
}
});
// 2. 合流 connect
ConnectedStreams<Tuple2<String, Double>, SensorReading> connectedStreams = warningStream.connect(lowTempStream);
DataStream<Object> resultStream = connectedStreams.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {
@Override
public Object map1(Tuple2<String, Double> value) throws Exception {
return new Tuple3<>(value.f0, value.f1, "high temp warning");
}
@Override
public Object map2(SensorReading value) throws Exception {
return new Tuple2<>(value.getId(), "normal");
}
});
DataStream -> DataStream:可以合并多个同类型的数据流,返回相同类型的新的DataStream。stream1.union(stream2,stream3)
public class TransformTest6_Partition {
public static void main(String[] args) throws Exception{
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度 = 4
env.setParallelism(4);
// 从文件读取数据
DataStream<String> inputStream = env.readTextFile("/tmp/Flink_Tutorial/src/main/resources/sensor.txt");
// 转换成SensorReading类型
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// SingleOutputStreamOperator多并行度默认就rebalance,轮询方式分配
dataStream.print("input");
// 1. shuffle (并非批处理中的获取一批后才打乱,这里每次获取到直接打乱且分区)
DataStream<String> shuffleStream = inputStream.shuffle();
shuffleStream.print("shuffle");
// 2. keyBy (Hash,然后取模)
dataStream.keyBy(SensorReading::getId).print("keyBy");
// 3. global (直接发送给第一个分区,少数特殊情况才用)
dataStream.global().print("global");
env.execute();
}
}
注:新版Flink已经不存在Split和Select这两个API了(至少Flink1.12.1没有!)
// 1. 分流,按照温度值30度为界分为两条流
SplitStream<SensorReading> splitStream = dataStream.split(new OutputSelector<SensorReading>() {
@Override
public Iterable<String> select(SensorReading value) {
return (value.getTemperature() > 30) ? Collections.singletonList("high") : Collections.singletonList("low");
}
});
DataStream<SensorReading> highTempStream = splitStream.select("high");
DataStream<SensorReading> allTempStream = splitStream.select("high", "low");
Java和Scala的所有基础数据类型,诸如int(包括Integer)、Double、Long、String,以及Date、BigDecimal和BigInteger
基础类型或其他对象类型组成的数组,如String[]。
类必须是public。
必须有一个public的无参构造函数
所有非静态(non-static)、非瞬态(non-transient)字段必须是public的
非public字段必须有标准的getter和setter方法
所有子字段必须是Flink支持的数据类型
Java的ArrayList、HashMap和Enum(Scala的Either和Option)
在Flink中,以上如此多的类型统一使用TypeInformation类表示。比如,POJO在Flink内部使用PojoTypeInfo来表示
TypeInformation的一个重要的功能就是创建TypeSerializer序列化器,为该类型的数据做序列化。
一般情况下,Flink会自动探测传入的数据类型,生成对应的TypeInformation,调用对应的序列化器。
当Flink的自动类型推断不起作用时,程序员就需要关注TypeInformation了。
如果传递给Flink算子的数据类型是父类,实际执行过程中使用的是子类,子类中有一些父类没有的数据结构和特性,将子类注册可以提高性能。在执行环境上调用env.registerType(clazz)来注册类。通常来说,POJO和元组等Flink内置类型的性能更好一些。
Avro、Kryo、Thrift和Protobuf
User Defined Function即用户自定义函数。
Flink暴露了所有UDF函数的接口(实现方式为接口或者抽象类)。例如MapFunction, FilterFunction, ProcessFunction等等。
也就是说,用户可以自定义函数内容,以实现流操作。
1 对于map()、flatMap()、reduce()等函数,我们可以实现MapFunction、FlatMapFunction、ReduceFunction等接口。
2 当不需要处理非常复杂的业务逻辑时,可以使用Lambda表达式
3 Rich函数类
可以自定义参数传进去。示例:
DataStream<String> tweets = env.readTextFile("INPUT_FILE ");
DataStream<String> flinkTweets = tweets.filter(new KeyWordFilter("flink"));
public static class KeyWordFilter implements FilterFunction<String> {
private String keyWord;
KeyWordFilter(String keyWord) {
this.keyWord = keyWord;
}
@Override public boolean filter(String value) throws Exception {
return value.contains(this.keyWord);
}
}
DataStream<String> tweets = env.readTextFile("INPUT_FILE");
DataStream<String> flinkTweets = tweets.filter( tweet -> tweet.contains("flink") );
“富函数”是DataStream API提供的一个函数类的接口,所有Flink函数类都有其Rich版本。
它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。
Rich Function有一个生命周期的概念。典型的生命周期方法有:
// 实现自定义富函数类(RichMapFunction是一个抽象类)
public static class MyMapper extends RichMapFunction<SensorReading, Tuple2<String, Integer>> {
@Override
public Tuple2<String, Integer> map(SensorReading value) throws Exception {
// RichFunction可以获取State状态
// getRuntimeContext().getState();
return new Tuple2<>(value.getId(), getRuntimeContext().getIndexOfThisSubtask());
}
@Override
public void open(Configuration parameters) throws Exception {
// 初始化工作,一般是定义状态,或者建立数据库连接
System.out.println("open");
}
@Override
public void close() throws Exception {
// 一般是关闭连接和清空状态的收尾操作
System.out.println("close");
}
}
}
累加器是大数据框架帮我们实现的一种机制,允许我们在多节点上进行累加统计,
常见的有IntCounter、LongCounter、DoubleCounter等,
例如在RichMapFunction中使用:
flink的所有对外的输出操作都要利用 Sink 完成:stream.addSink(new MySink(xxxx))
官方提供了一部分的框架的 sink,用户也可以自定义实现 sink。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-lmHNEULJ-1649949748153)(C:\Users\Simon\AppData\Roaming\Typora\typora-user-images\1648356010639.png)]
实现RichSinkFunction
接口
dataStream.addSink(new MyJdbcSink());
// 实现自定义的SinkFunction
public static class MyJdbcSink extends RichSinkFunction<SensorReading> {
// 声明连接和预编译语句
Connection connection = null;
PreparedStatement insertStmt = null;
PreparedStatement updateStmt = null;
@Override
public void open(Configuration parameters) throws Exception {
connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/flink_test?useUnicode=true&serverTimezone=Asia/Shanghai&characterEncoding=UTF-8&useSSL=false", "root", "example");
insertStmt = connection.prepareStatement("insert into sensor_temp (id, temp) values (?, ?)");
updateStmt = connection.prepareStatement("update sensor_temp set temp = ? where id = ?");
}
// 每来一条数据,调用连接,执行sql
@Override
public void invoke(SensorReading value, Context context) throws Exception {
// 直接执行更新语句,如果没有更新那么就插入
updateStmt.setDouble(1, value.getTemperature());
updateStmt.setString(2, value.getId());
updateStmt.execute();
if (updateStmt.getUpdateCount() == 0) {
insertStmt.setString(1, value.getId());
insertStmt.setDouble(2, value.getTemperature());
insertStmt.execute();
}
}
@Override
public void close() throws Exception {
insertStmt.close();
updateStmt.close();
connection.close();
}
}
dataStream.addSink( new FlinkKafkaProducer<String>("localhost:9092", "sinktest", new SimpleStringSchema()));
// 定义jedis连接配置(我这里连接的是docker的redis)
FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
.setHost("localhost")
.setPort(6379)
.setPassword("123456")
.setDatabase(0)
.build();
dataStream.addSink(new RedisSink<>(config, new MyRedisMapper()));
// 自定义RedisMapper
public static class MyRedisMapper implements RedisMapper<SensorReading> {
// 定义保存数据到redis的命令,存成Hash表,hset sensor_temp id temperature
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "sensor_temp");
}
@Override
public String getKeyFromData(SensorReading data) {
return data.getId();
}
@Override
public String getValueFromData(SensorReading data) {
return data.getTemperature().toString();
}
}
dataStream.addSink( new ElasticsearchSink.Builder<SensorReading>(httpHosts, new MyEsSinkFunction()).build());
// 实现自定义的ES写入操作
public static class MyEsSinkFunction implements ElasticsearchSinkFunction<SensorReading> {
@Override
public void process(SensorReading element, RuntimeContext ctx, RequestIndexer indexer) {
// 定义写入的数据source
HashMap<String, String> dataSource = new HashMap<>();
dataSource.put("id", element.getId());
dataSource.put("temp", element.getTemperature().toString());
dataSource.put("ts", element.getTimestamp().toString());
// 创建请求,作为向es发起的写入命令(ES7统一type就是_doc,不再允许指定type)
IndexRequest indexRequest = Requests.indexRequest()
.index("sensor")
.source(dataSource);
// 用index发送请求
indexer.add(indexRequest);
}
}
flink默认使用ProcessingTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Flink对于迟到数据有三层保障,先来后到的保障顺序是:
前言:流处理从事件产生,到流经source,再到operator,有可能由于网络、分布式等原因,导致Flink接收到的事件的先后顺序与事件的Event Time顺序不同,此时若使用了Event Time语义,则不能明确数据是否全部到位,但又不能无限期的等下去,于是有了Watermark。
Watermark = maxEventTime-延迟时间t
,[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4SPtWYOS-1649949748155)(C:\Users\Simon\AppData\Roaming\Typora\typora-user-images\1649778902375.png)]
Flink的算子一般分布在多个并行的算子子任务,每个并行算子子任务会维护针对该子任务的Event Time时钟,这个时钟记录了这个算子子任务的Watermark处理进度。每个分区维护一个水印,当上游有水印进入该分区时,若新水印大于旧水印,就以新换旧,然后对比更新了水印后的各个分区的水印,若最小的那个大于当前算子的水印,就更新当前算子水印,并把该水印发到下游算子——Flink某算子子任务根据上游流入的各Watermark来更新Partition Watermark列表。选取Partition Watermark列表中最小的时间戳作为该算子子任务的Event Time,并将Event Time发送给下游算子。
假如某个上游分区的Watermark一直不更新,Partition Watermark列表其他地方都在正常更新,唯独个别分区的Watermark停滞,这会导致算子的Event Time不更新,相应的时间窗口计算也不会被触发,大量的数据积压在算子内部得不到处理
在union()等多数据流处理时,一旦发现某个数据流不再生成新的Watermark,我们要在SourceFunction中的SourceContext里调用markAsTemporarilyIdle()来设置该数据流为空闲状态,避免空转。
在自定义SourceFunction或RichSourceFunction的SourceContext里重写void collectWithTimestamp(Telement,long timestamp)和void emitWatermark(Watermark mark)两个方法,前者给数据流中的每个元素赋值一个timestamp作为Event Time,后者生成Watermark。
ctx.collectWithTimestamp(obj, obj.eventTime);
if (obj.hasWatermarkTime()) {
ctx.emitWatermark(new Watermark(obj.watermarkTime));
}
使用流的assignTimestampsAndWatermarks()
方法,传入WatermarkStrategy.forGenerator(...).withTimestampAssigner(...)
。
forGenerator()方法用来生成Watermark,withTimestampAssigner()方法用来为数据流的每个元素设置时间戳。
forGenerator()设置Watermark生成策略:
env.getConfig.setAutoWatermarkInterval(5000L)
两者都需要实现WatermarkGenerator接口:
// Flink源码
// 生成Watermark的接口
@Public
public interface WatermarkGenerator<T> {
// 数据流中的每个元素流入后都会调用onEvent()方法
// Punctunated方式下,一般根据数据流中的元素是否有特殊标记来判断是否需要生成Watermark
// Periodic 方式下,一般用于记录各元素的Event Time时间戳
void onEvent(T event, long eventTimestamp, WatermarkOutput output);
// 每隔固定周期调用onPeriodicEmit()方法
// 一般主要用于Periodic方式
// 固定周期用 ExecutionConfig#setAutoWatermarkInterval() 方法设置
void onPeriodicEmit(WatermarkOutput output);
}
示例:
dataStream.assignTimestampsAndWatermarks(
WatermarkStrategy
.forGenerator((context -> new MyPeriodicGenerator()))
.withTimestampAssigner((event, recordTimestamp) -> event.f1));
public static class MyPeriodicGenerator implements WatermarkGenerator<Tuple2<String,
Long>> {
private final long maxOutOfOrderness = 60 * 1000; // 1分钟
private long currentMaxTimestamp; // 已抽取的Timestamp最大值
@Override
public void onEvent(Tuple2<String, Long> event, long eventTimestamp,
WatermarkOutput output) {
// 更新currentMaxTimestamp为当前遇到的最大值
currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// Watermark比currentMaxTimestamp最大值慢1分钟
output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness));
}
}
currentMaxTimestamp存元素的时间戳最大值。当需要发射Watermark时,以时间戳最大值减1分钟作为Watermark发送出去。
1 与上述自定义的示例一样,指定延迟时间t,以 最大事件时间-t 的值作为数据的watermark
// 第2个字段是时间戳
env.addSource(new MySource())
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.f1)
);
2 直接以时间戳最大值作为水印,不添加延迟:
env.addSource(new MySource())
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple2<String, Long>>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.f1)
);
最常见的引用方式如下:
dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {
@Override
public long extractTimestamp(SensorReading element) {
return element.getTimestamp() * 1000L;
}
});
BoundedOutOfOrdernessTimestampExtractor是AssignerWithPeriodicWatermarks
的实现类,还有一个接口AssignerWithPunctuatedWatermarks
,这两个接口都可以自定义如何从事件数据中抽取时间戳。
AssignerWithPeriodicWatermarks
ExecutionConfig.setAutoWatermarkInterval()
方法进行设置AssignerWithPunctuatedWatermarks
一般认为Watermark的设置代码,在里Source步骤越近的地方越合适。
即使如此,依然可能会有些事件数据在 Watermark 之后到达,这时可以用Late Elements 处理。
window是一种切割无限数据为有限块进行处理的手段,将一个无限的stream拆分成有限大小的”buckets”桶,我们可以在这些桶上做计算操作。
无限数据集是指一种不断增长的本质上无限的数据集。
Flink 默认的时间窗口根据 Processing Time 进行窗口的划分。
// Keyed Window
stream
.keyBy(<KeySelector>) <- 按照一个Key进行分组
.window(<WindowAssigner>) <- 将数据流中的元素分配到相应的窗口中
[.trigger(<Trigger>)] <- 指定触发器Trigger(可选)
[.evictor(<Evictor>)] <- 指定清除器Evictor(可选)
.reduce/aggregate/process() <- 窗口处理函数Window Function
// Non-Keyed Window
stream
.windowAll(WindowAssigner) <- 不分组,将数据流中的所有元素分配到相应的窗口中
[.trigger(<Trigger>)] <- 指定触发器Trigger(可选)
[.evictor(<Evictor>)] <- 指定清除器Evictor(可选)
.reduce/aggregate/process() <- 窗口处理函数Window Function
时间窗口(Time Window):按时间生成窗口
滚动时间窗口(Tumbling Windows):按固定窗口长度对数据做切割,可以看做是滑动窗口的一种特殊情况(即窗口大小和滑动间隔相等)
特点:时间对齐,窗口长度固定,没有重叠。
适用场景:适合做 BI 统计等(做每个时间段的聚合计算)。
滑动时间窗口(Sliding Windows):每隔一个滑动间隔时间,对过去固定窗口长度的数据做切割
会话窗口(Session Windows):由一系列事件组合一个指定时间长度的timeout间隙组成,也就是一段时间没有接收到新数据就会生成新的窗口,配置 session 间隔以定义非活跃周期的长度,当收不到数据的时间达到session间隔,则关闭当前session,后续数据会分配到新session窗口
计数窗口(Count Window):按数据数量生成窗口
window()
方法.timeWindow()
和.countWindow()
方法,用于定义时间窗口和计数窗口。.windowAll()
,其他窗口定义方法必须在keyBy之后才能使用基于时间的窗口都有一个开始时间和结束时间,表示一个左闭右开的时间段。
timeWindow()是window()的简写,使用timeWindow()时,如果我们在执行环境设置了TimeCharacteristic.EventTime,那么Flink会自动调用TumblingEventTimeWindows;如果我们设置TimeCharacteristic.ProcessingTime,那么Flink会自动调用TumblingProcessingTimeWindows。
滚动窗口:
DataStream<T> input = ...
//window
input
.keyBy(<KeySelector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5))) //基于事件时间
// .window(TumblingEventTimeWindows.of(Time.hours(1), Time.minutes(15))) //基于事件时间并设置offset(参数2)
// .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) //基于处理时间
.<window function>(...)
//timeWindow
input.timeWindow( Time.seconds(15) )
滑动窗口:(参数分别是窗口大小、滑动步长、延迟时间)
DataStream<T> input = ...
//window
input
.keyBy(<KeySelector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) //基于事件时间
// .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))//基于处理时间
// .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))) //带延迟
.<window function>(...)
//timeWindow
input.timeWindow( Time.seconds(15), Time.seconds(5) )
时间间隔可以通过Time.milliseconds(x)
,Time.seconds(x)
,Time.minutes(x)
等其中的一个来指定。
滚动窗口:.countWindow( long)
滑动窗口:.countWindow( long, long ) ,参数一个是window_size,一个是sliding_size。
CountWindow 的 window_size 指的是相同 Key 的元素的个数,不是输入的所有元素的总数。
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
该模式下,两个窗口之间有一个间隙,称为Session Gap,当一个窗口在大于Session Gap的时间内没有接收到新数据时,窗口将关闭。在这种模式下,窗口的Size是可变的,每个窗口的开始和结束时间并不是确定的。
我们可以设置定长的Session Gap,也可以使用SessionWindowTimeGapExtractor动态地确定Session Gap的值。
DataStream<T> input = ...
// event-time session windows with static gap
input
.keyBy(<KeySelector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<window function>(...)
// event-time session windows with dynamic gap
.window(EventTimeSessionWindows.withDynamicGap((element) -> {
// processing-time session windows with static gap
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
// processing-time session windows with dynamic gap
.window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
function例子:http://www.javashuo.com/article/p-ftiqugmc-eu.html
window function 定义了要对窗口中收集的数据做的计算操作,在window开窗操作后调用
主要可以分为两类:
指的是窗口保存一份中间数据,每流入一个新元素,新元素与中间数据两两合一,生成新的中间数据,再保存到窗口中。典型的增量聚合函数有ReduceFunction, AggregateFunction。
public interface ReduceFunction<T> extends Function, Serializable {
T reduce(T var1, T var2) throws Exception;
}
public interface AggregateFunction<IN, ACC, OUT>
extends Function, Serializable { // ACC数据即中间状态数据
// 以下函数一般在初始化时调用
ACC createAccumulator();
// 当一个新元素流入时,将新元素与ACC数据合并,返回ACC数据
ACC add(IN value, ACC accumulator);
// 将两个ACC数据合并,一般在窗口融合时调用,比如,会话窗口模式下,窗口长短是不断变化的,多个窗口有可能合并为一个窗口,多个窗口内的ACC会合并为一个
ACC merge(ACC a, ACC b);
// 将中间数据转换成结果数据,在窗口结束时调用
OUT getResult(ACC accumulator);
}
dataStream.keyBy("id")
// .timeWindow(Time.seconds(15)) // 已经不建议使用@Deprecated
.window(TumblingProcessingTimeWindows.of(Time.seconds(15)))
.aggregate(new myAggregateFunction());
AggregateFunction myAggregateFunction=new AggregateFunction<UserActionLog, Tuple2<Long,Long>, Double>() {
// 一、初始值
// 定义累加器初始值
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L,0L);
}
// 二、累加
// 定义累加器如何基于输入数据进行累加
@Override
public Tuple2<Long, Long> add(UserActionLog value, Tuple2<Long, Long> accumulator) {
accumulator.f0 += 1;
accumulator.f1 += value.getProductPrice();
return accumulator;
}
// 三、合并
// 定义累加器如何和State中的累加器进行合并
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> acc1, Tuple2<Long, Long> acc2) {
acc1.f0+=acc2.f0;
acc1.f1+=acc2.f1;
return acc1;
}
// 四、输出
// 定义如何输出数据
@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
return accumulator.f1 / (accumulator.f0 * 1.0);
}
}
指的是窗口先缓存所有元素,等触发条件后才对窗口内的全量元素执行计算。WindowFunction和ProcessWindowFunction
/**
* 函数接收4个泛型
* IN:输入类型
* OUT:输出类型
* KEY:keyBy()中按照Key分组,Key类型
* W:窗口类型
*/
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends
AbstractRichFunction {
/**
* 对一个窗口内的元素进行处理,窗口内的元素缓存在Iterable<IN>,进行处理后输出到
Collector<OUT>中
* 我们可以输出一到多个结果
*/
public abstract void process(KEY key, Context context, Iterable<IN> elements,
Collector<OUT> out) throws Exception;
/**
* 当窗口执行完毕被清理时,删除各类状态数据
*/
public void clear(Context context) throws Exception {}
/**
* 一个窗口的Context,包含窗口的一些元数据、状态数据等。
*/
public abstract class Context implements java.io.Serializable {
// 返回当前正在处理的窗口
public abstract W window();
// 返回当前Process Time
public abstract long currentProcessingTime();
// 返回当前Event Time对应的Watermark
public abstract long currentWatermark();
// 返回某个Key下的某个窗口的状态
public abstract KeyedStateStore windowState();
// 返回某个Key下的全局状态
public abstract KeyedStateStore globalState();
// 将迟到数据发送到其他位置
public abstract <X> void output(OutputTag<X> outputTag, X value);
}
}
可以获取窗口的更多信息。
Context中有两种状态:
触发器(Trigger)本质上是一种Timer,我们需要注册合适的时间,当到达这个时间时,flink会启动窗口处理函数来处理窗口中的数据,以及将窗口内的数据清除。
stream.keyBy(s -> s.symbol)
.timeWindow(Time.seconds(60))
.trigger(new MyTrigger())
.aggregate(new AverageAggregate());
Trigger会返回一个TriggerResult类型的结果。
TriggerResult是一个枚举类型,有下面几种情况。
每个窗口都有一个默认的Trigger。比如基于Event Time的窗口会有一个EventTimeTrigger,每当窗口的Watermark到达窗口的结束时间,Trigger会发送FIRE。
当这些已有的Trigger无法满足我们的需求时,我们需要自定义Trigger(自定义Trigger,如果使用了状态,一定要使用clear()方法将状态数据清除)
/**
* T为元素类型
* W为窗口类型
*/
public abstract class Trigger<T, W extends Window> implements Serializable {
// 窗口增加一个元素时调用,返回一个TriggerResult,可以在这里进行逻辑判断,在合适的时候调用ctx的registerxxxTimer注册timer
public abstract TriggerResult onElement(T element, long timestamp, W window,
TriggerContext ctx) throws Exception;
// 当一个基于Processing Time的Timer触发了FIRE时被调用
public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;
// 当一个基于Event Time的Timer触发了FIRE时被调用
public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx)
throws Exception;
// 当窗口数据被清除时,调用clear()方法来清除所有的Trigger状态数据
public abstract void clear(W window, TriggerContext ctx) throws Exception
/**
* TriggerContext,保存了时间、状态、监控以及定时器等信息
*/
public interface TriggerContext {
// 返回当前Processing Time
long getCurrentProcessingTime();
// 返回MetricGroup对象
MetricGroup getMetricGroup();
// 返回当前Watermark时间
long getCurrentWatermark();
// 将某个time值注册为一个Timer,当操作系统时间到达time值这个时间点时,
onProcessingTime方法会被调用
void registerProcessingTimeTimer(long time);
// 将某个time值注册为一个Timer,当Watermark到达time值这个时间点时,onEventTime
方法会被调用
void registerEventTimeTimer(long time);
void deleteProcessingTimeTimer(long time);
void deleteEventTimeTimer(long time);
// 获取状态
<S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor);
}
...
}
清除器(Evictor)是在WindowAssigner和Trigger的基础上的一个可选选项,用来清除一些数据。
可以在窗口处理函数执行前或执行后调用Evictor。
主要针对全量计算,对于增量计算的ReduceFunction和AggregateFunction没必要使用Evictor
如下,窗口的所有元素被放在了Iterable<TimestampedValue>中,我们要实现自己的清除逻辑。
/**
* T为元素类型
* W为窗口类型
*/
public interface Evictor<T, W extends Window> extends Serializable {
// 在窗口处理函数前调用
void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window,
EvictorContext evictorContext);
// 在窗口处理函数后调用
void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window,
EvictorContext evictorContext);
// EvictorContext
interface EvictorContext {
long getCurrentProcessingTime();
MetricGroup getMetricGroup();
long getCurrentWatermark();
}
}
两种Join:窗口连接(Window Join)和时间间隔连接(IntervalJoin)。
主要在Flink的窗口上进行操作,它将两个数据流中落在相同窗口的元素按照某个Key进行Inner Join,当窗口的时间结束,Flink会调用JoinFunction来对窗口内的数据对进行处理。
输入数据流input1中的某个元素与输入数据流input2中的所有元素逐个配对,当数据流中某个窗口内没数据时,Inner Join的结果为空。
input1.join(input2)
.where(<KeySelector>) <- input1 使用哪个字段作为Key
.equalTo(<KeySelector>) <- input2 使用哪个字段作为Key
.window(<WindowAssigner>) <- 指定WindowAssigner
[.trigger(<Trigger>)] <- 指定Trigger(可选)
[.evictor(<Evictor>)] <- 指定Evictor(可选)
.apply(<JoinFunction>) <- 指定JoinFunction
JoinFunction需事先join方法,该方法对两两配对的数据进行操作,并返回join的处理结果。
除了JoinFunction,Flink还提供了FlatJoinFunction,其功能是输出零到多个结果。
public static class MyJoinFunction
implements JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String> {
@Override
public String join(Tuple2<String, Integer> input1, Tuple2<String, Integer> input2)
{
return "input 1 :" + input1.f1 + ", input 2 :" + input2.f1;
}
}
DataStream<Tuple2<String, Integer>> input1 = ...
DataStream<Tuple2<String, Integer>> input2 = ...
DataStream<String> joinResult = input1.join(input2)
.where(i1 -> i1.f0)
.equalTo(i2 -> i2.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
.apply(new MyJoinFunction());
与join使用方式一致,在.apply()中作为传参,不同的是coGroup对应的是CoGroupFunction。
可以获得两个数据流中的所有元素,元素以Iterable的形式供开发者自行处理。
与window join类似,不过join换成coGroup,apply的参数改成CoGroupFunction
input1.coGroup(input2)
.where(<KeySelector>) <- input1 使用哪个字段作为Key
.equalTo(<KeySelector>) <- input2 使用哪个字段作为Key
.window(<WindowAssigner>) <- 指定WindowAssigner
[.trigger(<Trigger>)] <- 指定Trigger(可选)
[.evictor(<Evictor>)] <- 指定Evictor(可选)
.apply(<CoGroupFunction>) <- CoGroupFunction
如果第一个数据流中的某些Key是空的,那么CoGroupFunction被触发时,该Key上的Iterable为空,开发者自行决定如何处理空数据。
public static class MyCoGroupFunction
implements CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>,
String> {
@Override
public void coGroup(Iterable<Tuple2<String, Integer>> input1,
Iterable<Tuple2<String, Integer>> input2, Collector<String> out) {
input1.forEach(element -> System.out.println("input1 :" + element.f1));
input2.forEach(element -> System.out.println("input2 :" + element.f1));
}
}
input1.coGroup(input2).where(<KeySelector>).equalTo(<KeySelecotr>)。
DataStream<Tuple2<String, Integer>> input1 = ...
DataStream<Tuple2<String, Integer>> input2 = ...
DataStream<String> coGroupResult = input1.coGroup(input2)
.where(i1 -> i1.f0)
.equalTo(i2 -> i2.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.apply(new MyCoGroupFunction());
主要概念:
其他:
如果我们对input1和input2进行Interval Join,input1中的某个元素为input1.element1,时间戳为input1.element1.ts,那么Interval就是[input1.element1.ts+lowerBound,input1.element1.ts+upperBound],input2中落在这个时间段内的元素将会和input1.element1组成一个数据对。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BtqPqfvr-1649949748158)(C:\Users\Simon\AppData\Roaming\Typora\typora-user-images\1649942778369.png)]
对两个KeyedStream进行intervalJoin操作,用between指定上下界,用process指定ProcessJoinFunction
input1.keyBy(i -> i.f0)
.intervalJoin(input2.keyBy(i -> i.f0))
.between(Time.milliseconds(-5), Time.milliseconds(10))
.process(new MyProcessFunction());
public static class MyProcessFunction extends ProcessJoinFunction<Tuple3<String,
Long, Integer>, Tuple3<String, Long, Integer>, String> {
@Override
public void processElement(Tuple3<String, Long, Integer> input1,
Tuple3<String, Long, Integer> input2,
Context context,
Collector<String> out) {
out.collect("input 1: " + input1.toString() + ", input 2: " +
input2.toString());
}
}
// 使用Event Time
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 数据流有3个字段:(Key, 时间戳, 数值)
DataStream<Tuple3<String, Long, Integer>> input1 = ...
DataStream<Tuple3<String, Long, Integer>> input2 = ...
DataStream<String> intervalJoinResult = input1.keyBy(i -> i.f0)
.intervalJoin(input2.keyBy(i -> i.f0))
.between(Time.milliseconds(-5), Time.milliseconds(10))
.process(new MyProcessFunction());
对于Event Time,我们使用Watermark来判断数据是否迟到。一个迟到数据是指数据到达窗口算子时,该数据本该被分配到某个窗口,但由于延迟,窗口已经触发计算。
allowedLateness(
.allowedLateness() —— 允许处理迟到的数据
.sideOutputLateData(OutputTag)
使用ProcessFunction系列函数的侧输出功能存储迟到数据:
final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};
DataStream<T> input = ...
SingleOutputStreamOperator<T> result = input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.sideOutputLateData(lateOutputTag)
.<windowed transformation>(<window function>);
DataStream<T> lateStream = result.getSideOutput(lateOutputTag);
allowedLateness()设置窗口结束后的等待时间,若某个迟到元素的Event Time大于窗口结束时间但是小于“窗口结束时间+lateness”,迟到数据就会被加入ProcessWindowFunction的缓存中,窗口的Trigger会触发一次FIRE,重新计算结果。
使用这个功能时需要注意,原来窗口中的状态数据在窗口已经触发的情况下仍然会被保留
会话窗口依赖Session Gap来切分窗口,使用allowedLateness()可能会导致两个窗口合并成一个窗口。
/**
* ProcessWindowFunction 接收的泛型参数分别为:输入、输出、Key、窗口
*/
public static class AllowedLatenessFunction extends
ProcessWindowFunction<Tuple3<String, Long, Integer>, Tuple4<String, String, Integer,
String>, String, TimeWindow> {
@Override
public void process(String key,
Context context,
Iterable<Tuple3<String, Long, Integer>> elements,
Collector<Tuple4<String, String, Integer, String>> out) throws
Exception {
ValueState<Boolean> isUpdated = context.windowState().getState(
new ValueStateDescriptor<Boolean>("isUpdated", Types.BOOLEAN));
int count = 0;
for (Object i : elements) {
count++;
}
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
if (null == isUpdated.value() || isUpdated.value() == false) {
// 第一次使用process()函数时,isUpdated默认初始化为false,因此窗口处理函数第一次
//被调用时会进入这里
out.collect(Tuple4.of(key,
format.format(Calendar.getInstance().getTime()), count, "first"));
isUpdated.update(true);
} else {
// 之后isUpdated被设置为true,窗口处理函数因迟到数据被调用时会进入这里
out.collect(Tuple4.of(key,
format.format(Calendar.getInstance().getTime()), count, "updated"));
}
}
}
// 使用Event Time
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 数据流有3个字段:key、时间戳、数值
DataStream<Tuple3<String, Long, Integer>> input = ...
DataStream<Tuple4<String, String, Integer, String>> allowedLatenessStream = input
.keyBy(item -> item.f0)
.timeWindow(Time.seconds(5))
.allowedLateness(Time.seconds(5))
.process(new AllowedLatenessFunction());
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。