当前位置:   article > 正文

Kafka集群监控_kafka jmx

kafka jmx

1、监控度量指标

        Kafka使用Yammer Metrics在服务器和Scala客户端中报告指标。Java客户端使用Kafka Metrics,它是一个内置的度量标准注册表,可最大程度地减少拉入客户端应用程序的传递依赖项。两者都通过JMX公开指标,并且可以配置为使用可插拔的统计报告器报告统计信息,以连接到您的监视系统。

具体的监控指标可以查看官方文档

1.1、JMX

(1)Kafka开启Jmx端口

[root@node4 bin]# vim /opt/kafka_2.12-1.0.2/bin/kafka-server-start.sh

所有kafka机器添加一个JMX_PORT ,并重启kafka 

(2)验证JMX开启

首先打印9581端口占用的进程信息,然后使用进程编号对应到Kafka的进程号,搞定。

        也可以查看Kafka启动日志,确定启动参数 -Dcom.sun.management.jmxremote.port=9581存在即可 

1.2、使用JConsole链接JMX端口

(1)win/mac,找到jconsole工具并打开, 在${JAVA_HOEM}/bin/Mac电脑可以直接命令行输入 jconsole

详细的监控指标 

OS监控项 

 

objectName指标项说明
java.lang:type=OperatingSystemFreePhysicalMemorySize空闲物理内存
java.lang:type=OperatingSystemSystemCpuLoad系统CPU利用率
java.lang:type=OperatingSystemProcessCpuLoad进程CPU利用率
java.lang:type=GarbageCollector, name=G1 Young GenerationCollectionCountGC次数

broker指标

objectName指标项说明
kafka.server:type=BrokerTopicMetrics, name=BytesInPerSecCount每秒输入的流量
kafka.server:type=BrokerTopicMetrics, name=BytesOutPerSecCount每秒输出的流量
kafka.server:type=BrokerTopicMetrics, name=BytesRejectedPerSecCount每秒扔掉的流量
kafka.server:type=BrokerTopicMetrics, name=MessagesInPerSecCount每秒的消息写入总量
kafka.server:type=BrokerTopicMetrics, name=FailedFetchRequestsPerSecCount当前机器每秒fetch请求失败的数量
kafka.server:type=BrokerTopicMetrics, name=FailedProduceRequestsPerSecCount当前机器每秒produce请求失败的数量
kafka.server:type=ReplicaManager,name=PartitionCountValue该broker上的partition的数量
kafka.server:type=ReplicaManager,name=LeaderCountValueLeader的replica的数量
kafka.network:type=RequestMetrics, name=TotalTimeMs,request=FetchConsumerCount一个请求FetchConsumer耗费的所有时间
kafka.network:type=RequestMetrics, name=TotalTimeMs,request=FetchFollowerCount一个请求FetchFollower耗费的所有时间
kafka.network:type=RequestMetrics, name=TotalTimeMs,request=ProduceCount一个请求Produce耗费的所有时间

producer以及topic指标

objectName指标项官网说明译文说明
kafka.producer:type=producer-metrics,client-id=console-producer(client-id会变化)incoming-byte-rateThe average number of incoming bytes received per second from all servers.producer每秒的平均写入流量
kafka.producer:type=producer-metrics,client-id=console-producer(client-id会变化)outgoing-byte-rateThe average number of outgoing bytes sent per second to all servers.producer每秒的输出流量
kafka.producer:type=producer-metrics,client-id=console-producer(client-id会变化)request-rateThe average number of requests sent per second to the broker.producer每秒发给broker的平均request次数
kafka.producer:type=producer-metrics,client-id=console-producer(client-id会变化)response-rateThe average number of responses received per second from the broker.producer每秒发给broker的平均response次数
kafka.producer:type=producer-metrics,client-id=console-producer(client-id会变化)request-latency-avgThe average time taken for a fetch request.一个fetch请求的平均时间
kafka.producer:type=producer-topic-metrics,client-id=console-producer,topic=testjmx(client-id和topic名称会变化)record-send-
rate
The average number of records sent per second for a topic.每秒从topic发送的平均记录数
kafka.producer:type=producer-topic-metrics,client-id=console-producer,topic=testjmx(client-id和topic名称会变化)record-retry-totalThe total number of retried record sends重试发送的消息总数量
kafka.producer:type=producer-topic-metrics,client-id=console-producer,topic=testjmx(client-id和topic名称会变化)record-error-totalThe total number of record sends that resulted in errors发送错误的消息总数量

consumer指标

objectName指标项官网说明说明
kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-1(client-id会变化)records-lag-maxNumber of messages the consumer lags behind the producer by. Published by the consumer, not broker.由consumer提交的消息消费lag
kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-1(client-id会变化)records-consumed-rateThe average number of records consumed per second每秒平均消费的消息数量

1.3、编程手段来获取监控指标

查看要监控哪个指标:

代码实现: 

pom依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.kafka</groupId>
  4. <artifactId>kafka-clients</artifactId>
  5. <version>1.0.2</version>
  6. </dependency>
  7. </dependencies>

Java代码

  1. package com.lagou.kafka.demo.monitor;
  2. import javax.management.*;
  3. import javax.management.remote.JMXConnector;
  4. import javax.management.remote.JMXConnectorFactory;
  5. import javax.management.remote.JMXServiceURL;
  6. import java.io.IOException;
  7. import java.util.Iterator;
  8. import java.util.Set;
  9. public class JMXMonitorDemo {
  10. public static void main(String[] args) throws IOException, MalformedObjectNameException, AttributeNotFoundException, MBeanException, ReflectionException, InstanceNotFoundException {
  11. // jmx的url字符串
  12. // 在集群中,有很多broker节点,每个broker上统计的数据不都是一样的,一般不一样
  13. // 在jconsole中查看到的数据和通过程序获取的数据,一定要保证获取的是同一个broker节点上的
  14. String jmxServiceURL = "service:jmx:rmi:///jndi/rmi://192.168.100.103:9581/jmxrmi";
  15. JMXServiceURL jmxURL = null;
  16. JMXConnector jmxc = null;
  17. MBeanServerConnection jmxs = null;
  18. ObjectName mbeanObjName = null;
  19. Iterator sampleIter = null;
  20. Set sampleSet = null;
  21. // 创建JMXServiceURL对象,参数是
  22. jmxURL = new JMXServiceURL(jmxServiceURL);
  23. // 建立到指定URL服务器的连接
  24. jmxc = JMXConnectorFactory.connect(jmxURL);
  25. // 返回代表远程MBean服务器的MBeanServerConnection对象
  26. jmxs = jmxc.getMBeanServerConnection();
  27. // 根据传入的字符串,创建ObjectName对象
  28. // mbeanObjName = new ObjectName("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec");
  29. // mbeanObjName = new ObjectName("kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=tp_eagle_01");
  30. mbeanObjName = new ObjectName("kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec");
  31. // 获取指定ObjectName对应的MBeans
  32. sampleSet = jmxs.queryMBeans(null, mbeanObjName);
  33. // 迭代器
  34. sampleIter = sampleSet.iterator();
  35. if (sampleSet.isEmpty()) {
  36. } else {
  37. // 如果返回了,则打印信息
  38. while (sampleIter.hasNext()) {
  39. // Used to represent the object name of an MBean and its class name.
  40. // If the MBean is a Dynamic MBean the class name should be retrieved from the MBeanInfo it provides.
  41. // 用于表示MBean的ObjectName和ClassName
  42. ObjectInstance sampleObj = (ObjectInstance) sampleIter.next();
  43. ObjectName objectName = sampleObj.getObjectName();
  44. // 查看指定MBean指定属性的值
  45. String count = jmxs.getAttribute(objectName, "Count").toString();
  46. System.out.println(count);
  47. }
  48. }
  49. // 关闭
  50. jmxc.close();
  51. }
  52. }

2、监控工具 Kafka Eagle

我们可以使用Kafka-eagle管理Kafka集群

核心模块:

  • 面板可视化
  • 主题管理,包含创建主题、删除主题、主题列举、主题配置、主题查询等
  • 消费者应用:对不同消费者应用进行监控,包含Kafka API、Flink API、Spark API、StormAPI、Flume API、LogStash API等
  • 集群管理:包含对Kafka集群和Zookeeper集群的详情展示,其内容包含Kafka启动时间、Kafka端口号、Zookeeper Leader角色等。同时,还有多集群切换管理,Zookeeper Client操作入口
  • 集群监控:包含对Broker、Kafka核心指标、Zookeeper核心指标进行监控,并绘制历史趋势图
  • 告警功能:对消费者应用数据积压情况进行告警,以及对Kafka和Zookeeper监控度进行告警。同时,支持邮件、微信、钉钉告警通知
  • 系统管理:包含用户创建、用户角色分配、资源访问进行管理

架构:

  • 可视化:负责展示主题列表、集群健康、消费者应用等
  • 采集器:数据采集的来源包含Zookeeper、Kafka JMX & 内部Topic、Kafka API(Kafka 2.x以后版本)
  • 数据存储:目前Kafka Eagle存储采用MySQL或SQLite,数据库和表的创建均是自动完成的,按照官方文档进行配置好,启动Kafka Eagle就会自动创建,用来存储元数据和监控数据
  • 监控:负责见消费者应用消费情况、集群健康状态
  • 告警:对监控到的异常进行告警通知,支持邮件、微信、钉钉等方式
  • 权限管理:对访问用户进行权限管理,对于管理员、开发者、访问者等不同角色的用户,分配不用的访问权限

需要Kafka节点开启JMX。前面讲过了。

  1. # 下载编译好的包
  2. wget http://pkgs-linux.cvimer.com/kafka-eagle.zip
  3. # 配置kafka-eagle
  4. unzip kafka-eagle.zip
  5. cd kafka-eagle/kafka-eagle-web/target
  6. mkdir -p test
  7. cp kafka-eagle-web-2.0.1-bin.tar.gz test/
  8. tar xf kafka-eagle-web-2.0.1-bin.tar.gz
  9. cd kafka-eagle-web-2.0.1

需要配置环境变量:

conf下的配置文件:system-config.properties 

  1. ######################################
  2. # multi zookeeper & kafka cluster list
  3. ######################################
  4. # 集群的别名,用于在kafka-eagle中进行区分。
  5. # 可以配置监控多个集群,别名用逗号隔开
  6. # kafka.eagle.zk.cluster.alias=cluster1,cluster2,cluster3
  7. kafka.eagle.zk.cluster.alias=cluster1
  8. # cluster1.zk.list=10.1.201.17:2181,10.1.201.22:2181,10.1.201.23:2181
  9. # 配置当前集群的zookeeper地址,此处的值要与Kafka的server.properties中的zookeeper.connect的值一致
  10. # 此处的前缀就是集群的别名
  11. cluster1.zk.list=node2:2181,node3:2181,node4:2181/myKafka
  12. #cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181
  13. ######################################
  14. # zookeeper enable acl
  15. ######################################
  16. cluster1.zk.acl.enable=false
  17. cluster1.zk.acl.schema=digest
  18. cluster1.zk.acl.username=test
  19. cluster1.zk.acl.password=test123
  20. ######################################
  21. # broker size online list
  22. ######################################
  23. cluster1.kafka.eagle.broker.size=20
  24. ######################################
  25. # zookeeper客户端连接数限制
  26. ######################################
  27. kafka.zk.limit.size=25
  28. ######################################
  29. # kafka eagle网页端口号
  30. ######################################
  31. kafka.eagle.webui.port=8048
  32. ######################################
  33. # kafka 消费信息存储位置,用来兼容kafka低版本
  34. ######################################
  35. cluster1.kafka.eagle.offset.storage=kafka
  36. cluster2.kafka.eagle.offset.storage=zk
  37. ######################################
  38. # kafka metrics, 15 days by default
  39. ######################################
  40. kafka.eagle.metrics.charts=true
  41. kafka.eagle.metrics.retain=15
  42. ######################################
  43. # kafka sql topic records max
  44. ######################################
  45. kafka.eagle.sql.topic.records.max=5000
  46. kafka.eagle.sql.fix.error=true
  47. ######################################
  48. # 管理员删除kafka中topic的口令
  49. ######################################
  50. kafka.eagle.topic.token=keadmin
  51. ######################################
  52. # kafka 集群是否开启了认证模式,此处是cluster1集群的配置,禁用
  53. ######################################
  54. cluster1.kafka.eagle.sasl.enable=false
  55. cluster1.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
  56. cluster1.kafka.eagle.sasl.mechanism=SCRAM-SHA-256
  57. cluster1.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle";
  58. cluster1.kafka.eagle.sasl.client.id=
  59. cluster1.kafka.eagle.sasl.cgroup.enable=false
  60. cluster1.kafka.eagle.sasl.cgroup.topics=
  61. ######################################
  62. # kafka ssl authenticate,示例配置
  63. ######################################
  64. cluster2.kafka.eagle.sasl.enable=false
  65. cluster2.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
  66. cluster2.kafka.eagle.sasl.mechanism=PLAIN
  67. cluster2.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle";
  68. cluster2.kafka.eagle.sasl.client.id=
  69. cluster2.kafka.eagle.sasl.cgroup.enable=false
  70. cluster2.kafka.eagle.sasl.cgroup.topics=
  71. ######################################
  72. # kafka ssl authenticate,示例配置
  73. ######################################
  74. cluster3.kafka.eagle.ssl.enable=false
  75. cluster3.kafka.eagle.ssl.protocol=SSL
  76. cluster3.kafka.eagle.ssl.truststore.location=
  77. cluster3.kafka.eagle.ssl.truststore.password=
  78. cluster3.kafka.eagle.ssl.keystore.location=
  79. cluster3.kafka.eagle.ssl.keystore.password=
  80. cluster3.kafka.eagle.ssl.key.password=
  81. cluster3.kafka.eagle.ssl.cgroup.enable=false
  82. cluster3.kafka.eagle.ssl.cgroup.topics=
  83. ######################################
  84. # 存储监控数据的数据库地址
  85. # kafka默认使用sqlite存储,需要指定和创建sqlite的目录
  86. # 如 /home/lagou/hadoop/kafka-eagle/db
  87. ######################################
  88. kafka.eagle.driver=org.sqlite.JDBC
  89. kafka.eagle.url=jdbc:sqlite:/home/lagou/hadoop/kafka-eagle/db/ke.db
  90. kafka.eagle.username=root
  91. kafka.eagle.password=www.kafka-eagle.org
  92. ######################################
  93. # 还可以使用MySLQ存储监控数据
  94. ######################################
  95. #kafka.eagle.driver=com.mysql.jdbc.Driver
  96. #kafka.eagle.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
  97. #kafka.eagle.username=root
  98. #kafka.eagle.password=123456
  99. ######################################
  100. # kafka eagle 设置告警邮件服务器
  101. ######################################
  102. kafka.eagle.mail.enable=true
  103. kafka.eagle.mail.sa=kafka_lagou_alert
  104. kafka.eagle.mail.username=kafka_lagou_alert@163.com
  105. kafka.eagle.mail.password=Pas2W0rd
  106. kafka.eagle.mail.server.host=smtp.163.com
  107. kafka.eagle.mail.server.port=25
  • 也可以自行编译, https://github.com/smartloli/kafka-eagle
  • 创建Eagel的存储目录: mkdir -p /hadoop/kafka-eagle

# 启动kafka-eagle
./bin/ke.sh start

会提示我们登陆地址和账号密码

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/670600
推荐阅读
相关标签
  

闽ICP备14008679号