当前位置:   article > 正文

Flink源码剖析:Metrics运作机制_flink 源码 metric

flink 源码 metric

1. Metrics简介

1.1 什么是 Metrics?

Flink 提供的 Metrics 可以在 Flink 内部收集一些指标,通过这些指标让开发人员更好地理解作业或集群的状态。由于集群运行后很难发现内部的实际状况,跑得慢或快,是否异常等,开发人员无法实时查看所有的 Task 日志,比如作业很大或者有很多作业的情况下,该如何处理?此时 Metrics 可以很好的帮助开发人员了解作业的当前状况。

1.2 Metric Types

Metrics 的类型如下:

  1. Counter,对一个计数器进行累加,即对于多条数据和多兆数据一直往上加的过程。
  2. Gauge,Gauge 是最简单的 Metrics,它反映一个值的大小。
  3. Meter,Meter 是指统计吞吐量和单位时间内发生“事件”的次数。它相当于求一种速率,即事件次数除以使用的时间。理解为滚动窗口求平均值即可。
  4. Histogram,Histogram 比较复杂,也并不常用,Histogram 用于统计一些数据的分布,比如说 Quantile、Mean、StdDev、Max、Min 等。

1.3 Metric Group

Metric 在 Flink 内部有多层结构,以 Group 的方式组织,它并不是一个扁平化的结构,Metric Group + Metric Name 是 Metrics 的唯一标识。

Metric Group 的层级有 TaskManagerMetricGroup 和TaskManagerJobMetricGroup,每个 Job 具体到某一个 task 的 group,task 又分为 TaskIOMetricGroup 和 OperatorMetricGroup。Operator 下面也有 IO 统计和一些 Metrics,整个层级大概如下图所示。Metrics 不会影响系统,它处在不同的组中,并且 Flink支持自己去加 Group,可以有自己的层级。

•TaskManagerMetricGroup
 •TaskManagerJobMetricGroup
  •TaskMetricGroup
   •TaskIOMetricGroup
   •OperatorMetricGroup
    •${User-defined Group} / ${User-defined Metrics}
    •OperatorIOMetricGroup
 •JobManagerMetricGroup
  •JobManagerJobMetricGroup

JobManagerMetricGroup 相对简单,相当于 Master,它的层级也相对较少。

2. Metrics运行机制

系统或自定义的Metrics如何存入第三方存储呢?一般由两种模式,推(push)和拉(poll)。接下来,我们通过分析PrometheusReporterPrometheusPushGatewayReporter两种Reporter,来深入理解Metrics的运行机制和pushpull的区别。

2.1 初始化Reporter

做一些与第三方存储相关的初始化工作

2.1.1 PrometheusReporter

启动本地Rest服务器,Prometheus与其通信,并pull metrics

public class PrometheusReporter extends AbstractPrometheusReporter {
	// 初始化一些与第三方存储相关的工作
  @Override
	public void open(MetricConfig config) {
		super.open(config);

		String portsConfig = config.getString(ARG_PORT, DEFAULT_PORT);
		Iterator<Integer> ports = NetUtils.getPortRangeFromString(portsConfig);

		while (ports.hasNext()) {
			int port = ports.next();
			try {
				// internally accesses CollectorRegistry.defaultRegistry
                // 针对PrometheusReporter,启动一个本地Rest服务器,以供Prometheus拉取数据;
                // 而如果是PrometheusPushGatewayReporter,这里就是根据host+port新建一个PushGateway客户端,来往PushGateway发送数据
				httpServer = new HTTPServer(port);
				this.port = port;
				log.info("Started PrometheusReporter HTTP server on port {}.", port);
				break;
			} catch (IOException ioe) { //assume port conflict
				log.debug("Could not start PrometheusReporter HTTP server on port {}.", port, ioe);
			}
		}
		if (httpServer == null) {
			throw new RuntimeException("Could not start PrometheusReporter HTTP server on any configured port. Ports: " + portsConfig);
		}
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
2.1.2 PrometheusPushGatewayReporter

新建一个PushGateway客户端,主动向PushGateway push metrics

public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled {
	@Override
	public void open(MetricConfig config) {
		super.open(config);

		String host = config.getString(HOST.key(), HOST.defaultValue());
		int port = config.getInteger(PORT.key(), PORT.defaultValue());
		String configuredJobName = config.getString(JOB_NAME.key(), JOB_NAME.defaultValue());
		boolean randomSuffix = config.getBoolean(RANDOM_JOB_NAME_SUFFIX.key(), RANDOM_JOB_NAME_SUFFIX.defaultValue());
		deleteOnShutdown = config.getBoolean(DELETE_ON_SHUTDOWN.key(), DELETE_ON_SHUTDOWN.defaultValue());

		if (host == null || host.isEmpty() || port < 1) {
			throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port);
		}

		if (randomSuffix) {
			this.jobName = configuredJobName + new AbstractID();
		} else {
			this.jobName = configuredJobName;
		}
        // 新建一个PushGateway客户端,用于向PushGateway推送Metrics
		pushGateway = new PushGateway(host + ':' + port);
		log.info("Configured PrometheusPushGatewayReporter with {host:{}, port:{}, jobName: {}, randomJobNameSuffix:{}, deleteOnShutdown:{}}", host, port, jobName, randomSuffix, deleteOnShutdown);
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

2.2 注册Reporter

向MetricRegistry对象注册已配置的所有Reporter,以列表的形式维护

public class MetricRegistryImpl implements MetricRegistry {	
public MetricRegistryImpl(MetricRegistryConfiguration config, Collection<ReporterSetup> reporterConfigurations) {
		this.maximumFramesize = config.getQueryServiceMessageSizeLimit();
		this.scopeFormats = config.getScopeFormats();
		this.globalDelimiter = config.getDelimiter();
		this.delimiters = new ArrayList<>(10);
		this.terminationFuture = new CompletableFuture<>();
		this.isShutdown = false;

		// second, instantiate any custom configured reporters
		this.reporters = new ArrayList<>(4);
		// 周期调度器,用与汇报型的reporter周期向外界数据源周期汇报metrics
		this.executor = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-MetricRegistry"));

		this.queryService = null;
		this.metricQueryServiceRpcService = null;

		if (reporterConfigurations.isEmpty()) {
			// no reporters defined
			// by default, don't report anything
			LOG.info("No metrics reporter configured, no metrics will be exposed/reported.");
		} else {
			// 根据flink-conf.yaml中配置的reporter类型,获得所有reporter
			for (ReporterSetup reporterSetup : reporterConfigurations) {
				final String namedReporter = reporterSetup.getName();

				try {
					Optional<String> configuredPeriod = reporterSetup.getIntervalSettings();
					TimeUnit timeunit = TimeUnit.SECONDS;
					long period = 10;

					if (configuredPeriod.isPresent()) {
						try {
							String[] interval = configuredPeriod.get().split(" ");
							period = Long.parseLong(interval[0]);
							timeunit = TimeUnit.valueOf(interval[1]);
						}
						catch (Exception e) {
							LOG.error("Cannot parse report interval from config: " + configuredPeriod +
									" - please use values like '10 SECONDS' or '500 MILLISECONDS'. " +
									"Using default reporting interval.");
						}
					}
					// reporter实例
					final MetricReporter reporterInstance = reporterSetup.getReporter();
					final String className = reporterInstance.getClass().getName();
					// 汇报型reporter,例如PrometheusPushGatewayReporter
					if (reporterInstance instanceof Scheduled) {
						LOG.info("Periodically reporting metrics in intervals of {} {} for reporter {} of type {}.", period, timeunit.name(), namedReporter, className);
						// 周期向外界数据源汇报metrics
						executor.scheduleWithFixedDelay(
								new MetricRegistryImpl.ReporterTask((Scheduled) reporterInstance), period, period, timeunit);
					} else {
						LOG.info("Reporting metrics for reporter {} of type {}.", namedReporter, className);
					}
          			// 注册当前reporter到MetricRegistry中,这里MetricRegistry可以理解为注册中心
					reporters.add(reporterInstance);

					String delimiterForReporter = reporterSetup.getDelimiter().orElse(String.valueOf(globalDelimiter));
					if (delimiterForReporter.length() != 1) {
						LOG.warn("Failed to parse delimiter '{}' for reporter '{}', using global delimiter '{}'.", delimiterForReporter, namedReporter, globalDelimiter);
						delimiterForReporter = String.valueOf(globalDelimiter);
					}
					this.delimiters.add(delimiterForReporter.charAt(0));
				}
				catch (Throwable t) {
					LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", namedReporter, t);
				}
			}
		}
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72

2.3 收集Metrics到内存

当启动flink任务时,向MetricRegistry注册metrics

// MetricRegistryImpl类	
public class MetricRegistryImpl implements MetricRegistry {
    // Registers a new {@link Metric} with this registry.
	@Override
	public void register(Metric metric, String metricName, AbstractMetricGroup group) {
		for (int i = 0; i < reporters.size(); i++) {
			MetricReporter reporter = reporters.get(i);
			try {
				if (reporter != null) {
					FrontMetricGroup front = new FrontMetricGroup<AbstractMetricGroup<?>>(i, group);
					// 注册metrics,做了以下操作:
					// 1. 将metric注册到CollectorRegistry.defaultRegistry这个静态类
					// 2. 设置metric初始值,数据存储在SimpleCollector#children中,以ConcurrentMap<List<String>, Child>的结构维护,其中key是一个有tag值组成的列表,值是child对象,List的hashcode方法由list中的所有值共同生成,因此可以当做map的键
					reporter.notifyOfAddedMetric(metric, metricName, front);
					}
				} catch (Exception e) {
							LOG.warn("Error while registering metric.", e);
				}
		}
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
public abstract class AbstractPrometheusReporter implements MetricReporter {
	private void addMetric(Metric metric, List<String> dimensionValues, Collector collector) {
		if (metric instanceof Gauge) {
			((io.prometheus.client.Gauge) collector).setChild(gaugeFrom((Gauge) metric), toArray(dimensionValues));
		} else if (metric instanceof Counter) {
			((io.prometheus.client.Gauge) collector).setChild(gaugeFrom((Counter) metric), toArray(dimensionValues));
		} else if (metric instanceof Meter) {
			((io.prometheus.client.Gauge) collector).setChild(gaugeFrom((Meter) metric), toArray(dimensionValues));
		} else if (metric instanceof Histogram) {
			((HistogramSummaryProxy) collector).addChild((Histogram) metric, dimensionValues);
		} else {
			log.warn("Cannot add unknown metric type: {}. This indicates that the metric type is not supported by this reporter.",
				metric.getClass().getName());
		}
	}
}

	public abstract class SimpleCollector<Child> extends Collector {
    	public <T extends Collector> T setChild(Child child, String... labelValues) {
        	if (labelValues.length != this.labelNames.size()) {
            	throw new IllegalArgumentException("Incorrect number of labels.");
        	} else {
        		// 设置metric初始值,数据存储在SimpleCollector#children中,以ConcurrentMap<List<String>, Child>的结构维护,其中key是一个有tag值组成的列表,值是child对象,List的hashcode方法由list中的所有值共同生成,因此可以当做map的键
            	this.children.put(Arrays.asList(labelValues), child);
            	return this;
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

2.4 发送Metrics到第三方存储

PrometheusReporter:
Metrics被存储到Map构成的容器中,就完成了使命,等待Prometheus定时来pull Metrics即可,这是pull的模式;

PrometheusPushgatewayReporter:

Flink中的reporter分为两种,一种是实现了Scheduled接口的,一种是没有实现Scheduled接口的;如果实现了Scheduled则表明,该reporter会周期性(通过周期调度线程池实现)地向第三方存储发送Metrics,即推的模式。
请参考以下源码中的中文注释:

public class MetricRegistryImpl implements MetricRegistry {
public MetricRegistryImpl(MetricRegistryConfiguration config, Collection<ReporterSetup> reporterConfigurations) {
		this.maximumFramesize = config.getQueryServiceMessageSizeLimit();
		this.scopeFormats = config.getScopeFormats();
		this.globalDelimiter = config.getDelimiter();
		this.delimiters = new ArrayList<>(10);
		this.terminationFuture = new CompletableFuture<>();
		this.isShutdown = false;

		// second, instantiate any custom configured reporters
		this.reporters = new ArrayList<>(4);

		this.executor = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-MetricRegistry"));

		this.queryService = null;
		this.metricQueryServiceRpcService = null;

		if (reporterConfigurations.isEmpty()) {
			// no reporters defined
			// by default, don't report anything
			LOG.info("No metrics reporter configured, no metrics will be exposed/reported.");
		} else {
			for (ReporterSetup reporterSetup : reporterConfigurations) {
				final String namedReporter = reporterSetup.getName();

				try {
					Optional<String> configuredPeriod = reporterSetup.getIntervalSettings();
					TimeUnit timeunit = TimeUnit.SECONDS;
					long period = 10;

					if (configuredPeriod.isPresent()) {
						try {
							String[] interval = configuredPeriod.get().split(" ");
							period = Long.parseLong(interval[0]);
							timeunit = TimeUnit.valueOf(interval[1]);
						}
						catch (Exception e) {
							LOG.error("Cannot parse report interval from config: " + configuredPeriod +
									" - please use values like '10 SECONDS' or '500 MILLISECONDS'. " +
									"Using default reporting interval.");
						}
					}

					final MetricReporter reporterInstance = reporterSetup.getReporter();
					final String className = reporterInstance.getClass().getName();
                    // 如果实现了Scheduled,则表明reporter会周期性向第三方存储汇报Metrics
					if (reporterInstance instanceof Scheduled) {
						LOG.info("Periodically reporting metrics in intervals of {} {} for reporter {} of type {}.", period, timeunit.name(), namedReporter, className);
                        // 启动周期汇报Metrics的定时线程
						executor.scheduleWithFixedDelay(
								new MetricRegistryImpl.ReporterTask((Scheduled) reporterInstance), period, period, timeunit);
					} else {
						LOG.info("Reporting metrics for reporter {} of type {}.", namedReporter, className);
					}
					reporters.add(reporterInstance);

					String delimiterForReporter = reporterSetup.getDelimiter().orElse(String.valueOf(globalDelimiter));
					if (delimiterForReporter.length() != 1) {
						LOG.warn("Failed to parse delimiter '{}' for reporter '{}', using global delimiter '{}'.", delimiterForReporter, namedReporter, globalDelimiter);
						delimiterForReporter = String.valueOf(globalDelimiter);
					}
					this.delimiters.add(delimiterForReporter.charAt(0));
				}
				catch (Throwable t) {
					LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", namedReporter, t);
				}
			}
		}
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
public class MetricRegistryImpl implements MetricRegistry {	
	private static final class ReporterTask extends TimerTask {

		private final Scheduled reporter;

		private ReporterTask(Scheduled reporter) {
			this.reporter = reporter;
		}
       // 周期汇报Metrics的线程
		@Override
		public void run() {
			try {
				reporter.report();
			} catch (Throwable t) {
				LOG.warn("Error while reporting metrics", t);
			}
		}
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled {
    // 具体的汇报逻辑,就是调用PushGateway的Rest接口,往PushGateway插入Metrics
	@Override
	public void report() {
		try {
			// CollectorRegistry.defaultRegistry是一个collector注册中心(即metric),持有collector的引用,因此如果collector中的数值有变化,通过CollectorRegistry.defaultRegistry也可以获取到
			pushGateway.pushAdd(CollectorRegistry.defaultRegistry, jobName);
		} catch (Exception e) {
			log.warn("Failed to push metrics to PushGateway with jobName {}.", jobName, e);
		}
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

好了,看到这里,你是不是对Flink Metrics的运作机制,有了大概的理解呢?

参考:
http://www.54tianzhisheng.cn/2019/11/23/flink-metrics/
https://www.jianshu.com/p/a5ed47cd320a
https://www.cnblogs.com/sanduzxcvbnm/p/13724351.html

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/200325
推荐阅读
相关标签
  

闽ICP备14008679号