赞
踩
目录:
通过JMX监控可以看到的数据有:
利用jconsole 工具:(可通过jconsole,找到Mbean对应的指标,鼠标悬浮指标上方就能找到代码查询所需的ObjectName。)
本地直接连接kafka进程
这里讨论的kafka版本是0.8.1.x和0.8.2.x,这两者在使用jmx监控时会有差异,差异体现在ObjectName之中 。
所以在本程序中通过Boolean类型的newKafkaVersion来区别对待。
为确定使用者的objectName,可以利用jconsole工具,找到Mbean对应的指标,鼠标悬浮指标上方就能找到代码查询所需的objectName。
(推荐代码原创《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注作者的微信公众号:朱小厮的博客。)
启动kafka时,需启动jmx端口:JMX_PORT=9999 bin/kafka-server-start.sh config/server.properties &
JmxMgr.class
package monitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Created by hidden on 2016/12/8.
*/
public class JmxMgr {
private static Logger log = LoggerFactory.getLogger(JmxMgr.class);
private static List<JmxConnection> conns = new ArrayList<>();
public static boolean init(List<String> ipPortList, boolean newKafkaVersion){
for(String ipPort:ipPortList){
log.info("init jmxConnection [{}]",ipPort);
JmxConnection conn = new JmxConnection(newKafkaVersion, ipPort);
boolean bRet = conn.init();
if(!bRet){
log.error("init jmxConnection error");
return false;
}
conns.add(conn);
}
return true;
}
public static long getMsgInCountPerSec(String topicName){
long val = 0;
for(JmxConnection conn:conns){
long temp = conn.getMsgInCountPerSec(topicName);
val += temp;
}
return val;
}
public static double getMsgInTpsPerSec(String topicName){
double val = 0;
for(JmxConnection conn:conns){
double temp = conn.getMsgInTpsPerSec(topicName);
val += temp;
}
return val;
}
public static Map<Integer, Long> getEndOffset(String topicName){
Map<Integer,Long> map = new HashMap<>();
for(JmxConnection conn:conns){
Map<Integer,Long> tmp = conn.getTopicEndOffset(topicName);
if(tmp == null){
log.warn("get topic endoffset return null, topic {}", topicName);
continue;
}
for(Integer parId:tmp.keySet()){//change if bigger
if(!map.containsKey(parId) || (map.containsKey(parId) && (tmp.get(parId)>map.get(parId))) ){
map.put(parId, tmp.get(parId));
}
}
}
return map;
}
public static void main(String[] args) {
List<String> ipPortList = new ArrayList<>();
ipPortList.add("localhost:9999");
//ipPortList.add("xx.101.130.2:9999");
JmxMgr.init(ipPortList,true);
String topicName = "demo2";
System.out.println(getMsgInCountPerSec(topicName));
System.out.println(getMsgInTpsPerSec(topicName));
System.out.println(getEndOffset(topicName));
}
}
JmxConnection.class
package monitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.management.*;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
* Created by hidden on 2016/12/8.
*/
public class JmxConnection {
private static Logger log = LoggerFactory.getLogger(JmxConnection.class);
private MBeanServerConnection conn;
private String jmxURL;
private String ipAndPort = "localhost:9999";
private int port = 9999;
private boolean newKafkaVersion = false;
public JmxConnection(Boolean newKafkaVersion, String ipAndPort){
this.newKafkaVersion = newKafkaVersion;
this.ipAndPort = ipAndPort;
}
public boolean init(){
jmxURL = "service:jmx:rmi:///jndi/rmi://" +ipAndPort+ "/jmxrmi";
log.info("init jmx, jmxUrl: {}, and begin to connect it",jmxURL);
try {
JMXServiceURL serviceURL = new JMXServiceURL(jmxURL);
JMXConnector connector = JMXConnectorFactory.connect(serviceURL,null);
conn = connector.getMBeanServerConnection();
if(conn == null){
log.error("get connection return null!");
return false;
}
} catch (MalformedURLException e) {
e.printStackTrace();
return false;
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
public String getTopicName(String topicName){
String s;
if (newKafkaVersion) {
s = "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=" + topicName;
} else {
s = "\"kafka.server\":type=\"BrokerTopicMetrics\",name=\"" + topicName + "-MessagesInPerSec\"";
}
return s;
}
/**
* @param topicName: topic name, default_channel_kafka_zzh_demo
* @return 获取发送量(单个broker的,要计算某个topic的总的发送量就要计算集群中每一个broker之和)
*/
public long getMsgInCountPerSec(String topicName){
String objectName = getTopicName(topicName);
Object val = getAttribute(objectName,"Count");
String debugInfo = "jmxUrl:"+jmxURL+",objectName="+objectName;
if(val !=null){
log.info("{}, Count:{}",debugInfo,(long)val);
return (long)val;
}
return 0;
}
/**
* @param topicName: topic name, default_channel_kafka_zzh_demo
* @return 获取发送的tps,和发送量一样如果要计算某个topic的发送量就需要计算集群中每一个broker中此topic的tps之和。
*/
public double getMsgInTpsPerSec(String topicName){
String objectName = getTopicName(topicName);
Object val = getAttribute(objectName,"OneMinuteRate");
if(val !=null){
double dVal = ((Double)val).doubleValue();
return dVal;
}
return 0;
}
private Object getAttribute(String objName, String objAttr)
{
ObjectName objectName =null;
try {
objectName = new ObjectName(objName);
} catch (MalformedObjectNameException e) {
e.printStackTrace();
return null;
}
return getAttribute(objectName,objAttr);
}
private Object getAttribute(ObjectName objName, String objAttr){
if(conn== null)
{
log.error("jmx connection is null");
return null;
}
try {
return conn.getAttribute(objName,objAttr);
} catch (MBeanException e) {
e.printStackTrace();
return null;
} catch (AttributeNotFoundException e) {
e.printStackTrace();
return null;
} catch (InstanceNotFoundException e) {
e.printStackTrace();
return null;
} catch (ReflectionException e) {
e.printStackTrace();
return null;
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
/**
* @param topicName
* @return 获取topicName中每个partition所对应的logSize(即offset)
*/
public Map<Integer,Long> getTopicEndOffset(String topicName){
Set<ObjectName> objs = getEndOffsetObjects(topicName);
if(objs == null){
return null;
}
Map<Integer, Long> map = new HashMap<>();
for(ObjectName objName:objs){
int partId = getParId(objName);
Object val = getAttribute(objName,"Value");
if(val !=null){
map.put(partId,(Long)val);
}
}
return map;
}
private int getParId(ObjectName objName){
if(newKafkaVersion){
String s = objName.getKeyProperty("partition");
return Integer.parseInt(s);
}else {
String s = objName.getKeyProperty("name");
int to = s.lastIndexOf("-LogEndOffset");
String s1 = s.substring(0, to);
int from = s1.lastIndexOf("-") + 1;
String ss = s.substring(from, to);
return Integer.parseInt(ss);
}
}
private Set<ObjectName> getEndOffsetObjects(String topicName){
String objectName;
if (newKafkaVersion) {
objectName = "kafka.log:type=Log,name=LogEndOffset,topic="+topicName+",partition=*";
}else{
objectName = "\"kafka.log\":type=\"Log\",name=\"" + topicName + "-*-LogEndOffset\"";
}
ObjectName objName = null;
Set<ObjectName> objectNames = null;
try {
objName = new ObjectName(objectName);
objectNames = conn.queryNames(objName,null);
} catch (MalformedObjectNameException e) {
e.printStackTrace();
return null;
} catch (IOException e) {
e.printStackTrace();
return null;
}
return objectNames;
}
}
(转载自蓝色天堂博客,本文链接地址:http://hadoop1989.com/2015/09/22/Kafka-Monitor_Compare)
之前的博客中,介绍了Kafka Web Console这个监控工具,在生产环境中使用,运行一段时间后,发现该工具会和Kafka生产者、消费者、ZooKeeper建立大量连接,从而导致网络阻塞。并且这个Bug也在其他使用者中出现过,看来使用开源工具要慎重!该Bug暂未得到修复,不得已,只能研究下其他同类的Kafka监控软件。
通过研究,发现主流的三种kafka监控程序分别为:
现在依次介绍以上三种工具:
使用Kafka Web Console,可以监控:
程序运行后,会定时去读取kafka集群分区的日志长度,读取完毕后,连接没有正常释放,一段时间后产生大量的socket连接,导致网络堵塞。
雅虎开源的Kafka集群管理工具:
KafkaOffsetMonitor可以实时监控:
总结:通过使用,个人总结以上三种监控程序的优缺点:
若只需要监控功能,推荐使用KafkaOffsetMonito,若偏重Kafka集群管理,推荐使用Kafka Manager。
因为都是开源程序,稳定性欠缺。故需先了解清楚目前已存在哪些Bug,多测试一下,避免出现类似于Kafka Web Console的问题。
重度引用:
如何使用JMX监控Kafka:
Kafka三款监控工具比较:
https://www.jianshu.com/p/97ad87a933e1
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。