当前位置:   article > 正文

FlinkSQL的Watermark机制和Checkpoint机制_flinksql watermark

flinksql watermark

Watermark机制

Watermark机制,就是水印机制,也叫做水位线机制。就是专门用来解决流式环境下数据迟到问题的。

MonotonousWatermark(单调递增水印)

  1. package day05;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
  6. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  7. import org.apache.flink.api.common.functions.MapFunction;
  8. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  9. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  11. import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
  12. import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
  13. import org.apache.flink.streaming.api.windowing.time.Time;
  14. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  15. import org.apache.flink.util.Collector;
  16. /**
  17. * @desc: 需求:从socket获取数据,转换成水位传感器类,基于事件时间,每5秒生成一个滚动窗口,来计算传感器水位信息
  18. * 定义类 WaterSensor
  19. * String id --id
  20. * Integer vc --value count
  21. * Long ts --timestamp
  22. * TODO 演示单调递增水印monotonousWatermark
  23. */
  24. public class Demo01_MonotonousWatermark {
  25. public static void main(String[] args) throws Exception {
  26. //1.构建流式执行环境
  27. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  28. env.setParallelism(1);
  29. //2.数据输入
  30. DataStreamSource<String> source = env.socketTextStream("node1", 9999);
  31. //3.数据处理
  32. // //3.1 把数据转换成WaterSensor对象
  33. SingleOutputStreamOperator<WaterSensor> mapStream = source.map(new MapFunction<String, WaterSensor>() {
  34. @Override
  35. public WaterSensor map(String value) throws Exception {
  36. //lines就是转换后的数组类型,数组的长度为3,分别表示:
  37. // String id
  38. // Integer vc
  39. // Long ts
  40. String[] lines = value.split(",");
  41. return new WaterSensor(lines[0], Integer.parseInt(lines[1]), Long.parseLong(lines[2]));
  42. }
  43. });
  44. // //3.2 分配watermark(演示常用的watermark水印)
  45. SingleOutputStreamOperator<WaterSensor> watermarks = mapStream
  46. //forMonotonousTimestamps:单调递增水印
  47. .assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forMonotonousTimestamps()
  48. .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
  49. @Override
  50. public long extractTimestamp(WaterSensor element, long recordTimestamp) {
  51. return element.getTs() * 1000L;
  52. }
  53. }));
  54. // 3.3 基于id分组
  55. SingleOutputStreamOperator<String> result = watermarks.keyBy(value -> value.getId())
  56. // 3.4 指定5秒的滚动窗口
  57. .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  58. // 3.5 进行数据处理(统计次数)process方法。
  59. /**
  60. * 参数1:输入的数据类型
  61. * 参数2:输出的数据类型
  62. * 参数3:分组的数据类型
  63. * 参数4:时间窗口
  64. */
  65. .process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
  66. /**
  67. * process方法介绍:
  68. * @param key 根据key来分组
  69. * @param context 窗口计算的上下文对象(可以从上下文对象获取窗口的一些额外信息)
  70. * @param elements 窗口内的数据
  71. * @param out 收集窗口的计算结果
  72. * @throws Exception 异常抛出
  73. */
  74. @Override
  75. public void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
  76. out.collect("分组的key为:" + key +
  77. "\n窗口内的数据为:" + elements +
  78. "\n窗口内的数据量为:" + elements.spliterator().estimateSize() +
  79. "\n窗口为:[" + context.window().getStart() + "," + context.window().getEnd() + ")\n");
  80. }
  81. });
  82. //4.数据输出
  83. result.print();
  84. //5.启动流式任务
  85. env.execute();
  86. }
  87. /**
  88. * 创建水位传感器类:WaterSensor
  89. * @Data:可以用来构建getter和setter方法
  90. * 构造器:Java中有无参和有参的构造器(构造方法)
  91. * @AllArgsConstructor:有参构造
  92. * @NoArgsConstructor:无参构造
  93. */
  94. @Data
  95. @AllArgsConstructor
  96. @NoArgsConstructor
  97. public static class WaterSensor {
  98. //用户id
  99. private String id;
  100. //水位信息
  101. private Integer vc;
  102. //时间戳
  103. private Long ts;
  104. }
  105. }

运行结果如下:

结论:

MonotonousWatermark(单调递增水印)会有数据丢失的情况。  

BoundedOutofOrder(固定延迟水印)

  1. package day05;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
  6. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  7. import org.apache.flink.api.common.typeinfo.Types;
  8. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  9. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  11. import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
  12. import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
  13. import org.apache.flink.streaming.api.windowing.time.Time;
  14. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  15. import org.apache.flink.util.Collector;
  16. import java.time.Duration;
  17. /**
  18. * @desc: 需求:从socket获取数据,转换成水位传感器类,基于事件时间,每5秒生成一个滚动窗口,来计算传感器水位信息
  19. * * 定义类 WaterSensor
  20. * * String id --id
  21. * * Integer vc --value count
  22. * * Long ts --timestamp
  23. * todo 演示固定延迟水印
  24. */
  25. public class Demo02_BoundedOutofOrderWatermark {
  26. public static void main(String[] args) throws Exception {
  27. //1.构建流式执行环境
  28. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  29. env.setParallelism(1);
  30. //2.数据输入
  31. DataStreamSource<String> source = env.socketTextStream("node1", 9999);
  32. //3.数据处理
  33. //3.1 把输入数据转换成WaterSensor对象
  34. SingleOutputStreamOperator<WaterSensor> mapStream = source.map(value -> {
  35. String[] lines = value.split(",");
  36. return new WaterSensor(lines[0], Integer.parseInt(lines[1]), Long.parseLong(lines[2]));
  37. //返回时需要使用自定义的类WaterSensor,写法如下:Types.GENERIC(WaterSensor.class)
  38. }).returns(Types.GENERIC(WaterSensor.class));
  39. /**
  40. * 3.2 给数据添加固定延迟水印
  41. * WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2))
  42. * 如上代码,2秒的固定延迟,允许数据乱序2秒。(延迟2秒到达)
  43. */
  44. SingleOutputStreamOperator<WaterSensor> watermarks = mapStream
  45. .assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
  46. .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
  47. @Override
  48. public long extractTimestamp(WaterSensor element, long recordTimestamp) {
  49. return element.getTs() * 1000L;
  50. }
  51. }));
  52. //3.3 把数据进行分组
  53. SingleOutputStreamOperator<String> result = watermarks.keyBy(value -> value.getId())
  54. //3.4 分配滚动事件时间窗口,并且制定窗口大小为5秒钟
  55. .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  56. /**
  57. * 3.5 对窗口内的数据进行处理
  58. * @param <IN> The type of the input value. 输入数据类型
  59. * @param <OUT> The type of the output value. 输出数据类型
  60. * @param <KEY> The type of the key. key的类型
  61. * @param <W> The type of {@code Window} that this window function can be applied on.窗口
  62. */
  63. .process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
  64. /**
  65. *
  66. * @param s 分组的key
  67. * @param context 上下文对象,可以从上下文对象中获取其他信息
  68. * @param elements 窗口内的元素(数据)
  69. * @param out out收集器,用于结果输出
  70. * @throws Exception 异常
  71. */
  72. @Override
  73. public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
  74. out.collect("分组的key为:" + s +
  75. "\n窗口内的数据:" + elements +
  76. "\n窗口内的数据量为:" + elements.spliterator().estimateSize() +
  77. "\n窗口起始时间为:[" + context.window().getStart() + "," + context.window().getEnd() + ")\n");
  78. }
  79. });
  80. //4.数据输出
  81. result.print();
  82. //5.启动流式任务
  83. env.execute();
  84. }
  85. /**
  86. * 创建水位传感器类:WaterSensor
  87. * @Data:可以用来构建getter和setter方法
  88. * 构造器:Java中有无参和有参的构造器(构造方法)
  89. * @AllArgsConstructor:有参构造
  90. * @NoArgsConstructor:无参构造
  91. */
  92. @Data
  93. @AllArgsConstructor
  94. @NoArgsConstructor
  95. public static class WaterSensor {
  96. //用户id
  97. private String id;
  98. //水位信息
  99. private Integer vc;
  100. //时间戳
  101. private Long ts;
  102. }
  103. }

运行结果如下:

结论:

在固定延迟水印下,在一定范围内的数据迟到的情况下,可以正常统计。

AllowedLateness(在固定延迟水印下允许迟到)

  1. package day05;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
  6. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  7. import org.apache.flink.api.common.typeinfo.Types;
  8. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  9. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  11. import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
  12. import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
  13. import org.apache.flink.streaming.api.windowing.time.Time;
  14. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  15. import org.apache.flink.util.Collector;
  16. import java.time.Duration;
  17. /**
  18. * @desc: 需求:从socket获取数据,转换成水位传感器类,基于事件时间,每5秒生成一个滚动窗口,来计算传感器水位信息
  19. * * 定义类 WaterSensor
  20. * * String id --id
  21. * * Integer vc --value count
  22. * * Long ts --timestamp
  23. * todo 演示AllowedLateness
  24. */
  25. public class Demo03_AllowedLateness {
  26. public static void main(String[] args) throws Exception {
  27. //1.构建流式执行环境
  28. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  29. env.setParallelism(1);
  30. //2.数据输入
  31. DataStreamSource<String> source = env.socketTextStream("node1", 9999);
  32. //3.数据处理
  33. //3.1 把输入数据转换成WaterSensor对象
  34. SingleOutputStreamOperator<WaterSensor> mapStream = source.map(value -> {
  35. String[] lines = value.split(",");
  36. return new WaterSensor(lines[0], Integer.parseInt(lines[1]), Long.parseLong(lines[2]));
  37. //返回时需要使用自定义的类WaterSensor,写法如下:Types.GENERIC(WaterSensor.class)
  38. }).returns(Types.GENERIC(WaterSensor.class));
  39. /**
  40. * 3.2 给数据添加固定延迟水印
  41. * WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2))
  42. * 如上代码,2秒的固定延迟,允许数据乱序2秒。(延迟2秒到达)
  43. */
  44. SingleOutputStreamOperator<WaterSensor> watermarks = mapStream
  45. .assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
  46. .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
  47. @Override
  48. public long extractTimestamp(WaterSensor element, long recordTimestamp) {
  49. return element.getTs() * 1000L;
  50. }
  51. }));
  52. //3.3 把数据进行分组
  53. SingleOutputStreamOperator<String> result = watermarks.keyBy(value -> value.getId())
  54. //3.4 分配滚动事件时间窗口,并且制定窗口大小为5秒钟
  55. .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  56. //allowedLateness,在固定延迟水印下,再允许你延迟的时间程度
  57. .allowedLateness(Time.seconds(1))
  58. /**
  59. * 3.5 对窗口内的数据进行处理
  60. * @param <IN> The type of the input value. 输入数据类型
  61. * @param <OUT> The type of the output value. 输出数据类型
  62. * @param <KEY> The type of the key. key的类型
  63. * @param <W> The type of {@code Window} that this window function can be applied on.窗口
  64. */
  65. .process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
  66. /**
  67. *
  68. * @param s 分组的key
  69. * @param context 上下文对象,可以从上下文对象中获取其他信息
  70. * @param elements 窗口内的元素(数据)
  71. * @param out out收集器,用于结果输出
  72. * @throws Exception 异常
  73. */
  74. @Override
  75. public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
  76. out.collect("分组的key为:" + s +
  77. "\n窗口内的数据:" + elements +
  78. "\n窗口内的数据量为:" + elements.spliterator().estimateSize() +
  79. "\n窗口起始时间为:[" + context.window().getStart() + "," + context.window().getEnd() + ")\n");
  80. }
  81. });
  82. //4.数据输出
  83. result.print();
  84. //5.启动流式任务
  85. env.execute();
  86. }
  87. /**
  88. * 创建水位传感器类:WaterSensor
  89. * @Data:可以用来构建getter和setter方法
  90. * 构造器:Java中有无参和有参的构造器(构造方法)
  91. * @AllArgsConstructor:有参构造
  92. * @NoArgsConstructor:无参构造
  93. */
  94. @Data
  95. @AllArgsConstructor
  96. @NoArgsConstructor
  97. public static class WaterSensor {
  98. //用户id
  99. private String id;
  100. //水位信息
  101. private Integer vc;
  102. //时间戳
  103. private Long ts;
  104. }
  105. }

运行结果如下:

结论:

AllowedLateness,允许在固定延迟水印下,再次迟到的数据被捕获。

虽然Watermark会触发窗口计算,如果使用AllowedLateness,窗口就不会立刻销毁,

因此,数据的延迟时间在AllowedLateness的时间范围内,数据能够被正常处理。

窗口会在AllowedLateness设置的时间之后再销毁。  

SideOutput(侧道输出)

  1. package day05;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
  6. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  7. import org.apache.flink.api.common.typeinfo.Types;
  8. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  9. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  11. import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
  12. import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
  13. import org.apache.flink.streaming.api.windowing.time.Time;
  14. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  15. import org.apache.flink.util.Collector;
  16. import org.apache.flink.util.OutputTag;
  17. import java.time.Duration;
  18. /**
  19. * @desc: 需求:从socket获取数据,转换成水位传感器类,基于事件时间,每5秒生成一个滚动窗口,来计算传感器水位信息
  20. * * 定义类 WaterSensor
  21. * * String id --id
  22. * * Integer vc --value count
  23. * * Long ts --timestamp
  24. * todo 演示SideoutputTag(侧道输出)
  25. */
  26. public class Demo04_SideOutputTag {
  27. public static void main(String[] args) throws Exception {
  28. //1.构建流式执行环境
  29. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  30. env.setParallelism(1);
  31. //2.数据输入
  32. DataStreamSource<String> source = env.socketTextStream("node1", 9999);
  33. //定义一个OutputTag对象,用于SideOutputLateData
  34. OutputTag<WaterSensor> lateData = new OutputTag<>("lateData", Types.GENERIC(WaterSensor.class));
  35. //3.数据处理
  36. //3.1 把输入数据转换成WaterSensor对象
  37. SingleOutputStreamOperator<WaterSensor> mapStream = source.map(value -> {
  38. String[] lines = value.split(",");
  39. return new WaterSensor(lines[0], Integer.parseInt(lines[1]), Long.parseLong(lines[2]));
  40. //返回时需要使用自定义的类WaterSensor,写法如下:Types.GENERIC(WaterSensor.class)
  41. }).returns(Types.GENERIC(WaterSensor.class));
  42. /**
  43. * 3.2 给数据添加固定延迟水印
  44. * WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2))
  45. * 如上代码,2秒的固定延迟,允许数据乱序2秒。(延迟2秒到达)
  46. */
  47. SingleOutputStreamOperator<WaterSensor> watermarks = mapStream
  48. .assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
  49. .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
  50. @Override
  51. public long extractTimestamp(WaterSensor element, long recordTimestamp) {
  52. return element.getTs() * 1000L;
  53. }
  54. }));
  55. //3.3 把数据进行分组
  56. SingleOutputStreamOperator<String> result = watermarks.keyBy(value -> value.getId())
  57. //3.4 分配滚动事件时间窗口,并且制定窗口大小为5秒钟
  58. .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  59. //allowedLateness,在固定延迟水印下,再允许你延迟的时间程度
  60. .allowedLateness(Time.seconds(1))
  61. //侧道输出:无论延迟多久的数据,都能够通过侧道输出来捕获
  62. .sideOutputLateData(lateData)
  63. /**
  64. * 3.5 对窗口内的数据进行处理
  65. * @param <IN> The type of the input value. 输入数据类型
  66. * @param <OUT> The type of the output value. 输出数据类型
  67. * @param <KEY> The type of the key. key的类型
  68. * @param <W> The type of {@code Window} that this window function can be applied on.窗口
  69. */
  70. .process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
  71. /**
  72. *
  73. * @param s 分组的key
  74. * @param context 上下文对象,可以从上下文对象中获取其他信息
  75. * @param elements 窗口内的元素(数据)
  76. * @param out out收集器,用于结果输出
  77. * @throws Exception 异常
  78. */
  79. @Override
  80. public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
  81. out.collect("分组的key为:" + s +
  82. "\n窗口内的数据:" + elements +
  83. "\n窗口内的数据量为:" + elements.spliterator().estimateSize() +
  84. "\n窗口起始时间为:[" + context.window().getStart() + "," + context.window().getEnd() + ")\n");
  85. }
  86. });
  87. //4.数据输出
  88. result.print();
  89. //获取迟到的数据,并且打印输出
  90. result.getSideOutput(lateData).printToErr("超过AllowedLateness的数据:");
  91. //5.启动流式任务
  92. env.execute();
  93. }
  94. /**
  95. * 创建水位传感器类:WaterSensor
  96. * @Data:可以用来构建getter和setter方法
  97. * 构造器:Java中有无参和有参的构造器(构造方法)
  98. * @AllArgsConstructor:有参构造
  99. * @NoArgsConstructor:无参构造
  100. */
  101. @Data
  102. @AllArgsConstructor
  103. @NoArgsConstructor
  104. public static class WaterSensor {
  105. //用户id
  106. private String id;
  107. //水位信息
  108. private Integer vc;
  109. //时间戳
  110. private Long ts;
  111. }
  112. }

运行结果如下:

结论:

SideOutput侧道输出,可以允许数据在既超过了Watermark的时间,又超过了AllowedLateness的时间范围后,仍然被正常捕获。

也就是说,数据无论迟到多久,都不会丢失。

Checkpoint机制

Checkpoint机制,又叫容错机制,可以保证流式任务中,不会因为异常时等原因,造成任务异常退出。可以保证任务正常运行。  

机制运行流程

解释:

(1)主节点上的检查点协调器(CheckpointCoordinator)会周期性地发送一个个地Barrier(栅栏),Barrier会混在数据里,随着数据流,流向source算子

(2)source算子在摄入数据的时候,如果碰到Barrier栅栏,不会去处理,Barrier就会让算子去汇报当前的状态

(3)处理完之后,Barrier就会随着数据流,流向下一个算子

(4)下一个算子收到Barrier,同样会听下手里的工作,也会向检查点协调器汇报当前的状态,把状态往主节点传递一份(备份,防止算子出错,状态丢失) (5)上一步处理完之后,Barrier又会随着数据流向下一个算子,以此类推。 (6)等Barrier流经所有的算子之后,这一轮的快照就算制作完成。

状态后端

状态后端,StateBackend,就是Flink存储状态的介质。Flink提供了三种状态后端的存储方式:

  • MemoryStateBackend(内存)

内存,掉电易失。不安全。基本不用。

配置如下:

state.backend: hashmap
# 可选,当不指定 checkpoint 路径时,默认自动使用 JobManagerCheckpointStorage
state.checkpoint-storage: jobmanager

  • FsStateBackend(文件系统)

FsStateBackend,文件系统的状态后端,就是把状态保存在文件系统中,常用来保存状态的文件系统有HDFS。

配置如下:

state.backend: hashmap 
state.checkpoints.dir: file:///checkpoint-dir/ 

# 默认为FileSystemCheckpointStorage 
state.checkpoint-storage: filesystem

  • RocksDBStateBackend(RocksDB数据库)

RocksDBStateBackend,把状态保存在RocksDB数据库中。

RocksDB,是一个小型文件系统的数据库。

配置如下:

state.backend: rocksdb
state.checkpoints.dir: file:///checkpoint-dir/

# Optional, Flink will automatically default to FileSystemCheckpointStorage
# when a checkpoint directory is specified.
state.checkpoint-storage: filesystem

Flink的重启策略

Flink支持四种类型的重启策略:

  • none:没有重启。任务一旦遇到异常,就退出。

  • fixed-delay:固定延迟重启策略。也就是说,可以配置一个重启的次数。超过次数后,才会退出。

  • failure-rate:失败率重启策略。也就是说,任务的失败频率。超过该频率后才退出。在设定的频率之内,不会退出。

  • exponential-delay:指数延迟重启策略。也就是说,任务在失败后,下一次的延迟时间是随着指数增长的。

Checkpoint配置和重启策略的配置

  1. execution.checkpointing.interval: 5000
  2. #设置有且仅有一次模式 目前支持 EXACTLY_ONCE、AT_LEAST_ONCE        
  3. execution.checkpointing.mode: EXACTLY_ONCE
  4. state.backend: hashmap
  5. #设置checkpoint的存储方式
  6. state.checkpoint-storage: filesystem
  7. #设置checkpoint的存储位置
  8. state.checkpoints.dir: hdfs://node1:8020/checkpoints
  9. #设置savepoint的存储位置
  10. state.savepoints.dir: hdfs://node1:8020/checkpoints
  11. #设置checkpoint的超时时间 即一次checkpoint必须在该时间内完成 不然就丢弃
  12. execution.checkpointing.timeout: 600000
  13. #设置两次checkpoint之间的最小时间间隔
  14. execution.checkpointing.min-pause: 500
  15. #设置并发checkpoint的数目
  16. execution.checkpointing.max-concurrent-checkpoints: 1
  17. #开启checkpoints的外部持久化这里设置了清除job时保留checkpoint,默认值时保留一个 假如要保留3个
  18. state.checkpoints.num-retained: 3
  19. #默认情况下,checkpoint不是持久化的,只用于从故障中恢复作业。当程序被取消时,它们会被删除。但是你可以配置checkpoint被周期性持久化到外部,类似于savepoints。这些外部的checkpoints将它们的元数据输出到外#部持久化存储并且当作业失败时不会自动
  20. 清除。这样,如果你的工作失败了,你就会有一个checkpoint来恢复。
  21. #ExternalizedCheckpointCleanup模式配置当你取消作业时外部checkpoint会产生什么行为:
  22. #RETAIN_ON_CANCELLATION: 当作业被取消时,保留外部的checkpoint。注意,在此情况下,您必须手动清理checkpoint状态。
  23. #DELETE_ON_CANCELLATION: 当作业被取消时,删除外部化的checkpoint。只有当作业失败时,检查点状态才可用。
  24. execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
  25. # 重启策略选一个
  26. # 设置无重启策略
  27. restart-strategy: none
  28. # 设置固定延迟策略
  29. restart-strategy: fixed-delay
  30. # 尝试重启次数
  31. restart-strategy.fixed-delay.attempts: 3
  32. # 两次连续重启的间隔时间
  33. restart-strategy.fixed-delay.delay: 3 s
  34. # 设置失败率重启
  35. restart-strategy: failure-rate
  36. # 两次连续重启的间隔时间
  37. restart-strategy.failure-rate.delay: 3 s
  38. # 计算失败率的统计时间跨度
  39. restart-strategy.failure-rate.failure-rate-interval: 1 min
  40. # 计算失败率的统计时间内的最大失败次数
  41. restart-strategy.failure-rate.max-failures-per-interval: 3
  42. # 设置指数延迟重启
  43. restart-strategy: exponential-delay
  44. # 初次失败后重启时间间隔(初始值)
  45. restart-strategy.exponential-delay.initial-backoff: 1 s
  46. # 以后每次失败,重启时间间隔为上一次重启时间间隔乘以这个值
  47. restart-strategy.exponential-delay.backoff-multiplier: 2
  48. # 每次重启间隔时间的最大抖动值(加或减去该配置项范围内的一个随机数),防止大量作业在同一时刻重启
  49. restart-strategy.exponential-delay.jitter-factor: 0.1
  50. # 最大重启时间间隔,超过这个最大值后,重启时间间隔不再增大
  51. restart-strategy.exponential-delay.max-backoff: 1 min
  52. # 多长时间作业运行无失败后,重启间隔时间会重置为初始值(第一个配置项的值)
  53. restart-strategy.exponential-delay.reset-backoff-threshold: 1 h

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号