当前位置:   article > 正文

Flink 实现端到端的exactly-once_flink通过()实现端到端exactly-once

flink通过()实现端到端exactly-once

在流处理中,一致性可以分为三个级别:

1)at-most-once:最多一次,实际上就是对 不处理错误的委婉说法

2)at-least-once:最少一次,这是最简单的处理错误的方法,当出现错误时,重发。最终结果可能会大于准确值,但绝不会小于准确值

3)exactly-once:精确一次,这指的是系统发生错误时,也会得到准确的结果

当spark-streaming实现exactly-once时,付出了性能代价。而flink在框架内部即保证了exactly-once,且不会牺牲很大的性能。

之前我们已经了解到,flink内部通过CheckPoint barrier (分割线) 实现的检查点算法保证了exactly-once语义,但这只是flink内部的一致性,如果要实现端到端的exactly-once,需要整个流程中的组件全部实现,包括了数据源和持久化存储。而整个流处理的过程中,一致性的级别取决于所有组件中一致性最弱的组件。

具体可分为以下几点:

1)flink内部---依赖CheckPoint完成

2)source端---使用可以记录数据位置并重设读取位置的组件(如kafka)

3)sink端---保证发生错误时不会重复发送

而保证sink端的一致性,又分为两种方式

---幂等写入

        幂等操作是指,同一个操作,可以执行很多次,但是不会对结果造成影响,与执行一次的结果保持一致

---事务写入

        事务写入的方式是,在CheckPoint开始构建一个事务,当CheckPoint彻底完成时,提交事务

        事务写入又可以分为两种---WAL预写日志和2pc两阶段提交。DataStream API 提供了GenericWriteAheadSink模板类和TwoPhaseCommitSinkFunction 接口,可以方便地实现这两种方式的事务性写入。

以kafka-flink-kafka为例,完成flink端到端的exactly-once

1.flink-kafka:

  1. /**
  2. * @program: flink
  3. * @description: ${description}
  4. * @author: Mr.G
  5. * @create: 2021-09-23 17:30
  6. **/
  7. package com.ct.day06
  8. import org.apache.flink.api.common.serialization.SimpleStringSchema
  9. import org.apache.flink.streaming.api.scala._
  10. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011
  11. /**
  12. * @ClassName: FlinkWriteKafka
  13. * @Description: ${description}
  14. * @Author Mr.G
  15. * @Date 2021/9/23
  16. * @Version 1.0
  17. *
  18. */
  19. object FlinkWriteKafka {
  20. def main(args: Array[String]): Unit = {
  21. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  22. env.setParallelism(1)
  23. val stream = env.fromElements(
  24. "hello",
  25. "kafka"
  26. )
  27. stream.addSink(
  28. new FlinkKafkaProducer011[String](
  29. "hadoop01:9092",
  30. "flink_test",
  31. new SimpleStringSchema() //以字符串形式往 kafka写数据
  32. )
  33. )
  34. env.execute()
  35. }
  36. }

2.kafka-flink

  1. /**
  2. * @program: flink
  3. * @description: ${description}
  4. * @author: Mr.G
  5. * @create: 2021-09-23 17:34
  6. **/
  7. package com.ct.day06
  8. import java.util.Properties
  9. import org.apache.flink.api.common.serialization.SimpleStringSchema
  10. import org.apache.flink.streaming.api.scala._
  11. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
  12. /**
  13. * @ClassName: FlinkReadKafka
  14. * @Description: ${description}
  15. * @Author Mr.G
  16. * @Date 2021/9/23
  17. * @Version 1.0
  18. *
  19. */
  20. object FlinkReadKafka {
  21. def main(args: Array[String]): Unit = {
  22. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  23. env.setParallelism(1)
  24. val props = new Properties()
  25. props.put("bootstrap.servers","hadoop01:9092")
  26. props.put("group.id","consumer_group")
  27. props.put("key.deserializer","org.apache.kafka.common.serialization.String")
  28. props.put("value.deserializer","org.apache.kafka.common.serialization.String")
  29. props.put("auto.offset.reset","latest")
  30. val stream = env
  31. .addSource(new FlinkKafkaConsumer011[String](
  32. "flink_test",
  33. new SimpleStringSchema(),
  34. props
  35. ))
  36. stream.print()
  37. env.execute()
  38. }
  39. }

具体的两阶段提交流程:

1、第一条数据来的时候,开启一个kafka事务,正常写入kafka的分区日志,但是并未提交,也就是预提交

2、jobManager触发CheckPoint操作,CheckPoint barrier 随着数据流向下流动,当到达一个算子时,保存其状态并报告给 jobManager

3、sink连接器收到CheckPoint barrier ,报告给jobManager,保存其状态。并开启下一阶段的kafka事务,等待提交

4、jobManager收到sink端的报告,标记CheckPoint彻底完成

5、sink端收到jobManager的反馈,提交事务

6、外部kafka事务关闭,提交的数据可以正常消费了

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/792584
推荐阅读
相关标签
  

闽ICP备14008679号