当前位置:   article > 正文

Flink检查点配置、一致性_flink kafka source 配置 checkpoint

flink kafka source 配置 checkpoint

目录

检查点配置基本概念

一致性详解 

flink整合kafka 开启检查点 代码:


上一篇我详细介绍了检查点的一些概念:Flink检查点详解_后季暖的博客-CSDN博客

 这一篇主要来讲如何配置检查点,和保障数据一致性的几种模式

检查点配置基本概念

 

一致性详解 

 

 说白了就是输入端数据可重用 输出端幂等写入或事务写入(最好是能两阶段提交)

 Flink和Kafka连接的精确一次

你的写入事务什么时候关闭?

不是在下一个检查点的数据流来的时候 而是当你事务都写完毕以后 因为是可以开启多个事务写入的 互不干扰

flink整合kafka 开启检查点 代码:

  1. public class RestartStrategyDemo {
  2. public static void main(String[] args) throws Exception {
  3. /**1.创建流运行环境**/
  4. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5. /**请注意此处:**/
  6. //1.只有开启了CheckPointing,才会有重启策略
  7. env.enableCheckpointing(5000);
  8. //2.默认的重启策略是:固定延迟无限重启
  9. //此处设置重启策略为:出现异常重启3次,隔5秒一次
  10. env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.seconds(2)));
  11. //系统异常退出或人为 Cancel 掉,不删除checkpoint数据
  12. env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  13. //设置Checkpoint模式(与Kafka整合,一定要设置Checkpoint模式为Exactly_Once)
  14. env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  15. /**2.Source:读取 Kafka 中的消息**/
  16. //Kafka props
  17. Properties properties = new Properties();
  18. //指定Kafka的Broker地址
  19. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.204.210:9092,192.168.204.211:9092,192.168.204.212:9092");
  20. //指定组ID
  21. properties.put(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");
  22. //如果没有记录偏移量,第一次从最开始消费
  23. properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  24. //Kafka的消费者,不自动提交偏移量
  25. properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
  26. FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer("testTopic", new SimpleStringSchema(), properties);
  27. //Checkpoint成功后,还要向Kafka特殊的topic中写偏移量(此处不建议改为false )
  28. //设置为false后,则不会向特殊topic中写偏移量。
  29. //kafkaSource.setCommitOffsetsOnCheckpoints(false);
  30. //通过addSource()方式,创建 Kafka DataStream
  31. DataStreamSource<String> kafkaDataStream = env.addSource(kafkaSource);
  32. /**3.Transformation过程**/
  33. SingleOutputStreamOperator<Tuple2<String, Integer>> streamOperator = kafkaDataStream.map(str -> Tuple2.of(str, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));
  34. /**此部分读取Socket数据,只是用来人为出现异常,触发重启策略。验证重启后是否会再次去读之前已读过的数据(Exactly-Once)*/
  35. /*************** start **************/
  36. DataStreamSource<String> socketTextStream = env.socketTextStream("localhost", 8888);
  37. SingleOutputStreamOperator<String> streamOperator1 = socketTextStream.map(new MapFunction<String, String>() {
  38. @Override
  39. public String map(String word) throws Exception {
  40. if ("error".equals(word)) {
  41. throw new RuntimeException("Throw Exception");
  42. }
  43. return word;
  44. }
  45. });
  46. /************* end **************/
  47. //对元组 Tuple2 分组求和
  48. SingleOutputStreamOperator<Tuple2<String, Integer>> sum = streamOperator.keyBy(0).sum(1);
  49. /**4.Sink过程**/
  50. sum.print();
  51. /**5.任务执行**/
  52. env.execute("RestartStrategyDemo");
  53. }
  54. }

总结:

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/980534
推荐阅读
相关标签
  

闽ICP备14008679号