赞
踩
chunjun涉及到的操作有注册metric、同步metric、metric判断。
注册metric的操作主要是在open方法中,主要逻辑如下:
this.context = (StreamingRuntimeContext) getRuntimeContext(); //获取flink全局参数,用于通过脏数据管理器的参数配置 ExecutionConfig.GlobalJobParameters params = context.getExecutionConfig().getGlobalJobParameters(); DirtyConfig dc = DirtyConfUtil.parseFromMap(params.toMap()); //注册脏数据管理器 this.dirtyManager = new DirtyManager(dc, this.context); 。。。 if (!initialized) { //初始化累加器, initAccumulatorCollector(); //初始化行大小 initRowSizeCalculator(); //初始统计metric initStatisticsAccumulator(); //初始化消费速率 initByteRateLimiter(); //初始化cp相关文件 initRestoreInfo(); initialized = true; }
nbsp;
开启metric同步线程,每taskmanager心跳+1s同步一次数据。
private void initAccumulatorCollector() { String lastWriteLocation = String.format("%s_%s", Metrics.LAST_WRITE_LOCATION_PREFIX, indexOfSubTask); String lastWriteNum = String.format("%s_%s", Metrics.LAST_WRITE_NUM__PREFIX, indexOfSubTask); accumulatorCollector = new AccumulatorCollector( context, Arrays.asList( Metrics.NUM_READS, Metrics.READ_BYTES, Metrics.READ_DURATION, Metrics.WRITE_BYTES, Metrics.NUM_WRITES, lastWriteLocation, lastWriteNum)); accumulatorCollector.start(); }
注册如下指标
private void initStatisticsAccumulator() {
numReadCounter = getRuntimeContext().getLongCounter(Metrics.NUM_READS);
bytesReadCounter = getRuntimeContext().getLongCounter(Metrics.READ_BYTES);
durationCounter = getRuntimeContext().getLongCounter(Metrics.READ_DURATION);
inputMetric = new BaseMetric(getRuntimeContext());
inputMetric.addMetric(Metrics.NUM_READS, numReadCounter, true);
inputMetric.addMetric(Metrics.READ_BYTES, bytesReadCounter, true);
inputMetric.addMetric(Metrics.READ_DURATION, durationCounter);
inputMetric.addDirtyMetric(Metrics.DIRTY_DATA_COUNT, this.dirtyManager.getConsumedMetric());
inputMetric.addDirtyMetric(
Metrics.DIRTY_DATA_COLLECT_FAILED_COUNT,
this.dirtyManager.getFailedConsumedMetric());
}
openInputFormat中可以定义:用户的merticReporter
...
if (useCustomReporter()) {
customReporter =
DataSyncFactoryUtil.discoverMetric(
config, getRuntimeContext(), makeTaskFailedWhenReportFailed());
customReporter.open();
}
BaseMetric提供注册metric group、注册metric 以及等待metric同步(到jobmanager)等能力。
BaseMetric主要有addMetric、addDirtyMetric、waitForReportMetrics、getChunjunMetricGroup等方法,其中构造方法中,通过runtimeContext注册了chunjunMetricGroup、chunjunDirtyMetricGroup。具体逻辑如下:
注册metric group:
public BaseMetric(RuntimeContext runtimeContext) { //获取全局变量:DELAY_PERIOD_MILL ExecutionConfig.GlobalJobParameters params = runtimeContext.getExecutionConfig().getGlobalJobParameters(); Map<String, String> confMap = params.toMap(); this.DELAY_PERIOD_MILL = Long.parseLong( String.valueOf(confMap.getOrDefault(DELAY_PERIOD_MILL_KEY, "20000"))); //注册metric group:chunjun、output chunjunMetricGroup = runtimeContext .getMetricGroup() .addGroup( Metrics.METRIC_GROUP_KEY_CHUNJUN, Metrics.METRIC_GROUP_VALUE_OUTPUT); //注册metric group:DirtyData、output chunjunDirtyMetricGroup = chunjunMetricGroup.addGroup( Metrics.METRIC_GROUP_KEY_DIRTY, Metrics.METRIC_GROUP_VALUE_OUTPUT); }
注册metric:有gauge、meter等metric类型。
//注册指标 public void addMetric(String metricName, LongCounter counter) { addMetric(metricName, counter, false); } public void addMetric(String metricName, LongCounter counter, boolean meterView) { metricCounters.put(metricName, counter); chunjunMetricGroup.gauge(metricName, new SimpleAccumulatorGauge<>(counter)); if (meterView) { chunjunMetricGroup.meter( metricName + Metrics.SUFFIX_RATE, new SimpleLongCounterMeterView(counter, 20)); } } //metricName: metric名字 //counter: 具体计数的counter public void addDirtyMetric(String metricName, LongCounter counter) { metricCounters.put(metricName, counter); chunjunDirtyMetricGroup.gauge(metricName, new SimpleAccumulatorGauge<>(counter)); }
等待metric指标:当taskslot结束之后,需要等待一段时间把未同步的指标同步给jobmanager。
public void waitForReportMetrics() {
try {
Thread.sleep(DELAY_PERIOD_MILL);
} catch (InterruptedException e) {
ThreadUtil.sleepMilliseconds(DELAY_PERIOD_MILL);
log.warn("Task thread is interrupted");
}
}
AccumulatorCollector实现了周期性合并并获取全局metric,主流程是:每taskmanager心跳后+1s,同步一次全局的metric给taskslot。
AccumulatorCollector有start、close、collectAccumulator、getAccumulatorValue、getLocalAccumulatorValue等方法。
/** 启动线程池,周期性更新累加器信息 */
public void start() {
scheduledExecutorService.scheduleAtFixedRate(
this::collectAccumulator, 0, period, TimeUnit.MILLISECONDS);
}
收集累加器信息,具体逻辑是
调用requestJob(向jobmanager进行rpc请求,合并并获取全局metric),同步全局metric。具体步骤是:
- 每个slot将自己的metric同步到全局(jobmanager)中
- 脏数据同步给每个slot:拿到全局的脏数据metric传给taskslot,可在每个taskslot中判断处理的脏数据是否超过全局设置的。
- 如果没有注册metric,则获取maxValue指标,这里主要是jdbc用到了此指标。
public void collectAccumulator() { CompletableFuture<ExecutionGraphInfo> executionGraphInfoCompletableFuture = gateway.requestJob(Time.seconds(10)); ExecutionGraphInfo executionGraphInfo; try { executionGraphInfo = executionGraphInfoCompletableFuture.get(); } catch (Exception e) { // 限制最大出错次数,超过最大次数则使任务失败,如果不失败,统计数据没有及时更新,会影响速率限制,错误控制等功能 collectErrorTimes++; if (collectErrorTimes > MAX_COLLECT_ERROR_TIMES) { // 主动关闭线程和资源,防止异常情况下没有关闭 close(); throw new RuntimeException( "The number of errors in updating statistics data exceeds the maximum limit of 100 times. To ensure the correctness of the data, the task automatically fails"); } return; } StringifiedAccumulatorResult[] accumulatorResult = executionGraphInfo.getArchivedExecutionGraph().getAccumulatorResultsStringified(); for (StringifiedAccumulatorResult result : accumulatorResult) { ValueAccumulator valueAccumulator = valueAccumulatorMap.get(result.getName()); if (valueAccumulator != null) { valueAccumulator.setGlobal(Long.parseLong(result.getValue())); } else if (result.getName().equals(Metrics.MAX_VALUE)) { rdbMaxFuncValue = result.getValue(); } } }
获取指定累加器信息
public long getAccumulatorValue(String name, boolean needWaited) {
if (needWaited) {
waited();
}
ValueAccumulator valueAccumulator = valueAccumulatorMap.get(name);
if (valueAccumulator == null) {
return 0;
}
return valueAccumulator.getGlobal();
}
获取每个subtask的本地指标
/**
* 根据名称获取指定累加器的本地value
* * @param name 累加器指标名称
* @return
*/
public long getLocalAccumulatorValue(String name) {
ValueAccumulator valueAccumulator = valueAccumulatorMap.get(name);
if (valueAccumulator == null) {
return 0;
}
return valueAccumulator.getLocal().getLocalValue();
}
/** 关闭线程池 */
public void close() {
if (scheduledExecutorService != null
&& !scheduledExecutorService.isShutdown()
&& !scheduledExecutorService.isTerminated()) {
scheduledExecutorService.shutdown();
}
}
我们大致了解了chunjun
- 在什么时机注册metric指标:在BaseRichInputFormat中的open方法中,在连接器消费数据前,进行相关metric的注册;
- chunjun提供了管理(注册、指标更新、等待metric同步等)metric的基类:BaseMetric;
- 周期获取全局metric:以便每个subtask进行metric的指标判断; 等metric管理能力。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。