当前位置:   article > 正文

Kafka命令介绍和使用以及案例_kafka应用案例

kafka应用案例

资料

所有脚本 

  1. [root@localhost kafka]# ls bin/ -al
  2. 总用量 136
  3. drwxr-xr-x. 3 root root 4096 310 2019 .
  4. drwxr-xr-x. 9 root root 134 816 11:14 ..
  5. -rwxr-xr-x. 1 root root 1421 310 2019 connect-distributed.sh
  6. -rwxr-xr-x. 1 root root 1418 310 2019 connect-standalone.sh
  7. -rwxr-xr-x. 1 root root 861 310 2019 kafka-acls.sh
  8. -rwxr-xr-x. 1 root root 873 310 2019 kafka-broker-api-versions.sh
  9. -rwxr-xr-x. 1 root root 864 310 2019 kafka-configs.sh
  10. -rwxr-xr-x. 1 root root 945 310 2019 kafka-console-consumer.sh
  11. -rwxr-xr-x. 1 root root 944 310 2019 kafka-console-producer.sh
  12. -rwxr-xr-x. 1 root root 871 310 2019 kafka-consumer-groups.sh
  13. -rwxr-xr-x. 1 root root 948 310 2019 kafka-consumer-perf-test.sh
  14. -rwxr-xr-x. 1 root root 871 310 2019 kafka-delegation-tokens.sh
  15. -rwxr-xr-x. 1 root root 869 310 2019 kafka-delete-records.sh
  16. -rwxr-xr-x. 1 root root 866 310 2019 kafka-dump-log.sh
  17. -rwxr-xr-x. 1 root root 863 310 2019 kafka-log-dirs.sh
  18. -rwxr-xr-x. 1 root root 862 310 2019 kafka-mirror-maker.sh
  19. -rwxr-xr-x. 1 root root 886 310 2019 kafka-preferred-replica-election.sh
  20. -rwxr-xr-x. 1 root root 959 310 2019 kafka-producer-perf-test.sh
  21. -rwxr-xr-x. 1 root root 874 310 2019 kafka-reassign-partitions.sh
  22. -rwxr-xr-x. 1 root root 874 310 2019 kafka-replica-verification.sh
  23. -rwxr-xr-x. 1 root root 9289 310 2019 kafka-run-class.sh
  24. -rwxr-xr-x. 1 root root 1376 310 2019 kafka-server-start.sh
  25. -rwxr-xr-x. 1 root root 997 310 2019 kafka-server-stop.sh
  26. -rwxr-xr-x. 1 root root 945 310 2019 kafka-streams-application-reset.sh
  27. -rwxr-xr-x. 1 root root 863 310 2019 kafka-topics.sh
  28. -rwxr-xr-x. 1 root root 958 310 2019 kafka-verifiable-consumer.sh
  29. -rwxr-xr-x. 1 root root 958 310 2019 kafka-verifiable-producer.sh
  30. -rwxr-xr-x. 1 root root 1722 310 2019 trogdor.sh
  31. drwxr-xr-x. 2 root root 4096 310 2019 windows
  32. -rwxr-xr-x. 1 root root 867 310 2019 zookeeper-security-migration.sh
  33. -rwxr-xr-x. 1 root root 1393 310 2019 zookeeper-server-start.sh
  34. -rwxr-xr-x. 1 root root 1001 310 2019 zookeeper-server-stop.sh
  35. -rwxr-xr-x. 1 root root 968 310 2019 zookeeper-shell.sh

1. kafka版本

kafka没有提供命令或api直接差kafka版本。可以用如下方法:

  1. [root@localhost kafka]# find ./libs/ -name \*kafka_\* | head -1 | grep -o '\kafka[^\n]*'
  2. kafka_2.11-2.2.0.jar

结果:
就可以看到kafka的具体版本了。
其中,2.11为scala版本,2.2.0为kafka版本。

export PATH=$ZOOKEEPER_HOME/bin:$PATH:/opt/kafa/kafka/bin

集群管理

  1. # 启动zookeeper
  2. zookeeper-server-start.sh config/zookeeper.properties &
  3. # 停止zookeeper
  4. zookeeper-server-stop.sh
  5. # 前台启动broker Ctrl + C 关闭
  6. kafka-server-start.sh <path>/server.properties
  7. # 后台启动broker
  8. kafka-server-start.sh -daemon <path>/server.properties
  9. # 关闭broker
  10. kafka-server-stop.sh

topic相关

kafka-topics.sh 脚本主要负责 topic 相关的操作。它的具体实现是通过kafka-run-class 来调用 TopicCommand 类,并根据参数执行指定的功能。

TopicCommand.createTopic() 方法负责创建 Topic,其核心逻辑是确定新建 Topic 中有多少个分区及每个分区中的副本如何分配,既支持使用 replica-assignment 参数手动分配,也支持使用 partitions 参数和 replication-factor 参数指定分区个数和副本个数进行自动分配。之后该方法会将副本分配结果写入到 ZooKeeper 中。
 

注意:Kafka 从 2.2 版本开始将 kafka-topic.sh 脚本中的 −−zookeeper 参数标注为 “过时”,推荐使用 −−bootstrap-server 参数。若读者依旧使用的是 2.1 及以下版本,请将下述的 --bootstrap-server 参数及其值手动替换为 --zookeeper zk1:2181,zk2:2181,zk:2181。一定要注意两者参数值所指向的集群地址是不同的。

形式一:

使用 replica-assignment 参数手动指定 Topic Partition Replica 与 Kafka Broker 之间的存储映射关系。

  1. [root@localhost kafka]# kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test6 --replica-assignment 1:0,1:0,1:0
  2. [root@localhost kafka]# ls kafka-logs/test6-
  3. test6-0/ test6-1/ test6-2/
  • 格式:broker-id:副本数量,broker-id:副本数量,broker-id:副本数量
  • 注意:broker-id是broker-server的id号;副本数量不能大于broker的数量(我这里就一台broker);分区数量是用“,” 逗号分隔
  •  注意:1:0,1:0,1:0 中的数字均为 broker.id;3个分区(逗号分隔);每个分区有一个副本(副本所在的 broker 以冒号分割)。
  • 此形式在最新的 2.3 版本中会报 Aborted due to timeout 异常,建议使用形式二。

形式二,使用 partitions 和 replication-factor 参数自动分配存储映射关系

1. 创建topic(4个分区,1个副本[前提是副本数量小于等于broker数量,我目前就一个broker])

注意:Topic 名称中一定不要同时出现下划线 (’_’) 和小数点 (’.’)。
WARNING: Due to limitations in metric names, topics with a period (’.’) or underscore(’_’) could collide. To avoid issues ot os best to use either, but not both.

错误示范: 

  1. [root@localhost kafka]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic test
  2. Error while executing topic command : Replication factor: 2 larger than available brokers: 1.
  3. [2022-08-16 15:59:09,119] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 2 larger than available brokers: 1.
  4. (kafka.admin.TopicCommand$)

正确示范:

  1. [root@localhost kafka]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic test
  2. Created topic test.
  3. [root@localhost kafka]# ls kafka-logs/
  4. cleaner-offset-checkpoint log-start-offset-checkpoint meta.properties recovery-point-offset-checkpoint replication-offset-checkpoint test-0 test-1 test-2 test-3

kafka版本 >= 2.2

  1. [root@localhost kafka]# kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test1
  2. [root@localhost kafka]# ls kafka-logs/
  3. cleaner-offset-checkpoint log-start-offset-checkpoint meta.properties recovery-point-offset-checkpoint replication-offset-checkpoint test-0 test-1 test1-0 test-2 test-3
  •  --create:指定创建topic动作
  • --topic:指定新建topic的名称
  • --zookeeper: 指定kafka连接zk的连接url,该值和server.properties文件中的配置项{zookeeper.connect}一样
  • --partitions:指定当前创建的kafka分区数量,默认为1个
  • --replication-factor:指定每个分区的复制因子个数,默认1个

创建 Topic 时也可指定参数:

[root@localhost kafka]#  kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test8 --partitions 3 --replication-factor 1 --config cleanup.policy=compact --config retention.ms=500

2 # 查看所有topic的详细信息 

  1. [root@localhost kafka]# kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe
  2. Topic:test PartitionCount:4 ReplicationFactor:1 Configs:
  3. Topic: test Partition: 0 Leader: 1 Replicas: 1 Isr: 1
  4. Topic: test Partition: 1 Leader: 1 Replicas: 1 Isr: 1
  5. Topic: test Partition: 2 Leader: 1 Replicas: 1 Isr: 1
  6. Topic: test Partition: 3 Leader: 1 Replicas: 1 Isr: 1
  7. Topic:test1 PartitionCount:1 ReplicationFactor:1 Configs:
  8. Topic: test1 Partition: 0 Leader: 1 Replicas: 1 Isr: 1
  9. #查询指定topic的详细信息
  10. [root@localhost kafka]# kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic test
  11. Topic:test PartitionCount:4 ReplicationFactor:1 Configs:
  12. Topic: test Partition: 0 Leader: 1 Replicas: 1 Isr: 1
  13. Topic: test Partition: 1 Leader: 1 Replicas: 1 Isr: 1
  14. Topic: test Partition: 2 Leader: 1 Replicas: 1 Isr: 1
  15. Topic: test Partition: 3 Leader: 1 Replicas: 1 Isr: 1
  16. [root@localhost kafka]# kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic test1
  17. Topic:test1 PartitionCount:1 ReplicationFactor:1 Configs:
  18. Topic: test1 Partition: 0 Leader: 1 Replicas: 1 Isr: 1
  •     PartitionCount:partition 个数。
  •  ReplicationFactor:副本个数。
  •  Partition:partition 编号,从 0 开始递增。
  •     Leader:当前 partition 起作用的 breaker.id。
  •    Replicas: 当前副本数据所在的 breaker.id,是一个列表,排在最前面的其作用。
  •    Isr:当前 kakfa 集群中可用的 breaker.id 列表

3. 查询topic列表

  1. [root@localhost kafka]# kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
  2. test
  3. test1
  4. #查询topic列表(支持0.9版本+
  5. [root@localhost kafka]# kafka-topics.sh --list --bootstrap-server localhost:9092
  6. test
  7. test1

  4. # 分区扩容,注意:分区数量只能增加,不能减少;不能用来修改副本个数。(请使用 kafka-reassign-partitions.sh 脚本增加副本数)

  1. #kafka版本 < 2.2
  2. [root@localhost kafka]# kafka-topics.sh --zookeeper localhost:2181 --alter --topic test1 --partitions 2
  3. WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
  4. Adding partitions succeeded!
  5. [root@localhost kafka]# kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic test1
  6. Topic:test1 PartitionCount:2 ReplicationFactor:1 Configs:
  7. Topic: test1 Partition: 0 Leader: 1 Replicas: 1 Isr: 1
  8. Topic: test1 Partition: 1 Leader: 1 Replicas: 1 Isr: 1
  9. # kafka版本 >= 2.2
  10. [root@localhost kafka]# kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic test1 --partitions 3
  11. [root@localhost kafka]# kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic test1
  12. Topic:test1 PartitionCount:3 ReplicationFactor:1 Configs:
  13. Topic: test1 Partition: 0 Leader: 1 Replicas: 1 Isr: 1
  14. Topic: test1 Partition: 1 Leader: 1 Replicas: 1 Isr: 1
  15. Topic: test1 Partition: 2 Leader: 1 Replicas: 1 Isr: 1

5.# 删除topic

若 delete.topic.enable=true
 直接彻底删除该 Topic。
若 delete.topic.enable=false
 如果当前 Topic 没有使用过即没有传输过信息:可以彻底删除。
 如果当前 Topic 有使用过即有过传输过信息:并没有真正删除 Topic 只是把这个 Topic 标记为删除(marked for deletion),重启 Kafka Server 后删除。

:delete.topic.enable=true 配置信息位于配置文件 config/server.properties 中(较新的版本中无显式配置,默认为 true)。

  1. [root@localhost kafka]# kafka-topics.sh --zookeeper localhost:2181 --delete --topic test1
  2. Topic test1 is marked for deletion.
  3. Note: This will have no impact if delete.topic.enable is not set to true.
  4. [root@localhost kafka]# kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic test1
  5. Error while executing topic command : Topics in [] does not exist
  6. [2022-08-16 16:08:55,601] ERROR java.lang.IllegalArgumentException: Topics in [] does not exist
  7. at kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:416)
  8. at kafka.admin.TopicCommand$ZookeeperTopicService.describeTopic(TopicCommand.scala:332)
  9. at kafka.admin.TopicCommand$.main(TopicCommand.scala:66)
  10. at kafka.admin.TopicCommand.main(TopicCommand.scala)

6.增加配置

kafka-topics.sh --alter --bootstrap-server localhost:9092 --topic topicName --config flush.messages=1

 7.删除配置

kafka-topics.sh --alter --bootstrap-server localhost:9092 --topic topicName --delete-config flush.messages

Topic 级别配置属性 

当如下所示的属性配置到 Topic 上时,将会覆盖 server.properties 上对应的属性。

属性名值类型默认值有效值服务器默认属性描述
cleanup.policylistdeletedelete
compact
log.cleanup.policy过期或达到上限日志的清理策略。
delete:删除
compact:压缩
compression.typestringproduceruncompressed
snappy
lz4
gzip
producer
compression.type指定给该topic最终的压缩类型
delete.retention.mslong86400000[0,…]log.cleaner.delete.retention.ms压缩的日志保留的最长时间,也是客户端消费消息的最长时间。
与 log.retention.minutes 的区别在于:一个控制未压缩的数据,一个控制压缩后的数据。
file.delete.delay.mslong60000[0,…]log.segment.delete.delay.ms从文件系统中删除前所等待的时间
flush.messageslong9223372036854775807[0,…]log.flush.interval.messages在消息刷到磁盘之前,日志分区收集的消息数
flush.mslong9223372036854775807[0,…]log.flush.interval.ms消息在刷到磁盘之前,保存在内存中的最长时间,单位是ms
index.interval.bytesint4096[0,…]log.index.interval.bytes执行 fetch 操作后,扫描最近的 offset 运行空间的大小。
设置越大,代表扫描速度越快,但是也更耗内存。
(一般情况下不需要设置此参数)
message.max.bytesint1000012[0,…]message.max.byteslog中能够容纳消息的最大字节数
min.cleanable.dirty.ratiodouble0.5[0,…,1]log.cleaner.min.cleanable.ratio日志清理的频率控制,占该log的百分比。
越大意味着更高效的清理,同时会存在空间浪费问题
retention.byteslong-1log.retention.bytes
topic每个分区的最大文件大小。
一个 topic 的大小限制 = 分区数 * log.retention.bytes。
-1 表示没有大小限制。
retention.msint604800000[-1,…]log.retention.minutes
日志文件保留的分钟数。
数据存储的最大时间超过这个时间会根据 log.cleanup.policy 设置的策略处理数据
segment.bytesint1073741824[14,…]log.segment.bytes每个 segment 的大小 (默认为1G)
segment.index.bytesint10485760[0,…]log.index.size.max.bytes对于segment日志的索引文件大小限制(默认为10M)

生产者相关

生产消息有存入到分区中,有三种方式。【1.指定分区;2.带有key;3.无key】;

kafka-console-producer.sh 脚本通过调用 kafka.tools.ConsoleProducer 类加载命令行参数的方式,在控制台生产消息的脚本。

三个分区一个副本的topic 

  1. [root@localhost kafka]# kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic test1
  2. [root@localhost kafka]# kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic test1
  3. Topic:test1 PartitionCount:3 ReplicationFactor:1 Configs:
  4. Topic: test1 Partition: 0 Leader: 1 Replicas: 1 Isr: 1
  5. Topic: test1 Partition: 1 Leader: 1 Replicas: 1 Isr: 1
  6. Topic: test1 Partition: 2 Leader: 1 Replicas: 1 Isr: 1

1.无key型消息

默认情况下,所生产的消息是没有 key 的,命令如下:

执行上述命令后,就会在控制台等待键入消息体,直接输入消息值(value)即可,每行(以换行符分隔)表示一条消息,如下所示。

  1. [root@localhost kafka]# kafka-console-producer.sh --broker-list localhost:9092 --topic test1
  2. >"Hello kafka"

    正常情况,每次回车表示触发“发送”操作,回车后可直接使用“Ctrl + c”退出生产者控制台,再使用  kafka-console-consumer.sh 脚本验证本次的生产情况。

2. 有key型消息

当需要为消息指定 key 时,可使用如下命令:

默认消息键与消息值间使用“Tab键”进行分隔,切勿使用转义字符(\t),如下所示:

  1. [root@localhost kafka]# kafka-console-producer.sh --broker-list localhost:9092 --topic test1 --property parse.key=true
  2. >name kafka
  3. >age 11

键入如上信息表示所生产的消息“name”为消息键,“kafka”为消息值。

3.指定分区

[root@localhost kafka]# kafka-console-producer.sh --broker-list localhost:9092 --topic test1 1

附表 属性列表

 下表列举了当前版本支持的所有参数用法,敬请参阅。

参数值类型说明有效值
--bootstrap-serverString要连接的服务器
必需(除非指定--broker-list)
形如:host1:prot1,host2:prot2
--topicString(必需)接收消息的主题名称
--broker-listString已过时要连接的服务器形如:host1:prot1,host2:prot2
--batch-sizeInteger单个批处理中发送的消息数200(默认值)
--compression-codecString压缩编解码器none、gzip(默认值)
snappy、lz4、zstd
--max-block-msLong在发送请求期间,生产者将阻止的最长时间60000(默认值)
--max-memory-bytesLong生产者用来缓冲等待发送到服务器的总内存33554432(默认值)
--max-partition-memory-bytesLong为分区分配的缓冲区大小16384
--message-send-max-retriesInteger最大的重试发送次数3
--metadata-expiry-msLong强制更新元数据的时间阈值(ms)300000
--producer-propertyString将自定义属性传递给生成器的机制形如:key=value
--producer.configString生产者配置属性文件
[--producer-property]优先于此配置
配置文件完整路径
--propertyString自定义消息读取器parse.key=true|false
key.separator=<key.separator>
ignore.error=true|false
--request-required-acksString生产者请求的确认方式0、1(默认值)、all
--request-timeout-msInteger生产者请求的确认超时时间1500(默认值)
--retry-backoff-msInteger生产者重试前,刷新元数据的等待时间阈值100(默认值)
--socket-buffer-sizeIntegerTCP接收缓冲大小102400(默认值)
--timeoutInteger消息排队异步等待处理的时间阈值1000(默认值)
--sync同步发送消息
--version显示 Kafka 版本
不配合其他参数时,显示为本地Kafka版本
--help打印帮助信息

消费者相关

--reset-offsets 后面需要接重置的模式1. 基本操作

  1. #创建一个topic 一个副本 三个分区
  2. [root@localhost kafka]# kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic jettech
  3. # 生产者
  4. [root@localhost kafka]# kafka-console-producer.sh --broker-list localhost:9092 --topic jettech
  5. >"hello jettech"
  6. #消费者,其中"--from-beginning"为可选参数,表示要从头消费消息
  7. [root@localhost kafka]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic jettech --from-beginning
  8. "hello jettech"
  9. #指定groupid
  10. [root@localhost kafka]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic jettech --from-beginning --consumer-property group.id=jettoloader-consumer-group
  11. "hello jettech"
  12. #或指定文件
  13. [root@localhost consume]# cat consume.properties
  14. group.id=consumer-group-jettoloader
  15. [root@localhost consume]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic jettech --consumer.config consume.properties
  16. "wubo"
  17. # 指定分区发送数据
  18. [root@localhost kafka]# kafka-console-producer.sh --broker-list localhost:9092 --topic jettech 2
  19. >"hello beijing"
  20. >"nihao"
  21. # 指定分区接受数据
  22. [root@localhost consume]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic jettech --from-beginning --partition 1
  23. >"hello beijing"
  24. >"nihao"
  25. # 新生产者(支持0.9版本+
  26. [root@localhost kafka]# kafka-console-producer.sh --broker-list localhost:9092 --topic beijing --producer.config config/producer.properties
  27. >"hello bejing"
  28. # 新消费者(支持0.9版本+
  29. [root@localhost kafka]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic beijing --from-beginning --consumer.config config/consumer.properties
  30. "hello bejing"
  31. # kafka-verifiable-consumer.sh(消费者事件,例如:offset提交等
  32. [root@localhost kafka]# kafka-verifiable-consumer.sh --broker-list localhost:9092 --topic jettech --group-id consumer-group-jettoloader
  33. {"timestamp":1660702053129,"name":"startup_complete"}
  34. {"timestamp":1660702053323,"name":"partitions_revoked","partitions":[]}
  35. {"timestamp":1660702053344,"name":"partitions_assigned","partitions":[{"topic":"jettech","partition":0},{"topic":"jettech","partition":2},{"topic":"jettech","partition":1}]}
  36. {"timestamp":1660702053395,"name":"records_consumed","count":3,"partitions":[{"topic":"jettech","partition":0,"count":1,"minOffset":0,"maxOffset":0},{"topic":"jettech","partition":2,"count":1,"minOffset":0,"maxOffset":0},{"topic":"jettech","partition":1,"count":1,"minOffset":2,"maxOffset":2}]}
  37. {"timestamp":1660702053403,"name":"offsets_committed","offsets":[{"topic":"jettech","partition":0,"offset":1},{"topic":"jettech","partition":2,"offset":1},{"topic":"jettech","partition":1,"offset":3}],"success":true}
  38. #查看消费者组
  39. [root@localhost kafka]# kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
  40. consumer-group-jettoloader
  41. test-consumer-group
  42. jettoloader-consumer-group
  43. #查看消费者组详情--describe
  44. [root@localhost kafka]# kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group jettoloader-consumer-group
  45. Consumer group 'jettoloader-consumer-group' has no active members.
  46. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  47. jettech 0 0 1 1 - - -
  48. jettech 2 0 1 1 - - -
  49. jettech 1 1 3 2 - - -
  50. #查询消费者成员信息--members
  51. [root@localhost kafka]# kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group jettech-hello1-comsume-group --members
  52. CONSUMER-ID HOST CLIENT-ID #PARTITIONS
  53. consumer-1-2d7d59c3-64c9-4526-905b-4e8920db3e84 /192.168.1.36 consumer-1 2
  54. #查询消费者状态信息--state
  55. [root@localhost kafka]# kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group jettech-hello1-comsume-group --state
  56. COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS
  57. 192.168.1.36:9092 (1) range Stable 1
  58. #删除消费者组--delete
  59. [root@localhost kafka]# kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group jettech-hellp-comsume-group01
  60. Deletion of requested consumer groups ('jettech-hellp-comsume-group01') was successful.
  61. [root@localhost kafka]# kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
  62. jettech-hellp-comsume-group
  63. consumer-group-jettoloader
  64. jettech-hello1-comsume-group
  65. test-consumer-group
  66. jettoloader-consumer-group
  67. #PS: 想要删除消费组前提是这个消费组的所有客户端都停止消费/不在线才能够成功删除;否则会报下面异常
  68. [root@localhost kafka]# kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group jettech-hello1-comsume-group
  69. Error: Deletion of some consumer groups failed:
  70. * Group 'jettech-hello1-comsume-group' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.
  71. #重置消费组的偏移量 --reset-offsets
  72. #能够执行成功的一个前提是 消费组这会是不可用状态;
  73. 下面的示例使用的参数是: --dry-run ;这个参数表示预执行,会打印出来将要处理的结果;
  74. 等你想真正执行的时候请换成参数--excute ;
  75. 下面示例 重置模式都是 --to-earliest 重置到最早的;
  76. 请根据需要参考下面 相关重置Offset的模式 换成其他模式;
  77. 重置指定消费组的偏移量 --group
  78. #重置指定消费组的指定Topic的偏移量到最晚offset 【--to-latest 】
  79. [root@localhost kafka]# kafka-consumer-groups.sh --reset-offsets --to-earliest --group jettech-hellp1-comsume-group --bootstrap-server localhost:9092 --dry-run --topic hello1
  80. TOPIC PARTITION NEW-OFFSET
  81. hello1 0 0
  82. hello1 1 0
  83. #重置指定消费组的指定Topic的偏移量到指定的offset 【--to-latest 】
  84. 1)获取offset
  85. [root@localhost kafka]# kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic jettech --time -1
  86. jettech:0:33334
  87. jettech:1:33336
  88. jettech:2:33335
  89. 2)重置指定消费组的指定Topic的偏移量到指定的offset
  90. [root@localhost kafka]# kafka-consumer-groups.sh --reset-offsets --to-offset 33336 --group jettech-hellp1-comsume-group --bootstrap-server localhost:9092 --dry-run --topic hello1
  91. TOPIC PARTITION NEW-OFFSET
  92. hello1 0 0
  93. hello1 1 0

--reset-offsets 后面需要接重置的模式:相关重置Offset的模式

--to-earliest :    重置offset到最开始的那条offset(找到还未被删除最早的那个offset)    
--to-current:    直接重置offset到当前的offset,也就是LOE    
--to-latest:    重置到最后一个offset    
--to-datetime:    重置到指定时间的offset;格式为:YYYY-MM-DDTHH:mm:SS.sss;    --to-datetime "2021-6-26T00:00:00.000"
--to-offset    重置到指定的offset,但是通常情况下,匹配到多个分区,这里是将匹配到的所有分区都重置到这一个值; 如果 1.目标最大offset<--to-offset, 这个时候重置为目标最大offset;2.目标最小offset>--to-offset ,则重置为最小; 3.否则的话才会重置为--to-offset的目标值; 一般不用这个    --to-offset 3465 
--shift-by    按照偏移量增加或者减少多少个offset;正的为往前增加;负的往后退;当然这里也是匹配所有的;    --shift-by 100 、--shift-by -100
--from-file    根据CVS文档来重置; 这里下面单独讲解    

上面其他的一些模式重置的都是匹配到的所有分区; 不能够每个分区重置到不同的offset;不过**--from-file**可以让我们更灵活一点;

1先配置cvs文档
格式为: Topic:分区号: 重置目标偏移量

  1. test2,0,100
  2. test2,1,200
  3. test2,2,300

2执行命令

kafka-consumer-groups.sh --reset-offsets --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --from-file config/reset-offset.csv


通用命令kafka-run-class.sh

运行一个class,调用kafka的tools的部分功能。

kafka-run-class.sh [-daemon] [-name servicename] [-loggc] classname [opts]

GetOffsetShell

  1. kafka-run-class.sh kafka.tools.GetOffsetShell /?
  2. #获取offset
  3. kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka1:9092 --topic jettech --time -1

options:
Option    Description
–broker-list    
–max-wait-ms <Integer: ms>    废弃。(default: 1000)
–offsets <Integer: count>    废弃。(default: 1)
–partitions <String: partition ids>    partition id列表
–time <Long: >    时间戳。返回指定时间戳之前的offset。 timestamp/-1(latest,默认值)/-2(earliest)。如果时间戳大于当前时刻,无offset返回。
–topic <String: topic>    
 

kafka常用命令及问题解决_demon7552003的博客-CSDN博客

 配置文件:

1producer.properties:生产端的配置文件

  1. #指定kafka节点列表,用于获取metadata,不必全部指定
  2. #需要kafka的服务器地址,来获取每一个topic的分片数等元数据信息。
  3. metadata.broker.list=kafka01:9092,kafka02:9092,kafka03:9092
  4. #生产者生产的消息被发送到哪个block,需要一个分组策略。
  5. #指定分区处理类。默认kafka.producer.DefaultPartitioner,表通过key哈希到对应分区
  6. #partitioner.class=kafka.producer.DefaultPartitioner
  7. #生产者生产的消息可以通过一定的压缩策略(或者说压缩算法)来压缩。消息被压缩后发送到broker集群,
  8. #而broker集群是不会进行解压缩的,broker集群只会把消息发送到消费者集群,然后由消费者来解压缩。
  9. #是否压缩,默认0表示不压缩,1表示用gzip压缩,2表示用snappy压缩。
  10. #压缩后消息中会有头来指明消息压缩类型,故在消费者端消息解压是透明的无需指定。
  11. #文本数据会以110或者更高的压缩比进行压缩。
  12. compression.codec=none
  13. #指定序列化处理类,消息在网络上传输就需要序列化,它有String、数组等许多种实现。
  14. serializer.class=kafka.serializer.DefaultEncoder
  15. #如果要压缩消息,这里指定哪些topic要压缩消息,默认empty,表示不压缩。
  16. #如果上面启用了压缩,那么这里就需要设置
  17. #compressed.topics=
  18. #这是消息的确认机制,默认值是0。在面试中常被问到。
  19. #producer有个ack参数,有三个值,分别代表:
  20. #(1)不在乎是否写入成功;
  21. #(2)写入leader成功;
  22. #(3)写入leader和所有副本都成功;
  23. #要求非常可靠的话可以牺牲性能设置成最后一种。
  24. #为了保证消息不丢失,至少要设置为1,也就
  25. #是说至少保证leader将消息保存成功。
  26. #设置发送数据是否需要服务端的反馈,有三个值0,1,-1,分别代表3种状态:
  27. #0: producer不会等待broker发送ack。生产者只要把消息发送给broker之后,就认为发送成功了,这是第1种情况;
  28. #1: 当leader接收到消息之后发送ack。生产者把消息发送到broker之后,并且消息被写入到本地文件,才认为发送成功,这是第二种情况;#-1: 当所有的follower都同步消息成功后发送ack。不仅是主的分区将消息保存成功了,
  29. #而且其所有的分区的副本数也都同步好了,才会被认为发动成功,这是第3种情况。
  30. request.required.acks=0
  31. #broker必须在该时间范围之内给出反馈,否则失败。
  32. #在向producer发送ack之前,broker允许等待的最大时间 ,如果超时,
  33. #broker将会向producer发送一个error ACK.意味着上一次消息因为某种原因
  34. #未能成功(比如follower未能同步成功)
  35. request.timeout.ms=10000
  36. #生产者将消息发送到broker,有两种方式,一种是同步,表示生产者发送一条,broker就接收一条;
  37. #还有一种是异步,表示生产者积累到一批的消息,装到一个池子里面缓存起来,再发送给broker,
  38. #这个池子不会无限缓存消息,在下面,它分别有一个时间限制(时间阈值)和一个数量限制(数量阈值)的参数供我们来设置。
  39. #一般我们会选择异步。
  40. #同步还是异步发送消息,默认“sync”表同步,"async"表异步。异步可以提高发送吞吐量,
  41. #也意味着消息将会在本地buffer中,并适时批量发送,但是也可能导致丢失未发送过去的消息
  42. producer.type=sync
  43. #在async模式下,当message被缓存的时间超过此值后,将会批量发送给broker,
  44. #默认为5000ms
  45. #此值和batch.num.messages协同工作.
  46. queue.buffering.max.ms = 5000
  47. #异步情况下,缓存中允许存放消息数量的大小。
  48. #在async模式下,producer端允许buffer的最大消息量
  49. #无论如何,producer都无法尽快的将消息发送给broker,从而导致消息在producer端大量沉积
  50. #此时,如果消息的条数达到阀值,将会导致producer端阻塞或者消息被抛弃,默认为10000条消息。
  51. queue.buffering.max.messages=20000
  52. #如果是异步,指定每次批量发送数据量,默认为200
  53. batch.num.messages=500
  54. #在生产端的缓冲池中,消息发送出去之后,在没有收到确认之前,该缓冲池中的消息是不能被删除的,
  55. #但是生产者一直在生产消息,这个时候缓冲池可能会被撑爆,所以这就需要有一个处理的策略。
  56. #有两种处理方式,一种是让生产者先别生产那么快,阻塞一下,等会再生产;另一种是将缓冲池中的消息清空。
  57. #当消息在producer端沉积的条数达到"queue.buffering.max.meesages"后阻塞一定时间后,
  58. #队列仍然没有enqueue(producer仍然没有发送出任何消息)
  59. #此时producer可以继续阻塞或者将消息抛弃,此timeout值用于控制"阻塞"的时间
  60. #-1: 不限制阻塞超时时间,让produce一直阻塞,这个时候消息就不会被抛弃
  61. #0: 立即清空队列,消息被抛弃
  62. queue.enqueue.timeout.ms=-1
  63. #当producer接收到error ACK,或者没有接收到ACK时,允许消息重发的次数
  64. #因为broker并没有完整的机制来避免消息重复,所以当网络异常时(比如ACK丢失)
  65. #有可能导致broker接收到重复的消息,默认值为3.
  66. message.send.max.retries=3
  67. #producer刷新topic metada的时间间隔,producer需要知道partition leader
  68. #的位置,以及当前topic的情况
  69. #因此producer需要一个机制来获取最新的metadata,当producer遇到特定错误时,
  70. #将会立即刷新
  71. #(比如topic失效,partition丢失,leader失效等),此外也可以通过此参数来配置
  72. #额外的刷新机制,默认值600000
  73. topic.metadata.refresh.interval.ms=60000

2).consumer.properties:消费端的配置文件

  1. #消费者集群通过连接Zookeeper来找到broker。
  2. #zookeeper连接服务器地址
  3. zookeeper.connect=zk01:2181,zk02:2181,zk03:2181
  4. #zookeeper的session过期时间,默认5000ms,用于检测消费者是否挂掉
  5. zookeeper.session.timeout.ms=5000
  6. #当消费者挂掉,其他消费者要等该指定时间才能检查到并且触发重新负载均衡
  7. zookeeper.connection.timeout.ms=10000
  8. #这是一个时间阈值。
  9. #指定多久消费者更新offset到zookeeper中。
  10. #注意offset更新时基于time而不是每次获得的消息。
  11. #一旦在更新zookeeper发生异常并重启,将可能拿到已拿到过的消息
  12. zookeeper.sync.time.ms=2000
  13. #指定消费
  14. group.id=xxxxx
  15. #这是一个数量阈值,经测试是500条。
  16. #当consumer消费一定量的消息之后,将会自动向zookeeper提交offset信息#注意offset信息并不是每消费一次消息就向zk提交
  17. #一次,而是现在本地保存(内存),并定期提交,默认为true
  18. auto.commit.enable=true
  19. # 自动更新时间。默认60 * 1000
  20. auto.commit.interval.ms=1000
  21. # 当前consumer的标识,可以设定,也可以有系统生成,
  22. #主要用来跟踪消息消费情况,便于观察
  23. conusmer.id=xxx
  24. # 消费者客户端编号,用于区分不同客户端,默认客户端程序自动产生
  25. client.id=xxxx
  26. # 最大取多少块缓存到消费者(默认10)
  27. queued.max.message.chunks=50
  28. # 当有新的consumer加入到group时,将会reblance,此后将会
  29. #有partitions的消费端迁移到新 的consumer上,如果一个
  30. #consumer获得了某个partition的消费权限,那么它将会向zk
  31. #注册 "Partition Owner registry"节点信息,但是有可能
  32. #此时旧的consumer尚没有释放此节点, 此值用于控制,
  33. #注册节点的重试次数.
  34. rebalance.max.retries=5
  35. #每拉取一批消息的最大字节数
  36. #获取消息的最大尺寸,broker不会像consumer输出大于
  37. #此值的消息chunk 每次feth将得到多条消息,此值为总大小,
  38. #提升此值,将会消耗更多的consumer端内存
  39. fetch.min.bytes=6553600
  40. #当消息的尺寸不足时,server阻塞的时间,如果超时,
  41. #消息将立即发送给consumer
  42. #数据一批一批到达,如果每一批是10条消息,如果某一批还
  43. #不到10条,但是超时了,也会立即发送给consumer。
  44. fetch.wait.max.ms=5000
  45. socket.receive.buffer.bytes=655360
  46. # 如果zookeeper没有offset值或offset值超出范围。
  47. #那么就给个初始的offset。有smallest、largest、
  48. #anything可选,分别表示给当前最小的offset、
  49. #当前最大的offset、抛异常。默认largest
  50. auto.offset.reset=smallest
  51. # 指定序列化处理类
  52. derializer.class=kafka.serializer.DefaultDecoder

3).server.properties:服务端的配置文件

  1. #broker的全局唯一编号,不能重复
  2. broker.id=0
  3. #用来监听链接的端口,producer或consumer将在此端口建立连接
  4. port=9092
  5. #处理网络请求的线程数量,也就是接收消息的线程数。
  6. #接收线程会将接收到的消息放到内存中,然后再从内存中写入磁盘。
  7. num.network.threads=3
  8. #消息从内存中写入磁盘是时候使用的线程数量。
  9. #用来处理磁盘IO的线程数量
  10. num.io.threads=8
  11. #发送套接字的缓冲区大小
  12. socket.send.buffer.bytes=102400
  13. #接受套接字的缓冲区大小
  14. socket.receive.buffer.bytes=102400
  15. #请求套接字的缓冲区大小
  16. socket.request.max.bytes=104857600
  17. #kafka运行日志存放的路径
  18. log.dirs=/export/servers/logs/kafka
  19. #topic在当前broker上的分片个数
  20. num.partitions=2
  21. #我们知道segment文件默认会被保留7天的时间,超时的话就
  22. #会被清理,那么清理这件事情就需要有一些线程来做。这里就是
  23. #用来设置恢复和清理data下数据的线程数量
  24. num.recovery.threads.per.data.dir=1
  25. #segment文件保留的最长时间,默认保留7天(168小时),
  26. #超时将被删除,也就是说7天之前的数据将被清理掉。
  27. log.retention.hours=168
  28. #滚动生成新的segment文件的最大时间
  29. log.roll.hours=168
  30. #日志文件中每个segment的大小,默认为1G
  31. log.segment.bytes=1073741824
  32. #上面的参数设置了每一个segment文件的大小是1G,那么
  33. #就需要有一个东西去定期检查segment文件有没有达到1G,
  34. #多长时间去检查一次,就需要设置一个周期性检查文件大小
  35. #的时间(单位是毫秒)。
  36. log.retention.check.interval.ms=300000
  37. #日志清理是否打开
  38. log.cleaner.enable=true
  39. #broker需要使用zookeeper保存meta数据
  40. zookeeper.connect=zk01:2181,zk02:2181,zk03:2181
  41. #zookeeper链接超时时间
  42. zookeeper.connection.timeout.ms=6000
  43. #上面我们说过接收线程会将接收到的消息放到内存中,然后再从内存
  44. #写到磁盘上,那么什么时候将消息从内存中写入磁盘,就有一个
  45. #时间限制(时间阈值)和一个数量限制(数量阈值),这里设置的是
  46. #数量阈值,下一个参数设置的则是时间阈值。
  47. #partion buffer中,消息的条数达到阈值,将触发flush到磁盘。
  48. log.flush.interval.messages=10000
  49. #消息buffer的时间,达到阈值,将触发将消息从内存flush到磁盘,
  50. #单位是毫秒。
  51. log.flush.interval.ms=3000
  52. #删除topic需要server.properties中设置delete.topic.enable=true否则只是标记删除
  53. delete.topic.enable=true
  54. #此处的host.name为本机IP(重要),如果不改,则客户端会抛出:
  55. #Producer connection to localhost:9092 unsuccessful 错误!
  56. host.name=kafka01
  57. advertised.host.name=192.168.239.128
日志文件的删除策略非常简单:启动一个后台线程定期扫描log file列表,把保存时间超过阀值的文件直接删除(根据文件的创建时间).清理参数在server.properties文件中

Kafka日志管理器允许定制删除策略。目前的策略是删除修改时间在N天之前的日志(按时间删除),也可以使用另外一个策略:保留最后的N GB数据的策略(按大小删除)。为了避免在删除时阻塞读操作,采用了copy-on-write形式的实现,删除操作进行时,读取操作的二分查找功能实际是在一个静态的快照副本上进行的,这类似于Java的CopyOnWriteArrayList。

Kafka消费日志删除思想:Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用

1

2

3

4

5

6

log.cleanup.policy=delete启用删除策略

直接删除,删除后的消息不可恢复。可配置以下两个策略:

清理超过指定时间清理: 

log.retention.hours=16

超过指定大小后,删除旧的消息:

log.retention.bytes=1073741824

Kafka日志管理器允许压缩策略

将数据压缩,只保留每个key最后一个版本的数据。首先在broker的配置中设置log.cleaner.enable=true启用cleaner,这个默认是关闭的。在Topic的配置中设置log.cleanup.policy=compact启用压缩策略。

 在整个数据流中,每个Key都有可能出现多次,压缩时将根据Key将消息聚合,只保留最后一次出现时的数据。这样,无论什么时候消费消息,都能拿到每个Key的最新版本的数据。
    压缩后的offset可能是不连续的,比如上图中没有5和7,因为这些offset的消息被merge了,当从这些offset消费消息时,将会拿到比这个offset大的offset对应的消息,比如,当试图获取offset为5的消息时,实际上会拿到offset为6的消息,并从这个位置开始消费。
    这种策略只适合特俗场景,比如消息的key是用户ID,消息体是用户的资料,通过这种压缩策略,整个消息集里就保存了所有用户最新的资料。
    压缩策略支持删除,当某个Key的最新版本的消息没有内容时,这个Key将被删除,这也符合以上逻辑。

  • 消息消费
    kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicName

    表示从 latest 位移位置开始消费该主题的所有分区消息,即仅消费正在写入的消息。

  • 从开始位置消费

    kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic topicName

     表示从指定主题中有效的起始位移位置开始消费所有分区的消息

  • 显示key消费

    kafka-console-consumer.sh --bootstrap-server localhost:9092 --property print.key=true --topic topicName

    消费出的消息结果将打印出消息体的 key 和 value

  • 执行分区消费

    1. # 指定分区接受数据
    2. [root@localhost consume]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic jettech --from-beginning --partition 1

    若还需要为你的消息添加其他属性,请参考下述列表。

参数值类型说明有效值
--topicstring被消费的topic
--whiteliststring正则表达式,指定要包含以供使用的主题的白名单
--partitioninteger指定分区
除非指定’–offset’,否则从分区结束(latest)开始消费
--offsetstring执行消费的起始offset位置
默认值:latest
latest
earliest
<offset>
--consumer-propertystring将用户定义的属性以key=value的形式传递给使用者
--consumer.configstring消费者配置属性文件
请注意,[consumer-property]优先于此配置
--formatterstring用于格式化kafka消息以供显示的类的名称
默认值:kafka.tools.DefaultMessageFormatter
kafka.tools.DefaultMessageFormatter
kafka.tools.LoggingMessageFormatter
kafka.tools.NoOpMessageFormatter
kafka.tools.ChecksumMessageFormatter
--propertystring初始化消息格式化程序的属性print.timestamp=true|false
print.key=true|false
print.value=true|false
key.separator=<key.separator>
line.separator=<line.separator>
key.deserializer=<key.deserializer>
value.deserializer=<value.deserializer>
 
--from-beginningstring从存在的最早消息开始,而不是从最新消息开始
--max-messagesinteger消费的最大数据量,若不指定,则持续消费下去
--timeout-msinteger在指定时间间隔内没有消息可用时退出
--skip-message-on-error如果处理消息时出错,请跳过它而不是暂停
--bootstrap-serverstring必需(除非使用旧版本的消费者),要连接的服务器
key-deserializerstring
--value-deserializerstring
--enable-systest-events除记录消费的消息外,还记录消费者的生命周期
(用于系统测试)
--isolation-levelstring设置为read_committed以过滤掉未提交的事务性消息
设置为read_uncommitted以读取所有消息
默认值:read_uncommitted
--groupstring指定消费者所属组的ID
--blackliststring要从消费中排除的主题黑名单
--csv-reporter-enabled如果设置,将启用csv metrics报告器
--delete-consumer-offsets如果指定,则启动时删除zookeeper中的消费者信息
--metrics-dirstring输出csv度量值
需与[csv-reporter-enable]配合使用
--zookeeperstring必需(仅当使用旧的使用者时)连接zookeeper的字符串。
可以给出多个URL以允许故障转移

kafka发送、消费指定分区消息_月亮船长的博客-CSDN博客_kafka消费指定分区

kafka-consumer-groups.sh消费者组管理【资料

什么是kafka消费者组
  kafka消费者组(Consumer Group)是kafka提供的可扩展且具有容错性的消费者机制。
  它是一个组,所以内部有可以有多个消费者,这些消费者共用一个ID(Group ID),一个组内的所有消费者共同协作,完成对订阅的主题的所有分区进行消费。其中一个主题中的一个分区只能由一个消费者消费。

消费者组的特性

  1. 一个消费者组可以有多个消费者。
  2. Group ID是一个字符串,在一个kafka集群中,它标识唯一的一个消费者组。
  3. 每个消费者组订阅的所有主题中,每个主题的每个分区只能由一个消费者消费。消费者组之间不影响。

为什么出现消费者组

        我们知道的消息引擎模型有:点对点模型和发布/订阅模型。传统的消息引擎就是这两大类。这两大类消息引擎,都有各自适合的应用场景,也都有不适应的场景。
  点对点的模型,每消费一个消息之后,被消费的消息就会被删除。如果我们需要多个消费者消费同一个消息队列时,就不能使用点对点模型了。
  发布订阅模型,支持多个消费者消费同一个消息队列,但是发布订阅模型中,消费者订阅了一个主题后,就要订阅主题的所有分区。这总方式既不灵活,也会影响消息的真是投递效果。
  消费者组就避开了上述两种模型的缺陷,有兼容了他们的优点。
  首先消费者之间彼此独立,互不影响。可以订阅同一个主题并且互不干扰。再加上Broker端的消息留存机制,kafka的消费者组就完美的解决了上面的问题。kafka使用一种消费者组(Consumer Group)机制,就同时实现了传统消息引擎系统的两大模型:如果所有的消费者实例都属于一个消费者组那就是点对点模型,如果所有消费者实例各自是独立的消费者那就是发布订阅模型。
  因为上面消费组的第三个特性。所以消费者组的消费者实例数最好等于该消费者组订阅的主题中的分区数。如果实例数量多于分区数,那多余的实例将永远不会工作,除非有其他实例挂掉。

针对Consumer Group,Kafka如何管理位移(offset)?

        这个问题需要区分新老版本。首先他们的存储方式都是使用类似于map的KV对实现的存储。key是分区,v对应Consumer消费该分区的最新位移。我们可以这样理解,但是实际的存储要比这个复杂的多。
  新老版本的区别在于位移存储的位置:老版本是将位移存放到zookeeper中,而zookeeper是一个分布式的协调服务框架,kafka重度依赖它实现的各种各样的协调管理。将位移存到zookeeper中的做法,显而易见的好处是减少了kafka broker端的状态保存开销。可以自由的扩缩容,实现超强的伸缩性。
  但是由于zookeeper这类元框架其实并不适合进行频繁的写更新。而Consumer Group的位移更新却是一个非常频繁的更新操作。所以并不是很适合将位移存在zookeeper中。
  于是新版本将位移保存在kafka内部主题中。就是:_consumer_offsets。

kafka的Rebalance(重平衡)

定义
  Rebalance本质上是一种协议,规定了一个Consumer Group下的所有Consumer如何达成一致,来分配订阅Topic的每个分区。例如:某个Group下有20个Consumer实例,它订阅了一个有100个分区的Topic。正常情况下,kafka会给每个实例分配5个分区。这个分配的过程叫做Rebalance。
  rebalance发生时,Group下的所有Consumer实例都会协调在一起共同参与。具体怎么分配,是有分配策略协助的。分配策略以后再总结。

触发条件

  1. 组成员数发生变化。比如有实例进入或者离开组。
  2. 订阅的主题数发生变更。
  3. 订阅主题的分区数发生变更。

问题

  rebalance有一个比较大的问题。那就是再Rebalance过程中,所有的实例都会停止消费,等Rebalance完成。这就导致Rebalance过程中无法提供服务。而且,Rebalance的过程还很慢。所以我们要尽量避免Rebalance的发生。

案例操作:

案例一:消费者多于分区数

1)创建生产者1个分区,创建生产者一个副本 

  1. [root@localhost kafka]# kafka-topics.sh --zookeeper localhost:2181 --create --topic hello --partitions 1 --replication-factor 1
  2. Created topic hello.
  3. [root@localhost kafka]# kafka-topics.sh --list --bootstrap-server localhost:9092
  4. __consumer_offsets
  5. beijing
  6. hello
  7. jettech
  8. shanghai
  9. test
  10. test1
  11. test1-1
  12. test2
  13. test3
  14. test4
  15. test5
  16. test6
  17. test8
  18. topicname
  19. [root@localhost kafka]# kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic hello
  20. Topic:hello PartitionCount:1 ReplicationFactor:1 Configs:
  21. Topic: hello Partition: 0 Leader: 1 Replicas: 1 Isr: 1

2)启动生产者

  1. [root@localhost kafka]# kafka-console-producer.sh --broker-list localhost:9092 --topic hello
  2. >"hello hello:"

3)创建两个消费者

  1. [root@localhost kafka]# cat consume/consume.properties
  2. group.id=jettech-hellp-comsume-group

两个消费者都是用同一台机器,开两个shell终端操作的.
下面命令在两个shell终端分别输入一次

  1. [root@localhost kafka]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello --consumer.config consume/consume.properties
  2. wubo
  3. [root@localhost kafka]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello --consumer.config consume/consume.properties

生产者生产数据

  1. [root@localhost kafka]# kafka-console-producer.sh --broker-list localhost:9092 --topic hello
  2. >"hello hello:"
  3. >wubo
  4. >aaa
  5. >bbb
  6. >cccc
  7. >dddd
  8. >eeee
  9. >fff

消费者1接收数据

  1. [root@localhost kafka]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello --consumer.config consume/consume.properties
  2. wubo
  3. aaa
  4. bbb
  5. cccc
  6. dddd
  7. eeee
  8. fff

消费者2接收数据

[root@localhost kafka]#  kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello  --consumer.config  consume/consume.properties

结果:

消费者1接收到了7条数据, 消费者2一条数据没接收到.

结论 同一个消费者组内同一时刻只有一个消费者接收到消息。 另外一个消费者分不到partition消费就空着待机了

因为启动consumer的时候,consumer会通过某种算法分配消费哪个partition, 分配完了之后这个consumer就一直消费这个partition的数据了.别的分区这个consumer就管都不会管了.

案例二:不同消费者组消费相同topic的相同分区数据

不同消费者组的消费者是可以同时消费同一个topic的同一个分区信息的

1)生产:

  1. [root@localhost kafka]# kafka-console-producer.sh --broker-list localhost:9092 --topic hello
  2. >aaa
  3. >bbbb
  4. >ccc
  5. >ddd
  6. >

2)消费1

  1. [root@localhost kafka]# cat consume/consume.properties
  2. group.id=jettech-hellp-comsume-group
  3. [root@localhost kafka]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello --consumer.config consume/consume.properties
  4. aaaaa
  5. bbbb
  6. cccc

3)消费2

  1. [root@localhost kafka]# cat consume/consume1.properties
  2. group.id=jettech-hellp-comsume-group1
  3. [root@localhost kafka]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello --consumer.config consume/consume1.properties
  4. aaaaa
  5. bbbb
  6. cccc

结果:

消费者1接收到了3条数据, 消费者2也接收到了3条数据

结论 不同消费者组的消费者可同时消费同一个topic的同一个分区。

案例三:生产者2个分区

1)重新创建一个生产者,是2个分区的.,

  1. [root@localhost kafka]# kafka-topics.sh --zookeeper localhost:2181 --create --topic hello1 --partitions 2 --replication-factor 1
  2. Created topic hello1.

2)创建两个消费者

两个消费者都是用相同机器,开两个shell终端操作的.
下面命令在两个shell终端分别输入一次

  1. [root@localhost kafka]# kafka-console-producer.sh --broker-list localhost:9092 --topic hello1
  2. >hello1
  3. >hello2
  4. >hello3
  5. >hello4

3)消费1

  1. [root@localhost kafka]# cat consume/consume.properties
  2. group.id=jettech-hello1-comsume-group
  3. [root@localhost kafka]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello1 --consumer.config consume/consume.properties
  4. hello1
  5. hello3

4)消费2

  1. [root@localhost kafka]# cat consume/consume.properties
  2. group.id=jettech-hello1-comsume-group
  3. [root@localhost kafka]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello1 --consumer.config consume/consume.properties
  4. hello2
  5. hello4

结论: 消费者接收数据被轮询了

Kafka中的消费者组(Consumer Group) - DoubleLi - 博客园

切换leader

  1. # kafka版本 <= 2.4
  2. kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
  3. # kafka新版本
  4. kafka-preferred-replica-election.sh --bootstrap-server broker_host:port

kafka自带压测命令

  1. [root@localhost kafka]# kafka-producer-perf-test.sh --topic shanghai --num-records 100 --record-size 1 --throughput 100 --producer-props bootstrap.servers=localhost:9092
  2. [2022-08-17 10:13:57,158] WARN [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {shanghai=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
  3. 100 records sent, 99.403579 records/sec (0.00 MB/sec), 8.25 ms avg latency, 299.00 ms max latency, 2 ms 50th, 16 ms 95th, 299 ms 99th, 299 ms 99.9th.

kafka持续发送消息

持续发送消息到指定的topic中,且每条发送的消息都会有响应信息:

  1. #生产
  2. [root@localhost kafka]# kafka-verifiable-producer.sh --broker-list $(hostname -f):9092 --topic jettech --max-messages 100000
  3. #消费
  4. [root@localhost kafka]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic jettech --from-beginning

zookeeper-shell.sh

如果kafka集群的zk配置了chroot路径,那么需要加上/path

  1. [root@localhost kafka]# zookeeper-shell.sh localhost:2181[/path]
  2. ls /brokers/ids
  3. get /brokers/ids/0

迁移分区

  1. 创建规则json
    kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute

  2. 验证
    kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify

MirrorMaker 跨机房灾备工具

kafka-mirror-maker.sh --consumer.config consumer.properties --producer.config producer.properties --whitelist topicA|topicB

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/木道寻08/article/detail/785249
推荐阅读
相关标签
  

闽ICP备14008679号