Flink 提供的 Metrics 可以在 Flink 内部收集一些指标,通过这些指标让开发人员更好地理解作业或集群的状态。由于集群运行后很难发现内部的实际状况,跑得慢或快,是否异常等,开发人员无法实时查看所有的 Task 日志,比如作业很大或者有很多作业的情况下,该如何处理?此时 Metrics 可以很好的帮助开发人员了解作业的当前状况。
Metrics 的类型如下:
Metric 在 Flink 内部有多层结构,以 Group 的方式组织,它并不是一个扁平化的结构,Metric Group + Metric Name 是 Metrics 的唯一标识。
Metric Group 的层级有 TaskManagerMetricGroup 和TaskManagerJobMetricGroup,每个 Job 具体到某一个 task 的 group,task 又分为 TaskIOMetricGroup 和 OperatorMetricGroup。Operator 下面也有 IO 统计和一些 Metrics,整个层级大概如下图所示。Metrics 不会影响系统,它处在不同的组中,并且 Flink支持自己去加 Group,可以有自己的层级。
•${User-defined Group} / ${User-defined Metrics}
JobManagerMetricGroup 相对简单,相当于 Master,它的层级也相对较少。
启动本地Rest服务器,Prometheus与其通信,并pull metrics
public class PrometheusReporter extends AbstractPrometheusReporter { // 初始化一些与第三方存储相关的工作 @Override public void open(MetricConfig config) { super.open(config); String portsConfig = config.getString(ARG_PORT, DEFAULT_PORT); Iterator<Integer> ports = NetUtils.getPortRangeFromString(portsConfig); while (ports.hasNext()) { int port = ports.next(); try { // internally accesses CollectorRegistry.defaultRegistry // 针对PrometheusReporter,启动一个本地Rest服务器,以供Prometheus拉取数据; // 而如果是PrometheusPushGatewayReporter,这里就是根据host+port新建一个PushGateway客户端,来往PushGateway发送数据 httpServer = new HTTPServer(port); this.port = port; log.info("Started PrometheusReporter HTTP server on port {}.", port); break; } catch (IOException ioe) { //assume port conflict log.debug("Could not start PrometheusReporter HTTP server on port {}.", port, ioe); } } if (httpServer == null) { throw new RuntimeException("Could not start PrometheusReporter HTTP server on any configured port. Ports: " + portsConfig); } } }
新建一个PushGateway客户端,主动向PushGateway push metrics
public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled { @Override public void open(MetricConfig config) { super.open(config); String host = config.getString(HOST.key(), HOST.defaultValue()); int port = config.getInteger(PORT.key(), PORT.defaultValue()); String configuredJobName = config.getString(JOB_NAME.key(), JOB_NAME.defaultValue()); boolean randomSuffix = config.getBoolean(RANDOM_JOB_NAME_SUFFIX.key(), RANDOM_JOB_NAME_SUFFIX.defaultValue()); deleteOnShutdown = config.getBoolean(DELETE_ON_SHUTDOWN.key(), DELETE_ON_SHUTDOWN.defaultValue()); if (host == null || host.isEmpty() || port < 1) { throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port); } if (randomSuffix) { this.jobName = configuredJobName + new AbstractID(); } else { this.jobName = configuredJobName; } // 新建一个PushGateway客户端,用于向PushGateway推送Metrics pushGateway = new PushGateway(host + ':' + port); log.info("Configured PrometheusPushGatewayReporter with {host:{}, port:{}, jobName: {}, randomJobNameSuffix:{}, deleteOnShutdown:{}}", host, port, jobName, randomSuffix, deleteOnShutdown); } }
public class MetricRegistryImpl implements MetricRegistry { public MetricRegistryImpl(MetricRegistryConfiguration config, Collection<ReporterSetup> reporterConfigurations) { this.maximumFramesize = config.getQueryServiceMessageSizeLimit(); this.scopeFormats = config.getScopeFormats(); this.globalDelimiter = config.getDelimiter(); this.delimiters = new ArrayList<>(10); this.terminationFuture = new CompletableFuture<>(); this.isShutdown = false; // second, instantiate any custom configured reporters this.reporters = new ArrayList<>(4); // 周期调度器,用与汇报型的reporter周期向外界数据源周期汇报metrics this.executor = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-MetricRegistry")); this.queryService = null; this.metricQueryServiceRpcService = null; if (reporterConfigurations.isEmpty()) { // no reporters defined // by default, don't report anything LOG.info("No metrics reporter configured, no metrics will be exposed/reported."); } else { // 根据flink-conf.yaml中配置的reporter类型,获得所有reporter for (ReporterSetup reporterSetup : reporterConfigurations) { final String namedReporter = reporterSetup.getName(); try { Optional<String> configuredPeriod = reporterSetup.getIntervalSettings(); TimeUnit timeunit = TimeUnit.SECONDS; long period = 10; if (configuredPeriod.isPresent()) { try { String[] interval = configuredPeriod.get().split(" "); period = Long.parseLong(interval[0]); timeunit = TimeUnit.valueOf(interval[1]); } catch (Exception e) { LOG.error("Cannot parse report interval from config: " + configuredPeriod + " - please use values like '10 SECONDS' or '500 MILLISECONDS'. " + "Using default reporting interval."); } } // reporter实例 final MetricReporter reporterInstance = reporterSetup.getReporter(); final String className = reporterInstance.getClass().getName(); // 汇报型reporter,例如PrometheusPushGatewayReporter if (reporterInstance instanceof Scheduled) { LOG.info("Periodically reporting metrics in intervals of {} {} for reporter {} of type {}.", period, timeunit.name(), namedReporter, className); // 周期向外界数据源汇报metrics executor.scheduleWithFixedDelay( new MetricRegistryImpl.ReporterTask((Scheduled) reporterInstance), period, period, timeunit); } else { LOG.info("Reporting metrics for reporter {} of type {}.", namedReporter, className); } // 注册当前reporter到MetricRegistry中,这里MetricRegistry可以理解为注册中心 reporters.add(reporterInstance); String delimiterForReporter = reporterSetup.getDelimiter().orElse(String.valueOf(globalDelimiter)); if (delimiterForReporter.length() != 1) { LOG.warn("Failed to parse delimiter '{}' for reporter '{}', using global delimiter '{}'.", delimiterForReporter, namedReporter, globalDelimiter); delimiterForReporter = String.valueOf(globalDelimiter); } this.delimiters.add(delimiterForReporter.charAt(0)); } catch (Throwable t) { LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", namedReporter, t); } } } } }
// MetricRegistryImpl类 public class MetricRegistryImpl implements MetricRegistry { // Registers a new {@link Metric} with this registry. @Override public void register(Metric metric, String metricName, AbstractMetricGroup group) { for (int i = 0; i < reporters.size(); i++) { MetricReporter reporter = reporters.get(i); try { if (reporter != null) { FrontMetricGroup front = new FrontMetricGroup<AbstractMetricGroup<?>>(i, group); // 注册metrics,做了以下操作: // 1. 将metric注册到CollectorRegistry.defaultRegistry这个静态类 // 2. 设置metric初始值,数据存储在SimpleCollector#children中,以ConcurrentMap<List<String>, Child>的结构维护,其中key是一个有tag值组成的列表,值是child对象,List的hashcode方法由list中的所有值共同生成,因此可以当做map的键 reporter.notifyOfAddedMetric(metric, metricName, front); } } catch (Exception e) { LOG.warn("Error while registering metric.", e); } } } }
public abstract class AbstractPrometheusReporter implements MetricReporter { private void addMetric(Metric metric, List<String> dimensionValues, Collector collector) { if (metric instanceof Gauge) { ((io.prometheus.client.Gauge) collector).setChild(gaugeFrom((Gauge) metric), toArray(dimensionValues)); } else if (metric instanceof Counter) { ((io.prometheus.client.Gauge) collector).setChild(gaugeFrom((Counter) metric), toArray(dimensionValues)); } else if (metric instanceof Meter) { ((io.prometheus.client.Gauge) collector).setChild(gaugeFrom((Meter) metric), toArray(dimensionValues)); } else if (metric instanceof Histogram) { ((HistogramSummaryProxy) collector).addChild((Histogram) metric, dimensionValues); } else { log.warn("Cannot add unknown metric type: {}. This indicates that the metric type is not supported by this reporter.", metric.getClass().getName()); } } } public abstract class SimpleCollector<Child> extends Collector { public <T extends Collector> T setChild(Child child, String... labelValues) { if (labelValues.length != this.labelNames.size()) { throw new IllegalArgumentException("Incorrect number of labels."); } else { // 设置metric初始值,数据存储在SimpleCollector#children中,以ConcurrentMap<List<String>, Child>的结构维护,其中key是一个有tag值组成的列表,值是child对象,List的hashcode方法由list中的所有值共同生成,因此可以当做map的键 this.children.put(Arrays.asList(labelValues), child); return this; } } }
Metrics被存储到Map构成的容器中,就完成了使命,等待Prometheus定时来pull Metrics即可,这是pull的模式;
public class MetricRegistryImpl implements MetricRegistry { public MetricRegistryImpl(MetricRegistryConfiguration config, Collection<ReporterSetup> reporterConfigurations) { this.maximumFramesize = config.getQueryServiceMessageSizeLimit(); this.scopeFormats = config.getScopeFormats(); this.globalDelimiter = config.getDelimiter(); this.delimiters = new ArrayList<>(10); this.terminationFuture = new CompletableFuture<>(); this.isShutdown = false; // second, instantiate any custom configured reporters this.reporters = new ArrayList<>(4); this.executor = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-MetricRegistry")); this.queryService = null; this.metricQueryServiceRpcService = null; if (reporterConfigurations.isEmpty()) { // no reporters defined // by default, don't report anything LOG.info("No metrics reporter configured, no metrics will be exposed/reported."); } else { for (ReporterSetup reporterSetup : reporterConfigurations) { final String namedReporter = reporterSetup.getName(); try { Optional<String> configuredPeriod = reporterSetup.getIntervalSettings(); TimeUnit timeunit = TimeUnit.SECONDS; long period = 10; if (configuredPeriod.isPresent()) { try { String[] interval = configuredPeriod.get().split(" "); period = Long.parseLong(interval[0]); timeunit = TimeUnit.valueOf(interval[1]); } catch (Exception e) { LOG.error("Cannot parse report interval from config: " + configuredPeriod + " - please use values like '10 SECONDS' or '500 MILLISECONDS'. " + "Using default reporting interval."); } } final MetricReporter reporterInstance = reporterSetup.getReporter(); final String className = reporterInstance.getClass().getName(); // 如果实现了Scheduled,则表明reporter会周期性向第三方存储汇报Metrics if (reporterInstance instanceof Scheduled) { LOG.info("Periodically reporting metrics in intervals of {} {} for reporter {} of type {}.", period, timeunit.name(), namedReporter, className); // 启动周期汇报Metrics的定时线程 executor.scheduleWithFixedDelay( new MetricRegistryImpl.ReporterTask((Scheduled) reporterInstance), period, period, timeunit); } else { LOG.info("Reporting metrics for reporter {} of type {}.", namedReporter, className); } reporters.add(reporterInstance); String delimiterForReporter = reporterSetup.getDelimiter().orElse(String.valueOf(globalDelimiter)); if (delimiterForReporter.length() != 1) { LOG.warn("Failed to parse delimiter '{}' for reporter '{}', using global delimiter '{}'.", delimiterForReporter, namedReporter, globalDelimiter); delimiterForReporter = String.valueOf(globalDelimiter); } this.delimiters.add(delimiterForReporter.charAt(0)); } catch (Throwable t) { LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", namedReporter, t); } } } } }
public class MetricRegistryImpl implements MetricRegistry { private static final class ReporterTask extends TimerTask { private final Scheduled reporter; private ReporterTask(Scheduled reporter) { this.reporter = reporter; } // 周期汇报Metrics的线程 @Override public void run() { try { reporter.report(); } catch (Throwable t) { LOG.warn("Error while reporting metrics", t); } } } }
public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled {
// 具体的汇报逻辑,就是调用PushGateway的Rest接口,往PushGateway插入Metrics
public void report() {
try {
// CollectorRegistry.defaultRegistry是一个collector注册中心(即metric),持有collector的引用,因此如果collector中的数值有变化,通过CollectorRegistry.defaultRegistry也可以获取到
pushGateway.pushAdd(CollectorRegistry.defaultRegistry, jobName);
} catch (Exception e) {
log.warn("Failed to push metrics to PushGateway with jobName {}.", jobName, e);
好了,看到这里,你是不是对Flink Metrics的运作机制,有了大概的理解呢?
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。