April, 2020 ( Python as a Savior Part 1 )
2020年4月(Python作为救世主第1部分)
If you remember, in my last article, I decided to not use Appdynamics. I was bit anxious because I didn’t had any historical monitoring system where I can compare Apache Kafka Metrics till 6 months.
如果您还记得的话,在上一篇文章中,我决定不使用Appdynamics。 我有点着急,因为我没有任何历史监控系统可以比较Apache Kafka指标直到6个月。
Luckily, I came across Python JMX Query Module.Kudos's to “David Gildeh”.
幸运的是 ,我碰到的Python JMX查询模块 .Kudos对“ 大卫Gildeh ”。
In Home Page of the Module, he mentioned Kafka JMX Query Example:
在模块的主页中,他提到了Kafka JMX查询示例:
jmxConnection = JMXConnection("service:jmx:rmi:///jndi/rmi://localhost:9999/jmxrmi")jmxQuery = [JMXQuery("kafka.cluster:type=*,name=*,topic=*,partition=*", metric_name="kafka_cluster_{type}_{name}", metric_labels={"topic" : "{topic}", "partition" : "{partition}"})]metrics = jmxConnection.query(jmxQuery)for metric in metrics: print(f"{metric.metric_name}<{metric.metric_labels}> == {metric.value}")
Honestly, I was really surprised and eager to know how it works. Same Day, I put my tasks into backlog and started working on POC with this module. After couple of hours, I managed to make it working from my computer and was able to pull metrics from Apache Kafka.
老实说,我真的很惊讶,并渴望知道它是如何工作的。 当天,我将工作任务积压下来,并开始使用此模块进行POC。 几个小时后,我设法使其在计算机上运行,并能够从Apache Kafka获取指标。
That time, I was trying to figure out all pieces for this type of monitoring.1. Python JMX Script in Cron.2. Python Script will dump metric into /kafka/kafka-logs/ with .log extension.3. Splunk will read metric and Index them.4. Create Dashboards in Splunk.
那个时候,我试图找出这种监视的所有方面。1。 Cron.2中的Python JMX脚本。 Python脚本会将指标转储到扩展名为.log的/ kafka / kafka-logs /。 Splunk将读取指标并为其编制索引4。 在Splunk中创建仪表板。
Why above method was great for me,
为什么上述方法对我来说很棒
- Python 3 was already there so very less installation process. Python 3已经在那里,因此安装过程非常少。
- Splunk was already. Splunk已经是。
- Splunk Dashboards were already used and Whole DevOps team was comfortable with it. Splunk仪表板已被使用,整个DevOps团队对此感到满意。
Now, I had to figure out couple of more things
现在,我不得不弄清楚另外几件事
- how to write better python code? 如何编写更好的python代码?
- how to manage input? like what metrics to pull from Kafka. 如何管理输入? 例如要从Kafka获取哪些指标。
- how to manage output? how to create json based output so Splunk can index easily. 如何管理输出? 如何创建基于json的输出,以便Splunk可以轻松索引。
- how to pull metrics bit faster? 如何更快地获取指标?
Note: All Iteration of Code were using same input file or kafka-input.txt.
注意:所有代码迭代都使用相同的输入文件或kafka-input.txt 。
java.lang:type=GarbageCollector,name=*java.lang:type=Memoryjava.lang:type=Threadingjava.lang:type=ClassLoadingkafka.controller:type=KafkaController,name=OfflinePartitionsCountkafka.controller:type=KafkaController,name=ActiveControllerCountkafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMskafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSeckafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMskafka.log:type=Log,name=Size,topic=*,partition=*
I managed to write first version of code.
This code repository is not maintained as code was migrated to 116davinder/kafka-cluster-ansible . )
由于代码已迁移到116davinder / kafka-cluster-ansible,因此未维护该代码存储库。 )
#!/usr/bin/env python3# usage: python3 kafka-jmx-metric-collector.py localhost 9999 roles/jmxMonitor/files/input.txt /tmp/ kafka-dev# This script suppose to export all kafka metric from one node and write to file# from where either splunk like tools can read it.from jmxquery import *from socket import gethostnamefrom datetime import datetimeimport jsonimport sysclass KafkaJmx: def __init__(self,kAddr,kPort,inputFile,logDir,env): self.kAddr = kAddr self.kPort = kPort self.kJmxAddr = "service:jmx:rmi:///jndi/rmi://" + str(self.kAddr) + ":" + str(self.kPort) + "/jmxrmi" self.cTimeNow = datetime.now() self.jmxConnection = JMXConnection(self.kJmxAddr) self.inputFile = inputFile self.logDir = logDir self.env = env self.domainNameList = ['java.lang','kafka.controller','kafka.log', 'kafka.network','kafka.server','kafka.utils']def getMetric(self): with open(self.inputFile) as file: for query in file: metrics = self.jmxConnection.query([JMXQuery(query.strip())], timeout=1000000) for metric in metrics: domainName = metric.to_query_string().split(":")[0] queryName = metric.to_query_string().split(":")[1] queryValue = metric.value _queryDict = { "@timestamp": str(self.cTimeNow), "domainName": str(domainName), "environment": str(self.env), "queryName": str(queryName), "queryValue": queryValue } with open(self.logDir + domainName + ".log", 'a+') as logFile: logFile.write("\n") logFile.write(json.dumps(_queryDict))def cleanUpFiles(self): for domainName in self.domainNameList: open(self.logDir + domainName + ".log", 'w').close()def main(): hostname = sys.argv[1] port = sys.argv[2] inputFile = sys.argv[3] logDir = sys.argv[4] env = sys.argv[5]z = KafkaJmx(hostname, port, inputFile, logDir,env) z.cleanUpFiles() z.getMetric()main()
Two Important things to consider in above python code.
在上面的python代码中要考虑的两个重要事项。
For each domain type like “
kafka.controller
”, we were generating separate metric log file because splunk work’s very fast if you have separation in metrics.对于每个域类型(例如“
kafka.controller
”),我们正在生成单独的度量标准日志文件,因为如果度量标准分离,则splunk工作非常快。I was over writing all metrics on next run because I didn’t wanted to implement log4j for small script and once metrics were read by splunk, they have no use in system so it was much better if I just over write them every time aka Easy Clean Up.
我不想在下一次运行时编写所有指标,因为我不想为小脚本实现log4j,并且一旦splunk读取了指标,它们就不会在系统中使用,所以如果我每次都轻松地重写它们,那就更好了。 清理 。
Python is very simple and easy to use for sure.
当然,Python非常简单易用。
Creating json output was super fun, I create python dict which is similar to json format and later dump that dict into json format.
创建json输出非常有趣,我创建了类似于json格式的python dict,然后将该字典转储为json格式。
_queryDict = { "@timestamp": str(self.cTimeNow), "domainName": str(domainName), "environment": str(self.env), "queryName": str(queryName), "queryValue": queryValue }logFile.write(json.dumps(_queryDict))
Sample Output:
样本输出:
{"@timestamp": "2020-04-01 18:16:35.180486", "
Once it started working, I added New Role to Ansible Code called jmxMonitor.
一旦开始工作,我就在Ansible代码中添加了新角色jmxMonitor 。
---- name: creating folder for jmx monitor file: path: "{{ kafkaInstallDir }}/jmxMonitor" state: directory- name: copying script and input files copy: src: "{{ item }}" dest: "{{ kafkaInstallDir }}/jmxMonitor/{{ item }}" loop: - kafka-jmx-metric-collector.py - kafka-input.txt- name: kafka metric collector cron cron: name: "kafka metric collector cron task" minute: "*" hour: "*" weekday: "*" user: root job: 'find /bin/ -name "python3*m" -print0 -exec {} {{ kafkaInstallDir }}/jmxMonitor/kafka-jmx-metric-collector.py {{ ansible_fqdn }} {{ kafkaJmxPort }} {{ kafkaInstallDir }}/jmxMonitor/kafka-input.txt {{ kafkaLogDir }}/ {{ kafkaClusterName }} \;'
Later, I researched a little bit more and added system metrics as well like CPU, Memory & Disk.
后来,我进行了更多研究,并添加了系统指标以及CPU,内存和磁盘。
def getStorageMetric(self): _sMM = psutil.disk_usage("/kafka") _sMetric = { "@timestamp": str(self.cTimeNow), "domainName": "disk", "environment": self.env, "totalInGB": _sMM.total // (2**30), "usedInGB": _sMM.used // (2**30), "freeInGB": _sMM.free // (2**30), "usedPercent": _sMM.percent }with open(self.logDir + "disk.log", 'w') as logFile: logFile.write(json.dumps(_sMetric))def getCpuMetric(self): _cMetric = { "@timestamp": str(self.cTimeNow), "domainName": "cpu", "environment": self.env, "usedCpuPercent": psutil.cpu_percent() }with open(self.logDir + "cpu.log", 'w') as logFile: logFile.write(json.dumps(_cMetric))def getMemoryMetric(self): _memStats = psutil.virtual_memory() _swapMemStats = psutil.swap_memory() _rMetric = { "@timestamp": str(self.cTimeNow), "domainName": "memory", "environment": self.env, "totalMem": _memStats.total // (2**30), "availableMem": _memStats.available // (2**30), "percentUsedMem": _memStats.percent, "usedMem": _memStats.used // (2**30), "buffers": _memStats.buffers // (2**30), "totalSwap": _swapMemStats.total // (2**30), "usedSwap": _swapMemStats.used // (2**30), "freeSwap": _swapMemStats.free // (2**30), "percentUsedSwap": _swapMemStats.percent }with open(self.logDir + "memory.log", 'w') as logFile: logFile.write(json.dumps(_rMetric))
Now, I had all metrics which were required for Apache Kafka Monitoring. It was time for me to get my hands dirty with Splunk.
现在,我有了Apache Kafka Monitoring所需的所有指标。 是时候让我和Splunk沾沾自喜了。
Splunk Support’s Source Code Dashboard: 116davinder/kafka-cluster-ansible/splunk-dashboards/apache-kafka.xmlDashboard Sample: 116davinder/kafka-cluster-ansible/splunk-dashboards/Apache_Kafka_Splunk_7_3_3.png
Splunk支持的源代码 仪表板 : 116davinder / kafka-cluster-ansible / splunk-dashboards / apache-kafka.xml 仪表板样本: 116davinder / kafka-cluster-ansible / splunk-dashboards / Apache_Kafka_Splunk_7_3_3.png
Splunk Support’s Filter’s so One Dashboard was enough for all 10+ environments.
Splunk支持的筛选器功能使一个仪表板足以应付所有10多种环境。
At last, I was little concerned that I was extracting more than 50 different metrics and processing them so it might not finish within 1 minute. To fix this problem, I found another method in Python called Threading.
最后,我几乎不担心要提取50多种不同的指标并进行处理,因此可能不会在1分钟内完成。 为了解决这个问题,我在Python中找到了另一个名为Threading的方法。
I never thought that Threading was so simple to implement. you import the threading package and start the thread on function.
我从没想到线程化是如此容易实现。 您导入线程包并启动函数线程。
import threading_metric_thread = threading.Thread( target=z.getMetric ).start()_cpu_metric_thread = threading.Thread( target=z.getCpuMetric ).start()_memory_metric_thread = threading.Thread( target=z.getMemoryMetric ).start()_storage_metric_thread = threading.Thread( target=z.getStorageMetric ).start()
later, I added couple of minor fixes and Final Version can be found here:
稍后,我添加了一些次要修复程序,最终版本可以在这里找到:
116davinder/kafka-cluster-ansible/roles/jmxMonitor/files/kafka-jmx-metric-collector.py
116davinder / kafka-cluster-ansible / roles / jmxMonitor / files / kafka-jmx-metric-collector.py
Journey will continue on Next Topic ( Extend Custom Monitoring to Kafka Mirror Maker v1 ) !
旅程将继续进行下一主题(将自定义监视扩展到Kafka Mirror Maker v1)!