赞
踩
1)at-most-once:最多一次,实际上就是对 不处理错误的委婉说法
2)at-least-once:最少一次,这是最简单的处理错误的方法,当出现错误时,重发。最终结果可能会大于准确值,但绝不会小于准确值
3)exactly-once:精确一次,这指的是系统发生错误时,也会得到准确的结果
当spark-streaming实现exactly-once时,付出了性能代价。而flink在框架内部即保证了exactly-once,且不会牺牲很大的性能。
之前我们已经了解到,flink内部通过CheckPoint barrier (分割线) 实现的检查点算法保证了exactly-once语义,但这只是flink内部的一致性,如果要实现端到端的exactly-once,需要整个流程中的组件全部实现,包括了数据源和持久化存储。而整个流处理的过程中,一致性的级别取决于所有组件中一致性最弱的组件。
具体可分为以下几点:
1)flink内部---依赖CheckPoint完成
2)source端---使用可以记录数据位置并重设读取位置的组件(如kafka)
3)sink端---保证发生错误时不会重复发送
而保证sink端的一致性,又分为两种方式
---幂等写入
幂等操作是指,同一个操作,可以执行很多次,但是不会对结果造成影响,与执行一次的结果保持一致
---事务写入
事务写入的方式是,在CheckPoint开始构建一个事务,当CheckPoint彻底完成时,提交事务
事务写入又可以分为两种---WAL预写日志和2pc两阶段提交。DataStream API 提供了GenericWriteAheadSink模板类和TwoPhaseCommitSinkFunction 接口,可以方便地实现这两种方式的事务性写入。
1.flink-kafka:
- /**
- * @program: flink
- * @description: ${description}
- * @author: Mr.G
- * @create: 2021-09-23 17:30
- **/
- package com.ct.day06
-
- import org.apache.flink.api.common.serialization.SimpleStringSchema
- import org.apache.flink.streaming.api.scala._
- import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011
-
- /**
- * @ClassName: FlinkWriteKafka
- * @Description: ${description}
- * @Author Mr.G
- * @Date 2021/9/23
- * @Version 1.0
- *
- */
- object FlinkWriteKafka {
-
- def main(args: Array[String]): Unit = {
-
- val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
-
- env.setParallelism(1)
-
- val stream = env.fromElements(
- "hello",
- "kafka"
- )
-
- stream.addSink(
- new FlinkKafkaProducer011[String](
- "hadoop01:9092",
- "flink_test",
- new SimpleStringSchema() //以字符串形式往 kafka写数据
- )
- )
-
- env.execute()
- }
-
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
2.kafka-flink
- /**
- * @program: flink
- * @description: ${description}
- * @author: Mr.G
- * @create: 2021-09-23 17:34
- **/
- package com.ct.day06
-
- import java.util.Properties
-
- import org.apache.flink.api.common.serialization.SimpleStringSchema
- import org.apache.flink.streaming.api.scala._
- import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
-
- /**
- * @ClassName: FlinkReadKafka
- * @Description: ${description}
- * @Author Mr.G
- * @Date 2021/9/23
- * @Version 1.0
- *
- */
- object FlinkReadKafka {
-
- def main(args: Array[String]): Unit = {
-
- val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
- env.setParallelism(1)
-
- val props = new Properties()
-
- props.put("bootstrap.servers","hadoop01:9092")
- props.put("group.id","consumer_group")
- props.put("key.deserializer","org.apache.kafka.common.serialization.String")
- props.put("value.deserializer","org.apache.kafka.common.serialization.String")
- props.put("auto.offset.reset","latest")
-
- val stream = env
- .addSource(new FlinkKafkaConsumer011[String](
- "flink_test",
- new SimpleStringSchema(),
- props
- ))
- stream.print()
-
- env.execute()
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
1、第一条数据来的时候,开启一个kafka事务,正常写入kafka的分区日志,但是并未提交,也就是预提交
2、jobManager触发CheckPoint操作,CheckPoint barrier 随着数据流向下流动,当到达一个算子时,保存其状态并报告给 jobManager
3、sink连接器收到CheckPoint barrier ,报告给jobManager,保存其状态。并开启下一阶段的kafka事务,等待提交
4、jobManager收到sink端的报告,标记CheckPoint彻底完成
5、sink端收到jobManager的反馈,提交事务
6、外部kafka事务关闭,提交的数据可以正常消费了
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。