赞
踩
Flink处理函数(ProcessFunction、KeyedProcessFunction、ProcessWindowFunction、 ProcessAllWindowFunction)
处理函数是最为灵活的处理方法,可以实现各种自定义的业务逻辑;同时也是整个 DataStream API 的底层基础。
之前所介绍的流处理 API,无论是基本的转换、聚合,还是更为复杂的窗口操作,其实都
是基于 DataStream 进行转换的;所以可以统称为 DataStream API,这也是 Flink 编程的核心。
而我们知道,为了让代码有更强大的表现力和易用性,Flink 本身提供了多层 API,DataStream
API 只是中间的一环。
提供了一个“定时服务”(TimerService),我们可以通过它访问流中的事件(event)、时间戳(timestamp)、水位线(watermark),甚至可以注册“定时事件”。而且处理函数继承了 AbstractRichFunction 抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息。此外,处理函数还可以直接将数据输出到侧输出流(side output)中。
基于 DataStream 调用.process()方法就可以了。方法需要传入一个 ProcessFunction 作为参数,用来定义处理逻辑。
stream.process(new MyProcessFunction())
ProcessFunction 是一个抽象类,继承了AbstractRichFunction;MyProcessFunction 是它的一个具体实现。所以所有的处理函数,都是富函数(RichFunction),富函数可以调用的东西这里同样都可以调用。
假设你有一个DataStream,其中包含单词的字符串。你想要使用ProcessFunction统计每个单词的出现次数。
- DataStream<String> inputStream = …;
-
- DataStream<Tuple2<String, Long>> outputStream = inputStream
- .keyBy(word -> word)
- .process(new WordCountProcessFunction());
-
- class WordCountProcessFunction extends ProcessFunction<String, Tuple2<String, Long>> {
- private MapState<String, Long> countState;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- countState = getRuntimeContext().getMapState(new MapStateDescriptor<>("counts", String.class, Long.class));
- }
-
- @Override
- public void processElement(String word, Context context, Collector<Tuple2<String, Long>> collector) throws Exception {
- Long count = countState.get(word);
- if (count == null) {
- count = 0L;
- }
- countState.put(word, count + 1);
- collector.collect(new Tuple2<>(word, count + 1));
- }
- }
抽象类 ProcessFunction 继承了AbstractRichFunction,有两个泛型类型参数:I 表示 Input,也就是输入的数据类型;O 表示 Output,也就是处理完成之后输出的数据类型。
内部单独定义了两个方法:一个是必须要实现的抽象法.processElement();另一个是非抽象方法.onTimer()。
public abstract class ProcessFunction<I, O> extends AbstractRichFunction { ... public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception; public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {} ... }
非抽象方法.onTimer()
用于定义定时触发的操作,这是一个非常强大、也非常有趣的功能。这个方法只有在注册好的定时器触发的时候才会调用,而定时器是通过“定时服务”TimerService 来注册的。
定时器(timers)是处理函数中进行时间相关操作的主要机制。在.onTimer()方法中可以实;现定时处理的逻辑,而它能触发的前提,就是之前曾经注册过定时器、并且现在已经到了触发时间。注册定时器的功能,是通过上下文中提供的“定时服务”(TimerService)来实现的。
定时服务与当前运行的环境有关。前面已经介绍过,ProcessFunction 的上下文(Context)中提供了.timerService()方法,可以直接返回一个 TimerService 对象:public abstract TimerService timerService();
TimerService 是 Flink 关于时间和定时器的基础服务接口,包含以下六个方法:
// 获取当前的处理时间 long currentProcessingTime(); // 获取当前的水位线(事件时间) long currentWatermark(); // 注册处理时间定时器,当处理时间超过 time 时触发 void registerProcessingTimeTimer(long time); // 注册事件时间定时器,当水位线超过 time 时触发 void registerEventTimeTimer(long time); // 删除触发时间为 time 的处理时间定时器 void deleteProcessingTimeTimer(long time); // 删除触发时间为 time 的处理时间定时器 void deleteEventTimeTimer(long time);
继承自 AbstractRichFunction 的一个抽象类
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction { ... public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception; public void onTimer(long timestamp, OnTimerContext ctx,Collector<O> out) throws Exception {} public abstract class Context {...} ... }
K,这是当前按键分区的 key 的类型
假设你有一个流数据流,其中包含每个用户的点击数据,并且你想要对每个用户的点击数进行计数。
public class ClickEvent { private String userId; private long timestamp; // constructor, getters and setters }
public class ClickCountProcessFunction extends KeyedProcessFunction<String, ClickEvent, Tuple2<String, Long>> { private ValueState<Long> clickCountState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); clickCountState = getRuntimeContext().getState(new ValueStateDescriptor<>("click-count", Long.class)); } @Override public void processElement(ClickEvent value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception { long count = clickCountState.value() + 1; clickCountState.update(count); out.collect(Tuple2.of(value.getUserId(), count)); } }
DataStream<ClickEvent> clickEventStream = ...; DataStream<Tuple2<String, Long>> clickCountStream = clickEventStream .keyBy(ClickEvent::getUserId) .process(new ClickCountProcessFunction());
它继承了 AbstractRichFunction 的抽象类,它有四个类型参数:
而内部定义的方法,跟我们之前熟悉的处理函数就有所区别了。因为全窗口函数不是逐个处理元素的,所以处理数据的方法在这里并不是.processElement(),而是改成了.process()。方法包含四个参数。
假设你有一个流数据流,其中包含每个用户的点击数据,并且你想要对每个用户在每小时内的点击数进行计数。
public class ClickEvent { private String userId; private long timestamp; // constructor, getters and setters }
public class ClickCountWindowFunction extends ProcessWindowFunction<ClickEvent, Tuple2<String, Long>, String, TimeWindow> { @Override public void process(String userId, Context context, Iterable<ClickEvent> events, Collector<Tuple2<String, Long>> out) { long count = 0L; for (ClickEvent event : events) { count++; } out.collect(Tuple2.of(userId, count)); } }
DataStream<ClickEvent> clickEventStream = ...; DataStream<Tuple2<String, Long>> clickCountStream = clickEventStream .keyBy(ClickEvent::getUserId) //定义一小时窗口 .timeWindow(Time.hours(1)) .process(new ClickCountWindowFunction());
public class ClickEvent { private String userId; private long timestamp; // constructor, getters and setters }
public class ClickCountProcessAllWindowFunction extends ProcessAllWindowFunction<ClickEvent, Tuple2<String, Long>, TimeWindow> { @Override public void process(Context context, Iterable<ClickEvent> events, Collector<Tuple2<String, Long>> out) { long count = 0L; String userId = null; for (ClickEvent event : events) { count++; userId = event.getUserId(); } out.collect(Tuple2.of(userId, count)); } }
DataStream<ClickEvent> clickEventStream = ...; DataStream<Tuple2<String, Long>> clickCountStream = clickEventStream .keyBy(ClickEvent::getUserId) .timeWindow(Time.hours(1)) .process(new ClickCountProcessAllWindowFunction());
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。