赞
踩
上节我们完成了如下的内容:
Kafka使用Yammer Metrics在服务器和Scala客户端中报告指标,Java客户端使用Kafka Metrics,它是一种内置的度量标准注册表,可最大程度的减少拉入客户端应用程序的传递依赖项。
两者都通过JMX公开指标,并且可以配置为使用可插拔的统计报告器报告统计信息,以连接到你的监控系统中。
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote \
-Dcom.sun.management.jmxremote.port=9999 \
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom.sun.management.jmxremote.ssl=false \
-Djava.rmi.server.hostname=${服务器的IP,尽量写IP,不要hostname或者域名}"
接着我们启动Kafka:
kafka-server-start.sh /opt/servers/kafka_2.12-2.7.2/config/server.properties
在本机上启动 jconsole 服务,我们运行如下指令:(本机要有JDK)
启动窗口如下图所示:
我们输入Kafka的地址和端口:
连接成功之后页面如下图:
我们选择 MBean 选项卡:
可以看到对应的数据情况:
http://kafka.apache.org/10/documentation.html#monitoring
我们可以通过编程的方式来获取到Kafka的指标信息:
package icu.wzk.kafka; import javax.management.MBeanServerConnection; import javax.management.ObjectInstance; import javax.management.ObjectName; import javax.management.remote.JMXConnector; import javax.management.remote.JMXConnectorFactory; import javax.management.remote.JMXServiceURL; import java.util.Iterator; import java.util.Set; public class JMXMonitorDemo { public static void main(String[] args) throws Exception { String jmxServiceUrl = "service:jmx:rmi:///jndi/rmi://h121.wzk.icu:9999/jmxrmi"; JMXServiceURL jmxUrl = null; JMXConnector jmxc = null; MBeanServerConnection jmxs = null; ObjectName mbeanObjectName = null; Iterator sampleIter = null; Set sampleSet = null; // 创建JMXServiceURL 对象 jmxUrl = new JMXServiceURL(jmxServiceUrl); // 建立指定的URL服务器的连接 jmxc = JMXConnectorFactory.connect(jmxUrl); // 返回代表远程MBean服务器的MBeanServiceConnection对象 jmxs = jmxc.getMBeanServerConnection(); // 根据传入的字符串,创建ObjectName对象 mbeanObjectName = new ObjectName("kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec"); // 指定ObjectName对应的MBeans sampleSet = jmxs.queryMBeans(null, mbeanObjectName); // 迭代器 sampleIter = sampleSet.iterator(); if (!sampleSet.isEmpty()) { // 如果返回了 则打印信息 while (sampleIter.hasNext()) { ObjectInstance sampleObject = (ObjectInstance) sampleIter.next(); ObjectName objectName = sampleObject.getObjectName(); // 查看指定MBean指定属性的值 String count = jmxs.getAttribute(objectName, "Count").toString(); System.out.println("count: " + count); } } // 关闭 jmxc.close(); } }
控制台输出结果如下:
我们可以使用 kafka-eagle 管理 Kafka集群。
# Github 地址
# https://github.com/smartloli/EFAK
wget https://github.com/smartloli/kafka-eagle-bin/archive/v3.0.1.tar.gz
mv v3.0.1.tar.gz kafka-eagle-v3.0.1.tar.gz
tar -zxvf kafka-eagle-v3.0.1.tar.gz
cd kafka-eagle-bin-3.0.1/
tar -zxvf efak-web-3.0.1-bin.tar.gz
mv efak-web-3.0.1/ /opt/servers/
下载过程如下图所示:
整理好的项目如下所示:
cd /opt/servers/efak-web-3.0.1
修改配置文件
vim conf/system-config.properties
文件按照自己的需要修改,我这里修改了部分:
efak.zk.cluster.alias=cluster1
cluster1.zk.list=h121.wzk.icu:2181,h122.wzk.icu:2181,h123.wzk.icu:2181
######################################
# kafka sqlite jdbc driver address
######################################
efak.driver=org.sqlite.JDBC
efak.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
efak.username=root
efak.password=www.kafka-eagle.org
# 我注释掉了MySQL
此时我们需要新建一个文件夹:
mkdir -p /hadoop/kafka-eagle/db/
vim /etc/profile
# efak
export KE_HOME=/opt/servers/efak-web-3.0.1
export PATH=$PATH:$KE_HOME/bin
./bin/ke.sh start
启动我们的服务,如下图所示:
http://h121.wzk.icu:8048
admin
123456
运行结果如下图所示:
打开之后,填写账号密码:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。