当前位置:   article > 正文

CentOS7下安装使用kafka及其监控组件_centos系统上 kafka监控工具

centos系统上 kafka监控工具





































机器数 = 2 * (峰值生产速度 * 副本数 / 100) + 1



# tar -zxvf kafka_2.11-


  1. # cd kafka_2.11-
  2. # vim server.properties


  1. broker.id=0 # broker的id,全局唯一的整数
  2. delete.topic.enable=true # 使能话题删除
  3. ############################# Socket Server Settings #############################
  4. num.network.threads=3
  5. num.io.threads=8
  6. socket.send.buffer.bytes=102400
  7. socket.receive.buffer.bytes=102400
  8. socket.request.max.bytes=104857600
  9. ############################# Log Basics #############################
  10. log.dirs=/home/szc/kafka_2.11- # 数据目录
  11. num.partitions=1
  12. num.recovery.threads.per.data.dir=1
  13. ############################# Internal Topic Settings #############################
  14. offsets.topic.replication.factor=1
  15. transaction.state.log.replication.factor=1
  16. transaction.state.log.min.isr=1
  17. ############################# Log Flush Policy #############################
  18. ############################# Log Retention Policy #############################
  19. log.retention.hours=168
  20. #log.retention.bytes=1073741824
  21. log.segment.bytes=1073741824
  22. log.retention.check.interval.ms=300000
  23. ############################# Zookeeper #############################
  24. zookeeper.connect= # ZooKeeper地址
  25. zookeeper.connection.timeout.ms=6000
  26. ############################# Group Coordinator Settings #############################
  27. group.initial.rebalance.delay.ms=0


  1. # cd ../bin
  2. # ./kafka-server-start.sh -daemon ../config/server.properties


  1. [root@localhost bin]# ps -ef | grep kafka
  2. root 79436 1 99 23:58 pts/5 00:00:02 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true -Xloggc:/home/szc/kafka_2.11- -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/home/szc/kafka_2.11- -Dlog4j.configuration=file:./../config/log4j.properties -cp :/home/szc/kafka_2.11- kafka.Kafka ../config/server.properties
  3. root 79516 77781 0 23:58 pts/5 00:00:00 grep --color=auto kafka


# ./kafka-server-stop.sh --daemon


  1. # ps -ef | grep kafka
  2. root 79546 77781 0 23:58 pts/5 00:00:00 grep --color=auto kafka





  1. # ./kafka-topics.sh --create --zookeeper --topic test0 --partitions 2 --replication-factor 1
  2. Created topic "test0".


  1. [root@localhost bin]# ll ../log
  2. total 16
  3. ...
  4. drwxr-xr-x. 2 root root 141 May 1 16:36 test0-0
  5. drwxr-xr-x. 2 root root 141 May 1 16:36 test0-1


  1. # ./kafka-topics.sh --list --zookeeper
  2. test0


  1. # ./kafka-topics.sh --delete --zookeeper --topic test0
  2. Topic test0 is marked for deletion.
  3. Note: This will have no impact if delete.topic.enable is not set to true.


  1. # ./kafka-topics.sh --list --zookeeper
  2. #


  1. # ./kafka-topics.sh --describe --topic test0 --zookeeper
  2. Topic:test0 PartitionCount:2 ReplicationFactor:1 Configs:
  3. Topic: test0 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
  4. Topic: test0 Partition: 1 Leader: 0 Replicas: 0 Isr: 0



  1. # ./kafka-console-producer.sh --topic test0 --broker-list
  2. >


# ./kafka-console-consumer.sh --topic test0 --zookeeper



  1. >szc
  2. >anyang
  3. >


  1. szc
  2. anyang


  1. # ./kafka-console-consumer.sh --topic test0 --zookeeper --from-beginning
  2. Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
  3. anyang
  4. szc


  1. # ./kafka-console-consumer.sh --topic test0 --bootstrap-server
  2. henan


  1. # ll ../log
  2. total 20
  3. ...
  4. drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-0
  5. drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-1
  6. drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-10
  7. drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-11
  8. drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-12
  9. drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-13
  10. drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-14
  11. drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-15
  12. drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-16
  13. drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-17
  14. drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-18
  15. drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-19
  16. drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-2
  17. drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-20
  18. drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-21
  19. drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-22
  20. drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-23
  21. drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-24
  22. drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-25
  23. drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-26
  24. drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-27
  25. drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-28
  26. drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-29
  27. drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-3
  28. drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-30
  29. drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-31
  30. drwxr-xr-x. 2 root root 141 May 1 17:03 __consumer_offsets-32
  31. ....






  1. # firewall-cmd --add-port=9092/tcp --permanent
  2. # firewall-cmd --reload


  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version></version>
  5. </dependency>


  1. Properties properties = new Properties();
  2. // broker集群地址
  3. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "");
  4. // 序列化方法
  5. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  6. "org.apache.kafka.common.serialization.StringSerializer");
  7. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  8. "org.apache.kafka.common.serialization.StringSerializer");
  9. // 创建producer对象
  10. KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);


  1. for (int i = 0; i < 10; i++) {
  2. producer.send(new ProducerRecord<String, String>("test0",
  3. "msg " + i + " from windows10"), new Callback() {
  4. public void onCompletion(RecordMetadata recordMetadata, Exception e) {
  5. if (e != null) {
  6. e.printStackTrace();
  7. }
  8. }
  9. });
  10. producer.flush(); // 记得刷写
  11. }


  1. # ./kafka-console-consumer.sh --bootstrap-server --topic test0
  2. msg 0 from windows10
  3. msg 1 from windows10
  4. msg 2 from windows10
  5. msg 3 from windows10
  6. msg 4 from windows10
  7. msg 5 from windows10
  8. msg 6 from windows10
  9. msg 7 from windows10
  10. msg 8 from windows10
  11. msg 9 from windows10


  1. public void onCompletion(RecordMetadata recordMetadata, Exception e) {
  2. if (e != null) {
  3. e.printStackTrace();
  4. }
  5. if (recordMetadata != null) {
  6. System.out.println(recordMetadata.toString());
  7. }
  8. }


  1. test0-1@17
  2. test0-0@16
  3. test0-1@18
  4. test0-0@17
  5. test0-1@19
  6. test0-0@18
  7. test0-1@20
  8. test0-0@19
  9. test0-1@21
  10. test0-0@20


  1. test0-1@22
  2. test0-0@21
  3. test0-1@23
  4. test0-0@22
  5. test0-1@24
  6. test0-0@23
  7. test0-1@25
  8. test0-0@24
  9. test0-1@26
  10. test0-0@25



  1. for (int i = 0; i < 10; i++) {
  2. Future<RecordMetadata> fu = producer.send(new ProducerRecord<String, String>("test0",
  3. "msg " + i + " from windows10.."), new Callback() {
  4. public void onCompletion(RecordMetadata recordMetadata, Exception e) {
  5. if (e != null) {
  6. e.printStackTrace();
  7. }
  8. if (recordMetadata != null) {
  9. System.out.println(recordMetadata.toString());
  10. }
  11. }
  12. });
  13. try {
  14. RecordMetadata recordMetadata = fu.get(); // 获取发送结果
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. } catch (ExecutionException e) {
  18. e.printStackTrace();
  19. }
  20. }





  1. public class MyPartitioner implements Partitioner {
  2. public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
  3. return o.hashCode() % cluster.availablePartitionsForTopic(s).size(); // 轮询
  4. }
  5. public void close() {
  6. }
  7. public void configure(Map<String, ?> map) {
  8. }
  9. }


  1. Properties properties = new Properties();
  2. ...
  3. properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "partitioner.MyPartitioner");
  4. KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);



  1. Properties properties = new Properties();
  2. // broker集群地址
  3. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "");
  4. // 键值反序列化方法
  5. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
  6. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
  7. // 设置偏移量自动提交和提交间隔
  8. properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
  9. properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
  10. // 消费者组名
  11. properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group0");
  12. // 创建消费者
  13. KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);




  1. boolean exit = false;
  2. while (!exit) {
  3. ConsumerRecords<String, String> records = consumer.poll(100); // 接收消息,最长空转100毫秒
  4. if (records.isEmpty()) { // 消息为空,则跳过本次循环
  5. continue;
  6. }
  7. for (ConsumerRecord<String, String> record : records) { // 遍历接收到的消息
  8. System.out.println(record.toString());
  9. }
  10. }


  1. ConsumerRecord(topic = test0, partition = 1, offset = 32, CreateTime = 1588473193330, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 0 from windows10..)
  2. ConsumerRecord(topic = test0, partition = 1, offset = 33, CreateTime = 1588473193348, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 2 from windows10..)
  3. ConsumerRecord(topic = test0, partition = 1, offset = 34, CreateTime = 1588473193353, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 4 from windows10..)
  4. ConsumerRecord(topic = test0, partition = 1, offset = 35, CreateTime = 1588473193357, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 6 from windows10..)
  5. ConsumerRecord(topic = test0, partition = 1, offset = 36, CreateTime = 1588473193361, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 8 from windows10..)
  6. ConsumerRecord(topic = test0, partition = 0, offset = 31, CreateTime = 1588473193347, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 1 from windows10..)
  7. ConsumerRecord(topic = test0, partition = 0, offset = 32, CreateTime = 1588473193351, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 3 from windows10..)
  8. ConsumerRecord(topic = test0, partition = 0, offset = 33, CreateTime = 1588473193355, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 5 from windows10..)
  9. ConsumerRecord(topic = test0, partition = 0, offset = 34, CreateTime = 1588473193359, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 7 from windows10..)
  10. ConsumerRecord(topic = test0, partition = 0, offset = 35, CreateTime = 1588473193363, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 9 from windows10..)


  1. properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
  2. properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");


  1. ConsumerRecord(topic = test0, partition = 1, offset = 0, CreateTime = 1588325037161, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = test0)
  2. ConsumerRecord(topic = test0, partition = 1, offset = 1, CreateTime = 1588325149520, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = test2)
  3. ConsumerRecord(topic = test0, partition = 1, offset = 2, CreateTime = 1588432968113, serialized key size = -1, serialized value size = 20, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 1 from windows10)
  4. ConsumerRecord(topic = test0, partition = 1, offset = 3, CreateTime = 1588432968120, serialized key size = -1, serialized value size = 20, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 3 from windows10)
  5. ConsumerRecord(topic = test0, partition = 1, offset = 4, CreateTime = 1588432968126, serialized key size = -1, serialized value size = 20, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 5 from windows10)
  6. ConsumerRecord(topic = test0, partition = 1, offset = 5, CreateTime = 1588432968133, serialized key size = -1, serialized value size = 20, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 7 from windows10)
  7. ConsumerRecord(topic = test0, partition = 1, offset = 6, CreateTime = 1588432968139, serialized key size = -1, serialized value size = 20, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 9 from windows10)
  8. ConsumerRecord(topic = test0, partition = 1, offset = 7, CreateTime = 1588468271890, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 1 from windows10..)
  9. ConsumerRecord(topic = test0, partition = 1, offset = 8, CreateTime = 1588468271915, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 3 from windows10..)
  10. ConsumerRecord(topic = test0, partition = 1, offset = 9, CreateTime = 1588468271920, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 5 from windows10..)
  11. ConsumerRecord(topic = test0, partition = 1, offset = 10, CreateTime = 1588468271925, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 7 from windows10..)
  12. ConsumerRecord(topic = test0, partition = 1, offset = 11, CreateTime = 1588468271929, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 9 from windows10..)
  13. ConsumerRecord(topic = test0, partition = 1, offset = 12, CreateTime = 1588468408402, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 1 from windows10..)
  14. ConsumerRecord(topic = test0, partition = 1, offset = 13, CreateTime = 1588468408407, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 3 from windows10..)
  15. ConsumerRecord(topic = test0, partition = 1, offset = 14, CreateTime = 1588468408410, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 5 from windows10..)
  16. ConsumerRecord(topic = test0, partition = 1, offset = 15, CreateTime = 1588468408415, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 7 from windows10..)
  17. ConsumerRecord(topic = test0, partition = 1, offset = 16, CreateTime = 1588468408421, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 9 from windows10..)
  18. ConsumerRecord(topic = test0, partition = 1, offset = 17, CreateTime = 1588468418037, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 0 from windows10..)
  19. ConsumerRecord(topic = test0, partition = 1, offset = 18, CreateTime = 1588468418056, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 2 from windows10..)
  20. ConsumerRecord(topic = test0, partition = 1, offset = 19, CreateTime = 1588468418061, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 4 from windows10..)
  21. ConsumerRecord(topic = test0, partition = 1, offset = 20, CreateTime = 1588468418065, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 6 from windows10..)
  22. ConsumerRecord(topic = test0, partition = 1, offset = 21, CreateTime = 1588468418070, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 8 from windows10..)
  23. ConsumerRecord(topic = test0, partition = 1, offset = 22, CreateTime = 1588468568065, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 0 from windows10..)
  24. ConsumerRecord(topic = test0, partition = 1, offset = 23, CreateTime = 1588468568084, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 2 from windows10..)
  25. ConsumerRecord(topic = test0, partition = 1, offset = 24, CreateTime = 1588468568088, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 4 from windows10..)
  26. ConsumerRecord(topic = test0, partition = 1, offset = 25, CreateTime = 1588468568093, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 6 from windows10..)
  27. ConsumerRecord(topic = test0, partition = 1, offset = 26, CreateTime = 1588468568097, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 8 from windows10..)
  28. ConsumerRecord(topic = test0, partition = 1, offset = 27, CreateTime = 1588472294168, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 0 from windows10..)
  29. ConsumerRecord(topic = test0, partition = 1, offset = 28, CreateTime = 1588472294189, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 2 from windows10..)
  30. ConsumerRecord(topic = test0, partition = 1, offset = 29, CreateTime = 1588472294192, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 4 from windows10..)
  31. ConsumerRecord(topic = test0, partition = 1, offset = 30, CreateTime = 1588472294196, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 6 from windows10..)
  32. ConsumerRecord(topic = test0, partition = 1, offset = 31, CreateTime = 1588472294203, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 8 from windows10..)
  33. ConsumerRecord(topic = test0, partition = 1, offset = 32, CreateTime = 1588473193330, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 0 from windows10..)
  34. ConsumerRecord(topic = test0, partition = 1, offset = 33, CreateTime = 1588473193348, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 2 from windows10..)
  35. ConsumerRecord(topic = test0, partition = 1, offset = 34, CreateTime = 1588473193353, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 4 from windows10..)
  36. ConsumerRecord(topic = test0, partition = 1, offset = 35, CreateTime = 1588473193357, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 6 from windows10..)
  37. ConsumerRecord(topic = test0, partition = 1, offset = 36, CreateTime = 1588473193361, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 8 from windows10..)
  38. ConsumerRecord(topic = test0, partition = 0, offset = 0, CreateTime = 1588325131644, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = test1)
  39. ConsumerRecord(topic = test0, partition = 0, offset = 1, CreateTime = 1588432968084, serialized key size = -1, serialized value size = 20, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 0 from windows10)
  40. ConsumerRecord(topic = test0, partition = 0, offset = 2, CreateTime = 1588432968116, serialized key size = -1, serialized value size = 20, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 2 from windows10)
  41. ConsumerRecord(topic = test0, partition = 0, offset = 3, CreateTime = 1588432968123, serialized key size = -1, serialized value size = 20, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 4 from windows10)
  42. ConsumerRecord(topic = test0, partition = 0, offset = 4, CreateTime = 1588432968130, serialized key size = -1, serialized value size = 20, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 6 from windows10)
  43. ConsumerRecord(topic = test0, partition = 0, offset = 5, CreateTime = 1588432968136, serialized key size = -1, serialized value size = 20, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 8 from windows10)
  44. ConsumerRecord(topic = test0, partition = 0, offset = 6, CreateTime = 1588468271864, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 0 from windows10..)
  45. ConsumerRecord(topic = test0, partition = 0, offset = 7, CreateTime = 1588468271913, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 2 from windows10..)
  46. ConsumerRecord(topic = test0, partition = 0, offset = 8, CreateTime = 1588468271918, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 4 from windows10..)
  47. ConsumerRecord(topic = test0, partition = 0, offset = 9, CreateTime = 1588468271922, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 6 from windows10..)
  48. ConsumerRecord(topic = test0, partition = 0, offset = 10, CreateTime = 1588468271927, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 8 from windows10..)
  49. ConsumerRecord(topic = test0, partition = 0, offset = 11, CreateTime = 1588468408386, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 0 from windows10..)
  50. ConsumerRecord(topic = test0, partition = 0, offset = 12, CreateTime = 1588468408405, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 2 from windows10..)
  51. ConsumerRecord(topic = test0, partition = 0, offset = 13, CreateTime = 1588468408409, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 4 from windows10..)
  52. ConsumerRecord(topic = test0, partition = 0, offset = 14, CreateTime = 1588468408413, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 6 from windows10..)
  53. ConsumerRecord(topic = test0, partition = 0, offset = 15, CreateTime = 1588468408417, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 8 from windows10..)
  54. ConsumerRecord(topic = test0, partition = 0, offset = 16, CreateTime = 1588468418054, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 1 from windows10..)
  55. ConsumerRecord(topic = test0, partition = 0, offset = 17, CreateTime = 1588468418058, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 3 from windows10..)
  56. ConsumerRecord(topic = test0, partition = 0, offset = 18, CreateTime = 1588468418063, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 5 from windows10..)
  57. ConsumerRecord(topic = test0, partition = 0, offset = 19, CreateTime = 1588468418068, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 7 from windows10..)
  58. ConsumerRecord(topic = test0, partition = 0, offset = 20, CreateTime = 1588468418072, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 9 from windows10..)
  59. ConsumerRecord(topic = test0, partition = 0, offset = 21, CreateTime = 1588468568082, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 1 from windows10..)
  60. ConsumerRecord(topic = test0, partition = 0, offset = 22, CreateTime = 1588468568086, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 3 from windows10..)
  61. ConsumerRecord(topic = test0, partition = 0, offset = 23, CreateTime = 1588468568091, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 5 from windows10..)
  62. ConsumerRecord(topic = test0, partition = 0, offset = 24, CreateTime = 1588468568095, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 7 from windows10..)
  63. ConsumerRecord(topic = test0, partition = 0, offset = 25, CreateTime = 1588468568100, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 9 from windows10..)
  64. ConsumerRecord(topic = test0, partition = 0, offset = 26, CreateTime = 1588472294186, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 1 from windows10..)
  65. ConsumerRecord(topic = test0, partition = 0, offset = 27, CreateTime = 1588472294190, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 3 from windows10..)
  66. ConsumerRecord(topic = test0, partition = 0, offset = 28, CreateTime = 1588472294194, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 5 from windows10..)
  67. ConsumerRecord(topic = test0, partition = 0, offset = 29, CreateTime = 1588472294200, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 7 from windows10..)
  68. ConsumerRecord(topic = test0, partition = 0, offset = 30, CreateTime = 1588472294205, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 9 from windows10..)
  69. ConsumerRecord(topic = test0, partition = 0, offset = 31, CreateTime = 1588473193347, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 1 from windows10..)
  70. ConsumerRecord(topic = test0, partition = 0, offset = 32, CreateTime = 1588473193351, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 3 from windows10..)
  71. ConsumerRecord(topic = test0, partition = 0, offset = 33, CreateTime = 1588473193355, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 5 from windows10..)
  72. ConsumerRecord(topic = test0, partition = 0, offset = 34, CreateTime = 1588473193359, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 7 from windows10..)
  73. ConsumerRecord(topic = test0, partition = 0, offset = 35, CreateTime = 1588473193363, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = msg 9 from windows10..)


properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);


  1. consumer.commitAsync(new OffsetCommitCallback() {
  2. public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
  3. Exception exception) {
  4. for (TopicPartition topicPartition : offsets.keySet()) {
  5. System.out.println("partition:");
  6. System.out.println(topicPartition.toString());
  7. System.out.println("offset and metadata");
  8. System.out.println(offsets.get(topicPartition).toString());
  9. }
  10. }
  11. });




  1. ...
  2. partition:
  3. test0-1
  4. offset and metadata
  5. OffsetAndMetadata{offset=43, metadata=''}
  6. partition:
  7. test0-0
  8. offset and metadata
  9. OffsetAndMetadata{offset=41, metadata=''}
  10. ...
  11. partition:
  12. test0-1
  13. offset and metadata
  14. OffsetAndMetadata{offset=46, metadata=''}
  15. partition:
  16. test0-0
  17. offset and metadata
  18. OffsetAndMetadata{offset=45, metadata=''}
  19. partition:
  20. test0-1
  21. offset and metadata
  22. OffsetAndMetadata{offset=47, metadata=''}
  23. partition:
  24. test0-0
  25. offset and metadata
  26. OffsetAndMetadata{offset=46, metadata=''}


  1. consumer.subscribe(Collections.singletonList("test0"), new ConsumerRebalanceListener() {
  2. public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
  3. // 重分配前调用,提交偏移
  4. }
  5. public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
  6. // 重分配后调用,对分区进行偏移量寻址
  7. }
  8. });




  1. public class TimeInterceptor implements ProducerInterceptor<String, String> {
  2. public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
  3. // 每发送一个消息,调用此方法
  4. return new ProducerRecord<String, String>(record.topic(), record.partition(),
  5. record.key(), System.currentTimeMillis() + ", " + record.value()); // 把时间戳拼接到消息前面
  6. }
  7. public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
  8. }
  9. public void close() {
  10. }
  11. public void configure(Map<String, ?> configs) {
  12. }
  13. }


  1. package interceptor;
  2. import org.apache.kafka.clients.producer.ProducerInterceptor;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. import org.apache.kafka.clients.producer.RecordMetadata;
  5. import java.util.Map;
  6. public class CountInterceptor implements ProducerInterceptor<String, String> {
  7. private int mSuccessCount = 0, mFailCount = 0;
  8. public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
  9. return record;
  10. }
  11. public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
  12. // 收到一个ack后,调用此方法,进行统计
  13. if (metadata == null) {
  14. mFailCount++;
  15. } else {
  16. mSuccessCount++;
  17. }
  18. }
  19. public void close() {
  20. // 关闭生产者后,调用此方法
  21. System.out.println("Fail count:" + mFailCount);
  22. System.out.println("Success count:" + mSuccessCount);
  23. }
  24. public void configure(Map<String, ?> configs) {
  25. }
  26. }


  1. properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
  2. Arrays.asList(TimeInterceptor.class.getCanonicalName(),
  3. CountInterceptor.class.getCanonicalName())); // 注意按顺序


  1. Fail count:0
  2. Success count:10


  1. 1588489595360, msg 0 from windows10..
  2. 1588489595472, msg 1 from windows10..
  3. 1588489595476, msg 2 from windows10..
  4. 1588489595589, msg 3 from windows10..
  5. 1588489595612, msg 4 from windows10..
  6. 1588489595616, msg 5 from windows10..
  7. 1588489595621, msg 6 from windows10..
  8. 1588489595624, msg 7 from windows10..
  9. 1588489595630, msg 8 from windows10..
  10. 1588489595633, msg 9 from windows10..





  1. if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
  2. # export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
  3. export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
  4. export JMX_PORT="9999"
  5. fi


  1. # tar -zxvf kafka-eagle-bin-1.3.7.tar.gz
  2. # mkdir /home/szc/kafka_eagle
  3. # tar -zxvf kafka-eagle-bin-1.3.7/kafka-eagle-web-1.3.7-bin.tar.gz -C /home/szc/kafka_eagle


  1. # ll kafka_eagle/
  2. total 0
  3. drwxr-xr-x. 8 root root 74 May 3 15:36 kafka-eagle-web-1.3.7


  1. ######################################
  2. # multi zookeeper&kafka cluster list
  3. ######################################
  4. kafka.eagle.zk.cluster.alias=cluster1 # 可有多个集群
  5. cluster1.zk.list= # ZooKeeper地址
  6. ######################################
  7. # zk client thread limit
  8. ######################################
  9. kafka.zk.limit.size=25
  10. ######################################
  11. # kafka eagle webui port
  12. ######################################
  13. kafka.eagle.webui.port=8048 # eagle的webui端口
  14. ######################################
  15. # kafka offset storage
  16. ######################################
  17. cluster1.kafka.eagle.offset.storage=kafka # kafka0.10以上最好改成kafka
  18. ######################################
  19. # enable kafka metrics
  20. ######################################
  21. kafka.eagle.metrics.charts=true # 使能图形显示
  22. kafka.eagle.sql.fix.error=false
  23. ######################################
  24. # kafka sql topic records max
  25. ######################################
  26. kafka.eagle.sql.topic.records.max=5000
  27. ######################################
  28. # alarm email configure 邮件相关,直接忽略
  29. ######################################
  30. kafka.eagle.mail.enable=false
  31. kafka.eagle.mail.sa=alert_sa@163.com
  32. kafka.eagle.mail.username=alert_sa@163.com
  33. kafka.eagle.mail.password=mqslimczkdqabbbh
  34. kafka.eagle.mail.server.host=smtp.163.com
  35. kafka.eagle.mail.server.port=25
  36. ######################################
  37. # alarm im configure 钉钉、微信,忽略
  38. ######################################
  39. #kafka.eagle.im.dingding.enable=true
  40. #kafka.eagle.im.dingding.url=https://oapi.dingtalk.com/robot/send?access_token=
  41. #kafka.eagle.im.wechat.enable=true
  42. #kafka.eagle.im.wechat.token=https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=xxx&corpsecret=xxx
  43. #kafka.eagle.im.wechat.url=https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=
  44. #kafka.eagle.im.wechat.touser=
  45. #kafka.eagle.im.wechat.toparty=
  46. #kafka.eagle.im.wechat.totag=
  47. #kafka.eagle.im.wechat.agentid=
  48. ######################################
  49. # delete kafka topic token
  50. ######################################
  51. kafka.eagle.topic.token=keadmin
  52. ######################################
  53. # kafka sasl authenticate
  54. ######################################
  55. cluster1.kafka.eagle.sasl.enable=false
  56. cluster1.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
  57. cluster1.kafka.eagle.sasl.mechanism=PLAIN
  58. cluster1.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="kafka-eagle";
  59. ######################################
  60. # kafka jdbc driver address jdbc地址,修改成自己的,数据库不存在会自行创建
  61. ######################################
  62. kafka.eagle.driver=com.mysql.jdbc.Driver
  63. kafka.eagle.url=jdbc:mysql://
  64. kafka.eagle.username=root
  65. kafka.eagle.password=root

3、在eagle目录下的bin目录中,修改ke.sh,添加一行创建目录的命令mkdir -p ${KE_HOME}/kms/webapps/ke/WEB-INF/classes/

  1. ....
  2. rm -rf ${KE_HOME}/kms/webapps/ke/WEB-INF/classes/*.properties
  3. mkdir -p ${KE_HOME}/kms/webapps/ke/WEB-INF/classes/
  4. cp ${KE_HOME}/conf/*.properties ${KE_HOME}/kms/webapps/ke/WEB-INF/classes/
  5. ...


  1. export KE_HOME=/home/szc/kafka_eagle/kafka-eagle-web-1.3.7
  2. export PATH=$PATH:$KE_PATH/bin


  1. # source /etc/profile
  2. # firewall-cmd --add-port=8048/tcp --permanent
  3. # firewall-cmd --reload


  1. # jar
  2. Usage: jar {ctxui}[vfmn0PMe] [jar-file] [manifest-file] [entry-point] [-C dir] files ...
  3. Options:
  4. -c create new archive
  5. -t list table of contents for archive
  6. -x extract named (or all) files from archive
  7. -u update existing archive
  8. -v generate verbose output on standard output
  9. -f specify archive file name
  10. -m include manifest information from specified manifest file
  11. -n perform Pack200 normalization after creating a new archive
  12. -e specify application entry point for stand-alone application
  13. bundled into an executable jar file
  14. -0 store only; use no ZIP compression
  15. -P preserve leading '/' (absolute path) and ".." (parent directory) components from file names
  16. -M do not create a manifest file for the entries
  17. -i generate index information for the specified jar files
  18. -C change to the specified directory and include the following file
  19. If any file is a directory then it is processed recursively.
  20. The manifest file name, the archive file name and the entry point name are
  21. specified in the same order as the 'm', 'f' and 'e' flags.
  22. Example 1: to archive two class files into an archive called classes.jar:
  23. jar cvf classes.jar Foo.class Bar.class
  24. Example 2: use an existing manifest file 'mymanifest' and archive all the
  25. files in the foo/ directory into 'classes.jar':
  26. jar cvfm classes.jar mymanifest -C foo/ .


  1. # kafka_eagle/kafka-eagle-web-1.3.7/bin/ke.sh start
  2. ....
  3. *******************************************************************
  4. * Kafka Eagle system monitor port successful...
  5. *******************************************************************
  6. [2020-05-03 16:54:25] INFO: Status Code[0]
  7. [2020-05-03 16:54:25] INFO: [Job done!]
  8. Welcome to
  9. __ __ ___ ____ __ __ ___ ______ ___ ______ __ ______
  10. / //_/ / | / __/ / //_/ / | / ____/ / | / ____/ / / / ____/
  11. / ,< / /| | / /_ / ,< / /| | / __/ / /| | / / __ / / / __/
  12. / /| | / ___ | / __/ / /| | / ___ | / /___ / ___ |/ /_/ / / /___ / /___
  13. /_/ |_| /_/ |_|/_/ /_/ |_| /_/ |_| /_____/ /_/ |_|\____/ /_____//_____/
  14. Version 1.3.7
  15. *******************************************************************
  16. * Kafka Eagle Service has started success.
  17. * Welcome, Now you can visit ''
  18. * Account:admin ,Password:123456
  19. *******************************************************************
  20. * <Usage> ke.sh [start|status|stop|restart|stats] </Usage>
  21. * <Usage> https://www.kafka-eagle.org/ </Usage>
  22. *******************************************************************




[root@localhost szc]# unzip kafka-manager-


  1. [root@localhost szc]# cd kafka-manager-
  2. [root@localhost kafka-manager-]# vim conf/application.conf




  1. [root@localhost kafka-manager-]# bin/kafka-manager -Dhttp.port=7456 > start.log 2>&1 &
  2. [1] 38560

> start.log表示输出日志追加到start.log中,2>&1表示错误信息写到控制台,由于前面的文件重定向,这里也会把错误信息追加到日志文件中,最后一个&表示后台运行此进程



  1. [root@localhost kafka-manager-]# firewall-cmd --add-port=7456/tcp --permanent
  2. success
  3. [root@localhost kafka-manager-]# firewall-cmd --reload
  4. success

5、在windows浏览器里访问192.168.57.141:7456,点击上面的Cluster-> Add Cluster






[root@localhost kafka-manager-]# kill 38560



  1. [root@localhost kafka_2.11-]# bin/kafka-producer-perf-test.sh --topic test0 --record-size 100 --num-records 100000 --throughput 1000 --producer-props bootstrap.servers=
  2. 5002 records sent, 1000.0 records/sec (0.10 MB/sec), 2.6 ms avg latency, 157.0 max latency.
  3. 5005 records sent, 1001.0 records/sec (0.10 MB/sec), 0.7 ms avg latency, 7.0 max latency.
  4. 5002 records sent, 1000.4 records/sec (0.10 MB/sec), 0.6 ms avg latency, 4.0 max latency.
  5. 5001 records sent, 1000.0 records/sec (0.10 MB/sec), 0.6 ms avg latency, 8.0 max latency.
  6. 5001 records sent, 1000.0 records/sec (0.10 MB/sec), 0.5 ms avg latency, 4.0 max latency.
  7. 5001 records sent, 1000.0 records/sec (0.10 MB/sec), 0.5 ms avg latency, 9.0 max latency.
  8. 5000 records sent, 1000.0 records/sec (0.10 MB/sec), 0.5 ms avg latency, 5.0 max latency.
  9. 5004 records sent, 1000.4 records/sec (0.10 MB/sec), 1.0 ms avg latency, 68.0 max latency.
  10. ....
  11. 100000 records sent, 999.970001 records/sec (0.10 MB/sec), 0.68 ms avg latency, 157.00 ms max latency, 1 ms 50th, 1 ms 95th, 3 ms 99th, 38 ms 99.9th.

最后一条是总结,总共发了100000记录,每秒 999.970001条,吞吐量0.1MB/s,平均延迟0.68ms,最大延迟157ms


  1. [root@localhost kafka_2.11-]# bin/kafka-consumer-perf-test.sh --zookeeper --topic test0 --fetch-size 1000 --messages 10000000 --threads 1
  2. start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
  3. 2020-05-25 19:53:25:047, 2020-05-25 19:53:29:393, 9.5367, 2.1944, 100000, 23009.6641






# mkdir logs


  1. # mkdir job
  2. # vim kafka_flume.conf


  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sinks = k1
  4. # 输入类型为netcat,来自localhost:6666
  5. a1.sources.r1.type = netcat
  6. a1.sources.r1.bind = localhost
  7. a1.sources.r1.port = 6666
  8. # 内存输入
  9. a1.channels.c1.type = memory
  10. a1.channels.c1.capacity = 1000
  11. a1.channels.c1.transactionCapacity = 100
  12. # kafka输出
  13. a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
  14. a1.sinks.k1.kafka.topic = test0 # 话题
  15. a1.sinks.k1.kafka.bootstrap.servers = # kafka服务器地址
  16. a1.sinks.k1.kafka.flumeBatchSize = 20
  17. a1.sinks.k1.kafka.producer.acks = 1
  18. a1.sinks.k1.kafka.producer.linger.ms = 1
  19. # 绑定
  20. a1.sources.r1.channels = c1
  21. a1.sinks.k1.channel = c1


  1. # cd /usr/local
  2. # wget https://ncu.dl.sourceforge.net/project/netcat/netcat/0.7.1/netcat-0.7.1.tar.gz
  3. # tar -zxvf netcat-0.7.1.tar.gz
  4. # mv netcat-0.7.1 netcat
  5. # cd netcat/
  6. # ./configure
  7. # make
  8. # make install


# rm -f /usr/bin/nc


  1. $ nc -nv -w 1 -z 1-500
  2. 22 (ssh) open
  3. 111 (sunrpc) open


# /home/szc/kafka_2.11- --bootstrap-server --topic test0


# ./bin/flume-ng agent -c conf/ -f job/kafka_flume.conf -n a1


  1. # nc localhost 6666
  2. sads
  3. OK
  4. szc
  5. OK


  1. # /home/szc/kafka_2.11- --bootstrap-server --topic test0
  2. sads
  3. szc



  1. <dependency>
  2. <groupId>org.apache.flume</groupId>
  3. <artifactId>flume-ng-core</artifactId>
  4. <version>1.8.0</version>
  5. </dependency>


  1. public class FlumeInterceptor implements Interceptor {
  2. private ArrayList<Event> mHeaderEvents = new ArrayList<Event>();
  3. public void initialize() {
  4. }
  5. public Event intercept(Event event) { // 主要的拦截逻辑
  6. String body = new String(event.getBody());
  7. Map<String, String> headers = event.getHeaders();
  8. String topic = body.contains("szc") ? "test0" : "test1";
  9. headers.put("topic", topic); // 根据消息内容,在头信息中添加话题
  10. return event;
  11. }
  12. public List<Event> intercept(List<Event> list) {
  13. mHeaderEvents.clear();
  14. for (Event event : list) {
  15. mHeaderEvents.add(intercept(event));
  16. }
  17. return mHeaderEvents;
  18. }
  19. public void close() {
  20. }
  21. public static class Builder implements Interceptor.Builder {
  22. // 构造器,配置文件中要传入此类的全类名
  23. public Interceptor build() {
  24. return new FlumeInterceptor();
  25. }
  26. public void configure(Context context) {
  27. }
  28. }
  29. }




  1. a1.sources = r1
  2. ....
  3. # 拦截器设置
  4. a1.sources.r1.interceptors = i1
  5. a1.sources.r1.interceptors.i1.type = interceptor.FlumeInterceptor$Builder
  6. ....


# ./bin/flume-ng agent -c conf/ -f job/kafka_flume.conf -n a1


  1. # nc localhost 6666
  2. szc
  3. OK
  4. szcc
  5. OK
  6. 31244h
  7. OK


  1. # /home/szc/kafka_2.11- --bootstrap-server --topic test0
  2. szc
  3. szcc
  4. # /home/szc/kafka_2.11- --bootstrap-server --topic test1
  5. 31244h



