赞
踩
脏数据管理模块的基本逻辑是:
- 当数据消费失败时,将脏数据拦截并保存到dirtyDataCollector中;
- 全局metric判断:脏数据达到设定值之后,任务报错,flink停止运行,并将脏数据输出到flink日志中、或mysql的配置中。
对于代码实现:
DirtyManager用于管理DirtyDataCollector,串起DirtyDataCollector的生命周期,DirtyDataCollector主要用于收集脏数据并输出(到日志中,mysql中),脏数据数量达到设定值之后,flink停止运行。
具体的DataCollector实现有:
分别用于输出到taskmanager的日志、(最后报错时)jobmanager日志、输出到mysql表中。
所以这里有三层代码结构:
- DirtyManager:管理DirtyDataCollector
- DirtyDataCollector:主要用于收集脏数据并输出,并判断脏数据是否达到临界值
- 具体的DataCollector的实现:具体的输出实现:输出到日志,输出到mysql。
接下来我们逐个看每层的具体实现逻辑
DirtyManager用于管理DirtyDataCollector,串起DirtyDataCollector的生命周期(open、run、close),主要流程如下:
- 设置系统配置给DirtyDataCollector
- 开启DirtyManager线程,主要用于DirtyDataCollector消费脏数据(收集脏数据)
- 关闭资源:DirtyDataCollector、DirtyManager的线程资源。
初始化DirtyManager
- 根据配置加载特定的DirtyDataCollector:用于脏数据的收集
- 获取系统信息:jobId、jobName、operationName
- 获取脏数据metric,用于定期合并脏数据为全局脏数据。
public DirtyManager(DirtyConfig dirtyConfig, RuntimeContext runtimeContext) {
//通过反射注册DirtyDataCollector
this.consumer = DataSyncFactoryUtil.discoverDirty(dirtyConfig);
Map<String, String> allVariables = runtimeContext.getMetricGroup().getAllVariables();
this.jobId = allVariables.get(JOB_ID);
this.jobName = allVariables.getOrDefault(JOB_NAME, "defaultJobName");
this.operationName = allVariables.getOrDefault(OPERATOR_NAME, "defaultOperatorName");
this.errorCounter = runtimeContext.getLongCounter(Metrics.NUM_ERRORS);
}
被具体的连接器调用:
具体当连接器生产数据或写数据到数据源报错时,调用此方法收集脏数据
- 创建线程,用于异步执行DirtyDataCollector,开始消费脏数据到日志或mysql表中
- 添加脏数据条数,同步到全局脏数据metric中
- 脏数据信息,存到队列中,等待具体的脏数据收集器消费
- 子流程:判断脏数据条数是否大于总脏数据条数
public void collect(Object data, Throwable cause, String field, long globalErrors) { if (executor == null) { execute(); } DirtyDataEntry entity = new DirtyDataEntry(); entity.setJobId(jobId); entity.setJobName(jobName); entity.setOperatorName(operationName); entity.setCreateTime(new Timestamp(System.currentTimeMillis())); entity.setDirtyContent(toString(data)); entity.setFieldName(field); entity.setErrorMessage(ExceptionUtil.getErrorMessage(cause)); //积累metric:errorCounter,这里直接同步到jobmanager? errorCounter.add(1L); //将脏数据添加到队列,等待消费。 consumer.offer(entity, globalErrors); } /** * 创建线程,用于异步执行DirtyDataCollector */ public void execute() { if (executor == null) { executor = new ThreadPoolExecutor( MAX_THREAD_POOL_SIZE, MAX_THREAD_POOL_SIZE, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new ChunJunThreadFactory( "dirty-consumer", true, (t, e) -> { log.error( String.format( "Thread [%s] consume failed.", t.getName()), e); }), new ThreadPoolExecutor.CallerRunsPolicy()); } //初始化DirtyDataCollector:比如脏数据定时发送到mysql时的线程注册 consumer.open(); //拿出一个线程执行DirtyDataCollector的execute方法 executor.execute(consumer); }
/** Close manager. */ public void close() { if (!isAlive.get()) { return; } //先关闭datacollector的资源 if (consumer != null) { consumer.close(); } //再关闭executor线程 if (executor != null) { executor.shutdown(); } isAlive.compareAndSet(true, false); }
处于第二层的dirtyDataCollector实现了脏数据的临时保存并等待具体DataCollector的消费,
它的基本逻辑是:
- 当脏数据消费失败时,将脏数据拦截并保存到consumeQueue中,等待被消费
- 全局的metric:脏数据达到设定值之后,任务报错,flink停止运行,并将脏数据输出到flink日志中。
在DirtyManager实例化时,注册DirtyDataCollector时的操作,
这里获取脏数据最大值,允许消费脏数据失败的条数,以及对具体DataCollector的初始化,我们下节分析。
public void initializeConsumer(DirtyConfig conf) {
this.maxConsumed = conf.getMaxConsumed();
this.maxFailedConsumed = conf.getMaxFailedConsumed();
this.init(conf);
}
被DirtyManager调用:在开启脏数据收集器线程之前执行
初始化具体脏数据收集器:目前之后mysql脏数据收集器实现了此方法:消费线程、mysql连接
public void open() {
}
offer方法被DirtyManager的collect方法调用
- 用于存储具体脏数据并更新单个slot的脏数据条数。
- 每添加一条脏数据,就判断脏数据是否达到了设定值,如果是则抛出异常。
其中:globalErrors是上文AccumulatorCollector定期更新的结果。
//存储脏数据具体内容,并更新单个slot的脏数据条数 public synchronized void offer(DirtyDataEntry dirty, long globalErrors) { consumeQueue.offer(dirty); addConsumed(1L, dirty, globalErrors); } /** * 添加脏数据 * 通过metric判断此时的脏数据条数,是否已经超过全局设置的脏数据条数 * @param count * @param dirty * @param globalErrors */ protected void addConsumed(long count, DirtyDataEntry dirty, long globalErrors) { consumedCounter.add(count); // 因为总体的脏数据需要tm和jm进行通讯(每tm心跳+1s),会有延迟,且当单slot运行时误差将达到最大 // 所以这里需要判断延迟情况 long max = consumedCounter.getLocalValue() >= globalErrors ? consumedCounter.getLocalValue() : globalErrors; // 但这里仍然有误差:此时如果所有的slot都消费了脏数据那么其他slot的脏数据就记录不到。也就是会多消费脏数据 // 所以这里要有取舍:是否要消费完全准确的脏数据 if (max >= maxConsumed) { StringJoiner dirtyMessage = new StringJoiner("\n") .add("\n****************Dirty Data Begin****************\n") .add(dirty.toString()) .add("\n****************Dirty Data End******************\n"); throw new NoRestartException( String.format( "The dirty consumer shutdown, due to the consumed count exceed the max-consumed [%s]", maxConsumed) + dirtyMessage); } }
由DirtyManager开启脏数据消费线程,
具体的DataCollector(log、mysql)消费脏数据,发送到Taskmanager日志或mysql表中。
/** * 开启脏数据消费线程 * 定时消费脏数据,发送到执行脏数据管理器中:log、mysql等 */ @Override public void run() { while (isRunning.get()) { try { //指定的DataCollector消费脏数据 DirtyDataEntry dirty = consumeQueue.take(); consume(dirty); } catch (Exception e) { //未成功将脏数据收集到脏数据管理模块中 addFailedConsumed(e, 1L); } } } /** * 消费脏数据用于输出到日志、mysql等 */ protected abstract void consume(DirtyDataEntry dirty) throws Exception;
不同的DataCollector有不同的操作,下节分析
public abstract void close();
实现比较简单:拿到的数据直接打印到Taskmanager中,关闭时,设定isRunning为false
/** * 没有线程,调用即输出到日志中 */ @Slf4j public class LogDirtyDataCollector extends DirtyDataCollector { private static final long serialVersionUID = 7366317208451727471L; private Long printRate; @Override protected void init(DirtyConfig conf) { this.printRate = conf.getPrintRate(); } /** * 输出脏数据到taskmanager * @param dirty dirty-data which should be consumed. */ @Override protected void consume(DirtyDataEntry dirty) { if (consumedCounter.getLocalValue() % printRate == 0) { StringJoiner dirtyMessage = new StringJoiner("\n") .add("\n====================Dirty Data=====================") .add(dirty.toString()) .add("\n==================================================="); log.warn(dirtyMessage.toString()); } } @Override public void close() { isRunning.compareAndSet(true, false); log.info("Print consumer closed."); } }
下篇分析MysqlDirtyDataCollector是如何消费数据。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。