当前位置:   article > 正文

Apache Flink简介

apache flink

目录

1. Flink简介

1.1. what

1.2. 流处理应用的基本组件

1.3. 应用场景

2. Flink优势

3. 数据流编程模型

3.1. 多个抽象层

 3.2. 数据流编程

3.3. 并行数据流

3.4. 窗口

3.5. 时间

3.6. 状态操作

3.7. 故障恢复检查点

3.8. 支持批处理 

3.9. 计时器

4. JobManager和TaskManager

5. watermark水印

6. Flink示例


1. Flink简介

1.1. what

针对无限和有限数据流进行有状态计算的分布式执行引擎框架。集群部署,随意扩容;内存计算,速度快。

1.2. 流处理应用的基本组件

状态在一定时间内存储所接收的事件或中间结果
时间

事件时间,根据事件本身自带的时间戳进行结果的计算,保证结果的准确性和一致性。

处理时间,根据处理引擎的机器时钟触发计算,低延迟需求,并且能够容忍近似结果。

1.3. 应用场景

事件驱动型应用从一个或多个事件流提取数据,并根据到来的事件触发计算、状态更新或其他外部动作。
数据分析应用从原始数据中提取有价值的信息和指标。
数据管道应用

数据管道和 ETL 作业的用途相似,都可以转换、丰富数据,并将其从某个存储系统移动到另一个。但数据管道是以持续流模式运行,而非周期性触发。

提取-转换-加载(ETL)

一种在存储系统之间进行数据转换和迁移的常用方法。ETL 作业通常会周期性地触发,将数据从事务型数据库拷贝到分析型数据库或数据仓库。

2. Flink优势

  • 处理高吞吐量的事件流
  • 处理随时产生的事件,始终保持低延迟(sub-second)
  • 高效、易于使用的k/v结构的state
  • 真正的流处理框架。一次处理一个事件,每个事件都有自己的时间窗口。
  • 丰富的编程模型可以很容易地实现复杂的语义。对比微批处理,在事件流上进行推理更容易。
  • 使用事件时间,可以很容易地处理乱序事件等流缺陷

3. 数据流编程模型

3.1. 多个抽象层

 3.2. 数据流编程

Flink程序的基础块是stream流和transformation转化。(与spark类似)

当执行时,flink程序映射为streaming dataflow,其由streams和transformations操作组成。
每个dataflow开始于一个或多个sources,终止于一个或多个sinks。

3.3. 并行数据流

FLink程序内在的就是并行且分布式的。

执行时,一个stream有多个stream partitions分区,每个operator有多个operator subtasks子任务,每个子任务是彼此独立的且在不同的线程中执行,可能不同机器或容器。

使用DataStream#keyBy对流进行分区,保证同一个 task 处理同一个 key 的所有数据。 

3.4. 窗口

流的聚合操作与批的聚合操作是不同的。

由于流是持续不断的,所以在流中汇总所有记录是不可能的。

流的聚合操作是通过windows实现的,例如最近x分的计数,最近x条记录的汇总等。

用 Flink 计算窗口分析取决于两个主要的抽象:Window Assigners,Window Functions。

3.4.1. Window Assigners

Window Assigners,窗口分配器,将事件分配给窗口(根据需要创建新的窗口对象)。

窗口类型
滚动时间窗口  
每分钟页面浏览量
滑动时间窗口
每10秒钟计算前1分钟的页面浏览量
滚动数量窗口
滑动数量窗口
会话窗口
全局窗口

可以使用的间隔时间 Time.milliseconds(n), Time.seconds(n), Time.minutes(n), Time.hours(n), 和 Time.days(n) .

基于时间的窗口分配器(包括会话时间)既可以处理 事件时间,也可以处理 处理时间。建议使用事件时间。

使用基于计数的窗口时,请记住,只有窗口内的事件数量到达窗口要求的数值时,这些窗口才会触发计算。尽管可以使用自定义触发器自己实现该行为,但无法应对超时和处理部分窗口。 

3.4.2. Window Functions

Window Functions,用于处理窗口内的数据。

三种最基本的操作窗口内的事件的选项
像批量处理ProcessWindowFunction 会缓存 Iterable 和窗口内容,供接下来全量计算;
像流处理每一次有事件被分配到窗口时,都会调用 ReduceFunction 或者 AggregateFunction 来增量计算;
结合两者通过 ReduceFunction 或者 AggregateFunction 预聚合的增量计算结果在触发窗口时, 提供给 ProcessWindowFunction 做全量计算。

3.4.3. code

  1. stream.
  2. .keyBy(<key selector>)
  3. .window(<window assigner>)
  4. .reduce|aggregate|process(<window function>)

不是必须使用键控事件流(keyed stream),但是如果不使用键控事件流,我们的程序就不能 并行 处理。

3.5. 时间

Event time事件创建的时间,通常表现为事件中由生产者附加的时间戳。
Ingestion time:事件通过source操作进入flink数据流的时间。
Processing time对基于时间进行操作的操作器,它的操作时间。

3.6. 状态操作

Flink 中最基础的状态类型是 ValueState,能够为被其封装的变量添加容错能力的类型。
ValueState 是一种 keyed state,也就是说它只能被用于 keyed context 提供的 operator 中,即所有能够紧随 DataStream#keyBy 之后被调用的operator。 一个 operator 中的 keyed state 的作用域默认是属于它所属的 key 的。 

ValueState 需要使用 ValueStateDescriptor 来创建,ValueStateDescriptor 包含了 Flink 如何管理变量的一些元数据信息。
状态在使用之前需要使用 open() 函数来注册状态。

三个用于交互的方法:update 用于更新状态,value 用于获取状态值,clear 用于清空状态。 如果一个 key 还没有状态,例如当程序刚启动或者调用过 ValueState#clear 方法时,ValueState#value 将会返回 null。 如果需要更新状态,需要调用 ValueState#update 方法,直接更改 ValueState#value 的返回值可能不会被系统识别。 容错处理将在 Flink 后台自动管理,你可以像与常规变量那样与状态变量进行交互。 

3.7. 故障恢复检查点

3.8. 支持批处理 

3.9. 计时器

3.9.1. what

计时器在将来的某个时间点执行回调函数。

3.9.2. 计时器状态

算子内需要创建一个记录定时器时间的状态,在open函数中创建实例。

  1. private transient ValueState<Boolean> flagState;
  2. private transient ValueState<Long> timerState;
  3. @Override
  4. public void open(Configuration parameters) {
  5. ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
  6. "flag",
  7. Types.BOOLEAN);
  8. flagState = getRuntimeContext().getState(flagDescriptor);
  9. ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
  10. "timer-state",
  11. Types.LONG);
  12. timerState = getRuntimeContext().getState(timerDescriptor);
  13. }

3.9.3. 定时器服务

Context提供了定时器服务,定时器服务可以用于查询当前时间、注册定时器和删除定时器。 

3.9.4. 在process中注册定时器

  1. // set the timer and timer state
  2. long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
  3. context.timerService().registerProcessingTimeTimer(timer);
  4. timerState.update(timer);

处理时间是本地时钟时间,这是由运行任务的服务器的系统时间来决定的。

3.9.5. 取消定时器

需要删除已注册的定时器。

  1. // delete timer
  2. Long timer = timerState.value();
  3. ctx.timerService().deleteProcessingTimeTimer(timer);

3.9.6. 触发定时器

当定时器触发时,回调onTimer方法 

4. JobManager和TaskManager

一个 Flink 集群总是包含一个 JobManager 以及一个或多个 Flink TaskManager。

4.1. JobManager

jobManager负责处理 Job 提交、 Job 监控以及资源管理

在job执行期间,jobManager会持续的跟踪各个task,决定如何调度下一个或下一组task,处理已完成的task或失败情况。

4.1.1. jobGraph

jobGraph包括由多个算子顶点(jobVertex)组成的数据流图,以及中间结果数据(IntermediateDataSet)。
jobVertex 算子,每个算子有配置属性,如并行度和运行的代码。
jobGraph还包含算子运行所必须的依赖库。

4.1.2. ExecutionGraph

ExecutionGraph 执行图,理解为并行的job图。

每个算子顶点对应一个执行算子顶点ExecutionJobVertex。

对于每个算子顶点jobVertex,它的每个并行子task对应一个执行顶点ExecutionVertex,由执行顶点跟踪子task的执行状态。并行度100的算子顶点,会有100个子task,对应100个执行顶点。

一个执行算子顶点持有相应的算子顶点对应的所有执行顶点,并跟踪整个算子的执行状态

中间数据结果 IntermediateResult,负责跟踪中间结果的状态。
中间结果的分片 IntermediateResultPartition 负责跟踪每个分片的状态。

jobManager接收jobGraph,将job图转变为执行图ExecutionGraph。

对应关系
jobGraphExecutionGraph
jobVertexExecutionJobVertex
taskExecutionVertex
IntermediateDataSetIntermediateResult 多个IntermediateResultPartition

4.1.3. 图示

 

4.2.  TaskManager

Flink TaskManager 运行 worker 进程, 负责实际任务 Tasks 的执行,而这些任务共同组成了一个 Flink Job。 

使用task slot定义执行资源,一个taskManager包含多个slot。

每个slot可以运行一条流水线,且这条流水线由多个并行的连续的task组成。task指source、操作(如map、key等)、输出。

5. watermark水印

对于任何给定时间戳的事件,Flink 何时停止等待较早事件的到来。这正是watermarks 的作用:定义何时停止等待较早的事件

Flink 中事件时间的处理取决于 watermark 生成器,后者将带有时间戳的特殊元素插入流中形成 watermarks。事件时间 t 的 watermark 代表 t 之前(很可能)都已经到达。

对于大多数应用而言,固定延迟策略已经足够了。 

如果想要使用基于带有事件时间戳的事件流,Flink 需要知道与每个事件相关的时间戳,而且流必须包含 watermark。

  1. DataStream<Event> stream = ...
  2. WatermarkStrategy<Event> strategy = WatermarkStrategy
  3. .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(20))
  4. .withTimestampAssigner((event, timestamp) -> event.timestamp);
  5. DataStream<Event> withTimestampsAndWatermarks =
  6. stream.assignTimestampsAndWatermarks(strategy);

6. Flink示例

  1. // 司机维度的行程数量
  2. public class RideCount {
  3. public static void main(String[] args) throws Exception {
  4. // set up streaming execution environment
  5. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  6. // start the data generator
  7. // 事件数据源
  8. DataStream<TaxiRide> rides = env.addSource(new TaxiRideGenerator());
  9. // map each ride to a tuple of (driverId, 1)
  10. // 将单个事件 map为 其他数据模型
  11. DataStream<Tuple2<Long, Long>> tuples = rides.map(new MapFunction<TaxiRide, Tuple2<Long, Long>>() {
  12. @Override
  13. public Tuple2<Long, Long> map(TaxiRide ride) {
  14. // 这里用1L的原因:该例子用于统计司机的行程数量,每出现一个行程事件,则加1.
  15. // 若统计其他,如总里程、总金额等,1L对应改为单次里程数或单次金额等。
  16. return Tuple2.of(ride.driverId, 1L);
  17. }
  18. });
  19. // partition the stream by the driverId
  20. // 分片,将Tuple2<Long, Long> 的第一个field作为key
  21. KeyedStream<Tuple2<Long, Long>, Long> keyedByDriverId = tuples.keyBy(t -> t.f0);
  22. // count the rides for each driver
  23. // 使用第几个field进行聚合,positionToSum based-0
  24. DataStream<Tuple2<Long, Long>> rideCounts = keyedByDriverId.sum(1);
  25. // we could, in fact, print out any or all of these streams
  26. rideCounts.print();
  27. // run the cleansing pipeline
  28. env.execute("Ride Count");
  29. }
  30. }
  1. // 疲劳驾驶预警
  2. public class LongRidesSolution {
  3. /**
  4. * Main method.
  5. * @throws Exception which occurs during job execution.
  6. */
  7. public static void main(String[] args) throws Exception {
  8. // set up streaming execution environment
  9. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  10. env.setParallelism(2);
  11. // start the data generator
  12. DataStream<TaxiRide> rides = env.addSource(new TaxiRideGenerator());
  13. // 依赖与分区相关的state,实现数据处理。如
  14. DataStream<TaxiRide> longRides = rides.keyBy((TaxiRide ride) -> ride.rideId).process(new MatchFunction());
  15. longRides.print();
  16. env.execute("Long Taxi Rides");
  17. }
  18. private static class MatchFunction extends KeyedProcessFunction<Long, TaxiRide, TaxiRide> {
  19. private ValueState<TaxiRide> rideState;
  20. @Override
  21. public void open(Configuration config) {
  22. // 只有分区才可绑定相关的valueState
  23. // 每个分区可以有多个不同名称的valueStatue
  24. ValueStateDescriptor<TaxiRide> stateDescriptor = new ValueStateDescriptor<>("ride event", TaxiRide.class);
  25. rideState = getRuntimeContext().getState(stateDescriptor);
  26. }
  27. @Override
  28. public void processElement(TaxiRide ride, Context context, Collector<TaxiRide> out) throws Exception {
  29. // ride 事件数据
  30. // out 结果集
  31. // 获取valueState的数据
  32. TaxiRide previousRideEvent = rideState.value();
  33. // valueState无数据时,进行更新
  34. if (previousRideEvent == null) {
  35. // 更新valueState
  36. rideState.update(ride);
  37. if (ride.isStart) {
  38. // 注册计时器
  39. context.timerService().registerEventTimeTimer(getTimerTime(ride));
  40. }
  41. }
  42. // valueStatue有数据时,进行处理
  43. else {
  44. if (!ride.isStart) {
  45. // it's an END event, so event saved was the START event and has a timer
  46. // the timer hasn't fired yet, and we can safely kill the timer
  47. // 删除计时器
  48. context.timerService().deleteEventTimeTimer(getTimerTime(previousRideEvent));
  49. }
  50. // both events have now been seen, we can clear the state
  51. // 清空valueState
  52. rideState.clear();
  53. }
  54. }
  55. @Override
  56. public void onTimer(long timestamp, OnTimerContext context, Collector<TaxiRide> out) throws Exception {
  57. // 计时器触发
  58. // out 结果集
  59. // if we get here, we know that the ride started two hours ago, and the END hasn't been processed
  60. out.collect(rideState.value());
  61. rideState.clear();
  62. }
  63. private long getTimerTime(TaxiRide ride) {
  64. return ride.startTime.plusSeconds(120 * 60).toEpochMilli();
  65. }
  66. }
  67. }
  1. // 每小时获取最多小费的司机
  2. public class HourlyTipsSolution {
  3. /**
  4. * Main method.
  5. * @throws Exception which occurs during job execution.
  6. */
  7. public static void main(String[] args) throws Exception {
  8. // set up streaming execution environment
  9. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  10. env.setParallelism(2);
  11. // start the data generator
  12. DataStream<TaxiFare> fares = env.addSource(new TaxiFareGenerator());
  13. // compute tips per hour for each driver
  14. // 分片-窗口-处理窗口内数据
  15. // 分片间并行处理
  16. DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares.keyBy((TaxiFare fare) -> fare.driverId)
  17. // 滚动窗口,如每小时,整点,如1:00:00 - 2:00:00
  18. .window(TumblingEventTimeWindows.of(Time.hours(1))).process(new AddTips());
  19. // 对窗口范围内包含的所有(分片)生产的窗口进行聚合,无法并行
  20. DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips
  21. .windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);
  22. // You should explore how this alternative behaves. In what ways is the same as,
  23. // and different from, the solution above (using a windowAll)?
  24. // DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips
  25. // .keyBy(t -> t.f0)
  26. // .maxBy(2);
  27. hourlyMax.print();
  28. // execute the transformation pipeline
  29. env.execute("Hourly Tips (java)");
  30. }
  31. /*
  32. * Wraps the pre-aggregated result into a tuple along with the window's timestamp and key.
  33. */
  34. public static class AddTips extends ProcessWindowFunction<TaxiFare, Tuple3<Long, Long, Float>, Long, TimeWindow> {
  35. @Override
  36. public void process(Long key, Context context, Iterable<TaxiFare> fares,
  37. Collector<Tuple3<Long, Long, Float>> out) {
  38. float sumOfTips = 0F;
  39. for (TaxiFare f : fares) {
  40. sumOfTips += f.tip;
  41. }
  42. /*System.out.println(
  43. new Date(context.window().getStart()) + Tuple3.of(context.window().getEnd(), key, sumOfTips)
  44. .toString());*/
  45. out.collect(Tuple3.of(context.window().getEnd(), key, sumOfTips));
  46. }
  47. }
  48. }
  1. // 行程与车费
  2. public class RidesAndFaresExample {
  3. /**
  4. * Main method.
  5. * @throws Exception which occurs during job execution.
  6. */
  7. public static void main(String[] args) throws Exception {
  8. // Set up streaming execution environment, including Web UI and REST endpoint.
  9. // Checkpointing isn't needed for the RidesAndFares exercise; this setup is for
  10. // using the State Processor API.
  11. Configuration conf = new Configuration();
  12. conf.setString("state.backend", "filesystem");
  13. conf.setString("state.savepoints.dir", "file:///tmp/savepoints");
  14. conf.setString("state.checkpoints.dir", "file:///tmp/checkpoints");
  15. StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
  16. env.setParallelism(2);
  17. env.enableCheckpointing(10000L);
  18. CheckpointConfig config = env.getCheckpointConfig();
  19. // 默认为精确一次
  20. // config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  21. config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
  22. DataStream<TaxiRide> rides = env.addSource(new TaxiRideGenerator()).filter((TaxiRide ride) -> ride.isStart)
  23. .keyBy((TaxiRide ride) -> ride.rideId);
  24. DataStream<TaxiFare> fares = env.addSource(new TaxiFareGenerator()).keyBy((TaxiFare fare) -> fare.rideId);
  25. // Set a UID on the stateful flatmap operator so we can read its state using the State Processor API.
  26. DataStream<Tuple2<TaxiRide, TaxiFare>> enrichedRides = rides.connect(fares).flatMap(new EnrichmentFunction())
  27. .uid("enrichment");
  28. enrichedRides.print();
  29. env.execute("Join Rides with Fares (java RichCoFlatMap)");
  30. }
  31. public static class EnrichmentFunction
  32. extends RichCoFlatMapFunction<TaxiRide, TaxiFare, Tuple2<TaxiRide, TaxiFare>> {
  33. // keyed, managed state
  34. private ValueState<TaxiRide> rideState;
  35. private ValueState<TaxiFare> fareState;
  36. @Override
  37. public void open(Configuration config) {
  38. rideState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved ride", TaxiRide.class));
  39. fareState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved fare", TaxiFare.class));
  40. }
  41. @Override
  42. public void flatMap1(TaxiRide ride, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
  43. //System.out.println("ride " + ride);
  44. TaxiFare fare = fareState.value();
  45. if (fare != null) {
  46. fareState.clear();
  47. out.collect(Tuple2.of(ride, fare));
  48. }
  49. else {
  50. rideState.update(ride);
  51. }
  52. }
  53. @Override
  54. public void flatMap2(TaxiFare fare, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
  55. //System.out.println("fare " + fare);
  56. TaxiRide ride = rideState.value();
  57. if (ride != null) {
  58. rideState.clear();
  59. out.collect(Tuple2.of(ride, fare));
  60. }
  61. else {
  62. fareState.update(fare);
  63. }
  64. }
  65. }
  66. }

可前往Flink官方文档,获取更多。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/548836
推荐阅读
相关标签
  

闽ICP备14008679号