赞
踩
所有类型的窗口。例如,计算过去一小时的平均温度,就是有状态的计算。
所有用于复杂事件处理的状态机。例如,若在一分钟内收到两个相差20度以上的温度读数,则发出警告,这是有状态的计算。
流与流之间的所有关联操作,以及流与静态表或动态表之间的关联操作,都是有状态的计算。
下图展示了无状态流处理和有状态流处理的主要区别。
无状态流处理分别接收每条数据记录(图中的黑条),然后根据最新输入的数据生成输出数据(白条)。
有状态流处理会维护状态(根据每条输入记录进行更新),并基于最新输入的记录和当前的状态值生成输出记录(灰条)。
尽管无状态的计算很重要,但是流处理对有状态的计算更感兴趣。事实上,正确地实现有状态的计算比实现无状态的计算难得多。旧的流处理系统并不支持有状
态的计算,而新一代的流处理系统则将状态及其正确性视为重中之重。
Flink 内置的很多算子,数据源 source,数据存储 sink 都是有状态的,流中的数据都是 buffer records,会保存一定的元素或者元数据。例如: ProcessWindowFunction
会缓存输入流的数据,ProcessFunction
会保存设置的定时器信息等等。
在 Flink 中,状态始终与特定算子相关联。总的来说,有两种类型的状态:
算子状态(operator state)
键控状态(keyed state)
算子状态的作用范围限定为算子任务。这意味着由同一并行任务所处理的所有数据都可以访问到相同的状态,状态对于同一任务而言是共享的。算子状态不能由相同或不同算子的另一个任务访问。
列表状态(List state)
将状态表示为一组数据的列表。
联合列表状态(Union list state)
也将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复。
广播状态(Broadcast state)
如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态。
键控状态是根据输入数据流中定义的键(key)来维护和访问的。Flink 为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个 key 对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的 key。因此,具有相同 key 的所有数据都会访问相同的状态。KeyedState 很类似于一个分布式的 key-value map 数据结构,只能用于KeyedStream(keyBy算子处理之后)。
ValueState[T]
保存单个的值,值的类型为T。
ValueState.value()
ValueState.update(value: T)
ListState[T]
保存一个列表,列表里的元素的数据类型为T。基本操作如下:
ListState.add(value: T)
ListState.addAll(values: java.util.List[T])
ListState.get()
返回 Iterable[T]
ListState.update(values: java.util.List[T])
MapState[K, V]
保存 Key-Value 对。
MapState.get(key: K)
MapState.put(key: K, value: V)
MapState.contains(key: K)
MapState.remove(key: K)
ReducingState[T]
AggregatingState[I, O]
State.clear()
是清空操作。
main 方法:
val alerts: DataStream[(String, Double, Double)] = strem.flatMap(new TemperatureAlertFunctionThreshold(1.7))
alerts.print()
flatMap方法:
package com.flink.scala import org.apache.flink.api.common.functions.RichFlatMapFunction import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.util.Collector /** * 温度大于设定的值报警 * @param threshold */ class TemperatureAlertFunctionThreshold(val threshold: Double) extends RichFlatMapFunction[SensorReading,(String,Double,Double)]{ private var lastTempState:ValueState[Double] = _ override def open(parameters: Configuration): Unit = { val lastTempDescriptor = new ValueStateDescriptor[Double]("lastTemp",classOf[Double]) lastTempState = getRuntimeContext.getState[Double](lastTempDescriptor) } override def flatMap(in: SensorReading, collector: Collector[(String, Double, Double)]): Unit = { val lastTemp = lastTempState.value(); val tempDiff = (in.temperature - lastTemp).abs // 温度大于阈值报警 if (tempDiff > threshold) { collector.collect((in.id,in.temperature,tempDiff)) } lastTempState.update(in.temperature) } }
通过 RuntimeContext 注册 StateDescriptor
。StateDescriptor 以状态 state 的名字和存储的数据类型为参数。
在 open() 方法中创建 state 变量。注意复习之前的 RichFunction 相关知识。
接下来我们使用了 FlatMap with keyed ValueState
的快捷方式 flatMapWithState
实现以上需求。
val tempChangeAlertFlatMapWiteSate: DataStream[(String, Double,Double)] = keyedStream.flatMapWithState[(String,Double,Double),Double]{
// 如果没有状态的话,也就是没有数据来过,那么就将当前数据的温度存入状态
case ( input: SensorReading, None ) => ( List.empty, Some(input.temperature) )
// 如果有状态,就应该与上一次的温度值比较差值,如果大于就输出报警
case ( input: SensorReading, lastTempState: Some[Double] ) =>
val diff = (input.temperature - lastTempState.get).abs
if (diff > 10.0 ) {
( List((input.id, lastTempState.get, input.temperature)), Some(input.temperature) )
} else {
( List.empty, Some(input.temperature) )
}
}
在流处理中,一致性可以分为3个级别:
曾经,at-least-once 非常流行。第一代流处理器 (如Storm和Samza) 刚问世时只保证 at-least-once,原因有二:
最先保证 exactly-once 的系统 (Storm Trident和Spark Streaming) 在性能和表现力这两个方面付出了很大的代价。为了保证 exactly-once,这些系统无法单独地对每条记录运用应用逻辑,而是同时处理多条(一批)记录,保证对每一批的处理要么全部成功,要么全部失败。这就导致在得到结果前,必须等待一批记录处理结束。因此,用户经常不得不使用两个流处理框架(一个用来保证 exactly-once,另一个用来对每个元素做低延迟处理),结果使基础设施更加复杂。曾经,用户不得不在保证 exactly-once 与获得低延迟和效率之间权衡利弊。Flink 避免了这种权衡。
Flink的一个重大价值在于,它 既保证了 exactly-once,也具有低延迟和高吞吐的处理能力。
从根本上说,Flink 通过使自身满足所有需求来避免权衡,它是业界的一次意义重大的技术飞跃。尽管这在外行看来很神奇,但是一旦了解,就会恍然大悟。
目前我们看到的一致性保证都是由流处理器实现的,也就是说都是在 Flink 流处理器内部保证的;而在真实应用中,流处理应用除了流处理器以外还包含了数据源(例如Kafka)和输出到持久化系统。
端到端的一致性保证,意味着结果的正确性贯穿了整个流处理应用的始终。每一个组件都保证了它自己的一致性,整个端到端的一致性级别取决于所有组件中一致性最弱的组件。具体可以划分如下:
而对于 sink 端,又有两种具体的实现方式:幂等(Idempotent)写入和事务性(Transactional)写入。
幂等写入
所谓幂等操作,是说一个操作,可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了。
事务写入
需要构建事务来写入外部系统,构建的事务对应着 checkpoint,等到 checkpoint 真正完成的时候,才把所有对应的结果写入 sink 系统中。
对于事务性写入,具体又有两种实现方式:预写日志(WAL)和两阶段提交(2PC)。DataStream API 提供了 GenericWriteAheadSink 模板类和
TwoPhaseCommitSinkFunction 接口,可以方便地实现这两种方式的事务性写入。
不同 Source 和 Sink 的一致性保证可以用下表说明:
Flink 具体如何保证 exactly-once 呢?它使用一种被称为"检查点"(checkpoint)的特性,在出现故障时将系统重置回正确状态。下面通过简单的类比来解释检查点的作用。
假设你和两位朋友正在数项链上有多少颗珠子,如下图所示。你捏住珠子,边数边拨,每拨过一颗珠子就给总数加一。你的朋友也这样数他们手中的珠子。当你分神忘记数到哪里时,怎么办呢?如果项链上有很多珠子,你显然不想从头再数一遍,尤其是当三人的速度不一样却又试图合作的时候,更是如此(比如想记录前一分钟三人一共数了多少颗珠子,回想一下一分钟滚动窗口)。
于是,你想了一个更好的办法:在项链上每隔一段就松松地系上一根有色皮筋,将珠子分隔开,当珠子被拨动的时候,皮筋也可以被拨动。然后,你安排一个助手,让他在你和朋友拨到皮筋时记录总数。用这种方法,当有人数错时,就不必从头开始数。相反,你向其他人发出错误警示,然后你们都从上一根皮筋处开始重数,助手则会告诉每个人重数时的起始数值,例如在粉色皮筋处的数值是多少。
Flink 检查点的作用就类似于皮筋标记。数珠子这个类比的关键点是:对于指定的皮筋而言,珠子的相对位置是确定的;这让皮筋成为重新计数的参考点。总状态
(珠子的总数)在每颗珠子被拨动之后更新一次,助手则会保存与每根皮筋对应的检查点状态,如当遇到粉色皮筋时一共数了多少珠子,当遇到橙色皮筋时又是多少。当问题出现时,这种方法使得重新计数变得简单。
Flink 检查点的核心作用是确保状态正确,即使遇到程序中断,也要正确。记住这一基本点之后,我们用一个例子来看检查点是如何运行的。Flink 为用户提供了用来定义状态的工具。例如,以下这个Scala 程序按照输入记录的第一个字段(一个字符串)进行分组并维护第二个字段的计数状态。
valstream:DataStream[(String,Int)]=...
valcounts:DataStream[(String,Int)]=stream.keyBy(record=>record._1).mapWithState(
(in:(String,Int),state:Option[Int])=>
state match{
caseSome(c)=>((in._1,c+in._2),Some(c+in._2))
caseNone=>((in._1,in._2),Some(in._2))
})
该程序有两个算子: keyBy 算子用来将记录按照第一个元素(一个字符串)进行分组,根据该 key 将数据进行重新分区,然后将记录再发送给下一个算子: 有状态的 map 算子(map With State)。map 算子在接收到每个元素后,将输入记录的第二个字段的数据加到现有总数中,再将更新过的元素发射出去。下图表示程序的初始状态:输入流中的 6 条记录被检查点分割线 (checkpoint barrier) 隔开,所有的map算子状态均为 0(计数还未开始)。所有 key 为 a 的记录将被顶层的 map 算子处理,所有 key 为 b 的记录将被中间层的 map 算子处理,所有 key 为 c 的记录则将被底层的 map 算子处理。
当该程序处理输入流中的6条记录时,涉及的操作遍布 3 个并行实例(节点、CPU内核等)。那么,检查点该如何保证 exactly-once 呢?
检查点分割线和普通数据记录类似。它们由算子处理,但并不参与计算,而是会触发与检查点相关的行为。当读取输入流的数据源(在本例中与 keyBy 算子内联)遇到检查点屏障时,它将其在输入流中的位置保存到持久化存储中。如果输入流来自消息传输系统(Kafka),这个位置就是偏移量。Flink 的存储机制是插件化的,持久化存储可以是分布式文件系统,如 HDFS。下图展示了这个过程。
检查点像普通数据记录一样在算子之间流动。当 map 算子处理完前3条数据并收到检查点分界线时,它们会将状态以异步的方式写入持久化存储,如下图所示。
当 map 算子的状态备份和检查点分界线的位置备份被确认之后,该检查点操作就可以被标记为完成,如下图所示。我们在无须停止或者阻断计算的条件下,在一个逻辑时间点(对应检查点屏障在输入流中的位置)为计算状态拍了快照。通过确保备份的状态和位置指向同一个逻辑时间点,后文将解释如何基于备份恢复计算,从而保 exactly-once。值得注意的是,当没有出现故障时,Flink 检查点的开销极小,检查点操作的速度由持久化存储的可用带宽决定。回顾数珠子的例子:除了因为数错而需要用到皮筋之外,皮筋会被很快地拨过。
如果检查点操作失败,Flink 可以丢弃该检查点并继续正常执行,因为之后的某一个检查点可能会成功。虽然恢复时间可能更长,但是对于状态的保证依旧很有力。只有在一系列连续的检查点操作失败之后,Flink 才会抛出错误,因为这通常预示着发生了严重且持久的错误。
现在来看看下图所示的情况:检查点操作已经完成,但故障紧随其后。
下图展示了这一重新处理过程。从上一个检查点开始重新计算,可以保证在剩下的记录被处理之后,得到的 map 算子的状态值与没有发生故障时的状态值一致。
Flink 检查点算法的正式名称是异步分界线快照(asynchronous barrier snapshotting)。该算法大致基于Chandy-Lamport分布式快照算法。
检查点是 Flink 最有价值的创新之一,因为它使 Flink 可以保证 exactly-once,并且不需要牺牲性能。
我们知道,端到端的状态一致性的实现,需要每一个组件都实现,对于 Flink+Kafka 的数据管道系统(Kafka进、Kafka出)而言,各组件怎样保证 exactly-once 语义呢?
TwoPhaseCommitSinkFunction
内部的 checkpoint 机制我们已经有了了解,那 source 和 sink 具体又是怎样运行的呢?接下来我们逐步做一个分析。
我们知道 Flink 由 JobManager 协调各个 TaskManager 进行 checkpoint 存储,checkpoint 保存在 StateBackend
中,默认 StateBackend
是内存级的,也可以改为文件级的进行持久化保存。
当checkpoint启动时,JobManager 会将检查点分界线(barrier)注入数据流;barrier 会在算子间传递下去。
每个算子会对当前的状态做个快照,保存到状态后端。对于 source 任务而言,就会把当前的 offset 作为状态保存起来。下次从 checkpoint 恢复时,source 任务可以重新提交偏移量,从上次保存的位置开始重新消费数据。
每个内部的 transform 任务遇到 barrier 时,都会把状态存到 checkpoint 里。
sink 任务首先把数据写入外部 kafka,这些数据都属于预提交的事务(还不能被消费);当遇到 barrier 时,把状态保存到状态后端,并开启新的预提交事务。
当所有算子任务的快照完成,也就是这次的 checkpoint 完成时,JobManager 会向所有任务发通知,确认这次 checkpoint 完成。
当 sink 任务收到确认通知,就会正式提交之前的事务,kafka 中未确认的数据就改为“已确认”,数据就真正可以被消费了。
所以我们看到,执行过程实际上是一个两段式提交,每个算子执行完成,会进行“预提交”,直到执行完 sink 操作,会发起“确认提交”,如果执行失败,预提交会放弃掉。
kafka 两阶段提交设置
val wrapper: KeyedSerializationSchemaWrapper[String] = new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema())
val producer = new FlinkKafkaProducer011[String]("topic",wrapper,producerConfig,Semantic.EXACTLY_ONCE)
具体的两阶段提交步骤总结如下:
第一条数据来了之后,开启一个kafka的事务(transaction),正常写入kafka分区日志但标记为未提交,这就是“预提交”。
jobmanager 触发 checkpoint 操作,barrier 从 source 开始向下传递,遇到 barrier 的算子将状态存入状态后端,并通知 jobmanager。
sink 连接器收到 barrier,保存当前状态,存入 checkpoint,通知 jobmanager,并开启下一阶段的事务,用于提交下个检查点的数据。
jobmanager 收到所有任务的通知,发出确认信息,表示 checkpoint 完成。
sink 任务收到 jobmanager 的确认信息,正式提交这段时间的数据。
外部 kafka 关闭事务,提交的数据可以正常消费了。
所以我们也可以看到,如果宕机需要通过 StateBackend 进行恢复,只能恢复所有确认提交的操作。
内存级的状态后端,会将键控状态作为内存中的对象进行管理,将它们存储在 TaskManager 的 JVM 堆上;而将 checkpoint 存储在 JobManager 的内存中。
将 checkpoint 存到远程的持久化文件系统(FileSystem)上。而对于本地状态,跟 MemoryStateBackend 一样,也会存在 TaskManager 的JVM堆上。
将所有状态序列化后,存入本地的 RocksDB 中存储。
注意:RocksDB 的支持并不直接包含在 flink 中,需要引入依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.12</artifactId>
<version>1.10.1</version>
</dependency>
设置状态后端为 FsStateBackend:
val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //状态后端 // 内级别保存状态 val mBackend = new MemoryStateBackend() // 本地RocksDB 保存状态 val rBackend = new RocksDBStateBackend("/checkpoint"); // 文件系统保存状态 val fBackend = new FsStateBackend("file:/checkpoint"); env.setStateBackend(fBackend) // 默认不开启 checkpoint env.enableCheckpointing(60000) // checkpoint 模式 精准一次 env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) // checkpoint 超时时间 env.getCheckpointConfig.setCheckpointTimeout(10000) // 同时进行 checkpoint 的个数 env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) // 2次 checkpoint的时间间隔 env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000) // 开启checkpoint的外部持久化 手动手动停止flink程序,保存checkpoint env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) // 重启策略 重启次数 3次,重启间隔 1s env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,1000))
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。