赞
踩
目录
针对无限和有限数据流进行有状态计算的分布式执行引擎框架。集群部署,随意扩容;内存计算,速度快。
流 | |
状态 | 在一定时间内存储所接收的事件或中间结果 |
时间 | 事件时间,根据事件本身自带的时间戳进行结果的计算,保证结果的准确性和一致性。 |
处理时间,根据处理引擎的机器时钟触发计算,低延迟需求,并且能够容忍近似结果。 |
事件驱动型应用 | 从一个或多个事件流提取数据,并根据到来的事件触发计算、状态更新或其他外部动作。 |
数据分析应用 | 从原始数据中提取有价值的信息和指标。 |
数据管道应用 | 数据管道和 ETL 作业的用途相似,都可以转换、丰富数据,并将其从某个存储系统移动到另一个。但数据管道是以持续流模式运行,而非周期性触发。
|
Flink程序的基础块是stream流和transformation转化。(与spark类似)
当执行时,flink程序映射为streaming dataflow,其由streams和transformations操作组成。
每个dataflow开始于一个或多个sources,终止于一个或多个sinks。
FLink程序内在的就是并行且分布式的。
执行时,一个stream有多个stream partitions分区,每个operator有多个operator subtasks子任务,每个子任务是彼此独立的且在不同的线程中执行,可能不同机器或容器。
使用DataStream#keyBy对流进行分区,保证同一个 task 处理同一个 key 的所有数据。
流的聚合操作与批的聚合操作是不同的。
由于流是持续不断的,所以在流中汇总所有记录是不可能的。
流的聚合操作是通过windows实现的,例如最近x分的计数,最近x条记录的汇总等。
用 Flink 计算窗口分析取决于两个主要的抽象:Window Assigners,Window Functions。
Window Assigners,窗口分配器,将事件分配给窗口(根据需要创建新的窗口对象)。
滚动时间窗口 | |
每分钟页面浏览量 | |
滑动时间窗口 | |
每10秒钟计算前1分钟的页面浏览量 | |
滚动数量窗口 | |
滑动数量窗口 | |
会话窗口 | |
全局窗口 |
可以使用的间隔时间 Time.milliseconds(n), Time.seconds(n), Time.minutes(n), Time.hours(n), 和 Time.days(n) .
基于时间的窗口分配器(包括会话时间)既可以处理 事件时间,也可以处理 处理时间。建议使用事件时间。
使用基于计数的窗口时,请记住,只有窗口内的事件数量到达窗口要求的数值时,这些窗口才会触发计算。尽管可以使用自定义触发器自己实现该行为,但无法应对超时和处理部分窗口。
Window Functions,用于处理窗口内的数据。
像批量处理 | ProcessWindowFunction 会缓存 Iterable 和窗口内容,供接下来全量计算; |
像流处理 | 每一次有事件被分配到窗口时,都会调用 ReduceFunction 或者 AggregateFunction 来增量计算; |
结合两者 | 通过 ReduceFunction 或者 AggregateFunction 预聚合的增量计算结果在触发窗口时, 提供给 ProcessWindowFunction 做全量计算。 |
- stream.
- .keyBy(<key selector>)
- .window(<window assigner>)
- .reduce|aggregate|process(<window function>)
不是必须使用键控事件流(keyed stream),但是如果不使用键控事件流,我们的程序就不能 并行 处理。
Event time | 事件创建的时间,通常表现为事件中由生产者附加的时间戳。 |
Ingestion time: | 事件通过source操作进入flink数据流的时间。 |
Processing time | 对基于时间进行操作的操作器,它的操作时间。 |
Flink 中最基础的状态类型是 ValueState,能够为被其封装的变量添加容错能力的类型。
ValueState 是一种 keyed state,也就是说它只能被用于 keyed context 提供的 operator 中,即所有能够紧随 DataStream#keyBy 之后被调用的operator。 一个 operator 中的 keyed state 的作用域默认是属于它所属的 key 的。
ValueState 需要使用 ValueStateDescriptor 来创建,ValueStateDescriptor 包含了 Flink 如何管理变量的一些元数据信息。
状态在使用之前需要使用 open() 函数来注册状态。
三个用于交互的方法:update 用于更新状态,value 用于获取状态值,clear 用于清空状态。 如果一个 key 还没有状态,例如当程序刚启动或者调用过 ValueState#clear 方法时,ValueState#value 将会返回 null。 如果需要更新状态,需要调用 ValueState#update 方法,直接更改 ValueState#value 的返回值可能不会被系统识别。 容错处理将在 Flink 后台自动管理,你可以像与常规变量那样与状态变量进行交互。
计时器在将来的某个时间点执行回调函数。
算子内需要创建一个记录定时器时间的状态,在open函数中创建实例。
- private transient ValueState<Boolean> flagState;
- private transient ValueState<Long> timerState;
-
- @Override
- public void open(Configuration parameters) {
- ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
- "flag",
- Types.BOOLEAN);
- flagState = getRuntimeContext().getState(flagDescriptor);
-
- ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
- "timer-state",
- Types.LONG);
- timerState = getRuntimeContext().getState(timerDescriptor);
- }
Context提供了定时器服务,定时器服务可以用于查询当前时间、注册定时器和删除定时器。
- // set the timer and timer state
- long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
- context.timerService().registerProcessingTimeTimer(timer);
- timerState.update(timer);
处理时间是本地时钟时间,这是由运行任务的服务器的系统时间来决定的。
需要删除已注册的定时器。
- // delete timer
- Long timer = timerState.value();
- ctx.timerService().deleteProcessingTimeTimer(timer);
当定时器触发时,回调onTimer方法
一个 Flink 集群总是包含一个 JobManager 以及一个或多个 Flink TaskManager。
jobManager负责处理 Job 提交、 Job 监控以及资源管理。
在job执行期间,jobManager会持续的跟踪各个task,决定如何调度下一个或下一组task,处理已完成的task或失败情况。
jobGraph包括由多个算子顶点(jobVertex)组成的数据流图,以及中间结果数据(IntermediateDataSet)。
jobVertex 算子,每个算子有配置属性,如并行度和运行的代码。
jobGraph还包含算子运行所必须的依赖库。
ExecutionGraph 执行图,理解为并行的job图。
每个算子顶点对应一个执行算子顶点ExecutionJobVertex。
对于每个算子顶点jobVertex,它的每个并行子task对应一个执行顶点ExecutionVertex,由执行顶点跟踪子task的执行状态。并行度100的算子顶点,会有100个子task,对应100个执行顶点。
一个执行算子顶点持有相应的算子顶点对应的所有执行顶点,并跟踪整个算子的执行状态。
中间数据结果 IntermediateResult,负责跟踪中间结果的状态。
中间结果的分片 IntermediateResultPartition 负责跟踪每个分片的状态。
jobManager接收jobGraph,将job图转变为执行图ExecutionGraph。
jobGraph | ExecutionGraph |
jobVertex | ExecutionJobVertex |
task | ExecutionVertex |
IntermediateDataSet | IntermediateResult 多个IntermediateResultPartition |
Flink TaskManager 运行 worker 进程, 负责实际任务 Tasks 的执行,而这些任务共同组成了一个 Flink Job。
使用task slot定义执行资源,一个taskManager包含多个slot。
每个slot可以运行一条流水线,且这条流水线由多个并行的连续的task组成。task指source、操作(如map、key等)、输出。
对于任何给定时间戳的事件,Flink 何时停止等待较早事件的到来。这正是watermarks 的作用:定义何时停止等待较早的事件。
Flink 中事件时间的处理取决于 watermark 生成器,后者将带有时间戳的特殊元素插入流中形成 watermarks。事件时间 t 的 watermark 代表 t 之前(很可能)都已经到达。
对于大多数应用而言,固定延迟策略已经足够了。
如果想要使用基于带有事件时间戳的事件流,Flink 需要知道与每个事件相关的时间戳,而且流必须包含 watermark。
- DataStream<Event> stream = ...
-
- WatermarkStrategy<Event> strategy = WatermarkStrategy
- .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(20))
- .withTimestampAssigner((event, timestamp) -> event.timestamp);
-
- DataStream<Event> withTimestampsAndWatermarks =
- stream.assignTimestampsAndWatermarks(strategy);
- // 司机维度的行程数量
- public class RideCount {
- public static void main(String[] args) throws Exception {
-
- // set up streaming execution environment
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- // start the data generator
- // 事件数据源
- DataStream<TaxiRide> rides = env.addSource(new TaxiRideGenerator());
-
- // map each ride to a tuple of (driverId, 1)
- // 将单个事件 map为 其他数据模型
- DataStream<Tuple2<Long, Long>> tuples = rides.map(new MapFunction<TaxiRide, Tuple2<Long, Long>>() {
-
- @Override
- public Tuple2<Long, Long> map(TaxiRide ride) {
- // 这里用1L的原因:该例子用于统计司机的行程数量,每出现一个行程事件,则加1.
- // 若统计其他,如总里程、总金额等,1L对应改为单次里程数或单次金额等。
- return Tuple2.of(ride.driverId, 1L);
- }
- });
-
- // partition the stream by the driverId
- // 分片,将Tuple2<Long, Long> 的第一个field作为key
- KeyedStream<Tuple2<Long, Long>, Long> keyedByDriverId = tuples.keyBy(t -> t.f0);
-
- // count the rides for each driver
- // 使用第几个field进行聚合,positionToSum based-0
- DataStream<Tuple2<Long, Long>> rideCounts = keyedByDriverId.sum(1);
-
- // we could, in fact, print out any or all of these streams
- rideCounts.print();
-
- // run the cleansing pipeline
- env.execute("Ride Count");
- }
- }
- // 疲劳驾驶预警
- public class LongRidesSolution {
-
- /**
- * Main method.
- * @throws Exception which occurs during job execution.
- */
- public static void main(String[] args) throws Exception {
-
- // set up streaming execution environment
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(2);
-
- // start the data generator
- DataStream<TaxiRide> rides = env.addSource(new TaxiRideGenerator());
-
- // 依赖与分区相关的state,实现数据处理。如
- DataStream<TaxiRide> longRides = rides.keyBy((TaxiRide ride) -> ride.rideId).process(new MatchFunction());
-
- longRides.print();
-
- env.execute("Long Taxi Rides");
- }
-
- private static class MatchFunction extends KeyedProcessFunction<Long, TaxiRide, TaxiRide> {
-
- private ValueState<TaxiRide> rideState;
-
- @Override
- public void open(Configuration config) {
- // 只有分区才可绑定相关的valueState
- // 每个分区可以有多个不同名称的valueStatue
- ValueStateDescriptor<TaxiRide> stateDescriptor = new ValueStateDescriptor<>("ride event", TaxiRide.class);
- rideState = getRuntimeContext().getState(stateDescriptor);
- }
-
- @Override
- public void processElement(TaxiRide ride, Context context, Collector<TaxiRide> out) throws Exception {
- // ride 事件数据
- // out 结果集
-
- // 获取valueState的数据
- TaxiRide previousRideEvent = rideState.value();
-
- // valueState无数据时,进行更新
- if (previousRideEvent == null) {
- // 更新valueState
- rideState.update(ride);
- if (ride.isStart) {
- // 注册计时器
- context.timerService().registerEventTimeTimer(getTimerTime(ride));
- }
- }
- // valueStatue有数据时,进行处理
- else {
- if (!ride.isStart) {
- // it's an END event, so event saved was the START event and has a timer
- // the timer hasn't fired yet, and we can safely kill the timer
- // 删除计时器
- context.timerService().deleteEventTimeTimer(getTimerTime(previousRideEvent));
- }
- // both events have now been seen, we can clear the state
- // 清空valueState
- rideState.clear();
- }
- }
-
- @Override
- public void onTimer(long timestamp, OnTimerContext context, Collector<TaxiRide> out) throws Exception {
- // 计时器触发
- // out 结果集
- // if we get here, we know that the ride started two hours ago, and the END hasn't been processed
- out.collect(rideState.value());
- rideState.clear();
- }
-
- private long getTimerTime(TaxiRide ride) {
- return ride.startTime.plusSeconds(120 * 60).toEpochMilli();
- }
- }
-
- }
- // 每小时获取最多小费的司机
- public class HourlyTipsSolution {
-
- /**
- * Main method.
- * @throws Exception which occurs during job execution.
- */
- public static void main(String[] args) throws Exception {
-
- // set up streaming execution environment
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(2);
-
- // start the data generator
- DataStream<TaxiFare> fares = env.addSource(new TaxiFareGenerator());
-
- // compute tips per hour for each driver
- // 分片-窗口-处理窗口内数据
- // 分片间并行处理
- DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares.keyBy((TaxiFare fare) -> fare.driverId)
- // 滚动窗口,如每小时,整点,如1:00:00 - 2:00:00
- .window(TumblingEventTimeWindows.of(Time.hours(1))).process(new AddTips());
-
- // 对窗口范围内包含的所有(分片)生产的窗口进行聚合,无法并行
- DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips
- .windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);
-
- // You should explore how this alternative behaves. In what ways is the same as,
- // and different from, the solution above (using a windowAll)?
-
- // DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips
- // .keyBy(t -> t.f0)
- // .maxBy(2);
-
- hourlyMax.print();
-
- // execute the transformation pipeline
- env.execute("Hourly Tips (java)");
- }
-
- /*
- * Wraps the pre-aggregated result into a tuple along with the window's timestamp and key.
- */
- public static class AddTips extends ProcessWindowFunction<TaxiFare, Tuple3<Long, Long, Float>, Long, TimeWindow> {
-
- @Override
- public void process(Long key, Context context, Iterable<TaxiFare> fares,
- Collector<Tuple3<Long, Long, Float>> out) {
- float sumOfTips = 0F;
- for (TaxiFare f : fares) {
- sumOfTips += f.tip;
- }
-
- /*System.out.println(
- new Date(context.window().getStart()) + Tuple3.of(context.window().getEnd(), key, sumOfTips)
- .toString());*/
- out.collect(Tuple3.of(context.window().getEnd(), key, sumOfTips));
- }
- }
- }
- // 行程与车费
- public class RidesAndFaresExample {
-
- /**
- * Main method.
- * @throws Exception which occurs during job execution.
- */
- public static void main(String[] args) throws Exception {
-
- // Set up streaming execution environment, including Web UI and REST endpoint.
- // Checkpointing isn't needed for the RidesAndFares exercise; this setup is for
- // using the State Processor API.
-
- Configuration conf = new Configuration();
- conf.setString("state.backend", "filesystem");
- conf.setString("state.savepoints.dir", "file:///tmp/savepoints");
- conf.setString("state.checkpoints.dir", "file:///tmp/checkpoints");
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
- env.setParallelism(2);
-
- env.enableCheckpointing(10000L);
- CheckpointConfig config = env.getCheckpointConfig();
- // 默认为精确一次
- // config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
- config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
-
- DataStream<TaxiRide> rides = env.addSource(new TaxiRideGenerator()).filter((TaxiRide ride) -> ride.isStart)
- .keyBy((TaxiRide ride) -> ride.rideId);
-
- DataStream<TaxiFare> fares = env.addSource(new TaxiFareGenerator()).keyBy((TaxiFare fare) -> fare.rideId);
-
- // Set a UID on the stateful flatmap operator so we can read its state using the State Processor API.
- DataStream<Tuple2<TaxiRide, TaxiFare>> enrichedRides = rides.connect(fares).flatMap(new EnrichmentFunction())
- .uid("enrichment");
-
- enrichedRides.print();
-
- env.execute("Join Rides with Fares (java RichCoFlatMap)");
- }
-
- public static class EnrichmentFunction
- extends RichCoFlatMapFunction<TaxiRide, TaxiFare, Tuple2<TaxiRide, TaxiFare>> {
-
- // keyed, managed state
- private ValueState<TaxiRide> rideState;
-
- private ValueState<TaxiFare> fareState;
-
- @Override
- public void open(Configuration config) {
- rideState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved ride", TaxiRide.class));
- fareState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved fare", TaxiFare.class));
- }
-
- @Override
- public void flatMap1(TaxiRide ride, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
- //System.out.println("ride " + ride);
- TaxiFare fare = fareState.value();
- if (fare != null) {
- fareState.clear();
- out.collect(Tuple2.of(ride, fare));
- }
- else {
- rideState.update(ride);
- }
- }
-
- @Override
- public void flatMap2(TaxiFare fare, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
- //System.out.println("fare " + fare);
- TaxiRide ride = rideState.value();
- if (ride != null) {
- rideState.clear();
- out.collect(Tuple2.of(ride, fare));
- }
- else {
- fareState.update(fare);
- }
- }
- }
- }
可前往Flink官方文档,获取更多。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。