赞
踩
目录
上一篇我详细介绍了检查点的一些概念:Flink检查点详解_后季暖的博客-CSDN博客
这一篇主要来讲如何配置检查点,和保障数据一致性的几种模式
说白了就是输入端数据可重用 输出端幂等写入或事务写入(最好是能两阶段提交)
Flink和Kafka连接的精确一次
你的写入事务什么时候关闭?
不是在下一个检查点的数据流来的时候 而是当你事务都写完毕以后 因为是可以开启多个事务写入的 互不干扰
- public class RestartStrategyDemo {
-
- public static void main(String[] args) throws Exception {
-
- /**1.创建流运行环境**/
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- /**请注意此处:**/
- //1.只有开启了CheckPointing,才会有重启策略
- env.enableCheckpointing(5000);
- //2.默认的重启策略是:固定延迟无限重启
- //此处设置重启策略为:出现异常重启3次,隔5秒一次
- env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.seconds(2)));
- //系统异常退出或人为 Cancel 掉,不删除checkpoint数据
- env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
- //设置Checkpoint模式(与Kafka整合,一定要设置Checkpoint模式为Exactly_Once)
- env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
-
-
- /**2.Source:读取 Kafka 中的消息**/
- //Kafka props
- Properties properties = new Properties();
- //指定Kafka的Broker地址
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.204.210:9092,192.168.204.211:9092,192.168.204.212:9092");
- //指定组ID
- properties.put(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");
- //如果没有记录偏移量,第一次从最开始消费
- properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- //Kafka的消费者,不自动提交偏移量
- properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-
- FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer("testTopic", new SimpleStringSchema(), properties);
-
- //Checkpoint成功后,还要向Kafka特殊的topic中写偏移量(此处不建议改为false )
- //设置为false后,则不会向特殊topic中写偏移量。
- //kafkaSource.setCommitOffsetsOnCheckpoints(false);
- //通过addSource()方式,创建 Kafka DataStream
- DataStreamSource<String> kafkaDataStream = env.addSource(kafkaSource);
-
- /**3.Transformation过程**/
- SingleOutputStreamOperator<Tuple2<String, Integer>> streamOperator = kafkaDataStream.map(str -> Tuple2.of(str, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));
- /**此部分读取Socket数据,只是用来人为出现异常,触发重启策略。验证重启后是否会再次去读之前已读过的数据(Exactly-Once)*/
- /*************** start **************/
- DataStreamSource<String> socketTextStream = env.socketTextStream("localhost", 8888);
-
- SingleOutputStreamOperator<String> streamOperator1 = socketTextStream.map(new MapFunction<String, String>() {
- @Override
- public String map(String word) throws Exception {
- if ("error".equals(word)) {
- throw new RuntimeException("Throw Exception");
- }
- return word;
- }
- });
- /************* end **************/
-
- //对元组 Tuple2 分组求和
- SingleOutputStreamOperator<Tuple2<String, Integer>> sum = streamOperator.keyBy(0).sum(1);
- /**4.Sink过程**/
- sum.print();
-
- /**5.任务执行**/
- env.execute("RestartStrategyDemo");
- }
- }
总结:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。