赞
踩
KafkaCenter是一个针对Kafka的一站式,解决方案。用于Kafka集群的维护与管理,生产者和消费者的监控,以及Kafka部分生态组件的使用。
对于Kafka的平台化,一直缺少一个成熟的解决方案,之前比较流行的kafka监控方案,如kafka-manager提供了集群管理与topic管理等等功能。但是对于生产者、消费者的监控,以及Kafka的新生态,如Connect,KSQL还缺少响应的支持。Confluent Control Center功能要完整一些,但却是非开源收费的。
对于Kafka的使用,一直都是一个让人头疼的问题,由于实时系统的强运维特性,我们不得不投入大量的时间用于集群的维护、kafka的运维,比如:
- 人工创建topic,特别费力
- 相关kafka运维,监控孤岛化、
- 现有消费监控工具监控不准确
- 无法拿到Kafka 集群的summay信息
- 无法快速知晓集群健康状态
- 无法知晓业务对team kafka使用情况
- kafka管理,监控工具稀少,没有一个好的工具我们直接可以使用
- 无法快速查询topic消息
- Home-> 查看平台管理的Kafka Cluster集群信息及监控信息。
- Topic-> 用户可以在此模块查看自己的Topic,发起申请新建Topic,同时可以对Topic进行生产消费测试。
- Monitor-> 用户可以在此模块中可以查看Topic的生产以及消费情况,同时可以针对消费延迟情况设置预警信息。
- Connect-> 实现用户快速创建自己的Connect Job,并对自己的Connect进行维护。
- KSQL-> 实现用户快速创建自己的KSQL Job,并对自己的Job进行维护
- Approve-> 此模块主要用于当普通用户申请创建Topic,管理员进行审批操作。
- Setting-> 此模块主要功能为管理员维护User、Team以及kafka cluster信息
- Kafka Manager-> 此模块用于管理员对集群的正常维护操作。
系统截图:
安装需要依赖 mysql、es、email server
组件 | 是否必须 | 功能 |
---|---|---|
mysql | 必须 | 配置信息存在mysql |
elasticsearch(7.0+) | 可选 | 各种监控信息的存储 |
email server | 可选 | Apply, approval, warning e-mail alert |
在MySQL中执行sql建表
-- Dumping database structure for kafka_center CREATE DATABASE IF NOT EXISTS `kafka_center` /*!40100 DEFAULT CHARACTER SET utf8 COLLATE utf8_bin */; USE `kafka_center`; -- Dumping structure for table kafka_center.alert_group CREATE TABLE IF NOT EXISTS `alert_group` ( `id` int(11) NOT NULL AUTO_INCREMENT, `cluster_id` int(11) NOT NULL, `topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `consummer_group` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `consummer_api` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `threshold` int(11) DEFAULT NULL, `dispause` int(11) DEFAULT NULL, `mail_to` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '', `webhook` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '', `create_date` datetime DEFAULT NULL, `owner_id` int(11) DEFAULT NULL, `team_id` int(11) DEFAULT NULL, `disable_alerta` tinyint(1) DEFAULT 0, `enable` tinyint(1) NOT NULL DEFAULT 1, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; -- Data exporting was unselected. -- Dumping structure for table kafka_center.cluster_info CREATE TABLE IF NOT EXISTS `cluster_info` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) COLLATE utf8_bin NOT NULL, `zk_address` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `broker` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `create_time` datetime DEFAULT NULL, `comments` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `enable` int(11) DEFAULT NULL, `broker_size` int(4) DEFAULT 0, `kafka_version` varchar(10) COLLATE utf8_bin DEFAULT '', `location` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `graf_addr` varchar(255) COLLATE utf8_bin DEFAULT '', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; -- Data exporting was unselected. -- Dumping structure for table kafka_center.ksql_info CREATE TABLE IF NOT EXISTS `ksql_info` ( `id` int(11) NOT NULL AUTO_INCREMENT, `cluster_id` int(11) DEFAULT NULL, `cluster_name` varchar(255) DEFAULT NULL, `ksql_url` varchar(255) DEFAULT NULL, `ksql_serverId` varchar(255) DEFAULT NULL, `version` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- Data exporting was unselected. -- Dumping structure for table kafka_center.task_info CREATE TABLE IF NOT EXISTS `task_info` ( `id` int(11) NOT NULL AUTO_INCREMENT, `cluster_ids` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `location` varchar(20) COLLATE utf8_bin NOT NULL DEFAULT '', `topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `partition` int(11) DEFAULT NULL, `replication` int(11) DEFAULT NULL, `message_rate` int(50) DEFAULT NULL, `ttl` int(11) DEFAULT NULL, `owner_id` int(11) DEFAULT NULL, `team_id` int(11) DEFAULT NULL, `comments` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '', `create_time` datetime DEFAULT NULL, `approved` int(11) DEFAULT NULL, `approved_id` int(11) DEFAULT NULL, `approved_time` datetime DEFAULT NULL, `approval_opinions` varchar(1000) COLLATE utf8_bin DEFAULT '', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; -- Data exporting was unselected. -- Dumping structure for table kafka_center.team_info CREATE TABLE IF NOT EXISTS `team_info` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `own` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; -- Data exporting was unselected. -- Dumping structure for table kafka_center.topic_collection CREATE TABLE IF NOT EXISTS `topic_collection` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT, `cluster_id` int(11) NOT NULL, `user_id` int(11) NOT NULL, `name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `type` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; -- Data exporting was unselected. -- Dumping structure for table kafka_center.topic_info CREATE TABLE IF NOT EXISTS `topic_info` ( `id` int(11) NOT NULL AUTO_INCREMENT, `cluster_id` int(11) NOT NULL, `topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `partition` int(11) DEFAULT NULL, `replication` int(11) DEFAULT NULL, `ttl` bigint(11) DEFAULT NULL, `config` varchar(512) COLLATE utf8_bin DEFAULT NULL, `owner_id` int(11) DEFAULT NULL, `team_id` int(11) DEFAULT NULL, `comments` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '', `create_time` datetime DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; -- Data exporting was unselected. -- Dumping structure for table kafka_center.user_info CREATE TABLE IF NOT EXISTS `user_info` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `real_name` varchar(255) COLLATE utf8_bin DEFAULT '', `email` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `role` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '100', `create_time` datetime DEFAULT NULL, `password` varchar(255) COLLATE utf8_bin DEFAULT '', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; -- Data exporting was unselected. -- Dumping structure for table kafka_center.user_team CREATE TABLE IF NOT EXISTS `user_team` ( `id` int(11) NOT NULL AUTO_INCREMENT, `user_id` int(11) DEFAULT NULL, `team_id` int(11) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
相关配置位于application.properties
可对端口 日志等信息做一些修改:
server.port=8080 debug=false # 设置session timeout为6小时 server.servlet.session.timeout=21600 spring.security.user.name=admin spring.security.user.password=admin spring.datasource.url=jdbc:mysql://127.0.0.1:3306/kafka_center?useUnicode=true&characterEncoding=utf-8 spring.datasource.username=root spring.datasource.password=123456 spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver spring.datasource.type=com.zaxxer.hikari.HikariDataSource spring.datasource.hikari.minimum-idle=5 spring.datasource.hikari.maximum-pool-size=15 spring.datasource.hikari.pool-name=KafkaCenterHikariCP spring.datasource.hikari.max-lifetime =30000 spring.datasource.hikari.connection-test-query=SELECT 1 management.health.defaults.enabled=false public.url=http://localhost:8080 connect.url=http://localhost:8000/#/ system.topic.ttl.h=16 monitor.enable=true monitor.collect.period.minutes=5 monitor.elasticsearch.hosts=localhost:9200 monitor.elasticsearch.index=kafka_center_monitor #是否启用收集线程指定集群收集 monitor.collector.include.enable=false #收集线程指定location,必须属于remote.locations之中 monitor.collector.include.location=dev collect.topic.enable=true collect.topic.period.minutes=10 # remote的功能是为了提高lag查询和收集,解决跨location网络延迟问题 remote.query.enable=false remote.hosts=gqc@localhost2:8080 remote.locations=dev,gqc #发送consumer group的lag发送给alert service alert.enable=false alert.dispause=2 alert.service= alert.threshold=1000 alter.env=other #是否开启邮件功能,true:启用,false:禁用 mail.enable=false spring.mail.host= spring.mail.username=KafkaCenter@xaecbd.com # oauth2 generic.enabled=false generic.name=oauth2 Login generic.auth_url= generic.token_url= generic.redirect_utl= generic.api_url= generic.client_id= generic.client_secret= generic.scopes=
推荐使用docker
docker run -d -p 8080:8080 --name KafkaCenter -v ${PWD}/application.properties:/opt/app/kafka-center/config/application.properties xaecbd/kafka-center:2.1.0
不用docker
$ git clone https://github.com/xaecbd/KafkaCenter.git
$ cd KafkaCenter
$ mvn clean package -Dmaven.test.skip=true
$ cd KafkaCenter\KafkaCenter-Core\target
$ java -jar KafkaCenter-Core-2.1.0-SNAPSHOT.jar
访问http://localhost:8080 管理员用户与密码默认:admin / admin
用户可以在此模块完成Topic查看,已经申请新建Topic,同时可以对Topic进行生产消费测试。
用户可以在此模块中可以查看Topic的生成以及消费情况,同时可以针对消费延迟情况设置预警信息。
此模块用于维护预警信息。用户可以看到自己所有预警信息,管理员可以看到所有人的预警信息。
实现用户快速创建自己的Connect Job,并对自己的Connect进行维护。
实现用户快速创建自己的KSQL Job,并对自己的Job进行维护。
此模块主要用于当普通用户申请创建Topic 或者Job时,管理员进行审批操作。
此模块主要功能为管理员维护User、Team以及kafka cluster信息
此模块用于管理员对集群的正常维护操作。
这里是一些基本的统计信息
集群与topic列表
这里是一些topic的管理功能
操作范围:用户所属Team的所有Topic
- Topic -> Topic List -> Detail 查看Topic的详细信息
- Topic -> Topic List -> Mock 对Topic进行生产测试
Important: admin不能申请task,普通用户必须先让管理员新建team后,将用户加入指定team后,才可以申请task。
操作范围:用户所属Team的所有Task
Topic -> My Task -> Detail 查看申请的Task信息
Topic -> My Task -> Delete 删除被拒绝或待审批的Task
Topic -> My Task -> Edit 修改被拒绝的Task
Topic -> My Task -> Create Topic Task 创建Task
- 按照表单各字段要求填写信息
- 点击确认,提交申请
审批结果:
1. 审批通过:Topic将会被创建在管理员指定的集群
2. 审批拒绝:用户收到邮件,返回到My Task,点击对应Task后面的Edit,针对审批意见进行修改Topic命名规则:
- 只能包含:数字、大小写字母、下划线、中划线、点;长度大于等于3小于等于100。
- 不推荐:下划线开头;
可对所有Topic进行消费测试
监控模块
生产者监控
消费者监控
消息积压
报警功能
这里是一些Connect的操作
可以进行KQL的查询操作
这里主要是管理员做一些审核操作
- Approve->check 审批用户的Task
- 根据用户选择的location指定cluster
- 检查用户设置的partition和replication大小是否合理,如不合理做出调整
- 检查其他字段是否合理,如需要拒绝该申请,点击Reject并填写意见。
KafkaCenter还是一个非常不错的kafka管理工具,可以满足大部分需求。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。