赞
踩
Kafka使用Yammer Metrics在服务器和Scala客户端中报告指标。Java客户端使用Kafka Metrics,它是一个内置的度量标准注册表,可最大程度地减少拉入客户端应用程序的传递依赖项。两者都通过JMX公开指标,并且可以配置为使用可插拔的统计报告器报告统计信息,以连接到您的监视系统。
具体的监控指标可以查看官方文档。
(1)Kafka开启Jmx端口
[root@node4 bin]# vim /opt/kafka_2.12-1.0.2/bin/kafka-server-start.sh
所有kafka机器添加一个JMX_PORT ,并重启kafka
(2)验证JMX开启
首先打印9581端口占用的进程信息,然后使用进程编号对应到Kafka的进程号,搞定。
也可以查看Kafka启动日志,确定启动参数 -Dcom.sun.management.jmxremote.port=9581存在即可
(1)win/mac,找到jconsole工具并打开, 在${JAVA_HOEM}/bin/Mac电脑可以直接命令行输入 jconsole
详细的监控指标
OS监控项
objectName | 指标项 | 说明 |
java.lang:type=OperatingSystem | FreePhysicalMemorySize | 空闲物理内存 |
java.lang:type=OperatingSystem | SystemCpuLoad | 系统CPU利用率 |
java.lang:type=OperatingSystem | ProcessCpuLoad | 进程CPU利用率 |
java.lang:type=GarbageCollector, name=G1 Young Generation | CollectionCount | GC次数 |
broker指标
objectName | 指标项 | 说明 |
kafka.server:type=BrokerTopicMetrics, name=BytesInPerSec | Count | 每秒输入的流量 |
kafka.server:type=BrokerTopicMetrics, name=BytesOutPerSec | Count | 每秒输出的流量 |
kafka.server:type=BrokerTopicMetrics, name=BytesRejectedPerSec | Count | 每秒扔掉的流量 |
kafka.server:type=BrokerTopicMetrics, name=MessagesInPerSec | Count | 每秒的消息写入总量 |
kafka.server:type=BrokerTopicMetrics, name=FailedFetchRequestsPerSec | Count | 当前机器每秒fetch请求失败的数量 |
kafka.server:type=BrokerTopicMetrics, name=FailedProduceRequestsPerSec | Count | 当前机器每秒produce请求失败的数量 |
kafka.server:type=ReplicaManager,name=PartitionCount | Value | 该broker上的partition的数量 |
kafka.server:type=ReplicaManager,name=LeaderCount | Value | Leader的replica的数量 |
kafka.network:type=RequestMetrics, name=TotalTimeMs,request=FetchConsumer | Count | 一个请求FetchConsumer耗费的所有时间 |
kafka.network:type=RequestMetrics, name=TotalTimeMs,request=FetchFollower | Count | 一个请求FetchFollower耗费的所有时间 |
kafka.network:type=RequestMetrics, name=TotalTimeMs,request=Produce | Count | 一个请求Produce耗费的所有时间 |
producer以及topic指标
objectName | 指标项 | 官网说明 | 译文说明 |
kafka.producer:type=producer-metrics,client-id=console-producer(client-id会变化) | incoming-byte-rate | The average number of incoming bytes received per second from all servers. | producer每秒的平均写入流量 |
kafka.producer:type=producer-metrics,client-id=console-producer(client-id会变化) | outgoing-byte-rate | The average number of outgoing bytes sent per second to all servers. | producer每秒的输出流量 |
kafka.producer:type=producer-metrics,client-id=console-producer(client-id会变化) | request-rate | The average number of requests sent per second to the broker. | producer每秒发给broker的平均request次数 |
kafka.producer:type=producer-metrics,client-id=console-producer(client-id会变化) | response-rate | The average number of responses received per second from the broker. | producer每秒发给broker的平均response次数 |
kafka.producer:type=producer-metrics,client-id=console-producer(client-id会变化) | request-latency-avg | The average time taken for a fetch request. | 一个fetch请求的平均时间 |
kafka.producer:type=producer-topic-metrics,client-id=console-producer,topic=testjmx(client-id和topic名称会变化) | record-send- rate | The average number of records sent per second for a topic. | 每秒从topic发送的平均记录数 |
kafka.producer:type=producer-topic-metrics,client-id=console-producer,topic=testjmx(client-id和topic名称会变化) | record-retry-total | The total number of retried record sends | 重试发送的消息总数量 |
kafka.producer:type=producer-topic-metrics,client-id=console-producer,topic=testjmx(client-id和topic名称会变化) | record-error-total | The total number of record sends that resulted in errors | 发送错误的消息总数量 |
consumer指标
objectName | 指标项 | 官网说明 | 说明 |
kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-1(client-id会变化) | records-lag-max | Number of messages the consumer lags behind the producer by. Published by the consumer, not broker. | 由consumer提交的消息消费lag |
kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-1(client-id会变化) | records-consumed-rate | The average number of records consumed per second | 每秒平均消费的消息数量 |
查看要监控哪个指标:
代码实现:
pom依赖
- <dependencies>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>1.0.2</version>
- </dependency>
- </dependencies>
Java代码
- package com.lagou.kafka.demo.monitor;
-
- import javax.management.*;
- import javax.management.remote.JMXConnector;
- import javax.management.remote.JMXConnectorFactory;
- import javax.management.remote.JMXServiceURL;
- import java.io.IOException;
- import java.util.Iterator;
- import java.util.Set;
-
- public class JMXMonitorDemo {
-
- public static void main(String[] args) throws IOException, MalformedObjectNameException, AttributeNotFoundException, MBeanException, ReflectionException, InstanceNotFoundException {
-
- // jmx的url字符串
- // 在集群中,有很多broker节点,每个broker上统计的数据不都是一样的,一般不一样
- // 在jconsole中查看到的数据和通过程序获取的数据,一定要保证获取的是同一个broker节点上的
- String jmxServiceURL = "service:jmx:rmi:///jndi/rmi://192.168.100.103:9581/jmxrmi";
-
- JMXServiceURL jmxURL = null;
- JMXConnector jmxc = null;
- MBeanServerConnection jmxs = null;
- ObjectName mbeanObjName = null;
- Iterator sampleIter = null;
- Set sampleSet = null;
-
- // 创建JMXServiceURL对象,参数是
- jmxURL = new JMXServiceURL(jmxServiceURL);
- // 建立到指定URL服务器的连接
- jmxc = JMXConnectorFactory.connect(jmxURL);
- // 返回代表远程MBean服务器的MBeanServerConnection对象
- jmxs = jmxc.getMBeanServerConnection();
- // 根据传入的字符串,创建ObjectName对象
- // mbeanObjName = new ObjectName("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec");
- // mbeanObjName = new ObjectName("kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=tp_eagle_01");
- mbeanObjName = new ObjectName("kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec");
- // 获取指定ObjectName对应的MBeans
- sampleSet = jmxs.queryMBeans(null, mbeanObjName);
- // 迭代器
- sampleIter = sampleSet.iterator();
-
- if (sampleSet.isEmpty()) {
-
- } else {
- // 如果返回了,则打印信息
- while (sampleIter.hasNext()) {
- // Used to represent the object name of an MBean and its class name.
- // If the MBean is a Dynamic MBean the class name should be retrieved from the MBeanInfo it provides.
- // 用于表示MBean的ObjectName和ClassName
- ObjectInstance sampleObj = (ObjectInstance) sampleIter.next();
- ObjectName objectName = sampleObj.getObjectName();
- // 查看指定MBean指定属性的值
- String count = jmxs.getAttribute(objectName, "Count").toString();
- System.out.println(count);
- }
- }
-
- // 关闭
- jmxc.close();
- }
-
- }
我们可以使用Kafka-eagle管理Kafka集群
核心模块:
架构:
需要Kafka节点开启JMX。前面讲过了。
- # 下载编译好的包
- wget http://pkgs-linux.cvimer.com/kafka-eagle.zip
-
- # 配置kafka-eagle
- unzip kafka-eagle.zip
- cd kafka-eagle/kafka-eagle-web/target
- mkdir -p test
- cp kafka-eagle-web-2.0.1-bin.tar.gz test/
- tar xf kafka-eagle-web-2.0.1-bin.tar.gz
- cd kafka-eagle-web-2.0.1
需要配置环境变量:
conf下的配置文件:system-config.properties
- ######################################
- # multi zookeeper & kafka cluster list
-
- ######################################
- # 集群的别名,用于在kafka-eagle中进行区分。
- # 可以配置监控多个集群,别名用逗号隔开
- # kafka.eagle.zk.cluster.alias=cluster1,cluster2,cluster3
- kafka.eagle.zk.cluster.alias=cluster1
- # cluster1.zk.list=10.1.201.17:2181,10.1.201.22:2181,10.1.201.23:2181
- # 配置当前集群的zookeeper地址,此处的值要与Kafka的server.properties中的zookeeper.connect的值一致
- # 此处的前缀就是集群的别名
- cluster1.zk.list=node2:2181,node3:2181,node4:2181/myKafka
- #cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181
-
- ######################################
- # zookeeper enable acl
- ######################################
- cluster1.zk.acl.enable=false
- cluster1.zk.acl.schema=digest
- cluster1.zk.acl.username=test
- cluster1.zk.acl.password=test123
-
- ######################################
- # broker size online list
- ######################################
- cluster1.kafka.eagle.broker.size=20
-
- ######################################
- # zookeeper客户端连接数限制
- ######################################
- kafka.zk.limit.size=25
-
- ######################################
- # kafka eagle网页端口号
- ######################################
- kafka.eagle.webui.port=8048
-
- ######################################
- # kafka 消费信息存储位置,用来兼容kafka低版本
- ######################################
- cluster1.kafka.eagle.offset.storage=kafka
- cluster2.kafka.eagle.offset.storage=zk
-
- ######################################
- # kafka metrics, 15 days by default
- ######################################
- kafka.eagle.metrics.charts=true
- kafka.eagle.metrics.retain=15
-
- ######################################
- # kafka sql topic records max
- ######################################
- kafka.eagle.sql.topic.records.max=5000
- kafka.eagle.sql.fix.error=true
-
- ######################################
- # 管理员删除kafka中topic的口令
- ######################################
- kafka.eagle.topic.token=keadmin
-
- ######################################
- # kafka 集群是否开启了认证模式,此处是cluster1集群的配置,禁用
- ######################################
- cluster1.kafka.eagle.sasl.enable=false
- cluster1.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
- cluster1.kafka.eagle.sasl.mechanism=SCRAM-SHA-256
- cluster1.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle";
- cluster1.kafka.eagle.sasl.client.id=
- cluster1.kafka.eagle.sasl.cgroup.enable=false
- cluster1.kafka.eagle.sasl.cgroup.topics=
-
- ######################################
- # kafka ssl authenticate,示例配置
- ######################################
- cluster2.kafka.eagle.sasl.enable=false
- cluster2.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
- cluster2.kafka.eagle.sasl.mechanism=PLAIN
- cluster2.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle";
- cluster2.kafka.eagle.sasl.client.id=
- cluster2.kafka.eagle.sasl.cgroup.enable=false
- cluster2.kafka.eagle.sasl.cgroup.topics=
-
- ######################################
- # kafka ssl authenticate,示例配置
- ######################################
- cluster3.kafka.eagle.ssl.enable=false
- cluster3.kafka.eagle.ssl.protocol=SSL
- cluster3.kafka.eagle.ssl.truststore.location=
- cluster3.kafka.eagle.ssl.truststore.password=
- cluster3.kafka.eagle.ssl.keystore.location=
- cluster3.kafka.eagle.ssl.keystore.password=
- cluster3.kafka.eagle.ssl.key.password=
- cluster3.kafka.eagle.ssl.cgroup.enable=false
- cluster3.kafka.eagle.ssl.cgroup.topics=
-
- ######################################
- # 存储监控数据的数据库地址
- # kafka默认使用sqlite存储,需要指定和创建sqlite的目录
- # 如 /home/lagou/hadoop/kafka-eagle/db
- ######################################
- kafka.eagle.driver=org.sqlite.JDBC
- kafka.eagle.url=jdbc:sqlite:/home/lagou/hadoop/kafka-eagle/db/ke.db
- kafka.eagle.username=root
- kafka.eagle.password=www.kafka-eagle.org
-
- ######################################
- # 还可以使用MySLQ存储监控数据
- ######################################
- #kafka.eagle.driver=com.mysql.jdbc.Driver
- #kafka.eagle.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
- #kafka.eagle.username=root
- #kafka.eagle.password=123456
-
- ######################################
- # kafka eagle 设置告警邮件服务器
- ######################################
- kafka.eagle.mail.enable=true
- kafka.eagle.mail.sa=kafka_lagou_alert
- kafka.eagle.mail.username=kafka_lagou_alert@163.com
- kafka.eagle.mail.password=Pas2W0rd
- kafka.eagle.mail.server.host=smtp.163.com
- kafka.eagle.mail.server.port=25
# 启动kafka-eagle
./bin/ke.sh start会提示我们登陆地址和账号密码
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。