赞
踩
目录
(1)启动 broker
一般需要在后台启动,因此加上"-daemon"参数。
bin/kafka-server-start.sh -daemon config/server.properties
(2)关闭 broker
bin/kafka-server-stop.sh
对kafka-topic.sh相关的操作通常是指定 --zookeeper 参数。然而从 Kafka 2.2 版本开始,社区推荐用 --bootstrap-server 参数替换 --zookeeper 参数,并且显式地将后者标记为“已过期”。(2.2以上也兼容 --zookeeper ,但如果是2.2以前的版本,就只能使用 --zookeeper 参数。)
社区推荐使用 --bootstrap-server 而非 --zookeeper 的原因主要有两个。
- 使用 --zookeeper 会绕过 Kafka 的安全体系。这就是说,即使你为 Kafka 集群设置了安全认证,限制了主题的创建,如果你使用 --zookeeper 的命令,依然能成功创建任意主题,不受认证体系的约束。这显然是 Kafka 集群的运维人员不希望看到的。
- 使用 --bootstrap-server 与集群进行交互,越来越成为使用 Kafka 的标准姿势。换句话说,以后会有越来越少的命令和 API 需要与 ZooKeeper 进行连接。这样,我们只需要一套连接信息,就能与 Kafka 进行全方位的交互,不用像以前一样,必须同时维护 ZooKeeper 和 Broker 的连接信息。
kafka2.2以前的版本
bin/kafka-topics.sh --zookeeper {zk_host}:{zk_port}[/{path}] --list
--zookeeper参数后接zk的地址和端口,如果在部署kafka集群时指定了zk的路径,还需要加上特定的路径。例如,kafka集群配置的zk地址为:zookeeper.connect=broker-test1:2181,broker-test2:2181,broker-test3:2181/kafka 那么列出集群topic的命令如下:
./kafka-topics.sh --zookeeper broker-test1:2181/kafka --list
kafka2.2开始的版本
bin/kafka-topics.sh --bootstrap-server {broker_host}:{kafka_port} --list
kafka2.2以前的版本
bin/kafka-topics.sh --zookeeper {zk_host}:{zk_port}[/{path}] --create --topic <topic_name> --partitions <partition_num> --replication-factor <replication_factor_num>
例如,创建一个名为test的topic,指定分区数为3,副本数为2:
bin/kafka-topics.sh --zookeeper broker-test1:2181/kafka --create --topic test --partitions 3 --replication-factor 2
kafka2.2开始的版本
bin/kafka-topics.sh --bootstrap-server {broker_host}:{kafka_port} --create --topic <topic_name> --partitions <partition_num> --replication-factor <replication_factor_num>
kafka2.2以前的版本
bin/kafka-topics.sh --zookeeper {zk_host}:{zk_port}[/{path}] --describe --topic <topic_name>
例如,查看名为xhs的topic信息:
bin/kafka-topics.sh --zookeeper broker-test1:2181/kafka --describe --topic xhs
kafka2.2开始的版本
bin/kafka-topics.sh --bootstrap-server {broker_host}:{kafka_port} --describe --topic <topic_name>
如果 describe 命令不指定具体的主题名称,那么 Kafka 默认会返回所有“可见”主题的详细数据给你。
这里的“可见”,是指发起这个命令的用户能够看到的 Kafka 主题。2.2版本开始创建主题时,使用 --zookeeper 和 --bootstrap-server 的区别是一样的。如果指定了 --bootstrap-server,那么这条命令就会受到安全认证体系的约束,即对命令发起者进行权限验证,然后返回它能看到的主题。否则,如果指定 --zookeeper 参数,那么默认会返回集群中所有的主题详细数据。基于这些原因,建议在2.2版本开始最好统一使用 --bootstrap-server 连接参数。
其实就是增加分区,目前 Kafka 不允许减少某个主题的分区数。可以使用 kafka-topics 脚本,结合 --alter 参数来增加某个主题的分区数。
kafka2.2以前的版本
bin/kafka-topics.sh --zookeeper {zk_host}:{zk_port}[/{path}] --alter --topic <topic_name> --partitions <新分区数>
例如,修改名为test-zl的topic分区数为3:
bin/kafka-topics.sh --zookeeper broker-test1:2181/kafka --alter --topic test-zl --partitions 3
kafka2.2开始的版本
bin/kafka-topics.sh --bootstrap-server {broker_host}:{kafka_port} --alter --topic <topic_name> --partitions <新分区数>
⚠️这里要注意的是,你指定的分区数一定要比原有分区数大,否则 Kafka 会抛出 InvalidPartitionsException 异常。
kafka2.2以前的版本
bin/kafka-topics.sh --zookeeper {zk_host}:{zk_port}[/{path}] --delete --topic <topic_name>
例如,删除名为test-zl的topic:
bin/kafka-topics.sh --zookeeper broker-test1:2181/kafka --delete --topic test-zl
kafka2.2开始的版本
bin/kafka-topics.sh --bootstrap-server {broker_host}:{kafka_port} --delete --topic <topic_name>
Kafka删除操作是异步的,执行完这条命令不代表主题立即就被删除了。它仅仅是被标记成“已删除”状态而已。Kafka 会在后台默默地开启主题删除操作。因此,通常都需要等待一段时间才会看到效果。
在主题创建之后,我们可以使用 kafka-configs 脚本修改对应的参数。(修改topic级别参数)
设置常规的主题级别参数,还是使用 --zookeeper。
假设我们要设置主题级别参数 max.message.bytes,那么命令如下:
bin/kafka-configs.sh --zookeeper {zk_host}:{zk_port}[/{path}] --entity-type topics --entity-name <topic_name> --alter --add-config max.message.bytes=10485760
bin/kafka-console-producer.sh --broker-list {broker_host}:{kafka_port} --topic <topic_name>
例如,在下面这段命令中,我们指定生产者参数 acks 为 -1,同时启用 LZ4 的压缩算法:
bin/kafka-console-producer.sh --broker-list broker-test1:9092 --topic test-topic --request-required-acks -1 --producer-property compression.type=lz4
从头开始消费
bin/kafka-console-consumer.sh --bootstrap-server {broker_host}:{kafka_port} --topic <topic_name> --from-beginning
--from-beginning参数表示从该主题最早的位移开始消费,例如消费名为testzl的topic数据:
bin/kafka-console-consumer.sh --bootstrap-server broker-test1:9092 --topic testzl --from-beginning
指定消费组
bin/kafka-console-consumer.sh --bootstrap-server {broker_host}:{kafka_port} --topic <topic_name> --group <group_name>
--group参数可以指定消费者组,例如消费组group1从名为test-topic的topic消费数据:
bin/kafka-console-consumer.sh --bootstrap-server broker-test1:9092 --topic test-topic --group group1
⚠️注意:在这段命令中,我们指定了 group 信息。如果没有指定的话,每次运行 Console Consumer,它都会自动生成一个新的消费者组来消费。久而久之,你会发现你的集群中有大量的以 console-consumer 开头的消费者组。通常情况下,最好还是加上 group。
bin/kafka-consumer-groups.sh --bootstrap-server broker-test1:9092 --describe --group group1
对于 __consumer_offsets 而言,由于它保存了消费者组的位移数据,有时候直接查看该主题消息是很方便的事情。下面的命令可以帮助我们直接查看消费者组提交的位移数据。
bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
除了查看位移提交数据,我们还可以直接读取该主题消息,查看消费者组的状态信息。
bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$GroupMetadataMessageFormatter" --from-beginning
对于内部主题 __transaction_state 而言,方法是相同的。你只需要指定 kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter 即可。
bin/kafka-consumer-groups.sh --bootstrap-server {broker_host}:{kafka_port} --describe --group <group_name>
重置位移可以大致从两个维度来进行。
1.位移维度。
2.时间维度。
DateTime 策略
重点记录一下比较常用的DateTime策略。DateTime 策略直接指定 --to-datetime
例如,把kafka topic TestRawLog 的 group.id: "group1" offset 重置到20210619 23:50
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group group1 --topic TestRawLog --reset-offsets --to-datetime 2021-06-19T23:50:00.000+0800 --execute
最后一个参数--excute如果不加,只是打印位移调整方案,不实际执行;加上参数--excute执行真正的位移调整。
⚠️注意:需要考虑到时差问题。默认设定时间是UTC时间,并非北京时间,北京时区是东八区,领先UTC八个小时。因此需要加上8小时时差。即:+0800
- $ bin/kafka-preferred-replica-election.sh --zookeeper localhost:2181 --path-to-json-file <path>/preferred-leader-plan.json
- # {"partitions":[{"topic":"test-topic","partition":0}]}
- # 生成分配策略
- $ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate
- # 执行分配策略
- $ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file cluster-reassignment.json --execute
- # 验证分配
- $ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file cluster-reassignment.json --verify
- # 可通过编写分配策略,增加副本因子 略
$ bin/kafka-producer-perf-test.sh --topic test_topic --num-records 50000000 --throughput -1 --record-size 200 --producer-props bootstrap.servers=localhost:9092 acks=1 linger.ms=50
$ bin/kafka-consumer-perf-test.sh --broker-list localhost:9092 --messages 500000000 --topic test_topic
必须参数为--group, 不指定--topic,默认为所有topic。
$ bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group group1
$ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /dfs5/kafka/data/secLog-2/00000000000110325000.log --print-data-log --deep-iteration > secLog.log
- # 获取当前最大位移
- $ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9200 --topic test --time -1
- # 当前获取最早位移
- $ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9200 --topic test --time -2
- # 以上两个数相减,即可得出 topic 当前在集群的消息总数
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。