赞
踩
1.Flink中exactly once实现原理分析
生产者从kafka拉取数据以及消费者往kafka写数据都需要保证exactly once。目前flink中支持exactly once的source不多,有kafka source;能实现exactly once的sink也不多,如kafka sink、streamingFileSink,其都要开启checkpoint才能实现exactly once。接下来以FlinkKafkaProducer为例,深入研究其源代码,从而理解flink中的exactly once(精准一次性语义)是怎么实现的。
1.1 大致流程图(也叫分两阶段提交原理)
1. JobManager定期(通过CheckpointCodinator)向各个包含state的subTask发起checkpoint的请求
2. subTask将各自的state写入到相应的statebackend,一个资源槽对应一个文件,其中各个subTask的state写入这个文件中
3. 各个subTask向JobManager发送checkpoint成功的消息
4. 当所有subTask都发送了checkpoint成功的消息后,jobManager会向所有实现了checkpoint的subTask发送成功的消息
5. subTask往kafka写数据,并且向Kafka提交事务()
注意:为了保证一个流水线(pipeline)上的operrator state和keyedstate数据一致,flink引入了barrier机制,即在jobmanager和taskManager间设置一个barrier,相当于节流,保证在checkpoint时,source不能在读取数据
问题:kafka涉及到生产者往里面写数据一个事务,以及消费者读取数据一个事务,这两个事物间有什么联系?
1.2 源码解析
(1)首先看FlinkKafkaProducer类,可以发现其继承了TwoPhaseCommitSinkFunction
(2)TwoPhaseCommitSinkFunction是所有要实现一次性语义的SinkFunction的一个比较推荐的基类,其实现了两个重要的接口,分别为:CheckpointedFunction, CheckpointListener
CheckpointedFunction接口
此接口中包含两个方法,分别为snapshotState方法、initializeState方法,源代码如下
public interfaceCheckpointedFunction {/*** This method is called when a snapshot for a checkpoint is requested. This acts as a hook to the function to
* ensure that all state is exposed by means previously offered through {@linkFunctionInitializationContext} when
* the Function was initialized, or offered now by {@linkFunctionSnapshotContext} itself.
*
*@paramcontext the context for drawing a snapshot of the operator
*@throwsException*/
void snapshotState(FunctionSnapshotContext context) throwsException;/*** This method is called when the parallel function instance is created during distributed
* execution. Functions typically set up their state storing data structures in this method.
*
*@paramcontext the context for initializing the operator
*@throwsException*/
void initializeState(FunctionInitializationContext context) throwsException;
}
View Code
其中snapshotState方法是用checkpoint时,拍快照,其能将state持久化到statebackend。这里面存了一些transactionID、subTask编号、以及kafka的相关信息(用来写数据)。若是checkpoint成功了,但是subTask并没有成功将数据写入kafka,则会通过这个方法恢复原先最近的state进行恢复,然后继续
initializeState方法可以用来恢复state,解释可能以前将state持久化到了statebackend,但并没有将数据成功写入kafka,则可以ton过这个方法恢复最近的state,然后将数据继续往kafka写数据。
CheckpointListener接口
此接口中包含一个notifyCheckpointComplete方法
源码如下
/*** This interface must be implemented by functions/operations that want to receive
* a commit notification once a checkpoint has been completely acknowledged by all
* participants.*/@PublicEvolvingpublic interfaceCheckpointListener {/*** This method is called as a notification once a distributed checkpoint has been completed.
*
* Note that any exception during this method will not cause the checkpoint to
* fail any more.
*
*@paramcheckpointId The ID of the checkpoint that has been completed.
*@throwsException*/
void notifyCheckpointComplete(long checkpointId) throwsException;
}
View Code
notifyCheckpointComplete方法什么时候被调用呢?所有分区的subTask向JobManager相应checkpoint后才会被调用,即告知各个subTask,这次checkpoint成功了,可以进行下一步的操作了,该方法源码如下:
@Overridepublic final void notifyCheckpointComplete(long checkpointId) throwsException {//the following scenarios are possible here//
//(1) there is exactly one transaction from the latest checkpoint that//was triggered and completed. That should be the common case.//Simply commit that transaction in that case.//
//(2) there are multiple pending transactions because one previous//checkpoint was skipped. That is a rare case, but can happen//for example when://
//- the master cannot persist the metadata of the last//checkpoint (temporary outage in the storage system) but//could persist a successive checkpoint (the one notified here)//
//- other tasks could not persist their status during//the previous checkpoint, but did not trigger a failure because they//could hold onto their state and could successfully persist it in//a successive checkpoint (the one notified here)//
//In both cases, the prior checkpoint never reach a committed state, but//this checkpoint is always expected to subsume the prior one and cover all//changes since the last successful one. As a consequence, we need to commit//all pending transactions.//
//(3) Multiple transactions are pending, but the checkpoint complete notification//relates not to the latest. That is possible, be
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。