当前位置:   article > 正文

25+ Kafka 常用管理命令和脚本


公众号关注 「奇妙的 Linux 世界」

设为「星标」,每天带你玩转 Linux !



启动 Kafka

-daemon 参数可以让 Kafka 在后台运行。

kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

指定 JMX 端口启动

JMX 的全称为 Java Management Extensions。顾名思义,是管理 Java 的一种扩展,通过 JMX 可以方便我们监控 Kafka 的内存,线程,CPU 的使用情况,以及生产和消费消息的指标。

JMX_PORT=9999 kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

停止 Kafka



创建 Topic

kafka-topics.sh --create  --bootstrap-server --replication-factor 3 --partitions 3 --topic <topic-name>

列出所有 Topic

kafka-topics.sh  --bootstrap-server --list

查看指定 Topic

kafka-topics.sh --bootstrap-server --describe --topic <topic-name>

删除指定 Topic

kafka-topics.sh --bootstrap-server --delete --topic <topic-name>

扩展 Topic 的 Partition 数量

artition 数量只能扩大不能缩小。

kafka-topics.sh --bootstrap-server --topic app --alter --partitions 30

扩展 topic 每个 partition 的副本数量

replication factor 可以扩大也可以缩小,最多不能超过 broker 数量。先创建一个文件名为 increace-factor.json,这里要扩展的是 mysql-audit-log 这个 topic 的 partition 到 15 个:0,1,2 为 broker id。

  1. {"version":1,
  2. "partitions":[
  3. {"topic":"mysql-audit-log","partition":0,"replicas":[0,1,2]},
  4. {"topic":"mysql-audit-log","partition":1,"replicas":[0,1,2]},
  5. {"topic":"mysql-audit-log","partition":2,"replicas":[0,1,2]},
  6. {"topic":"mysql-audit-log","partition":3,"replicas":[0,1,2]},
  7. {"topic":"mysql-audit-log","partition":4,"replicas":[0,1,2]},
  8. {"topic":"mysql-audit-log","partition":5,"replicas":[0,1,2]},
  9. {"topic":"mysql-audit-log","partition":6,"replicas":[0,1,2]},
  10. {"topic":"mysql-audit-log","partition":7,"replicas":[0,1,2]},
  11. {"topic":"mysql-audit-log","partition":8,"replicas":[0,1,2]},
  12. {"topic":"mysql-audit-log","partition":9,"replicas":[0,1,2]},
  13. {"topic":"mysql-audit-log","partition":10,"replicas":[0,1,2]},
  14. {"topic":"mysql-audit-log","partition":11,"replicas":[0,1,2]},
  15. {"topic":"mysql-audit-log","partition":12,"replicas":[0,1,2]},
  16. {"topic":"mysql-audit-log","partition":13,"replicas":[0,1,2]},
  17. {"topic":"mysql-audit-log","partition":14,"replicas":[0,1,2]}
  18. ]}
kafka-reassign-partitions.sh --zookeeper --reassignment-json-file  increace-factor.json --execute

查看 Topic 数据大小

  1. #方法一
  2. kafka-log-dirs.sh \
  3.   --bootstrap-server \
  4.   --topic-list mytopic \
  5.   --describe \
  6.   | grep -oP '(?<=size":)\d+'  \
  7.   | awk '{ sum += $1 } END { print sum }'
  9. #返回结果,单位 Byte
  10. 648
  11. #方法二,需要安装 jq
  12. kafka-log-dirs.sh \
  13.     --bootstrap-server \
  14.     --topic-list mytopic \
  15.     --describe \
  16.   | grep '^{' \
  17.   | jq '[ ..|.size? | numbers ] | add'
  18. #返回结果,单位 Byte
  19. 648

消费者组 Consumer Group

列出所有的 Consumer Group

kafka-consumer-groups.sh --bootstrap-server --list

查看指定 Consumer Group 详情

  • GROUP:消费者 group

  • TOPIC:话题 id

  • PARTITION:分区 id

  • CURRENT-OFFSET:当前已消费的条数


  • LAG:未消费的条数

  • CONSUMER-ID:消费者 id

  • HOST:消费者 ip 地址

  • CLIENT-ID:客户端 id

  1. #这里查看的是 logstash_mysql 这个消费者 group 的消费情况
  2. kafka-consumer-groups.sh --bootstrap-server --describe --group logstash_mysql
  3. #返回结果
  4. GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                      HOST            CLIENT-ID
  5. logstash_mysql  mysql-audit-log 11         1312115         1312857         742             logstash-5-0545a8a7-f7bd-430c-b619-7a2b206addd2  /    logstash-5
  6. logstash_mysql  mysql-audit-log 1          1312593         1313345         752             logstash-0-d86bd51a-d010-45de-aa6f-f6da8542b779  /    logstash-0
  7. logstash_mysql  mysql-audit-log 2          1309548         1310317         769             logstash-1-496340ea-935d-444d-a184-51d42e225054  /    logstash-1
  8. logstash_mysql  mysql-audit-log 12         1313083         1313194         111             logstash-6-806b20cb-a9af-49c1-b37d-ccb33a646ab2  /    logstash-6
  9. logstash_mysql  mysql-audit-log 6          1310984         1311192         208             logstash-13-8d474bf6-e8d0-4b8a-b319-cf5e2e6cc078 /    logstash-13
  10. logstash_mysql  mysql-audit-log 9          1312998         1313768         770             logstash-3-29863fb0-6708-4fb1-9e28-bd81c30ce8ef  /    logstash-3
  11. logstash_mysql  mysql-audit-log 4          1315150         1315276         126             logstash-11-6d66a188-85b7-476b-bd89-5423ef48cd01 /    logstash-11
  12. logstash_mysql  mysql-audit-log 0          22770935522     22770935650     128             logstash-0-7be475d6-a49e-4ff9-bf83-6b83f6067306  /    logstash-0
  13. logstash_mysql  mysql-audit-log 8          1309956         1310103         147             logstash-2-3c313c6f-8c98-4333-8bad-2f9696457d7d  /    logstash-2
  14. logstash_mysql  mysql-audit-log 13         1314659         1314775         116             logstash-7-e98fd14e-e7f6-45e5-8ccf-2442058f0bc9  /    logstash-7
  15. logstash_mysql  mysql-audit-log 14         1313145         1313250         105             logstash-8-2c3345a8-f8f1-4f08-a18e-333dff2f0d65  /    logstash-8
  16. logstash_mysql  mysql-audit-log 5          1314037         1314297         260             logstash-12-ce018227-9e59-4137-a23f-5ccc0c7d4f6a /    logstash-12
  17. logstash_mysql  mysql-audit-log 10         1312883         1312962         79              logstash-4-9eb84ae4-3351-4083-9b1f-288910a6c3b8  /    logstash-4
  18. logstash_mysql  mysql-audit-log 7          1312476         1313200         724             logstash-14-680c982e-5cf3-406b-810a-4d5c96b5bdee /    logstash-14
  19. logstash_mysql  mysql-audit-log 3          1313227         1313328         101             logstash-10-e212dc18-a2bb-42d9-9d0b-095a93841efc /    logstash-10

删除指定 Consumer Group

kafka-topics.sh --bootstrap-server --delete --topic pgw-nginx



  1. kafka-console-producer.sh --broker-list --topic mytopic
  2. >this is my topic
生产消息指定 Key

key.separator=, 指定以逗号作为 key 和 value 的分隔符。

  1. kafka-console-producer.sh --broker-list kafka1:9092 --topic cr7-topic --property parse.key=true --property key.separator=,
  2. >mykey,{"orderAmount":1000,"orderId":1,"productId":101,"productNum":1}



从头开始消费是可以消费到之前的消息的,通过 --from-beginning 指定:

  1. kafka-console-consumer.sh --bootstrap-server --topic mytopic --from-beginning
  2. this is my topic

--offset latest 指定从尾部开始消费,另外还需要指定 partition,可以指定多个:

kafka-console-consumer.sh --bootstrap-server --topic mytopic  --offset latest  --partition 0 1 2


  1. kafka-console-consumer.sh --bootstrap-server --topic mytopic  --offset latest  --partition 0 1 2 --max-messages 2
  2. bobo
  3. 1111
  4. Processed a total of 2 messages

--consumer-property group.id=<消费者组名>执行消费者组进行消费:

kafka-console-consumer.sh --bootstrap-server  kafka1:9092 --topic test_partition --consumer-property  group.id=test_group --from-beginning


  1. kafka-dump-log.sh --files cr7-topic-0/00000000000000000000.log  -deep-iteration --print-data-log
  3.  #输出结果
  4. | offset: 1080 CreateTime: 1615020877664 keysize: 1 valuesize: 63 sequence: -1 headerKeys: [] key: 1 payload: {"orderAmount":1000,"orderId":1,"productId":101,"productNum":1}
  5. | offset: 1081 CreateTime: 1615020877677 keysize: 1 valuesize: 63 sequence: -1 headerKeys: [] key: 5 payload: {"orderAmount":1000,"orderId":5,"productId":105,"productNum":5}
  6. | offset: 1082 CreateTime: 1615020877677 keysize: 1 valuesize: 63 sequence: -1 headerKeys: [] key: 7 payload: {"orderAmount":1000,"orderId":7,"productId":107,"productNum":7}
  7. | offset: 1083 CreateTime: 1615020877677 keysize: 1 valuesize: 63 sequence: -1 headerKeys: [] key: 8 payload: {"orderAmount":1000,"orderId":8,"productId":108,"productNum":8}
  8. | offset: 1084 CreateTime: 1615020877677 keysize: 2 valuesize: 65 sequence: -1 headerKeys: [] key: 11 payload: {"orderAmount":1000,"orderId":11,"productId":111,"productNum":11}
  9. | offset: 1085 CreateTime: 1615020877677 keysize: 2 valuesize: 65 sequence: -1 headerKeys: [] key: 15 payload: {"orderAmount":1000,"orderId":15,"productId":115,"productNum":15}
  10. | offset: 1086 CreateTime: 1615020877678 keysize: 2 valuesize: 65 sequence: -1 headerKeys: [] key: 17 payload: {"orderAmount":1000,"orderId":17,"productId":117,"productNum":17}
  11. | offset: 1087 CreateTime: 1615020877678 keysize: 2 valuesize: 65 sequence: -1 headerKeys: [] key: 21 payload: {"orderAmount":1000,"orderId":21,"productId":121,"productNum":21}

查看 Topic 中当前消息总数

Kafka 自带的命令没有直接提供这样的功能,要使用 Kafka 提供的工具类 GetOffsetShell 来计算给定 Topic 每个分区当前最早位移和最新位移,差值就是每个分区的当前的消息总数,将该 Topic 所有分区的消息总数累加就能得到该 Topic 总的消息数。

首先查询 Topic 中每个分区 offset 的最小值(起始位置),使用 --time -2 参数。一个分区的起始位置并不是每时每刻都为 0 ,因为日志清理的动作会清理旧的数据,所以分区的起始位置会自然而然地增加。

  1. kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka1:9092 -topic test-topic  --time -2
  2. #前面是分区号,后面是 offset
  3. test-topic:0:0
  4. test-topic:1:0

然后使用--time -1 参数查询 Topic 各个分区的 offset 的最大值。

  1. kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka1:9092 --time -1 --topic test-topic
  2. #输出结果
  3. test-topic:0:5500000
  4. test-topic:1:5500000

对于本例来说,test-topic 中当前总的消息数为 (5500000 - 0) + (5500000 - 0),等于 1100 万条。如果只是要获取 Topic 中总的消息数(包括已经从 Kafka 删除的消息),那么只需要将 Topic 中每个 Partition 的 Offset 累加即可。


重置消费者 Offset

  1. #查看消费者组消费情况
  2. #目前的 0 分区 CURRENT-OFFSET 是 42 分区 CURRENT-OFFSET 是 6
  3. kafka-consumer-groups.sh --bootstrap-server kafka1:9092 --describe --group my-consumer-group
  4. #返回结果
  5. Consumer group 'my-consumer-group' has no active members.
  6. GROUP             TOPIC                 PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
  7. my-consumer-group transaction-topic-msg 2          6               6               0               -               -               -
  8. my-consumer-group transaction-topic-msg 1          0               0               0               -               -               -
  9. my-consumer-group transaction-topic-msg 0          4               4               0               -               -               -         -
  10. #重置消费者组 offset 为 3,重置是所有分区一起重置
  11. kafka-consumer-groups.sh --bootstrap-server kafka1:9092 --group my-consumer-group --reset-offsets --execute --to-offset 3 --topic transaction-topic-msg
  12. #返回结果
  13. [2021-06-25 10:44:51,848] WARN New offset (3) is higher than latest offset for topic partition transaction-topic-msg-1. Value will be set to 0 (kafka.admin.ConsumerGroupCommand$)
  14. GROUP                          TOPIC                          PARTITION  NEW-OFFSET     
  15. my-consumer-group              transaction-topic-msg          0          3              
  16. my-consumer-group              transaction-topic-msg          1          0              
  17. my-consumer-group              transaction-topic-msg          2          3              
  18. #可以看到 0 分区和 2 分区的 CURRENT-OFFSET 都变为 3 了
  19. kafka-consumer-groups.sh --bootstrap-server kafka1:9092 --describe --group my-consumer-group
  20. #返回结果
  21. Consumer group 'my-consumer-group' has no active members.
  22. GROUP             TOPIC                 PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
  23. my-consumer-group transaction-topic-msg 2          3               6               3               -               -               -
  24. my-consumer-group transaction-topic-msg 1          0               0               0               -               -               -
  25. my-consumer-group transaction-topic-msg 0          3               4               1               -               -               -
  26. #可以重新消费到之前的数据
  27. kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic transaction-topic-msg  --group my-consumer-group 
  28. #返回结果
  29. message-111111
  30. message-333333



  • --num-records 10000000: 向指定主题发送了 1 千万条消息。

  • --record-size 1024: 每条消息的大小为 1024KB。

  • --throughput -1: 不限制吞吐量。

  • --producer-props: 指定生产者参数。

    • acks=-1: 这要求 ISR 列表里跟 leader 保持同步的那些 follower 都要把消息同步过去,才能认为这条消息是写入成功。

    • linger.ms=2000: batch.size 和 linger.ms 是对 kafka producer 性能影响比较大的两个参数。batch.size 是 producer 批量发送的基本单位,默认是 16384Bytes,即 16kB;lingger.ms 是 sender 线程在检查 batch 是否 ready 时候,判断有没有过期的参数,默认大小是 0ms。

    • compression.type=lz4: 使用 lz4 压缩算法。

  1. [root@kafka1 ~]# kafka-producer-perf-test.sh --topic test_producer_perf --num-records 10000000 --throughput -1 --record-size 1024 --producer-props bootstrap.servers=kafka1:9092 acks=-1 linger.ms=2000 compression.type=lz4
  2. #输出结果
  3. 705600 records sent, 141063.6 records/sec (137.76 MB/sec), 54.8 ms avg latency, 557.0 ms max latency.
  4. 1204178 records sent, 240739.3 records/sec (235.10 MB/sec), 44.1 ms avg latency, 402.0 ms max latency.
  5. 1370938 records sent, 274187.6 records/sec (267.76 MB/sec), 27.9 ms avg latency, 311.0 ms max latency.
  6. 1464605 records sent, 292628.4 records/sec (285.77 MB/sec), 19.2 ms avg latency, 139.0 ms max latency.
  7. 1477239 records sent, 295447.8 records/sec (288.52 MB/sec), 31.8 ms avg latency, 290.0 ms max latency.
  8. 1446682 records sent, 289336.4 records/sec (282.56 MB/sec), 26.4 ms avg latency, 281.0 ms max latency.
  9. 1555098 records sent, 311019.6 records/sec (303.73 MB/sec), 37.6 ms avg latency, 344.0 ms max latency.
  10. 10000000 records sent, 263894.020162 records/sec (257.71 MB/sec), 32.60 ms avg latency, 557.00 ms max latency, 12 ms 50th, 140 ms 95th, 262 ms 99th, 396 ms 99.9th.

我们应该关心延时的概率分布情况,仅仅知道一个平均值是没有意义的。这就是这里计算分位数的原因。通常我们关注到 99th 分位就可以了。比如在上面的输出中,99th 值是 262 ms,这表明测试生产者生产的消息中,有 99% 消息的延时都在 262 ms 以内。你完全可以把这个数据当作这个生产者对外承诺的 SLA。


  1. [root@kafka1 ~]# kafka-consumer-perf-test.sh --broker-list kafka1:9092 --messages 10000000 --topic test_producer_perf
  2. #输出结果
  3. start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
  4. 2021-03-09 10:34:18:4472021-03-09 10:34:33:9489765.6250629.999710000000645119.66971615257259068-1615257243567-0.0000-0.0062

虽然输出格式有所差别,但该脚本也会打印出消费者的吞吐量数据。比如本例中的 629.9997MB/s。有点令人遗憾的是,它没有计算不同分位数下的分布情况。因此,在实际使用过程中,这个脚本的使用率要比生产者性能测试脚本的使用率低。



如果你想要知道动态 Broker 参数都有哪些,一种方式是在 Kafka 官网中查看 Broker 端参数列表,另一种方式是直接运行无参数的 kafka-configs 脚本,该脚本的说明文档会告诉你当前动态 Broker 参数都有哪些。

  1. [root@kafka1 ~]# kafka-configs.sh 
  2. This tool helps to manipulate and describe entity config for a topic, client, user or broker
  3. Option                                 Description                            
  4. ------                                 -----------                            
  5. --add-config <String>                  Key Value pairs of configs to add.     
  6.                                          Square brackets can be used to group 
  7.                                          values which contain commas: 'k1=v1, 
  8.                                          k2=[v1,v2,v2],k3=v3'. The following  
  9.                                          #Topic 动态参数
  10.                                          is a list of valid configurations:   
  11.                                          For entity-type 'topics':            
  12.                                         cleanup.policy                        
  13.                                         compression.type                      
  14.                                         delete.retention.ms                   
  15.                                         file.delete.delay.ms                  
  16.                                         flush.messages                        
  17.                                         flush.ms                              
  18.                                         follower.replication.throttled.       
  19.                                          replicas                             
  20.                                         index.interval.bytes                  
  21.                                         leader.replication.throttled.replicas 
  22.                                         max.compaction.lag.ms                 
  23.                                         max.message.bytes                     
  24.                                         message.downconversion.enable         
  25.                                         message.format.version                
  26.                                         message.timestamp.difference.max.ms   
  27.                                         message.timestamp.type                
  28.                                         min.cleanable.dirty.ratio             
  29.                                         min.compaction.lag.ms                 
  30.                                         min.insync.replicas                   
  31.                                         preallocate                           
  32.                                         retention.bytes                       
  33.                                         retention.ms                          
  34.                                         segment.bytes                         
  35.                                         segment.index.bytes                   
  36.                                         segment.jitter.ms                     
  37.                                         segment.ms                            
  38.                                         unclean.leader.election.enable        
  39.                                         #Broker 动态参数
  40.                                        For entity-type 'brokers':             
  41.                                         advertised.listeners                  
  42.                                         background.threads                    
  43.                                         compression.type                      
  44.                                         follower.replication.throttled.rate   
  45.                                         leader.replication.throttled.rate     
  46.                                         listener.security.protocol.map        
  47.                                         listeners                             
  48.                                         log.cleaner.backoff.ms                
  49.                                         log.cleaner.dedupe.buffer.size        
  50.                                         log.cleaner.delete.retention.ms       
  51.                                         log.cleaner.io.buffer.load.factor     
  52.                                         log.cleaner.io.buffer.size            
  53.                                         log.cleaner.io.max.bytes.per.second   
  54.                                         log.cleaner.max.compaction.lag.ms     
  55.                                         log.cleaner.min.cleanable.ratio       
  56.                                         log.cleaner.min.compaction.lag.ms     
  57.                                         log.cleaner.threads                   
  58.                                         log.cleanup.policy                    
  59.                                         log.flush.interval.messages           
  60.                                         log.flush.interval.ms                 
  61.                                         log.index.interval.bytes              
  62.                                         log.index.size.max.bytes              
  63.                                         log.message.downconversion.enable     
  64.                                         log.message.timestamp.difference.max. 
  65.                                          ms                                   
  66.                                         log.message.timestamp.type            
  67.                                         log.preallocate                       
  68.                                         log.retention.bytes                   
  69.                                         log.retention.ms                      
  70.                                         log.roll.jitter.ms                    
  71.                                         log.roll.ms                           
  72.                                         log.segment.bytes                     
  73.                                         log.segment.delete.delay.ms           
  74.                                         max.connection.creation.rate          
  75.                                         max.connections                       
  76.                                         max.connections.per.ip                
  77.                                         max.connections.per.ip.overrides      
  78.                                         message.max.bytes                     
  79.                                         metric.reporters                      
  80.                                         min.insync.replicas                   
  81.                                         num.io.threads                        
  82.                                         num.network.threads                   
  83.                                         num.recovery.threads.per.data.dir     
  84.                                         num.replica.fetchers                  
  85.                                         principal.builder.class               
  86.                                         replica.alter.log.dirs.io.max.bytes.  
  87.                                          per.second                           
  88.                                         sasl.enabled.mechanisms               
  89.                                         sasl.jaas.config                      
  90.                                         sasl.kerberos.kinit.cmd               
  91.                                         sasl.kerberos.min.time.before.relogin 
  92.                                         sasl.kerberos.principal.to.local.rules
  93.                                         sasl.kerberos.service.name            
  94.                                         sasl.kerberos.ticket.renew.jitter     
  95.                                         sasl.kerberos.ticket.renew.window.    
  96.                                          factor                               
  97.                                         sasl.login.refresh.buffer.seconds     
  98.                                         sasl.login.refresh.min.period.seconds 
  99.                                         sasl.login.refresh.window.factor      
  100.                                         sasl.login.refresh.window.jitter      
  101.                                         sasl.mechanism.inter.broker.protocol  
  102.                                         ssl.cipher.suites                     
  103.                                         ssl.client.auth                       
  104.                                         ssl.enabled.protocols                 
  105.                                         ssl.endpoint.identification.algorithm 
  106.                                         ssl.engine.factory.class              
  107.                                         ssl.key.password                      
  108.                                         ssl.keymanager.algorithm              
  109.                                         ssl.keystore.certificate.chain        
  110.                                         ssl.keystore.key                      
  111.                                         ssl.keystore.location                 
  112.                                         ssl.keystore.password                 
  113.                                         ssl.keystore.type                     
  114.                                         ssl.protocol                          
  115.                                         ssl.provider                          
  116.                                         ssl.secure.random.implementation      
  117.                                         ssl.trustmanager.algorithm            
  118.                                         ssl.truststore.certificates           
  119.                                         ssl.truststore.location               
  120.                                         ssl.truststore.password               
  121.                                         ssl.truststore.type                   
  122.                                         unclean.leader.election.enable        
  123.                                        For entity-type 'users':               
  124.                                         SCRAM-SHA-256                         
  125.                                         SCRAM-SHA-512                         
  126.                                         consumer_byte_rate                    
  127.                                         controller_mutation_rate              
  128.                                         producer_byte_rate                    
  129.                                         request_percentage                    
  130.                                        For entity-type 'clients':             
  131.                                         consumer_byte_rate                    
  132.                                         controller_mutation_rate              
  133.                                         producer_byte_rate                    
  134.                                         request_percentage                    
  135.                                        Entity types 'users' and 'clients' may 
  136.                                          be specified together to update      
  137.                                          config for clients of a specific     
  138.                                          user.
修改 Broker 动态参数

修改动态参数无需重启 Broker,动态 Broker 参数的使用场景非常广泛,通常包括但不限于以下几种:

  • 动态调整 Broker 端各种线程池大小,实时应对突发流量。

  • 动态调整 Broker 端连接信息或安全配置信息。

  • 动态更新 SSL Keystore 有效期。

  • 动态调整 Broker 端 Compact 操作性能。

  • 实时变更 JMX 指标收集器 (JMX Metrics Reporter)。

Kafka Broker Config 的参数有以下 3 种类型:

  • read-only:被标记为 read-only 的参数和原来的参数行为一样,只有重启 Broker,才能令修改生效。

  • per-broker:被标记为 per-broker 的参数属于动态参数,修改它之后,只会在对应的 Broker 上生效。

  • cluster-wide:被标记为 cluster-wide 的参数也属于动态参数,修改它之后,会在整个集群范围内生效,也就是说,对所有 Broker 都生效。你也可以为具体的 Broker 修改 cluster-wide 参数。

在集群层面设置全局值,即设置 cluster-wide 范围值,将 unclean.leader.election.enable 参数在集群层面设置为 true。

  1. kafka-configs.sh --bootstrap-server \
  2. --entity-type brokers --entity-default --alter \
  3. --add-config unclean.leader.election.enable=true
  4. #返回结果
  5. Completed updating default config for brokers in the cluster.

如果要设置 cluster-wide 范围的动态参数,需要显式指定 entity-default。现在,我们使用下面的命令来查看一下刚才的配置是否成功。

  1. kafka-configs.sh --bootstrap-server \
  2. --entity-type brokers --entity-default --describe
  3. #返回结果
  4. Default configs for brokers in the cluster are:
  5.   unclean.leader.election.enable=true sensitive=false synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:unclean.leader.election.enable=true}

在 Zookeeper 上查看 /config/brokers/ 节点可以查看 cluster-wide 的动态参数设置。

  1. [zk: (CONNECTED) ] > get /config/brokers/<default>
  2. {"version":1,"config":{"unclean.leader.election.enable":"true"}}
  3. cZxid = 17179869570
  4. ctime = 1631246402937
  5. mZxid = 17179869570
  6. mtime = 1631246402937
  7. pZxid = 17179869570
  8. cversion = 0
  9. dataVersion = 0
  10. aclVersion = 0
  11. ephemeralOwner = 0
  12. dataLength = 64
  13. numChildren = 0

设置 per-broker 范围参数。我们还是以 unclean.leader.election.enable 参数为例,我现在为 ID 为 1 的 Broker 设置一个不同的值。命令如下:

  1. kafka-configs.sh --bootstrap-server --entity-type brokers --entity-name 1 --alter --add-config unclean.leader.election.enable=false
  2. #返回结果
  3. Completed updating config for broker 1.

我们使用下列命令查看 Broker ID 为 1 的节点动态参数,可以看到 DYNAMIC_BROKER_CONFIG:unclean.leader.election.enable=false,表示我们刚才对 per-broker 参数的调整生效了。

  1. kafka-configs.sh --bootstrap-server --entity-type brokers --entity-name 1 --describe
  2. #返回结果
  3. Dynamic configs for broker 1 are:
  4.   unclean.leader.election.enable=false sensitive=false synonyms={DYNAMIC_BROKER_CONFIG:unclean.leader.election.enable=false, DYNAMIC_DEFAULT_BROKER_CONFIG:unclean.leader.election.enable=true, STATIC_BROKER_CONFIG:unclean.leader.election.enable=false, DEFAULT_CONFIG:unclean.leader.election.enable=false}

在 Zookeeper 上查看 /config/brokers/1 节点可以查看 Broker ID 为 1 的节点的动态参数设置。

  1. [zk: (CONNECTED) ] > get /config/brokers/1
  2. {"version":1,"config":{"unclean.leader.election.enable":"false"}}
  3. cZxid = 17179869574
  4. ctime = 1631246495120
  5. mZxid = 17179869574
  6. mtime = 1631246495120
  7. pZxid = 17179869574
  8. cversion = 0
  9. dataVersion = 0
  10. aclVersion = 0
  11. ephemeralOwner = 0
  12. dataLength = 65
  13. numChildren = 0[zk: (CONNECTED) ] > get /config/brokers/<default>[zk: (CONNECTED) ] > get /config/brokers/1

删除 cluster-wide 范围动态参数。

  1. kafka-configs.sh --bootstrap-server \
  2. --entity-type brokers --entity-default --alter \
  3. --delete-config unclean.leader.election.enable
  4. #返回结果
  5. Completed updating default config for brokers in the cluster.

删除 per-broker 范围参数。

  1. kafka-configs.sh --bootstrap-server \
  2. --entity-type brokers --entity-name 1 --alter \
  3. --delete-config unclean.leader.election.enable
  4. #返回结果
  5. Completed updating config for broker 1.
修改 Topic 动态参数

设置 Topic test-topic 的 retention.ms 为 10000。

  1. kafka-configs.sh --bootstrap-server \
  2. --entity-type topics --entity-name test-topic --alter \
  3. --add-config retention.ms=10000

查看设置的 Topic 动态参数。

  1. kafka-configs.sh --bootstrap-server \
  2. --entity-type topics --entity-name test-topic --describe
  3. #返回结果
  4. Dynamic configs for topic test-topic are:
  5.   retention.ms=10000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:retention.ms=10000}

在 Zookeeper 上可以查看 /config/topics/ 来查看 Topic 动态参数。

  1. [zk: (CONNECTED) ] > get /config/topics/test-topic
  2. {"version":1,"config":{"retention.ms":"10000"}}
  3. cZxid = 17179869460
  4. ctime = 1631245744105
  5. mZxid = 17179869619
  6. mtime = 1631250116481
  7. pZxid = 17179869460
  8. cversion = 0
  9. dataVersion = 10
  10. aclVersion = 0
  11. ephemeralOwner = 0
  12. dataLength = 47
  13. numChildren = 0[zk: (CONNECTED) ] > get /config/topics/test-topic

删除 Topic 动态参数。

  1. kafka-configs.sh --bootstrap-server \
  2. --entity-type topics --entity-name test-topic --alter \
  3. --delete-config retention.ms

Kafka 集群一键启动/停止脚本


  1. #/etc/profile 文件
  2. export KAFKA_HOME=/usr/local/kafka
  3. export PATH=$PATH:$KAFKA_HOME/bin

一键启动/停止脚本,查看状态需要安装 jps 工具。

  1. #! /bin/bash
  2. # 填写 Kafka Broker 节点地址
  3. hosts=(kafka1 kafka2 kafka3)
  4. # 打印启动分布式脚本信息
  5. mill=`date "+%N"`
  6. tdate=`date "+%Y-%m-%d %H:%M:%S,${mill:0:3}"`
  7. echo [$tdate] INFO [Kafka Cluster] begins to execute the $1 operation.
  8. # 执行分布式开启命令
  9. function start()
  10. {
  11.         for i in ${hosts[@]}
  12.                 do
  13.                         smill=`date "+%N"`
  14.                         stdate=`date "+%Y-%m-%d %H:%M:%S,${smill:0:3}"`
  15.                         ssh root@$i "source /etc/profile;echo [$stdate] INFO [Kafka Broker $i] begins to execute the startup operation.;kafka-server-start.sh $KAFKA_HOME/config/server.properties>/dev/null" &
  16.                         sleep 1
  17.                 done
  18. }
  19. # 执行分布式关闭命令
  20. function stop()
  21. {
  22.         for i in ${hosts[@]}
  23.                 do
  24.                         smill=`date "+%N"`
  25.                         stdate=`date "+%Y-%m-%d %H:%M:%S,${smill:0:3}"`
  26.                         ssh root@$i "source /etc/profile;echo [$stdate] INFO [Kafka Broker $i] begins to execute the shutdown operation.;kafka-server-stop.sh>/dev/null;" &
  27.                         sleep 1
  28.                 done
  29. }
  30. # 查看 Kafka Broker 节点状态
  31. function status()
  32. {
  33.         for i in ${hosts[@]}
  34.                 do
  35.                         smill=`date "+%N"`
  36.                         stdate=`date "+%Y-%m-%d %H:%M:%S,${smill:0:3}"`
  37.                         ssh root@$i "source /etc/profile;echo [$stdate] INFO [Kafka Broker $i] status message is :;jps | grep Kafka;" &
  38.                         sleep 1
  39.                 done
  40. }
  41. # 判断输入的 Kafka 命令参数是否有效
  42. case "$1" in
  43.     start)
  44.         start
  45.         ;;
  46.     stop)
  47.         stop
  48.         ;;
  49.     status)
  50.         status
  51.         ;;
  52.     *)
  53.         echo "Usage: $0 {start|stop|status}"
  54.         RETVAL=1
  55. esac


  • Kafka 动态配置了解下?(https://time.geekbang.org/column/article/113504)

  • 常见工具脚本大汇总(https://time.geekbang.org/column/article/116111)

  • Kafka 并不难学!

本文转载自:「Se7en的架构笔记」,原文:https://url.hi-linux.com/iRhUr,版权归原作者所有。欢迎投稿,投稿邮箱: editor@hi-linux.com。


最近,我们建立了一个技术交流微信群。目前群里已加入了不少行业内的大神,有兴趣的同学可以加入和我们一起交流技术,在 「奇妙的 Linux 世界」 公众号直接回复 「加群」 邀请你入群。





万字干货,分布式数据库 HBase 中文入门指南




