赞
踩
Kafka提供了几个命令行接口实用程序,他们对于kafka集群的配置管理非常有用。这些工具是通过java来实现的,并提供了一组脚本来调用这些类。这些工具提供了基本的功能,但是对于更复杂的操作,你可能会发现他们还是有些力不从心。本章将描述做为Apache Kafka开源项目的一部分的工具。在Apache桑可以找到关于社区中开发的高级工具的更多信息。详见kakfa官网。
kafka-topics.sh工具提供了快速创建、修改、删除和列出集群中的topic信息。配置管理已经移到kafka-configs.sh中。使用这些命令行工具,需要使用–zookeeper参数,如:zoo1.example.com:2181/kafka-cluster。
在集群中创建topic的时候需要三个参数。这三个参数是必须的。尽管一些参数在broker上有默认值。
还可以在创建的时候显示设置topic的副本,或者设置配置参数对topic的配置进行覆盖。这些操作不在此讨论。配置覆盖可以在本章后面找到,他们可以提供给kafka-topics.sh 通过 --config 命令行参数使用。分区的配置也将在后续内容中介绍。
Topic的名称可以包含字母、数字、字符及下划线、破折号和点号。
kafka-topics.sh 示例如下:
kafka-topics.sh --zookeeper <zookeeper connect> --create --topic <string>
--replication-factor <integer> --partitions <integer>
该命令将导致集群创建具有指定名称和分区数量的topic。对于每个分区,集群将选择适当的副本数。这意味着,如果集群设置为支持机架的副本分配,那么每个分区的副本将位于单独的机架中,如果不需要机架支持,那么可以通过–disable-rack-aware 关闭。
如下创建名为 my-topic的topic,包含8个分区,每个分区有2个副本:
# kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster --create
--topic my-topic --replication-factor 2 --partitions 8
Created topic "my-topic".
#
有时需要增加topic的分区数量,分区是在集群中扩展和复制TOPIC的方式。增加分区技术的最常见的原因是为了进一步扩展topic,或者降低单个分区的吞吐量。如果消费者需要扩展在单个组中运行更多的副本,则Topic也可以增加,因为分区只能由组中的单个成员使用。
# kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--alter --topic my-topic --partitions 16
WARNING: If partitions are increased for a topic that has a key,
the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
#
即使没有消息的topic也会使用集群的资源,包括磁盘空间,打开的文件句柄和内存。如果topic不再需要,可以删除它并释放这些资源。为了执行此操作,集群中的broker必须配置了delete.topic.enable选项为true。如果这个选项设置为false,则这个操作将被忽略。
例如,删除my-topic的topic:
# kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--delete --topic my-topic
Topic my-topic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set
to true.
#
topic工具可以列出集群所有的topic,列表格式为每行一个topic,没有特定的顺序。
样例如下:
# kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--list
my-topic - marked for deletion
other-topic
#
还可以获得关于集群中的一个或者多个topic的详细信息,输出包括分区计数,topic配置覆盖以及每个分区及其副本分配的清单。通过向命令行提供一个topic参数,可以将此限制为单个topic主题。
样例如下:
# kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster --describe
Topic:other-topic PartitionCount:8 ReplicationFactor:2 Configs:
Topic:other-topic Partition: 0 ... Replicas: 1,0 Isr: 1,0
Topic:other-topic Partition: 1 ... Replicas: 0,1 Isr: 0,1
Topic:other-topic Partition: 2 ... Replicas: 1,0 Isr: 1,0
Topic:other-topic Partition: 3 ... Replicas: 0,1 Isr: 0,1
Topic:other-topic Partition: 4 ... Replicas: 1,0 Isr: 1,0
Topic:other-topic Partition: 5 ... Replicas: 0,1 Isr: 0,1
Topic:other-topic Partition: 6 ... Replicas: 1,0 Isr: 1,0
Topic:other-topic Partition: 7 ... Replicas: 0,1 Isr: 0,1
#
describe命令还有几个用于过滤输出的选项。这有助于诊断集群问题。对于其中每一个,不要指定–topic参数。因为目的是查找集群中匹配条件的所有topic和分区。因此这些选项不适用于list命令。
为了找到所有具有覆盖的topic,请使用 --topics-withoverrides 参数,这将只描述配置与缺省值不同的topic。这将只描述配置与集群缺省的不同的topic。
有两个过滤器用于查找所有问题的分区,–underreplicated-partitions 将显示一个或者多个副本与leader不同步的所有分区。–unavailable-partitions显示没有leader的所有分区。这是一种更严重的情况,意味着该分区目前处于脱机状态,不能用于客户端生产或者使用。
样例如下:
# kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--describe --under-replicated-partitions
Topic: other-topic Partition: 2 Leader: 0 Replicas: 1,0
Isr: 0
Topic: other-topic Partition: 4 Leader: 0 Replicas: 1,0
Isr: 0
#
kafka中的消费者组分为两个不同的地方维护,对于较老的消费者,信息存储在zookeeper中。而对于版本比较新的消费者,信息存储在kafka中的特定topic中。kafka-consumer-groups.sh可以列出这两种类型的消费者组,它还可以用于删除消费者组的offset。但仅仅用在旧的消费者组下运行的组中,在zookeeper中维护的消费者组。在与较老的消费者组一起工作时,你将访问–zookeeper参数指定的kafka集群。对于新版本的消费者组,你需要通过–bootstrap-server指定kafka broker的IP和端口。
旧的消费者详情:
# kafka-consumer-groups.sh --zookeeper
zoo1.example.com:2181/kafka-cluster --list
console-consumer-79697
myconsumer
#
新的消费者:
# kafka-consumer-groups.sh --new-consumer --bootstrap-server
kafka1.example.com:9092/kafka-cluster --list
kafka-python-test
my-new-consumer
#
如果要查看一个消费者组的情况,你可以用–describe替换–list之后加上–group参数。这将列出当前指定的消费者组正在使用的topic以及每个topic的offset。
如下对testgroup进行查看:
# kafka-consumer-groups.sh --zookeeper zoo1.example.com:2181/kafka-cluster --describe --group testgroup GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER myconsumer my-topic 0 1688 1688 0 myconsumer_host1.example.com-1478188622741-7dab5ca7-0 myconsumer my-topic 1 1418 1418 0 myconsumer_host1.example.com-1478188622741-7dab5ca7-0 myconsumer my-topic 2 1314 1315 1 myconsumer_host1.example.com-1478188622741-7dab5ca7-0 myconsumer my-topic 3 2012 2012 0 myconsumer_host1.example.com-1478188622741-7dab5ca7-0 myconsumer my-topic 4 1089 1089 0 myconsumer_host1.example.com-1478188622741-7dab5ca7-0 myconsumer my-topic 5 1429 1432 3 myconsumer_host1.example.com-1478188622741-7dab5ca7-0 myconsumer my-topic 6 1634 1634 0 myconsumer_host1.example.com-1478188622741-7dab5ca7-0 myconsumer my-topic 7 2261 2261 0 myconsumer_host1.example.com-1478188622741-7dab5ca7-0 #
输出字段的解释如下表:
字段 | 描述 |
---|---|
GROUP | 消费者组的名称 |
TOPIC | 被消费的topic的名称 |
PARTITION | 被消费的分区ID |
CURRENT-OFFSET | 消费者组为这个topic分区提交的最后一个offset,这是消费者在分区中的位置。 |
LOG-END-OFFSET | Topic的当前高水位线的offset,这是生产者提交到消费者集群被确认的最后一条消息的offset |
LAG | 此Topic分区的消费者当前的offset和broker中水位线的差异 |
OWNER | 当前使用此topic的分区的消费者的组成员,这是消费者组成员提供的任意ID,不一定包括消费者的主机名 |
只支持对旧的消费者客户端删除消费者组。这将从zookeeper中删除整个组。包括该组正在使用的所有topic的所有被存储的offset。应该关闭组中的消费者,如果没有首先关闭所有消费者,则可能带来消费者的未定义行为,因为组的zookeeper元数据将在消费者使用的时候被删除。
删除示例如下:
# kafka-consumer-groups.sh --zookeeper
zoo1.example.com:2181/kafka-cluster --delete --group testgroup
Deleted all consumer group information for group testgroup in
zookeeper.
#
还可以使用相同的命令删除正在使用的当个topic的offset,而不是删除整个组。同样,江一在执行操作之前停止使用消费者组。或配置为不适用要删除的topic。
从名为testgroup的消费者组中删除my-topic的offset:
# kafka-consumer-groups.sh --zookeeper
zoo1.example.com:2181/kafka-cluster --delete --group testgroup
--topic my-topic
Deleted consumer group information for group testgroup topic
my-topic in zookeeper.
#
除了使用旧的消费者客户端显示和删除消费者组的offset之外,还可以检索offset并以批处理的方式存储新的offset。这对于在出现需要重新读取消息的问题时为使用重置offset非常有用。或者对于在消费者有问题之后的消息推进offset(入果存在消费者无法处理的格式化错误的消息)。
没有命名脚本来导出offset,但是我们可以使用kafka-run-class.sh 在适当的时候通过其底层的java类来执行该工具。导出offset并将其生成一个文件。该文件以导入工具可以读取的以定义的格式包含的组的每个topic的分区及offset。创建的文件每行有一个topic分区,格式如下:
/consumers/GROUPNAME/offsets/topic/TOPICNAME/PARTITIONID-0:OFFSET.
将名为testgroup的消费者组的offset导出到名为offsets的文件中:
# kafka-run-class.sh kafka.tools.ExportZkOffsets
--zkconnect zoo1.example.com:2181/kafka-cluster --group testgroup
--output-file offsets
# cat offsets
/consumers/testgroup/offsets/my-topic/0:8905
/consumers/testgroup/offsets/my-topic/1:8915
/consumers/testgroup/offsets/my-topic/2:9845
/consumers/testgroup/offsets/my-topic/3:8072
/consumers/testgroup/offsets/my-topic/4:8008
/consumers/testgroup/offsets/my-topic/5:8319
/consumers/testgroup/offsets/my-topic/6:8102
/consumers/testgroup/offsets/my-topic/7:12739
#
导入offset的工具与导出相反,它获取通过导出上一节中的offset生成的文件,并使用该文件设置消费者组的当前offset。一种常见的做法时导出消费者组的当前offset,对文件进行复制,并编辑该副本。以offset替换为所需要的值。注意,对于import命令,没有使用–group选项。这是因为消费者组名称要嵌入到要导入的文件中。
# kafka-run-class.sh kafka.tools.ImportZkOffsets --zkconnect
zoo1.example.com:2181/kafka-cluster --input-file offsets
#
在集群运行的过程中,可以对topic和客户端的配置进行覆盖。kafka的开发者打算在未来添加更多的动态配置,这就是为什么这些更改被放在一个单独的命令行工具kafka-config.sh中。这运行你为特定的topic和客户端id设置配置。一旦设置好,这些配置对于集群就是永远生效的。他们存储在zookeeper中,并在启动的死后由每个broker读取。在工具和文档中,像这样为每个topic或者客户端动态配置被称为重写。
与前面的工具一样,需要使用–zookeeper参数为集群提供zookeeper的连接字符串。在下面的示例中,假设zookeeper的连接字符串为zoo1.example.com:2181/kafka-cluster。
有许多应用于topic的配置,可以针对单个topic更改这些配置。以适应当个集群中的不同的用例。大多数配置都是在broker配置中指定的缺省的值。除非设置了覆盖,否则将使用缺省值。
更改topic的样例如下:
kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--alter --entity-type topics --entity-name <topic name>
--add-config <key>=<value>[,<key>=<value>...]
有效配置见下表:
key | 说明 |
---|---|
cleanup.policy | 如果设置为compact,则topic中 的消息将被丢弃,仅保留具有给定key的最新消息(日志压缩)。 |
compression.type | broker将消息写入磁盘时使用的压缩类型,可以用gzip、snappy和lz4. |
delete.retention.ms | 删除墓碑,将为这个topic保留多长时间。仅仅对日志压缩的topic有效。 |
file.delete.delay.ms | 从磁盘中删除此topic的日志端和索引之前需要等待的多长时间 |
flush.messages | 在强制将此topic的消息刷到磁盘之前接收的消息数 |
flush.ms | 在强制将此topic的消息刷到磁盘之前需要的时间,单位是ms |
index.interval.bytes | 日志段索引中的条目之间可以产生多少字节的消息 |
max.message.bytes | 此topic中当个消息的大小 |
message.format.version | broker将消息写入磁盘时使用的消息格式版本,必须是一个有效的版本号,如0.10.0 |
message.timestamp.difference.max.ms | 接收消息时,消息时间戳和broker时间戳之间允许的最大差异。只有当message.timestamp.type 是CreateTime时生效 |
message.timestamp.type | 将消息写入磁盘时使用的时间戳,当前值为createTime用于客户端指定的时间戳。LogAppendtime用于broker将消息写入分区的时间。 |
min.cleanable.dirty.ratio | 日志压缩器尝试为这个topic压缩分区的频率,表示未压缩的日志段数与日志段总数的比率,仅对日志压缩topic有效 |
min.insync.replicas | topic的一个分区必须同步的最小副本才能被认为是可用的 |
preallocate | 如果设置为true,则应该在滚动新段的时候预先分配此topic的日志段 |
retention.bytes | 为topic保留的消息量的总字节数 |
retention.ms | topic中消息保留的最长时间 |
segment.bytes | 写入分区中的单个日志段的消息大小 |
segment.index.bytes | 按日志段索引的最大大小,以字节为单位 |
segment.jitter.ms | 随机化添加到段的最大毫秒数,之后将生成新的段 |
segment.ms | 旋转每个分区的日志段的频率,以毫秒为单位 |
unclean.leader.election.enable | 如果设置为false,则不洁的领导人选举将不被允许 |
如将my-topic的topic的用户留存时间设置为1小时,3600万ms:
# kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--alter --entity-type topics --entity-name my-topic --add-config
retention.ms=3600000
Updated config for topic: "my-topic".
#
对于kafka客户端唯一可以配置的是生产者和消费者的配额,他们都是一个字节/秒的速率。允许具有指定客户端ID的所有客户端在每个broker的基础上生成或者使用。这意味着,如果集群中有5个broker,并且为一个客户端指定10M/s的生产者配额,那么该客户端将被允许在broker上同时生产10MB/s的总量为50MB/s。
更改客户端配置的格式如下:
kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--alter --entity-type clients --entity-name <client ID>
--add-config <key>=<value>[,<key>=<value>...]
客户端支持的配置参数如下表:
key | 说明 |
---|---|
producer_bytes_rate | 允许单个客户端ID在一秒内生成给单个broker的消息量。以字节为单位 |
consumer_bytes_rate | 允许单个消费者ID在一秒内单个broker中消费的消息量,以字节为单位 |
可以使用命令行工具列出所有的配置,这将允许你检查topic的特定配置,与其他工具类似,通过–describe命令即可:
# kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--describe --entity-type topics --entity-name my-topic
Configs for topics:my-topic are
retention.ms=3600000,segment.ms=3600000
#
可以完全删除动态配置,这将导致集群恢复到默认值,要删除配置覆盖,情使用alter命令和delete-config命令。
如下是一个重写retention.ms :
# kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--alter --entity-type topics --entity-name my-topic
--delete-config retention.ms
Updated config for topic: "my-topic".
#
kafka工具包含两个用于分区管理的脚本,一个允许副本选举,另外一个用于为broker分配分区的低级实用程序。这些工具一起可以帮助在kafka broker集群中实现消息流量的适当平衡。
如第六章所述,为了可靠性,分区可以有多个副本,但是,这些副本中只有一个可以做为分区的leader,并且所有生成和消费操作都发生在这个broker上,kafka internals将其定位副本列表中的第一个同步副本,但是当broerk停止并重写启动的时候,他不会自动恢复任何分区的领导权。
# kafka-preferred-replica-election.sh --zookeeper
zoo1.example.com:2181/kafka-cluster
Successfully started preferred replica election for partitions
Set([my-topic,5], [my-topic,0], [my-topic,7], [my-topic,4],
[my-topic,6], [my-topic,2], [my-topic,3], [my-topic,1])
#
对于有大量分区的集群,可能无法运行单一首选副本选择,请求必须写入集群元数据中的zookeeper的znode,如果请求大于znode的大小默认为1MB,则请求将失败。在本例中,需要创建一个包含j’son对象的文件,该对象列出要选择的分区,并将请求分解为多个步骤,json的格式为:
{
"partitions": [
{
"partition": 1,
"topic": "foo"
},
{
"partition": 2,
"topic": "foobar"
}
]
}
例如,在一个名为partitions.json的文件中指定分区列表,开始一个首选副本选择:
# kafka-preferred-replica-election.sh --zookeeper
zoo1.example.com:2181/kafka-cluster --path-to-json-file
partitions.json
Successfully started preferred replica election for partitions
Set([my-topic,1], [my-topic,2], [my-topic,3])
#
有时,可能需要更改分区的副本配置,需要有这样的例子:
kafka-reassign-partitions.sh能够用来执行这个操作。它必须包含如下两个步骤:第一步使用broker列表和topic列表来生成一组移动。第二本执行生成的移动。还有一个可选的第三步,它使用生成的列表来验证分区重写分配的进度或完成。
要生成一组分区移动,必须创建一个包含列出topic的JSON对象的文件。JSON对象的格式如下:
{
"topics": [
{
"topic": "foo"
},
{
"topic": "foo1"
}
],
"version": 1
}
例如,生成一组分区移动以移动文件中列出的topic.json发送给id为0和1的broker:
# kafka-reassign-partitions.sh --zookeeper zoo1.example.com:2181/kafka-cluster --generate --topics-to-move-json-file topics.json --broker-list 0,1 Current partition replica assignment {"version":1,"partitions":[{"topic":"my-topic","partition":5,"replicas":[0,1]}, {"topic":"my-topic","partition":10,"replicas":[1,0]},{"topic":"mytopic"," partition":1,"replicas":[0,1]},{"topic":"my-topic","partition":4,"repli cas":[1,0]},{"topic":"my-topic","partition":7,"replicas":[0,1]},{"topic":"mytopic"," partition":6,"replicas":[1,0]},{"topic":"my-topic","partition":3,"replicas":[0,1]},{"topic":"my-topic","partition":15,"replicas":[0,1]}, {"topic":"my-topic","partition":0,"replicas":[1,0]},{"topic":"mytopic"," partition":11,"replicas":[0,1]},{"topic":"my-topic","partition":8,"repli cas":[1,0]},{"topic":"my-topic","partition":12,"replicas":[1,0]},{"topic":"mytopic"," partition":2,"replicas":[1,0]},{"topic":"my-topic","partition": 13,"replicas":[0,1]},{"topic":"my-topic","partition":14,"replicas":[1,0]}, {"topic":"my-topic","partition":9,"replicas":[0,1]}]} Proposed partition reassignment configuration {"version":1,"partitions":[{"topic":"my-topic","partition":5,"replicas":[0,1]}, {"topic":"my-topic","partition":10,"replicas":[1,0]},{"topic":"mytopic"," partition":1,"replicas":[0,1]},{"topic":"my-topic","partition":4,"repli cas":[1,0]},{"topic":"my-topic","partition":7,"replicas":[0,1]},{"topic":"mytopic"," partition":6,"replicas":[1,0]},{"topic":"my-topic","partition": 15,"replicas":[0,1]},{"topic":"my-topic","partition":0,"replicas":[1,0]}, {"topic":"my-topic","partition":3,"replicas":[0,1]},{"topic":"mytopic"," partition":11,"replicas":[0,1]},{"topic":"my-topic","partition":8,"repli cas":[1,0]},{"topic":"my-topic","partition":12,"replicas":[1,0]},{"topic":"mytopic"," partition":13,"replicas":[0,1]},{"topic":"my-topic","partition": 2,"replicas":[1,0]},{"topic":"my-topic","partition":14,"replicas":[1,0]}, {"topic":"my-topic","partition":9,"replicas":[0,1]}]} #
broker列出在命令行工具中以都好分隔的broker ID列表的形式提供在标准的输出中。该工具将输出两个JSON对象。描述topic当前分区分配和建议的分区分配。JSON格式为:
{"partitions": [{"topic": "mytopic",
"partition": 0, "replicas": [1,2] }], "version":_1_}.
可以保持的第一个JSON对象,以备重写分配时需要恢复时使用。第二个JSON对象,应该保持到一个新的文件中,然后,这个文件被提供给kafka-reassign-partitions.sh工具在第二步使用。
如;从reassign.json执行分区建议分配:
# kafka-reassign-partitions.sh --zookeeper zoo1.example.com:2181/kafka-cluster --execute --reassignment-json-file reassign.json Current partition replica assignment {"version":1,"partitions":[{"topic":"my-topic","partition":5,"replicas":[0,1]}, {"topic":"my-topic","partition":10,"replicas":[1,0]},{"topic":"mytopic"," partition":1,"replicas":[0,1]},{"topic":"my-topic","partition":4,"repli cas":[1,0]},{"topic":"my-topic","partition":7,"replicas":[0,1]},{"topic":"mytopic"," partition":6,"replicas":[1,0]},{"topic":"my-topic","partition": 3,"replicas":[0,1]},{"topic":"my-topic","partition":15,"replicas":[0,1]}, {"topic":"my-topic","partition":0,"replicas":[1,0]},{"topic":"mytopic"," partition":11,"replicas":[0,1]},{"topic":"my-topic","partition":8,"repli cas":[1,0]},{"topic":"my-topic","partition":12,"replicas":[1,0]},{"topic":"mytopic"," partition":2,"replicas":[1,0]},{"topic":"my-topic","partition":13,"replicas":[0,1]},{"topic":"my-topic","partition":14,"replicas":[1,0]}, {"topic":"my-topic","partition":9,"replicas":[0,1]}]} Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions {"version":1,"partitions": [{"topic":"my-topic","partition":5,"replicas":[0,1]},{"topic":"mytopic"," partition":0,"replicas":[1,0]},{"topic":"my-topic","partition":7,"repli cas":[0,1]},{"topic":"my-topic","partition":13,"replicas":[0,1]},{"topic":"mytopic"," partition":4,"replicas":[1,0]},{"topic":"my-topic","partition": 12,"replicas":[1,0]},{"topic":"my-topic","partition":6,"replicas":[1,0]}, {"topic":"my-topic","partition":11,"replicas":[0,1]},{"topic":"mytopic"," partition":10,"replicas":[1,0]},{"topic":"my-topic","partition":9,"repli cas":[0,1]},{"topic":"my-topic","partition":2,"replicas":[1,0]},{"topic":"mytopic"," partition":14,"replicas":[1,0]},{"topic":"my-topic","partition": 3,"replicas":[0,1]},{"topic":"my-topic","partition":1,"replicas":[0,1]}, {"topic":"my-topic","partition":15,"replicas":[0,1]},{"topic":"mytopic"," partition":8,"replicas":[1,0]}]} #
这将启动将指定的分区副本重写分配到新的broker,集群控制器通过将新的副本添加到每个分区副本列表,增加副本因子,来执行此操作。然后,新的副本将从当前leader复制每个分区的所有现有消息。根据磁盘上分区的大小,在通过网络将数据复制到新的副本时,这可能会花费大量的时间。复制完成之后,控制器将从复制列表中删除旧的副本,将复制因子减少到原始的大小。
# kafka-reassign-partitions.sh --zookeeper zoo1.example.com:2181/kafka-cluster --verify --reassignment-json-file reassign.json Status of partition reassignment: Reassignment of partition [my-topic,5] completed successfully Reassignment of partition [my-topic,0] completed successfully Reassignment of partition [my-topic,7] completed successfully Reassignment of partition [my-topic,13] completed successfully Reassignment of partition [my-topic,4] completed successfully Reassignment of partition [my-topic,12] completed successfully Reassignment of partition [my-topic,6] completed successfully Reassignment of partition [my-topic,11] completed successfully Reassignment of partition [my-topic,10] completed successfully Reassignment of partition [my-topic,9] completed successfully Reassignment of partition [my-topic,2] completed successfully Reassignment of partition [my-topic,14] completed successfully Reassignment of partition [my-topic,3] completed successfully Reassignment of partition [my-topic,1] completed successfully Reassignment of partition [my-topic,15] completed successfully Reassignment of partition [my-topic,8] completed successfully #
分区重新分配工具中有一个未在文档中说明的特性,它允许你增加或者简述分区的副本因子。在使用错误的副本因子创建分区的情况下,这可能是必须的,假如在创建topic的时候没有足够的broker可用。这可以通过创建一个json对象来完成,该json对象的格式在分区重新分配的执行步骤中使用,该步骤条件或者删除副本以正确设置副本因子。集群将完成重新分配,并将复制因子保持在新的大小。
例如,考试一个名为my-topic的tipic的当前分配,他有一个分区,副本因子为1:
{
"partitions": [
{
"topic": "my-topic",
"partition": 0,
"replicas": [
1
]
}
],
"version": 1
}
在重新分配分区的执行步骤中提供以下json对象将导致副本因子增加到2:
{
"partitions": [
{
"partition": 0,
"replicas": [
1,
2
],
"topic": "my-topic"
}
],
"version": 1
}
类似的,通过提供具有更小副本列表的json对象,可以减少分区的副本呢因子。
如果你必须寻找消息的特定内容,可能是因为你的topic中出现了消费者无法处理的毒丸消息,那么你有一个辅助工具可以为分区解码日志段,这将允许你查看单独的消息,而不需要使用和解码他们。该工具以逗号分隔的日志段文件列表做为参数,并可以打印消息摘要信息或者详细的消息数据。
例如,解吗名为00000000000052368601.log的日志段文件:
# kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000052368601.log Dumping 00000000000052368601.log Starting offset: 52368601 offset: 52368601 position: 0 NoTimestampType: -1 isvalid: true payloadsize: 661 magic: 0 compresscodec: GZIPCompressionCodec crc: 1194341321 offset: 52368603 position: 687 NoTimestampType: -1 isvalid: true payloadsize: 895 magic: 0 compresscodec: GZIPCompressionCodec crc: 278946641 offset: 52368604 position: 1608 NoTimestampType: -1 isvalid: true payloadsize: 665 magic: 0 compresscodec: GZIPCompressionCodec crc: 3767466431 offset: 52368606 position: 2299 NoTimestampType: -1 isvalid: true payloadsize: 932 magic: 0 compresscodec: GZIPCompressionCodec crc: 2444301359 ...
解码名为00000000000052368601.log的日志段文件:
# kafka-run-class.sh kafka.tools.DumpLogSegments --files
00000000000052368601.log --print-data-log
offset: 52368601 position: 0 NoTimestampType: -1 isvalid: true
payloadsize: 661 magic: 0 compresscodec: GZIPCompressionCodec crc:
1194341321 payload: test message 1
offset: 52368603 position: 687 NoTimestampType: -1 isvalid: true
payloadsize: 895 magic: 0 compresscodec: GZIPCompressionCodec crc:
278946641 payload: test message 2
offset: 52368604 position: 1608 NoTimestampType: -1 isvalid: true
payloadsize: 665 magic: 0 compresscodec: GZIPCompressionCodec crc:
3767466431 payload: test message 3
offset: 52368606 position: 2299 NoTimestampType: -1 isvalid: true
payloadsize: 932 magic: 0 compresscodec: GZIPCompressionCodec crc:
2444301359 payload: test message 4
...
还可以使用此工具验证日志段附带的索引文件,索引用于查找日志段中的消息,如果消息被破坏,将导致错误的使用。只要broker在buclean状态启动,就会执行验证,但也可以手动执行。有两种检查索引的选项,这取决于你想要进行多少检查,选项–index-sanity-check将检查索引是否处于可用状态。–verify-index-only将检查索引中是否存在不匹配,而不会打印出所有的索引项。
如:验证00000000000052368601.log是否损坏:
# kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000052368601.index,00000000000052368601.log --index-sanity-check Dumping 00000000000052368601.index 00000000000052368601.index passed sanity check. Dumping 00000000000052368601.log Starting offset: 52368601 offset: 52368601 position: 0 NoTimestampType: -1 isvalid: true payloadsize: 661 magic: 0 compresscodec: GZIPCompressionCodec crc: 1194341321 offset: 52368603 position: 687 NoTimestampType: -1 isvalid: true payloadsize: 895 magic: 0 compresscodec: GZIPCompressionCodec crc: 278946641 offset: 52368604 position: 1608 NoTimestampType: -1 isvalid: true payloadsize: 665 magic: 0 compresscodec: GZIPCompressionCodec crc: 3767466431 ...
分区复制的工作原理类似于普通的kafka客户端,follower的broker在最老的offset开始复制,并定期检查磁盘当前的offset。当复制停止并重新启动时,它从最后要给检查点获取数据,以前的复制的日志段可以从broker中删除,在这种情况下,follower 不会填补空白。
为了验证topic分区的副本在集群中是否相同,可以使用kafka-replica-verification.sh进行验证,次攻击从给懂的topic分区集的所有副本中获取消息。并检查所有副本上是否存在所有的消息,必须为该工具提供一个正则表达式。以匹配希望验证的topic,如果没有提供,则验证所有的topic。还必须提供要连接的broker的显式列表。
# kafka-replica-verification.sh --broker-list
kafka1.example.com:9092,kafka2.example.com:9092 --topic-white-list 'my-.*'
2016-11-23 18:42:08,838: verification process is started.
2016-11-23 18:42:38,789: max lag is 0 for partition [my-topic,7]
at offset 53827844 among 10 partitions
2016-11-23 18:43:08,790: max lag is 0 for partition [my-topic,7]
at offset 53827878 among 10 partitions
在使用ApacheKafka的时候,你经常会发现需要手动使用消费或者生产一些示例消息,以验证应用程序的运行情况。提供了两个实用程序。kafka-console-consumer.sh 和 kafka-console-producer.sh。这是围绕java客户端端的包装器,允许你与kafka的topic交互,而无须编写整个应用程序。
kafka-console-consumer.sh 提供了一种使用来自kafka集群中的一个或者多个topic消息的方法,消息以标准输出的方式打印,然后分隔。默认情况下,它不使用格式输出消息中的原始字节,以下各段描述了所需的选项。
第一个选项是指定是否使用新的消费者,并让配置指向kafka集群本身,在使用较老的消费者的时候,唯一要的参数是–zookeeper选项。后面是集群的连接字符串。从上面的例子来看,这可能是–zookeeper zoo1.example.com:2181/kafka-cluster。 如果使用新的消费者,旧必须指定–new-consumer的标识和–broker-list选项。后面是逗号分隔的broker列表。如:–broker-list
kafka1.example.com:9092,kafka2.example.com:9092。
接下来必须指定要使用的topic,为此提供了三种选择。–topic, --whitelist, 和 --blacklist。只能使用其中一种。–topic选项指定要使用的单个topic。–whitelist与–blacklist每个选项后面都跟一个正则表达式,记住要对正则表达式转义
这样它就不会被shell命令修改。白名单将使用与正则表达式匹配的所有topic,而黑名单将使用除正则表达式匹配的topic之外的所有topic。
样例如下:
# kafka-console-consumer.sh --zookeeper
zoo1.example.com:2181/kafka-cluster --topic my-topic
sample message 1
sample message 2
^CProcessed a total of 2 messages
#
除了基本的命令行选项之外,还恶意将任何普通的用户配置选项传递给控制台用户,这可以通过两种方式完成,具体取决于你需要传递的选项的数量以及你喜欢的方式,第一个是通过指定提供消费者的配置文件,–consumer.config CONFIGFILE需要包含配置文件的完整路径。另外一种方法是使用表单的一个或者多个参数在命令行上指定选项,消费者属性KEY=VALUE,其中key是配置选项名。VALUE是设置它的值,者对于消费者选项如设置消费者组ID非常有用。
控制台消费者还有一些其他的参数,你应该知道:
除了默认的格式化器外,还有三种供选择:
再某些时候,查看集群的消费者组提交了哪些offset是非常关键的。你可能想知道某个特定的组是否正在提交offset,或者offset提交的频率是多少。这可以通过使用控制台消费者对__consumer_offsets这个特殊的内部topic进行消费来实现。为了解码这个topic总的消息,必须使用格式化程序kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter。
样例如下:
# kafka-console-consumer.sh --zookeeper
zoo1.example.com:2181/kafka-cluster --topic __consumer_offsets
--formatter 'kafka.coordinator.GroupMetadataManager$OffsetsMessage
Formatter' --max-messages 1
[my-group-name,my-topic,0]::[OffsetMetadata[481690879,NO_METADATA]
,CommitTime 1479708539051,ExpirationTime 1480313339051]
Processed a total of 1 messages
#
与控制台消费者程序类似,kakfa-console-producer.sh工具可以用于将消息写入该集群的kafka的topic中,默认情况下,每行读取一条消息,用tab分隔key和value。如果没有tab,则key为null。
控制台生产者中要求必须提供两个参数,–broker-list 用于指定一个或者多个broker。通过hostname:port格式,进行提供。–topic 指定你需要写入的topic。当你写入完成之后,发送一个文件结束字符EOF来关闭客户端。
样例如下,给my-topic写入信息:
# kafka-console-producer.sh --broker-list
kafka1.example.com:9092,kafka2.example.com:9092 --topic my-topic
sample message 1
sample message 2
^D
#
与控制台生产者一样,你也可以将任何普通的生产者配置选项传递给控制台生产者。这可以通过两种方式来完成,具体取决于你需要传递选项的参数以及你喜欢的方式。第一种方法是通过–producer.config CONFIGFILE指定生产者配置文件。其中CONFIGFILE是包含配置选项文件的完整路径。另外一种方法是再命令行上用表单的一个或者多个参数进行指定。–producer-property KEY=VALUE。其中KEY是配置选项名,VALUE是要设置的值。这对于消费者批处理配置等非常有用。如linger.ms 和 batch.size。
控制台生产者提供了很大命令行参数来调整其行为,一些更有用的选项是:
–key-serializer CLASSNAME 指定消息key序列化的编码类,默认为kafka.serializer.DefaultEncoder。
–value-serializer CLASSNAME 指定消息value序列化编码类,默认为kafka.serializer.DefaultEncoder。
–compression-codec STRING
指定再生成消息时使用的压缩类型,这可以是一个gzip,snappy或者lz4。默认是gzip。
–sync
同步生成消息,再发送下一条消息之前等待每个消息的ack。
Creating a Custom Serializer 创建消息的自定义序列化器,必须继承kafka.serializer.Encoder 这可以额从标准输入获取字符串,并将他们转换为适合的topic。如Avro和Protobuf。
kafka.tools.LineMessageReader 负责读取标准输入和创建生产者记录,也有几个有用的选项,可以通过–property传递到控制台生产者:
命令行工具kafka-acls.sh提供了kafka客户端的访问控制与交互,Apache kafka网站提供了关于acl的安全性和其他的文档。
有戏管理任务在技术上是可行的,但是除非在最极端的情况下,否则不应该尝试。通常是在诊断问题并没有其他选择的时候,或者是发现了需要临时解决的特定错误。这些任务通常没有文档记录,不受支持。并且会给应用程序带来一定的风险。
这里记录了其中一些较为常见的任务,以便在紧急情况下可以选择恢复。不建议在正常的集群中使用他们,应该在执行之前仔细考虑。
每个kafka集群都有一个控制器,它是一个在broker中运行的thread。控制器负责监督集群的操作,有时候需要强制将控制器移动到另外一个broker。一个这样的例子是当控制器遇到异常或者其他问题,使其无法运行而无法正常工作的时候,这些情况下移动控制器的风险并不高,但是这不是一项正常的任务,不应该定期执行。当前做为控制器的broker使用名为/controller的集群路径的顶层的zookeeper的节点注册。手动删除这个zookeeper节点将导致当前控制器退出,集群将选择一个新的控制器。
分区重新分配的正常操作流程为:
1.请求重新分配(创建zookeeper节点)。
2.集群控制器向添加的新的broker添加分区。
3.新的broker开始复制每个分区,知道它同步。
4.集群控制器从分区复制列表中删除旧的broker。
因为所有的重新分配都是在请求时并行重新启动,所有通常没有理由尝试取消正在进行的重新分配。一个例外是当broker在重新分配过程中失败而不能立即重新启动的时候。这将导致无法完成重新分配,从而排除启动任何额外的重新分配,例如从失败的broker中删除分区并将他们分配给其他的broker,在这种情况下,可能会使集群忘记所有的重新分配。
要删除正在进行的分区重新分配:
1.从zookeeper节点上kakfa集群的路径中删除/admin/reassign_partitions。
2.强制控制器移动。请参阅前文移动集群控制器的详细过程。
Checking Replication Factors 检查副本因子
在删除正在进行的分区移动的时候,任何尚未完成的分区都不会执行从复制列表中删除旧的broker的步骤。这意味着某些分区的复制因子可能大于预期。broker不允许对具有不一致副本因子的分区,(例如增加分区)的topi进行一些管理操作。建议检查仍在进行中的分区。并在重新分配的另外一个分区时确保它的复制因子是正确的。
当使用命令行工具删除topic的时候,zookeeper节点请求创建删除操作。在正常情况下,集群会立即执行此操作,但是,命令行工具无法知道集群中是否启用了topic删除操作,因此,无论如何,它都会请求删除topic,如果禁用了删除,则会导致意外的结果,可能集群会将删除操作挂起,以避免这种情况。
通过在zookeeper上的/admin/delete_topic节点创建一个子节点来删除topic。该节点以topic命名。删除这些zookeeper节点,将删除挂起的请求。
如果你运行的集群禁用了删除topic。或者你发现自己需要删除正常的操作流程之外的一些topic,那么可以从集群中手动删除他们。但是这需要完全关闭集群中的所有borker,并且在集群中的任何broker都在运行时不能这样做。
从集群中删除topic的过程:
运行kafka集群是一项艰巨的任务,需要进行大量的配置和维护工作,以保持系统在最高性能下能运行。在本章中,我们讨论了许多日常的任务,比如管理topic和经常需要处理的客户端配置。我们还介绍了调试问题所需要的一些更深的任务,比如检查日志段。最后,我们介绍了一些虽然不安全或者例行的操作,但是可以用于摆脱棘手的情况,总之,这些工具将帮助你管理kafka集群。
当然,如果没有适当的监控,管理集群是不可能的,第十章将讨论监控集群和集群的运行情况的操作方法,这样你可以七二班kafka工作良好,我们还将提供监控客户端的最佳办法,报告生产者和消费者。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。