当前位置:   article > 正文

kafka shell脚本用法详解_--value-deserializer

--value-deserializer

kafka太强大了,而shell脚本也很多,网上查了查资料,这里当自己学习记录一下。

1. shell列举

kafka安装目录下的bin目录包含了很多运维可操作的shell脚本,列举如下:

脚本名称用途描述
connect-distributed.sh连接kafka集群模式
connect-standalone.sh连接kafka单机模式
kafka-acls.shtodo
kafka-broker-api-versions.shtodo
kafka-configs.sh配置管理脚本
kafka-console-consumer.shkafka消费者控制台
kafka-console-producer.shkafka生产者控制台
kafka-consumer-groups.shkafka消费者组相关信息
kafka-consumer-perf-test.shkafka消费者性能测试脚本
kafka-delegation-tokens.shtodo
kafka-delete-records.sh删除低水位的日志文件
kafka-log-dirs.shkafka消息日志目录信息
kafka-mirror-maker.sh不同数据中心kafka集群复制工具
kafka-preferred-replica-election.sh触发preferred replica选举
kafka-producer-perf-test.shkafka生产者性能测试脚本
kafka-reassign-partitions.sh分区重分配脚本
kafka-replay-log-producer.shtodo
kafka-replica-verification.sh复制进度验证脚本
kafka-run-class.shtodo
kafka-server-start.sh启动kafka服务
kafka-server-stop.sh停止kafka服务
kafka-simple-consumer-shell.shdeprecated,推荐使用kafka-console-consumer.sh
kafka-streams-application-reset.shtodo
kafka-topics.shtopic管理脚本
kafka-verifiable-consumer.sh可检验的kafka消费者
kafka-verifiable-producer.sh可检验的kafka生产者
trogdor.shtodo
zookeeper-security-migration.shtodo
zookeeper-server-start.sh启动zk服务
zookeeper-server-stop.sh停止zk服务
zookeeper-shell.shzk客户端

详细说明

接下来详细说明每个脚本的使用方法。

connect-distributed.sh&connect-standalone.sh

Kafka Connect是在0.9以后加入的功能,主要是用来将其他系统的数据导入到Kafka,然后再将Kafka中的数据导出到另外的系统。可以用来做实时数据同步的ETL,数据实时分析处理等。
主要有2种模式:Standalone(单机模式)和Distribute(分布式模式)。
单机主要用来开发,测试,分布式的用于生产环境。
用法比较复杂,建议参考:Kafka Connect教程详解 https://3gods.com/bigdata/Kafka-Connect-Details.html

kafka-broker-api-versions.sh

用法:bin/kafka-broker-api-versions.sh --bootstrap-server 10.0.55.229:9092,10.0.55.229:9093,10.0.55.229:9094

kafka-configs.sh

配置管理脚本,这个脚本主要分两类用法:describe和alter。
describe相关用法:
查看每个topic的配置:bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type topics
部分结果如下:

 

  1. Configs for topic 'afei' are
  2. Configs for topic 'TOPIC-TEST-AFEI' are retention.ms=600000
  3. Configs for topic '__consumer_offsets' are segment.bytes=104857600,cleanup.policy=compact,compression.type=producer

查看broker的配置:bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type brokers --entity-name 0

说明:0是broker.id,因为entity-type为brokers,所以entity-name表示broker.id。

alter相关用法:
给指定topic增加配置:bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name TOPIC-TEST-AFEI --add-config retention.ms=600000
给指定topic删除配置:bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name TOPIC-TEST-AFEI --delete-config max.message.bytes

通过该脚本可以管理的属性,可以通过执行bin/kafka-configs.sh得到的输出中--add-config的desc可以得到。

kafka-broker-api-versions.sh

用法:bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092

kafka-console-consumer.sh

用法:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic afei [--group groupName] [--partition 目标分区]

这个命令后面还可带很多参数:
--key-deserializer:指定key的反序列化方式,默认是 org.apache.kafka.common.serialization.StringDeserializer
--value-deserializer:指定value的反序列化方式,默认是 org.apache.kafka.common.serialization.StringDeserializer
--from-beginning:从最早的消息开始消费,默认是从最新消息开始消费。
--offset: 从指定的消息位置开始消费,如果设置了这个参数,还需要带上--partition。否则会提示:The partition is required when offset is specified.
--timeout-ms:当消费者在这个参数指定时间间隔内没有收到消息就会推出,并抛出异常:kafka.consumer.ConsumerTimeoutException。
--whitelist:接收的topic白名单集合,和--topic二者取其一。例如:--whitelist "afei.*"(以afei开头的topic),--whitelist "afei"(指定afei这个topic),"afei|fly"(指定afei或者fly这两个topic)。另外一个参数--blacklist用法类似。

kafka-console-producer.sh

用法:bin/kafka-console-producer.sh --broker-list localhost:9092 --topic afei
,如果连接集群,那么broker-list参数格式为:HOST1:PORT1,HOST2:PORT2,HOST3:PORT3

这个命令后面还可带很多参数:
--key-serializer:指定key的序列化方式,默认是 org.apache.kafka.common.serialization.StringSerializer
--value-serializer:指定value的序列化方式,默认是 org.apache.kafka.common.serialization.StringSerializer

kafka-consumer-groups.sh

查看所有消费者组:bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
查看某个消费者组:bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group AfeiGroup --describe,输出结果如下:

 

  1. Note: This will not show information about old Zookeeper-based consumers.
  2. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  3. afei 0 8 8 0 consumer-1-7a46c647-8221-4aca-b6bf-ed14571fb0f1 /172.18.36.203 consumer-1
  4. afei 4 10 10 0 consumer-1-7a46c647-8221-4aca-b6bf-ed14571fb0f1 /172.18.36.203 consumer-1
  5. afei 1 8 8 0 consumer-1-7a46c647-8221-4aca-b6bf-ed14571fb0f1 /172.18.36.203 consumer-1
  6. afei 3 6 6 0 consumer-1-7a46c647-8221-4aca-b6bf-ed14571fb0f1 /172.18.36.203 consumer-1
  7. afei 2 9 9 0 consumer-1-7a46c647-8221-4aca-b6bf-ed14571fb0f1 /172.18.36.203 consumer-1
  • kafka-consumer-perf-test.sh

perf是performance的缩写,所以这个脚本是kafka消费者性能测试脚本。
用法:bin/kafka-consumer-perf-test.sh --broker-list localhost:9092 --group testGroup --topic afei --messages 1024
输出结果如下:

 

  1. start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
  2. 2018-07-02 22:49:10:068, 2018-07-02 22:49:12:077, 0.0001, 0.0001, 41, 20.4082, 19, 1990, 0.0001, 20.6030
  • kafka-delete-records.sh

用法:bin/kafka-delete-records.sh --bootstrap-server 10.0.55.229:9092,10.0.55.229:9093,10.0.55.229:9094 --offset-json-file offset.json,offset.json文件内容:

 

  1. {
  2. "partitions": [{
  3. "topic": "afei",
  4. "partition": 3,
  5. "offset": 10
  6. }],
  7. "version": 1
  8. }

执行结果如下,表示删除afei这个topic下分区为3的offset少于10的消息日志(不会删除offset=10的消息日志):

 

  1. Executing records delete operation
  2. Records delete operation completed:
  3. partition: afei-3 low_watermark: 10
  • kafka-log-dirs.sh

用法:bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe --topic-list afei[,topicName2,topicNameN],如果没有指定--topic-list,那么会输出所有kafka消息日志目录以及目录下所有topic信息。加上--topic-list参数后,输出结果如下,由这段结果可知,消息日志所在目录为/data/kafka-logs,并且afei这个topic有3个分区:

 

  1. {
  2. "version": 1,
  3. "brokers": [{
  4. "broker": 0,
  5. "logDirs": [{
  6. "logDir": "/data/kafka-logs",
  7. "error": null,
  8. "partitions": [{
  9. "partition": "afei-1",
  10. "size": 567,
  11. "offsetLag": 0,
  12. "isFuture": false
  13. }, {
  14. "partition": "afei-2",
  15. "size": 639,
  16. "offsetLag": 0,
  17. "isFuture": false
  18. }, {
  19. "partition": "afei-0",
  20. "size": 561,
  21. "offsetLag": 0,
  22. "isFuture": false
  23. }]
  24. }]
  25. }]
  26. }
  • kafka-preferred-replica-election.sh

用法:bin/kafka-preferred-replica-election.sh --zookeeper 10.0.55.208:2181/wallet,10.0.55.209:2181/wallet,10.0.55.210:2181/wallet --path-to-json-file afei-preferred.json(如果不带--path-to-json-file就是对所有topic进行preferred replica election),json文件内容如下::

 

  1. {
  2. "partitions": [{
  3. "topic": "afei",
  4. "partition": 0
  5. },
  6. {
  7. "topic": "afei",
  8. "partition": 1
  9. },
  10. {
  11. "topic": "afei",
  12. "partition": 2
  13. }]
  14. }

场景:在创建一个topic时,kafka尽量将partition均分在所有的brokers上,并且将replicas也均分在不同的broker上。每个partitiion的所有replicas叫做"assigned replicas","assigned replicas"中的第一个replicas叫"preferred replica",刚创建的topic一般"preferred replica"是leader。leader replica负责所有的读写。其他replica只是冷备状态,不接受读写请求。但随着时间推移,broker可能会主动停机甚至客观宕机,会发生leader选举迁移,导致机群的负载不均衡。我们期望对topic的leader进行重新负载均衡,让partition选择"preferred replica"做为leader。

kafka提供了一个参数auto.leader.rebalance.enable自动做这件事情,且默认为true,原理是一个后台线程检查并触发leader balance。但是并不建议把这个参数设置为true。因为担心这个自动选举发生在业务高峰期,从而导致影响业务。

验证:
操作比较简单,常见一个3个分区3个副本的topic,然后kill掉一个broker。这时候topic信息如下,我们可以看到broker.id为0的broker上有两个leader:

 

  1. Topic:afei PartitionCount:3 ReplicationFactor:3 Configs:
  2. Topic: afei Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
  3. Topic: afei Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 0,1,2
  4. Topic: afei Partition: 2 Leader: 0 Replicas: 2,0,1 Isr: 0,1,2

执行kafka-preferred-replica-election.sh脚本后,topic信息如下,leader均匀分布在3个不同的broker上,

 

  1. Topic:afei PartitionCount:3 ReplicationFactor:3 Configs:
  2. Topic: afei Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
  3. Topic: afei Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 0,1,2
  4. Topic: afei Partition: 2 Leader: 2 Replicas: 2,0,1 Isr: 0,1,2
  • kafka-producer-perf-test.sh

perf是performance的缩写,所以这个脚本是kafka生产者性能测试脚本。

  • kafka-reassign-partitions.sh

场景:将一些topic上的分区从当前所在broker移到其他比如新增的broker上。假设有个名为ORDER-DETAIL的topic,在broker.id为2的broker上:

 

  1. Topic:ORDER-DETAIL PartitionCount:1 ReplicationFactor:1 Configs:
  2. Topic: ORDER-DETAIL Partition: 0 Leader: 2 Replicas: 2 Isr: 2

现在想要把它移动到broker.id为1的broker上,执行脚本:bin/kafka-reassign-partitions.sh --zookeeper 10.0.55.208:2181/wallet,10.0.55.209:2181/wallet,10.0.55.210:2181/wallet --topics-to-move-json-file move.json --broker-list "1" --generate

--generate参数表示生成一个分区再分配配置,并不会真正的执行,命令执行结果如下:

 

  1. Current partition replica assignment
  2. {"version":1,"partitions":[{"topic":"ORDER-DETAIL","partition":0,"replicas":[2],"log_dirs":["any"]}]}
  3. Proposed partition reassignment configuration
  4. {"version":1,"partitions":[{"topic":"ORDER-DETAIL","partition":0,"replicas":[1],"log_dirs":["any"]}]}

我们只需要把第二段json内容保存到一个新建的final.json文件中(如果知道如何编写这段json内容,那么也可以不执行第一条命令),然后执行命令:bin/kafka-reassign-partitions.sh --zookeeper 10.0.55.208:2181/wallet,10.0.55.209:2181/wallet,10.0.55.210:2181/wallet --reassignment-json-file move_final.json --execute,此次执行的命令带有--execute参数,说明是真正的执行分区重分配。

通过这个命令还可以给某个topic增加副本,例如有一个名为ORDER-DETAIL的topic,有3个分区,但是只有1个副本,为了高可用,需要将副本数增加到2,那么编写replica.json文本内容如下:

 

  1. {
  2. "version": 1,
  3. "partitions": [{
  4. "topic": "ORDER-DETAIL",
  5. "partition": 0,
  6. "replicas": [1, 2]
  7. },
  8. {
  9. "topic": "ORDER-DETAIL",
  10. "partition": 1,
  11. "replicas": [0, 2]
  12. },
  13. {
  14. "topic": "ORDER-DETAIL",
  15. "partition": 2,
  16. "replicas": [1, 0]
  17. }]
  18. }

然后执行命令即可:bin/kafka-reassign-partitions.sh --zookeeper 10.0.55.208:2181/wallet,10.0.55.209:2181/wallet,10.0.55.210:2181/wallet --reassignment-json-file replica.json

  • kafka-replica-verification.sh

用法:bin/kafka-replica-verification.sh --broker-list 10.0.55.229:9092,10.0.55.229:9093,10.0.55.229:9094 [--topic-white-list afei],参数--topic-white-list指定要检查的目标topic。输出结果如下,如果输出信息为max lag is 0 for ...表示这个topic的复制没有任何延迟:

 

  1. 2018-07-03 15:04:46,889: verification process is started.
  2. 2018-07-03 15:05:16,811: max lag is 0 for partition multi-1 at offset 0 among 5 partitions
  3. 2018-07-03 15:05:46,812: max lag is 0 for partition multi-1 at offset 0 among 5 partitions
  4. ... ...
  • kafka-server-start.sh

用法:bin/kafka-server-start.sh -daemon config/server.properties,指定配置文件并以守护进程模式启动。

  • kafka-server-stop.sh

用法:bin/kafka-server-stop.sh 。说明,这个命令会kill掉当前服务器上所有kafka broker。但是这个脚本可能执行结果为:No kafka server to stop

分析原因:我们先看一下kafka-server-stop.sh脚本内容,这个脚本非常简单,就是得到所有包含kafka.Kafka的进程ID,但是由于kafka启动依赖比较多的jar,导致kafka进程的ps结果输出内容比较长,而ps输出结果受到PAGE_SIZE(其值通过命令getconf PAGE_SIZE可以得到)的限制,从而导致ps结果中看不到kafka\.Kafka,所以不能kill掉kafka server:

 

  1. SIGNAL=${SIGNAL:-TERM}
  2. PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $1}')
  3. if [ -z "$PIDS" ]; then
  4. echo "No kafka server to stop"
  5. exit 1
  6. else
  7. kill -s $SIGNAL $PIDS
  8. fi

为了kafka-server-stop.sh脚本可以正常执行,建议修改脚本如下,通过bin脚本所在目录的上级目录来查找进程ID,从而kill相关进程:

 

  1. cd `dirname $0`
  2. BIN_DIR=`pwd`
  3. cd ..
  4. DEPLOY_DIR=`pwd`
  5. SIGNAL=${SIGNAL:-TERM}
  6. PIDS=$(ps ax | grep -i "${DEPLOY_DIR}" | grep java | grep -v grep | awk '{print $1}')
  7. if [ -z "$PIDS" ]; then
  8. echo "No kafka server to stop"
  9. exit 1
  10. else
  11. kill -s $SIGNAL $PIDS
  12. fi
  • kafka-simple-consumer-shell.sh

deprecated,用法:bin/kafka-simple-consumer-shell.sh --broker-list 10.0.55.229:9092,10.0.55.229:9093,10.0.55.229:9094 --topic afei

  • kafka-topics.sh

创建topic: bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic afei --partitions 3 --replication-factor 1
删除topic: bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
修改topic: bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic afei --partitions 5,修改topic时只能增加分区数量。
查询topic: bin/kafka-topics.sh --zookeeper localhost:2181 --describe [ --topic afei ],查询时如果带上--topic topicName,那么表示只查询该topic的详细信息。这时候还可以带上--unavailable-partitions--under-replicated-partitions任意一个参数。

创建TOPIC:

./kafka-topics.sh  --create --zookeeper centos_1:2181  --replication-factor 1 --partitions 1 --topic wangcheng_test

查看TOPIC是否创建成功:

./kafka-topics.sh --list --zookeeper centos_1:2181 wangcheng_test

 

kafka-verifiable-consumer.sh

用法:bin/kafka-verifiable-consumer.sh --broker-list 10.0.55.229:9092,10.0.55.229:9093,10.0.55.229:9094 --topic afei --group-id groupName
这个脚本的作用是接收指定topic的消息消费,并发出消费者事件,例如:offset提交等。

kafka-verifiable-producer.sh

用法:bin/kafka-verifiable-producer.sh --broker-list 10.0.55.229:9092,10.0.55.229:9093,10.0.55.229:9094 --topic afei [--max-messages 64],建议使用该脚本时增加参数--max-messages,否则会不停的发送消息。
这个脚本的作用是持续发送消息到指定的topic中,参数--max-messages限制最大发送消息数。且每条发送的消息都会有响应信息,这就是和kafka-console-producer.sh最大的不同:

 

  1. [mwopr@jtcrtvdra35 kafka_2.12-1.1.0]$ bin/kafka-verifiable-producer.sh --broker-list 10.0.55.229:9092,10.0.55.229:9093,10.0.55.229:9094 --topic afei --max-messages 9
  2. {"timestamp":1530515959900,"name":"startup_complete"}
  3. {"timestamp":1530515960310,"name":"producer_send_success","key":null,"value":"1","offset":5,"topic":"afei","partition":0}
  4. {"timestamp":1530515960315,"name":"producer_send_success","key":null,"value":"4","offset":6,"topic":"afei","partition":0}
  5. {"timestamp":1530515960315,"name":"producer_send_success","key":null,"value":"7","offset":7,"topic":"afei","partition":0}
  6. {"timestamp":1530515960316,"name":"producer_send_success","key":null,"value":"0","offset":5,"topic":"afei","partition":1}
  7. {"timestamp":1530515960316,"name":"producer_send_success","key":null,"value":"3","offset":6,"topic":"afei","partition":1}
  8. {"timestamp":1530515960316,"name":"producer_send_success","key":null,"value":"6","offset":7,"topic":"afei","partition":1}
  9. {"timestamp":1530515960316,"name":"producer_send_success","key":null,"value":"2","offset":6,"topic":"afei","partition":2}
  10. {"timestamp":1530515960316,"name":"producer_send_success","key":null,"value":"5","offset":7,"topic":"afei","partition":2}
  11. {"timestamp":1530515960316,"name":"producer_send_success","key":null,"value":"8","offset":8,"topic":"afei","partition":2}
  12. {"timestamp":1530515960333,"name":"shutdown_complete"}
  13. {"timestamp":1530515960334,"name":"tool_data","sent":9,"acked":9,"target_throughput":-1,"avg_throughput":20.689655172413794}

afei这个topic有3个分区,使用kafka-verifiable-producer.sh发送9条消息。根据输出结果可以看出,往每个分区发送了3条消息。另外,我们可以通过设置参数--max-messages一个比较大的值,可以压测一下搭建的kafka集群环境。

  • zookeeper-shell.sh

用法:bin/zookeeper-shell.sh localhost:2181[/path],如果kafka集群的zk配置了chroot路径,那么需要加上/path,例如bin/zookeeper-shell.sh localhost:2181/mykafka,登陆zk后,就可以查看kafka写在zk上的节点信息。例如查看有哪些broker,以及broker的详细信息:

 

  1. ls /brokers/ids
  2. [0]
  3. get /brokers/ids/0
  4. {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://izwz91rhzhed2c54e6yx87z:9092"],"jmx_port":-1,"host":"izwz91rhzhed2c54e6yx87z","timestamp":"1530435834272","port":9092,"version":4}
  5. cZxid = 0x2d3
  6. ctime = Sun Jul 01 17:03:54 CST 2018
  7. mZxid = 0x2d3
  8. mtime = Sun Jul 01 17:03:54 CST 2018
  9. pZxid = 0x2d3
  10. cversion = 0
  11. dataVersion = 0
  12. aclVersion = 0
  13. ephemeralOwner = 0x1642cb09421006c
  14. dataLength = 216
  15. numChildren = 0
  • 写在最后

上面的这些kafka运维脚本,有些是指定参数--zookeeper,有些是指定参数--broker-list,有些是指定参数--bootstrap-server。
这实际上是历史问题。broker-list代表broker地址,而bootstrap-server代表连接起点,可以从中拉取broker地址信息(前面的[4. kafka生产者&消费者]已经分析过)。bootstrap-server的命名更高级点。还有通过zookeeper连接的,kafka早起很多信息存方在zk中,后期慢慢弱化了zk的作用,这三个参数代表kafka的三个时代。往好的讲是见证kafka设计的理念变迁,往坏的讲:什么**玩意儿,绕的一笔(来自厮大大的解答),哈。


例子

例子1:

创建TOPIC

./kafka-topics.sh  --create --zookeeper centos_1:2181  --replication-factor 1 --partitions 1 --topic wangcheng_test

查看TOPIC是否创建成功

./kafka-topics.sh --list --zookeeper centos_1:2181 wangcheng_test

生产者发送消息

./kafka-console-producer.sh --broker-list centos_1:9092 --topic wangcheng_test

消费者消费消息(可以在集群任何一台机器上消费)

./kafka-console-consumer.sh --zookeeper localhost:2181 --topic wangcheng_test --from-beginning
到此为止说明集群正确

创建多备份的topic

1、创建3个备份的topic
./kafka-topics.sh  --create --zookeeper centos_1:2181  --replication-factor 3 --partitions 1 --topic topic_1
2、查看topic_1的状态
./kafka-topics.sh --describe --zookeeper centos_2:2181 --topic topic_1
Topic:topic_1   PartitionCount:1        ReplicationFactor:3     Configs:
Topic: topic_1  Partition: 0    Leader: 2       Replicas: 2,1,3 Isr: 2,1,3
 

可以看到当前的Partition数量为1,ReplicationFactor备份数为3,leader是2
向centos_2中的kafka发信息:

./kafka-console-producer.sh --broker-list centos_2:9092 --topic topic_1
topic_1_test
 

我们将centos_2上的kafka进程关闭,在centos_2上开启一个consumer:

./kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic_1 --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
topic_1_test
 

再查看下topic_1的status:

  1. ./kafka-topics.sh --describe --zookeeper centos_2:2181 --topic topic_1
  2. Topic:topic_1 PartitionCount:1 ReplicationFactor:3 Configs:
  3. Topic: topic_1 Partition: 0 Leader: 1 Replicas: 2,1,3 Isr: 1,3

leader变成了1,ISR变成了1,3
重启2,再查看状态:

  1. ./kafka-topics.sh --describe --zookeeper centos_2:2181 --topic topic_1
  2. Topic:topic_1 PartitionCount:1 ReplicationFactor:3 Configs:
  3. Topic: topic_1 Partition: 0 Leader: 1 Replicas: 2,1,3 Isr: 1,3,2

Isr变回了1,3,2

再试下先关闭centos_1上的进程,发送再重启看下现象:

  1. [root@centos_1 bin]# jps -l
  2. 11525 kafka.Kafka
  3. 14341 sun.tools.jps.Jps
  4. 7532 org.apache.zookeeper.server.quorum.QuorumPeerMain
  5. [root@centos_1 bin]# kill 11525
  6. [root@centos_1 bin]# jps -l
  7. 14357 sun.tools.jps.Jps
  8. 7532 org.apache.zookeeper.server.quorum.QuorumPeerMain
  9. [root@centos_1 bin]# ./kafka-topics.sh --describe --zookeeper centos_2:2181 --topic topic_1
  10. Topic:topic_1 PartitionCount:1 ReplicationFactor:3 Configs:
  11. Topic: topic_1 Partition: 0 Leader: 2 Replicas: 2,1,3 Isr: 3,2
  12. [root@centos_1 bin]# ./kafka-console-producer.sh --broker-list centos_2:9092 --topic topic_1
  13. >2345678
  14. [root@centos_2 bin]# ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic_1 --from-beginning
  15. Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
  16. topic_1_test
  17. 2345678
  18. [root@centos_1 bin]# ./kafka-server-start.sh ../config/server.properties &
  19. [root@centos_1 bin]# ./kafka-topics.sh --describe --zookeeper centos_2:2181 --topic topic_1
  20. Topic:topic_1 PartitionCount:1 ReplicationFactor:3 Configs:
  21. Topic: topic_1 Partition: 0 Leader: 2 Replicas: 2,1,3 Isr: 3,2,1

根据以上现象说明,broker重启会重新拉取副本信息

 

参考:

https://www.jianshu.com/p/3ed342a28a9d

https://www.jianshu.com/p/5ed30e789b0f

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/176188
推荐阅读
相关标签
  

闽ICP备14008679号