当前位置:   article > 正文

flink-metric原理与实战_flink自定义业务埋点

flink自定义业务埋点

一、背景介绍

     flink内部实现了一套metric数据收集库。同时flink自身系统有一些固定的metric数据,包括系统的一些指标,CPU、内存、 IO或者各个task运行的一些指标。具体包含那些指标可以查看官方文档: flink-metric。同时我们也可以利用系统的metric库在自己的代码中进行打点收集metrics数据。此外, flink提供了外部接口reporter,可以用来导出这些metrics数据。

 

二、Metric类别

 

  从图中可以看出,Metric这个接口有四个直接子类,分别是:

Gauge —— 最简单的度量指标,只是简单的返回一个值,比如当前实时读取kafka数据的条数 
Counter —— 计数器,在一些情况下,会比Gauge高效,比如通过一个AtomicLong变量来统计一个队列的长度; 
Meter —— 吞吐量的度量,也就是一系列事件发生的速率,例如TPS; 
Histogram —— 度量值的统计结果,如最大值、最小值、平均值,以及分布情况等。

 

三、Metric使用

     在官方的文档中有介绍, 需要继承 Richfunction 才能获得对应的metric对象,用法如下:

  1. public class MyMapper extends RichMapFunction<String, String> {
  2. private transient Counter counter;
  3. @Override
  4. public void open(Configuration config) {
  5. this.counter = getRuntimeContext()
  6. .getMetricGroup()
  7. .counter("myCounter");
  8. }
  9. @Override
  10. public String map(String value) throws Exception {
  11. this.counter.inc();
  12. return value;
  13. }
  14. }

    自定义监控指标会在Flink WebUI显示出来,如需在外部系统显示,需要自定义reporter类。

 

四、监控指标接入外部系统

          Flink 内置了很多 Reporter,对外部系统的技术选型可以参考,比如 JMX 是 java 自带的技术,不严格属于第三方。还有InfluxDB、Prometheus、Slf4j(直接打 log 里)等,调试时候很好用,可以直接看 logger,Flink 本身自带日志系统,会打到 Flink 框架包里面去。

         详见官方文档:

https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html

 

1.自定义reporter(参考自带的influxDB)

  1. public class InfluxdbReporter extends AbstractReporter<MeasurementInfo> implements Scheduled {
  2. private String database;
  3. private String retentionPolicy;
  4. private InfluxDB influxDB;
  5. public InfluxdbReporter() {
  6. super(new MeasurementInfoProvider());
  7. }
  8. @Override
  9. public void open(MetricConfig config) {
  10. String host = getString(config, HOST);
  11. int port = getInteger(config, PORT);
  12. if (!isValidHost(host) || !isValidPort(port)) {
  13. throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port);
  14. }
  15. String database = getString(config, DB);
  16. if (database == null) {
  17. throw new IllegalArgumentException("'" + DB.key() + "' configuration option is not set");
  18. }
  19. String url = String.format("http://%s:%d", host, port);
  20. String username = getString(config, USERNAME);
  21. String password = getString(config, PASSWORD);
  22. this.database = database;
  23. this.retentionPolicy = getString(config, RETENTION_POLICY);
  24. if (username != null && password != null) {
  25. influxDB = InfluxDBFactory.connect(url, username, password);
  26. } else {
  27. influxDB = InfluxDBFactory.connect(url);
  28. }
  29. log.info("Configured InfluxDBReporter with {host:{}, port:{}, db:{}, and retentionPolicy:{}}", host, port, database, retentionPolicy);
  30. }
  31. @Override
  32. public void close() {
  33. if (influxDB != null) {
  34. influxDB.close();
  35. influxDB = null;
  36. }
  37. }
  38. @Override
  39. public void report() {
  40. BatchPoints report = buildReport();
  41. if (report != null) {
  42. influxDB.write(report);
  43. }
  44. }
  45. @Nullable
  46. private BatchPoints buildReport() {
  47. Instant timestamp = Instant.now();
  48. BatchPoints.Builder report = BatchPoints.database(database);
  49. report.retentionPolicy(retentionPolicy);
  50. try {
  51. for (Map.Entry<Gauge<?>, MeasurementInfo> entry : gauges.entrySet()) {
  52. report.point(MetricMapper.map(entry.getValue(), timestamp, entry.getKey()));
  53. }
  54. for (Map.Entry<Counter, MeasurementInfo> entry : counters.entrySet()) {
  55. report.point(MetricMapper.map(entry.getValue(), timestamp, entry.getKey()));
  56. }
  57. for (Map.Entry<Histogram, MeasurementInfo> entry : histograms.entrySet()) {
  58. report.point(MetricMapper.map(entry.getValue(), timestamp, entry.getKey()));
  59. }
  60. for (Map.Entry<Meter, MeasurementInfo> entry : meters.entrySet()) {
  61. report.point(MetricMapper.map(entry.getValue(), timestamp, entry.getKey()));
  62. }
  63. }
  64. catch (ConcurrentModificationException | NoSuchElementException e) {
  65. // ignore - may happen when metrics are concurrently added or removed
  66. // report next time
  67. return null;
  68. }
  69. return report.build();
  70. }
  71. private static boolean isValidHost(String host) {
  72. return host != null && !host.isEmpty();
  73. }
  74. private static boolean isValidPort(int port) {
  75. return 0 < port && port <= 65535;
  76. }
  77. }

 

2.配置文件

  1. metrics.reporter.influxdb.class: org.apache.flink.metrics.influxdb.InfluxdbReporter
  2. metrics.reporter.influxdb.host: 192.168.2.x
  3. metrics.reporter.influxdb.port: 8086
  4. metrics.reporter.influxdb.db: flink_metrics
  5. metrics.reporter.influxdb.username: gzdata
  6. metrics.reporter.influxdb.password: gzdata

 可以自定义配置,在配置文件中配置:

metrics.reporter.your_monitor.config.b: your_b_value

 然后在open方法中可以直接获取到。

 

输出到influxDB详见:https://blog.csdn.net/qq_23160237/article/details/98058632

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/200334?site
推荐阅读
相关标签
  

闽ICP备14008679号