当前位置:   article > 正文

09 Confluent_Kafka权威指南 第九章:管理kafka集群_confluent.kafka producerconfig

confluent.kafka producerconfig

CHAPTER 9 Administering Kafka 管理kafka

Kafka提供了几个命令行接口实用程序,他们对于kafka集群的配置管理非常有用。这些工具是通过java来实现的,并提供了一组脚本来调用这些类。这些工具提供了基本的功能,但是对于更复杂的操作,你可能会发现他们还是有些力不从心。本章将描述做为Apache Kafka开源项目的一部分的工具。在Apache桑可以找到关于社区中开发的高级工具的更多信息。详见kakfa官网。

  • 授权管理操作:
    虽然Apache Kafka实现了身份验证和授权来控制Topic的操作,但是大多数集群操作还不支持。这意味着这些命令行工具可以在不需要任何身份验证的情况下使用。这将允许在不进行安全检查或者审计的情况下执行诸如topic更改之类的操作。该功能正在开发中,应该很快就会添加。

Topic Operations 主题操作

kafka-topics.sh工具提供了快速创建、修改、删除和列出集群中的topic信息。配置管理已经移到kafka-configs.sh中。使用这些命令行工具,需要使用–zookeeper参数,如:zoo1.example.com:2181/kafka-cluster。

  • 版本检查Check the Version 许多命令行工具直接将元数据存储在zookeeper中,而不是broker。因此,务必确保你的工具版本与集群broker的版本匹配。最安全的办法是使用kafka broker服务端程序自带的工具。
Creating a New Topic 创建新的topic

在集群中创建topic的时候需要三个参数。这三个参数是必须的。尽管一些参数在broker上有默认值。

  • Topic Name : 你需要创建Topic的名称
  • Replication Factor:副本因子,在集群中维护topic的每个分区的副本数。
  • Partitions: 分区数,topic由这些分区组成。
Specifying Topic Configurations 指定Topic的配置

还可以在创建的时候显示设置topic的副本,或者设置配置参数对topic的配置进行覆盖。这些操作不在此讨论。配置覆盖可以在本章后面找到,他们可以提供给kafka-topics.sh 通过 --config 命令行参数使用。分区的配置也将在后续内容中介绍。

Topic的名称可以包含字母、数字、字符及下划线、破折号和点号。

  • topic的命名:允许但是不建议以两个下划线开头的topic名称。这种形式的topic呗视为集群内部的topic。如消费者组offset存储的topic是__consumer_offsets。也不建议在单个集群中同时使用句号和下划线,因为在内部统计topic的时候,句号被改为了下划线,如:topic.1在统计中将变成topic_1。

kafka-topics.sh 示例如下:

kafka-topics.sh --zookeeper <zookeeper connect> --create --topic <string>
--replication-factor <integer> --partitions <integer>
  • 1
  • 2

该命令将导致集群创建具有指定名称和分区数量的topic。对于每个分区,集群将选择适当的副本数。这意味着,如果集群设置为支持机架的副本分配,那么每个分区的副本将位于单独的机架中,如果不需要机架支持,那么可以通过–disable-rack-aware 关闭。
如下创建名为 my-topic的topic,包含8个分区,每个分区有2个副本:

# kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster --create
--topic my-topic --replication-factor 2 --partitions 8
Created topic "my-topic".
#
  • 1
  • 2
  • 3
  • 4
  • 跳过存在错误:当用上述脚本进行自动化操作的时候,你可以使用–if-not-exists参数,如果Topic已经存在,则不会返回一个错误。
Adding Partitions 添加分区

有时需要增加topic的分区数量,分区是在集群中扩展和复制TOPIC的方式。增加分区技术的最常见的原因是为了进一步扩展topic,或者降低单个分区的吞吐量。如果消费者需要扩展在单个组中运行更多的副本,则Topic也可以增加,因为分区只能由组中的单个成员使用。

  • 调整topic的key 在消费者角度来看,使用key来控制的topic很难添加分区。这是因为key到分区的映射将随着分区数量的改变而改变。由于这个原因,建议在创建topic的时候设置一次包含key控制消息的topic的分区数量,并避免调整topic的大小。
  • 跳过不存在topic的错误。虽然为–alter命令提供了一个–if-exists参数,但是不建议使用它。如果正在修改的topic不存在,使用此参数将导致命令不返回错误。这可能会掩盖本应该创建topic的topic不存在问题。
    例如,将my-topic的分区增加到16个:
# kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--alter --topic my-topic --partitions 16
WARNING: If partitions are increased for a topic that has a key,
the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
#
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 减少分区的数量。不可能减少topic分区的数量,不支持此操作的原因是,从topic中删除的分区将导致该topic的部分数据也被删除。从消费者的角度来看,这是不一致的。此外,尝试将数据重新分发到剩余的分区也会很困难。并导致无序的消息。如果需要减少分区的数量,则需要删除topic并重新创建它。
删除topic

即使没有消息的topic也会使用集群的资源,包括磁盘空间,打开的文件句柄和内存。如果topic不再需要,可以删除它并释放这些资源。为了执行此操作,集群中的broker必须配置了delete.topic.enable选项为true。如果这个选项设置为false,则这个操作将被忽略。

  • 删除数据之前需要注意:删除一个topic也会删除它的全部消息。这是不可逆的操作,所以一定要小心执行。

例如,删除my-topic的topic:

# kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--delete --topic my-topic
Topic my-topic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set
to true.
#
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
Listing All Topics in a Cluster 列出集群所有的topic

topic工具可以列出集群所有的topic,列表格式为每行一个topic,没有特定的顺序。
样例如下:

# kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--list
my-topic - marked for deletion
other-topic
#
  • 1
  • 2
  • 3
  • 4
  • 5
Describing Topic Details 描述topic的细节

还可以获得关于集群中的一个或者多个topic的详细信息,输出包括分区计数,topic配置覆盖以及每个分区及其副本分配的清单。通过向命令行提供一个topic参数,可以将此限制为单个topic主题。
样例如下:

# kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster --describe
Topic:other-topic PartitionCount:8 ReplicationFactor:2 Configs:
Topic:other-topic Partition: 0 ... Replicas: 1,0 Isr: 1,0
Topic:other-topic Partition: 1 ... Replicas: 0,1 Isr: 0,1
Topic:other-topic Partition: 2 ... Replicas: 1,0 Isr: 1,0
Topic:other-topic Partition: 3 ... Replicas: 0,1 Isr: 0,1
Topic:other-topic Partition: 4 ... Replicas: 1,0 Isr: 1,0
Topic:other-topic Partition: 5 ... Replicas: 0,1 Isr: 0,1
Topic:other-topic Partition: 6 ... Replicas: 1,0 Isr: 1,0
Topic:other-topic Partition: 7 ... Replicas: 0,1 Isr: 0,1
#
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

describe命令还有几个用于过滤输出的选项。这有助于诊断集群问题。对于其中每一个,不要指定–topic参数。因为目的是查找集群中匹配条件的所有topic和分区。因此这些选项不适用于list命令。
为了找到所有具有覆盖的topic,请使用 --topics-withoverrides 参数,这将只描述配置与缺省值不同的topic。这将只描述配置与集群缺省的不同的topic。
有两个过滤器用于查找所有问题的分区,–underreplicated-partitions 将显示一个或者多个副本与leader不同步的所有分区。–unavailable-partitions显示没有leader的所有分区。这是一种更严重的情况,意味着该分区目前处于脱机状态,不能用于客户端生产或者使用。
样例如下:

# kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--describe --under-replicated-partitions
Topic: other-topic Partition: 2 Leader: 0 Replicas: 1,0
Isr: 0
Topic: other-topic Partition: 4 Leader: 0 Replicas: 1,0
Isr: 0
#
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

Consumer Groups 消费者组

kafka中的消费者组分为两个不同的地方维护,对于较老的消费者,信息存储在zookeeper中。而对于版本比较新的消费者,信息存储在kafka中的特定topic中。kafka-consumer-groups.sh可以列出这两种类型的消费者组,它还可以用于删除消费者组的offset。但仅仅用在旧的消费者组下运行的组中,在zookeeper中维护的消费者组。在与较老的消费者组一起工作时,你将访问–zookeeper参数指定的kafka集群。对于新版本的消费者组,你需要通过–bootstrap-server指定kafka broker的IP和端口。

列出组详情

旧的消费者详情:

# kafka-consumer-groups.sh --zookeeper
zoo1.example.com:2181/kafka-cluster --list
console-consumer-79697
myconsumer
#
  • 1
  • 2
  • 3
  • 4
  • 5

新的消费者:

# kafka-consumer-groups.sh --new-consumer --bootstrap-server
kafka1.example.com:9092/kafka-cluster --list
kafka-python-test
my-new-consumer
#
  • 1
  • 2
  • 3
  • 4
  • 5

如果要查看一个消费者组的情况,你可以用–describe替换–list之后加上–group参数。这将列出当前指定的消费者组正在使用的topic以及每个topic的offset。
如下对testgroup进行查看:

# kafka-consumer-groups.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--describe --group testgroup
GROUP TOPIC PARTITION
CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
myconsumer my-topic 0
1688 1688 0
myconsumer_host1.example.com-1478188622741-7dab5ca7-0
myconsumer my-topic 1
1418 1418 0
myconsumer_host1.example.com-1478188622741-7dab5ca7-0
myconsumer my-topic 2
1314 1315 1
myconsumer_host1.example.com-1478188622741-7dab5ca7-0
myconsumer my-topic 3
2012 2012 0
myconsumer_host1.example.com-1478188622741-7dab5ca7-0
myconsumer my-topic 4
1089 1089 0
myconsumer_host1.example.com-1478188622741-7dab5ca7-0
myconsumer my-topic 5
1429 1432 3
myconsumer_host1.example.com-1478188622741-7dab5ca7-0
myconsumer my-topic 6
1634 1634 0
myconsumer_host1.example.com-1478188622741-7dab5ca7-0
myconsumer my-topic 7
2261 2261 0
myconsumer_host1.example.com-1478188622741-7dab5ca7-0
#
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

输出字段的解释如下表:

字段描述
GROUP消费者组的名称
TOPIC被消费的topic的名称
PARTITION被消费的分区ID
CURRENT-OFFSET消费者组为这个topic分区提交的最后一个offset,这是消费者在分区中的位置。
LOG-END-OFFSETTopic的当前高水位线的offset,这是生产者提交到消费者集群被确认的最后一条消息的offset
LAG此Topic分区的消费者当前的offset和broker中水位线的差异
OWNER当前使用此topic的分区的消费者的组成员,这是消费者组成员提供的任意ID,不一定包括消费者的主机名
Delete Group 删除消费者组

只支持对旧的消费者客户端删除消费者组。这将从zookeeper中删除整个组。包括该组正在使用的所有topic的所有被存储的offset。应该关闭组中的消费者,如果没有首先关闭所有消费者,则可能带来消费者的未定义行为,因为组的zookeeper元数据将在消费者使用的时候被删除。
删除示例如下:

# kafka-consumer-groups.sh --zookeeper
zoo1.example.com:2181/kafka-cluster --delete --group testgroup
Deleted all consumer group information for group testgroup in
zookeeper.
#
  • 1
  • 2
  • 3
  • 4
  • 5

还可以使用相同的命令删除正在使用的当个topic的offset,而不是删除整个组。同样,江一在执行操作之前停止使用消费者组。或配置为不适用要删除的topic。
从名为testgroup的消费者组中删除my-topic的offset:

# kafka-consumer-groups.sh --zookeeper
zoo1.example.com:2181/kafka-cluster --delete --group testgroup
--topic my-topic
Deleted consumer group information for group testgroup topic
my-topic in zookeeper.
#
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
Offset Management offset管理

除了使用旧的消费者客户端显示和删除消费者组的offset之外,还可以检索offset并以批处理的方式存储新的offset。这对于在出现需要重新读取消息的问题时为使用重置offset非常有用。或者对于在消费者有问题之后的消息推进offset(入果存在消费者无法处理的格式化错误的消息)。

  • 管理offset的commit 对于将offset提交给kafka的消费者客户端,目前还米有可用的工具来管理offset,此功能仅对向zookeeper提交offset的用户可用。以管理承诺的组的offset,你必须使用客户端中可用的api来提交组的offset。
Export Offsets 导出offsets

没有命名脚本来导出offset,但是我们可以使用kafka-run-class.sh 在适当的时候通过其底层的java类来执行该工具。导出offset并将其生成一个文件。该文件以导入工具可以读取的以定义的格式包含的组的每个topic的分区及offset。创建的文件每行有一个topic分区,格式如下:

/consumers/GROUPNAME/offsets/topic/TOPICNAME/PARTITIONID-0:OFFSET.
  • 1

将名为testgroup的消费者组的offset导出到名为offsets的文件中:

# kafka-run-class.sh kafka.tools.ExportZkOffsets
--zkconnect zoo1.example.com:2181/kafka-cluster --group testgroup
--output-file offsets
# cat offsets
/consumers/testgroup/offsets/my-topic/0:8905
/consumers/testgroup/offsets/my-topic/1:8915
/consumers/testgroup/offsets/my-topic/2:9845
/consumers/testgroup/offsets/my-topic/3:8072
/consumers/testgroup/offsets/my-topic/4:8008
/consumers/testgroup/offsets/my-topic/5:8319
/consumers/testgroup/offsets/my-topic/6:8102
/consumers/testgroup/offsets/my-topic/7:12739
#
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
Import Offsets 导入offsets

导入offset的工具与导出相反,它获取通过导出上一节中的offset生成的文件,并使用该文件设置消费者组的当前offset。一种常见的做法时导出消费者组的当前offset,对文件进行复制,并编辑该副本。以offset替换为所需要的值。注意,对于import命令,没有使用–group选项。这是因为消费者组名称要嵌入到要导入的文件中。

  • 注意,首先要关闭消费者。在执行此步骤之前,必须停止消费者组中的所有消费者。如果在消费者组处于活动状态时写入新的offset,则不会读取这些offset。消费者会将这些写入的offset覆盖。
    从一个名为offsets的文件中导入名为testgroup的消费者组的offset。
# kafka-run-class.sh kafka.tools.ImportZkOffsets --zkconnect
zoo1.example.com:2181/kafka-cluster --input-file offsets
#
  • 1
  • 2
  • 3

Dynamic Configuration Changes 配置动态修改

在集群运行的过程中,可以对topic和客户端的配置进行覆盖。kafka的开发者打算在未来添加更多的动态配置,这就是为什么这些更改被放在一个单独的命令行工具kafka-config.sh中。这运行你为特定的topic和客户端id设置配置。一旦设置好,这些配置对于集群就是永远生效的。他们存储在zookeeper中,并在启动的死后由每个broker读取。在工具和文档中,像这样为每个topic或者客户端动态配置被称为重写。
与前面的工具一样,需要使用–zookeeper参数为集群提供zookeeper的连接字符串。在下面的示例中,假设zookeeper的连接字符串为zoo1.example.com:2181/kafka-cluster。

Overriding Topic Configuration Defaults 覆盖topic配置的默认值

有许多应用于topic的配置,可以针对单个topic更改这些配置。以适应当个集群中的不同的用例。大多数配置都是在broker配置中指定的缺省的值。除非设置了覆盖,否则将使用缺省值。
更改topic的样例如下:

kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--alter --entity-type topics --entity-name <topic name>
--add-config <key>=<value>[,<key>=<value>...]
  • 1
  • 2
  • 3

有效配置见下表:

key说明
cleanup.policy如果设置为compact,则topic中 的消息将被丢弃,仅保留具有给定key的最新消息(日志压缩)。
compression.typebroker将消息写入磁盘时使用的压缩类型,可以用gzip、snappy和lz4.
delete.retention.ms删除墓碑,将为这个topic保留多长时间。仅仅对日志压缩的topic有效。
file.delete.delay.ms从磁盘中删除此topic的日志端和索引之前需要等待的多长时间
flush.messages在强制将此topic的消息刷到磁盘之前接收的消息数
flush.ms在强制将此topic的消息刷到磁盘之前需要的时间,单位是ms
index.interval.bytes日志段索引中的条目之间可以产生多少字节的消息
max.message.bytes此topic中当个消息的大小
message.format.versionbroker将消息写入磁盘时使用的消息格式版本,必须是一个有效的版本号,如0.10.0
message.timestamp.difference.max.ms接收消息时,消息时间戳和broker时间戳之间允许的最大差异。只有当message.timestamp.type 是CreateTime时生效
message.timestamp.type将消息写入磁盘时使用的时间戳,当前值为createTime用于客户端指定的时间戳。LogAppendtime用于broker将消息写入分区的时间。
min.cleanable.dirty.ratio日志压缩器尝试为这个topic压缩分区的频率,表示未压缩的日志段数与日志段总数的比率,仅对日志压缩topic有效
min.insync.replicastopic的一个分区必须同步的最小副本才能被认为是可用的
preallocate如果设置为true,则应该在滚动新段的时候预先分配此topic的日志段
retention.bytes为topic保留的消息量的总字节数
retention.mstopic中消息保留的最长时间
segment.bytes写入分区中的单个日志段的消息大小
segment.index.bytes按日志段索引的最大大小,以字节为单位
segment.jitter.ms随机化添加到段的最大毫秒数,之后将生成新的段
segment.ms旋转每个分区的日志段的频率,以毫秒为单位
unclean.leader.election.enable如果设置为false,则不洁的领导人选举将不被允许

如将my-topic的topic的用户留存时间设置为1小时,3600万ms:

# kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--alter --entity-type topics --entity-name my-topic --add-config
retention.ms=3600000
Updated config for topic: "my-topic".
#
  • 1
  • 2
  • 3
  • 4
  • 5
Overriding Client Configuration Defaults 重写客户端的缺省配置

对于kafka客户端唯一可以配置的是生产者和消费者的配额,他们都是一个字节/秒的速率。允许具有指定客户端ID的所有客户端在每个broker的基础上生成或者使用。这意味着,如果集群中有5个broker,并且为一个客户端指定10M/s的生产者配额,那么该客户端将被允许在broker上同时生产10MB/s的总量为50MB/s。

  • 客户端ID与消费者组 客户端ID不一定与消费者组的名称相同,消费者可以设置他们自己的客户端ID,而且你可能有许多位于不同组的消费者,他们指定的相同的客户端ID,最佳的方法是将每个消费者组的客户端ID设置为标识该组的唯一值/这运行单个消费者组共享配额,并且更容易的在日志中确定哪个组负责请求。

更改客户端配置的格式如下:

kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--alter --entity-type clients --entity-name <client ID>
--add-config <key>=<value>[,<key>=<value>...]
  • 1
  • 2
  • 3

客户端支持的配置参数如下表:

key说明
producer_bytes_rate允许单个客户端ID在一秒内生成给单个broker的消息量。以字节为单位
consumer_bytes_rate允许单个消费者ID在一秒内单个broker中消费的消息量,以字节为单位
Describing Configuration Overrides 配置覆盖说明

可以使用命令行工具列出所有的配置,这将允许你检查topic的特定配置,与其他工具类似,通过–describe命令即可:

# kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--describe --entity-type topics --entity-name my-topic
Configs for topics:my-topic are
retention.ms=3600000,segment.ms=3600000
#
  • 1
  • 2
  • 3
  • 4
  • 5
  • topic 覆盖: 配置describe 值显示覆盖,它不包括集群默认的配置。目前,无论是通过zookeeper还是kafka存储的新旧版本,都无法动态地发现broker本身的配置,这意味着,当使用此工具在自动会发现topic或者客户端的设置时,该工具必须具有集群默认的配置的独立知识。
Removing Configuration Overrides 删除覆盖的配置

可以完全删除动态配置,这将导致集群恢复到默认值,要删除配置覆盖,情使用alter命令和delete-config命令。
如下是一个重写retention.ms :

# kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster
--alter --entity-type topics --entity-name my-topic
--delete-config retention.ms
Updated config for topic: "my-topic".
#
  • 1
  • 2
  • 3
  • 4
  • 5

Partition Management 分区管理

kafka工具包含两个用于分区管理的脚本,一个允许副本选举,另外一个用于为broker分配分区的低级实用程序。这些工具一起可以帮助在kafka broker集群中实现消息流量的适当平衡。

Preferred Replica Election

如第六章所述,为了可靠性,分区可以有多个副本,但是,这些副本中只有一个可以做为分区的leader,并且所有生成和消费操作都发生在这个broker上,kafka internals将其定位副本列表中的第一个同步副本,但是当broerk停止并重写启动的时候,他不会自动恢复任何分区的领导权。

  • Automatic Leader Rebalancing 自动reblance
    一个用于自动leader重平衡的broker配置,不建议用于生产环节。自动reblance会对性能造成重大影响,对于较大的集群,他会导致客户端流量长时间暂停。
    让broker恢复领导抵为的一种方式是触发一种首选的副本选举机制。这告诉集群控制器为分区选择理想的leader,该操作通常不会产生影响,因为客户可以自动跟踪leader的变更,这可以使kafka-preferred-replicaelection.sh手动进行。
    为集群中所有的topic启动一个首选副本选择:
# kafka-preferred-replica-election.sh --zookeeper
zoo1.example.com:2181/kafka-cluster
Successfully started preferred replica election for partitions
Set([my-topic,5], [my-topic,0], [my-topic,7], [my-topic,4],
[my-topic,6], [my-topic,2], [my-topic,3], [my-topic,1])
#
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

对于有大量分区的集群,可能无法运行单一首选副本选择,请求必须写入集群元数据中的zookeeper的znode,如果请求大于znode的大小默认为1MB,则请求将失败。在本例中,需要创建一个包含j’son对象的文件,该对象列出要选择的分区,并将请求分解为多个步骤,json的格式为:

{
"partitions": [
{
"partition": 1,
"topic": "foo"
},
{
"partition": 2,
"topic": "foobar"
}
]
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

例如,在一个名为partitions.json的文件中指定分区列表,开始一个首选副本选择:

# kafka-preferred-replica-election.sh --zookeeper
zoo1.example.com:2181/kafka-cluster --path-to-json-file
partitions.json
Successfully started preferred replica election for partitions
Set([my-topic,1], [my-topic,2], [my-topic,3])
#
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
Changing a Partition’s Replicas 更改分区的副本数

有时,可能需要更改分区的副本配置,需要有这样的例子:

  • 如果主题的分区在集群中不平衡,导致broker上的负载不均匀。
  • 如果broker脱机,且分区复制不足
  • 如果添加了一个新的borker,并且需要接收集群的负载的共享

kafka-reassign-partitions.sh能够用来执行这个操作。它必须包含如下两个步骤:第一步使用broker列表和topic列表来生成一组移动。第二本执行生成的移动。还有一个可选的第三步,它使用生成的列表来验证分区重写分配的进度或完成。
要生成一组分区移动,必须创建一个包含列出topic的JSON对象的文件。JSON对象的格式如下:

{
"topics": [
{
"topic": "foo"
},
{
"topic": "foo1"
}
],
"version": 1
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

例如,生成一组分区移动以移动文件中列出的topic.json发送给id为0和1的broker:

# kafka-reassign-partitions.sh --zookeeper
zoo1.example.com:2181/kafka-cluster --generate
--topics-to-move-json-file topics.json --broker-list 0,1
Current partition replica assignment
{"version":1,"partitions":[{"topic":"my-topic","partition":5,"replicas":[0,1]},
{"topic":"my-topic","partition":10,"replicas":[1,0]},{"topic":"mytopic","
partition":1,"replicas":[0,1]},{"topic":"my-topic","partition":4,"repli
cas":[1,0]},{"topic":"my-topic","partition":7,"replicas":[0,1]},{"topic":"mytopic","
partition":6,"replicas":[1,0]},{"topic":"my-topic","partition":3,"replicas":[0,1]},{"topic":"my-topic","partition":15,"replicas":[0,1]},
{"topic":"my-topic","partition":0,"replicas":[1,0]},{"topic":"mytopic","
partition":11,"replicas":[0,1]},{"topic":"my-topic","partition":8,"repli
cas":[1,0]},{"topic":"my-topic","partition":12,"replicas":[1,0]},{"topic":"mytopic","
partition":2,"replicas":[1,0]},{"topic":"my-topic","partition":
13,"replicas":[0,1]},{"topic":"my-topic","partition":14,"replicas":[1,0]},
{"topic":"my-topic","partition":9,"replicas":[0,1]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"my-topic","partition":5,"replicas":[0,1]},
{"topic":"my-topic","partition":10,"replicas":[1,0]},{"topic":"mytopic","
partition":1,"replicas":[0,1]},{"topic":"my-topic","partition":4,"repli
cas":[1,0]},{"topic":"my-topic","partition":7,"replicas":[0,1]},{"topic":"mytopic","
partition":6,"replicas":[1,0]},{"topic":"my-topic","partition":
15,"replicas":[0,1]},{"topic":"my-topic","partition":0,"replicas":[1,0]},
{"topic":"my-topic","partition":3,"replicas":[0,1]},{"topic":"mytopic","
partition":11,"replicas":[0,1]},{"topic":"my-topic","partition":8,"repli
cas":[1,0]},{"topic":"my-topic","partition":12,"replicas":[1,0]},{"topic":"mytopic","
partition":13,"replicas":[0,1]},{"topic":"my-topic","partition":
2,"replicas":[1,0]},{"topic":"my-topic","partition":14,"replicas":[1,0]},
{"topic":"my-topic","partition":9,"replicas":[0,1]}]}
#
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

broker列出在命令行工具中以都好分隔的broker ID列表的形式提供在标准的输出中。该工具将输出两个JSON对象。描述topic当前分区分配和建议的分区分配。JSON格式为:

{"partitions": [{"topic": "mytopic",
"partition": 0, "replicas": [1,2] }], "version":_1_}.
  • 1
  • 2

可以保持的第一个JSON对象,以备重写分配时需要恢复时使用。第二个JSON对象,应该保持到一个新的文件中,然后,这个文件被提供给kafka-reassign-partitions.sh工具在第二步使用。
如;从reassign.json执行分区建议分配:

# kafka-reassign-partitions.sh --zookeeper
zoo1.example.com:2181/kafka-cluster --execute
--reassignment-json-file reassign.json
Current partition replica assignment
{"version":1,"partitions":[{"topic":"my-topic","partition":5,"replicas":[0,1]},
{"topic":"my-topic","partition":10,"replicas":[1,0]},{"topic":"mytopic","
partition":1,"replicas":[0,1]},{"topic":"my-topic","partition":4,"repli
cas":[1,0]},{"topic":"my-topic","partition":7,"replicas":[0,1]},{"topic":"mytopic","
partition":6,"replicas":[1,0]},{"topic":"my-topic","partition":
3,"replicas":[0,1]},{"topic":"my-topic","partition":15,"replicas":[0,1]},
{"topic":"my-topic","partition":0,"replicas":[1,0]},{"topic":"mytopic","
partition":11,"replicas":[0,1]},{"topic":"my-topic","partition":8,"repli
cas":[1,0]},{"topic":"my-topic","partition":12,"replicas":[1,0]},{"topic":"mytopic","
partition":2,"replicas":[1,0]},{"topic":"my-topic","partition":13,"replicas":[0,1]},{"topic":"my-topic","partition":14,"replicas":[1,0]},
{"topic":"my-topic","partition":9,"replicas":[0,1]}]}
Save this to use as the --reassignment-json-file option during
rollback
Successfully started reassignment of partitions {"version":1,"partitions":
[{"topic":"my-topic","partition":5,"replicas":[0,1]},{"topic":"mytopic","
partition":0,"replicas":[1,0]},{"topic":"my-topic","partition":7,"repli
cas":[0,1]},{"topic":"my-topic","partition":13,"replicas":[0,1]},{"topic":"mytopic","
partition":4,"replicas":[1,0]},{"topic":"my-topic","partition":
12,"replicas":[1,0]},{"topic":"my-topic","partition":6,"replicas":[1,0]},
{"topic":"my-topic","partition":11,"replicas":[0,1]},{"topic":"mytopic","
partition":10,"replicas":[1,0]},{"topic":"my-topic","partition":9,"repli
cas":[0,1]},{"topic":"my-topic","partition":2,"replicas":[1,0]},{"topic":"mytopic","
partition":14,"replicas":[1,0]},{"topic":"my-topic","partition":
3,"replicas":[0,1]},{"topic":"my-topic","partition":1,"replicas":[0,1]},
{"topic":"my-topic","partition":15,"replicas":[0,1]},{"topic":"mytopic","
partition":8,"replicas":[1,0]}]}
#
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

这将启动将指定的分区副本重写分配到新的broker,集群控制器通过将新的副本添加到每个分区副本列表,增加副本因子,来执行此操作。然后,新的副本将从当前leader复制每个分区的所有现有消息。根据磁盘上分区的大小,在通过网络将数据复制到新的副本时,这可能会花费大量的时间。复制完成之后,控制器将从复制列表中删除旧的副本,将复制因子减少到原始的大小。

  • 改善在重新分配副本的时候的网络利用率:当从当个broker删除许多分区的时候,比如从集群中删除该broker,最佳的实践时在启动重写分配之前关闭并重写启动broker。这把特定的broker上分区的领导权转移到集群中其他broker上,这可以显著提高重写分配的时候的性能,并减少对集群的影响,因为复制流量将分配到许多的broker。
    当重写分配运行时,再它完成后,kafka-reassignpartitions.sh可以用来验证重写分配的状态。这将显示哪些重写分配正在进行中,什么重写分配已经完成,如果出现错误,什么重写分配已经失败。为此,你必须拥有在此执行步骤中使用的带有JSON对象的文件。
    例如,从reassign.json校验正在运行的重写分配:
# kafka-reassign-partitions.sh --zookeeper
zoo1.example.com:2181/kafka-cluster --verify
--reassignment-json-file reassign.json
Status of partition reassignment:
Reassignment of partition [my-topic,5] completed successfully
Reassignment of partition [my-topic,0] completed successfully
Reassignment of partition [my-topic,7] completed successfully
Reassignment of partition [my-topic,13] completed successfully
Reassignment of partition [my-topic,4] completed successfully
Reassignment of partition [my-topic,12] completed successfully
Reassignment of partition [my-topic,6] completed successfully
Reassignment of partition [my-topic,11] completed successfully
Reassignment of partition [my-topic,10] completed successfully
Reassignment of partition [my-topic,9] completed successfully
Reassignment of partition [my-topic,2] completed successfully
Reassignment of partition [my-topic,14] completed successfully
Reassignment of partition [my-topic,3] completed successfully
Reassignment of partition [my-topic,1] completed successfully
Reassignment of partition [my-topic,15] completed successfully
Reassignment of partition [my-topic,8] completed successfully
#
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • Batching Reassignments批次分配
    分区重新分配对集群的性能有很大的影响,因为他们会导致内存页缓存的一致性发生变化。并使用网络和磁盘IO,将重新分配分解为许多小步骤时一个保持这种最小化的好主意。
Changing Replication Factor 改变副本因子

分区重新分配工具中有一个未在文档中说明的特性,它允许你增加或者简述分区的副本因子。在使用错误的副本因子创建分区的情况下,这可能是必须的,假如在创建topic的时候没有足够的broker可用。这可以通过创建一个json对象来完成,该json对象的格式在分区重新分配的执行步骤中使用,该步骤条件或者删除副本以正确设置副本因子。集群将完成重新分配,并将复制因子保持在新的大小。
例如,考试一个名为my-topic的tipic的当前分配,他有一个分区,副本因子为1:

{
"partitions": [
{
"topic": "my-topic",
"partition": 0,
"replicas": [
1
]
}
],
"version": 1
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

在重新分配分区的执行步骤中提供以下json对象将导致副本因子增加到2:

{
"partitions": [
{
"partition": 0,
"replicas": [
1,
2
],
"topic": "my-topic"
}
],
"version": 1
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

类似的,通过提供具有更小副本列表的json对象,可以减少分区的副本呢因子。

Dumping Log Segments

如果你必须寻找消息的特定内容,可能是因为你的topic中出现了消费者无法处理的毒丸消息,那么你有一个辅助工具可以为分区解码日志段,这将允许你查看单独的消息,而不需要使用和解码他们。该工具以逗号分隔的日志段文件列表做为参数,并可以打印消息摘要信息或者详细的消息数据。
例如,解吗名为00000000000052368601.log的日志段文件:

# kafka-run-class.sh kafka.tools.DumpLogSegments --files
00000000000052368601.log
Dumping 00000000000052368601.log
Starting offset: 52368601
offset: 52368601 position: 0 NoTimestampType: -1 isvalid: true
payloadsize: 661 magic: 0 compresscodec: GZIPCompressionCodec crc:
1194341321
offset: 52368603 position: 687 NoTimestampType: -1 isvalid: true
payloadsize: 895 magic: 0 compresscodec: GZIPCompressionCodec crc:
278946641
offset: 52368604 position: 1608 NoTimestampType: -1 isvalid: true
payloadsize: 665 magic: 0 compresscodec: GZIPCompressionCodec crc:
3767466431
offset: 52368606 position: 2299 NoTimestampType: -1 isvalid: true
payloadsize: 932 magic: 0 compresscodec: GZIPCompressionCodec crc:
2444301359
...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

解码名为00000000000052368601.log的日志段文件:

# kafka-run-class.sh kafka.tools.DumpLogSegments --files
00000000000052368601.log --print-data-log
offset: 52368601 position: 0 NoTimestampType: -1 isvalid: true
payloadsize: 661 magic: 0 compresscodec: GZIPCompressionCodec crc:
1194341321 payload: test message 1
offset: 52368603 position: 687 NoTimestampType: -1 isvalid: true
payloadsize: 895 magic: 0 compresscodec: GZIPCompressionCodec crc:
278946641 payload: test message 2
offset: 52368604 position: 1608 NoTimestampType: -1 isvalid: true
payloadsize: 665 magic: 0 compresscodec: GZIPCompressionCodec crc:
3767466431 payload: test message 3
offset: 52368606 position: 2299 NoTimestampType: -1 isvalid: true
payloadsize: 932 magic: 0 compresscodec: GZIPCompressionCodec crc:
2444301359 payload: test message 4
...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

还可以使用此工具验证日志段附带的索引文件,索引用于查找日志段中的消息,如果消息被破坏,将导致错误的使用。只要broker在buclean状态启动,就会执行验证,但也可以手动执行。有两种检查索引的选项,这取决于你想要进行多少检查,选项–index-sanity-check将检查索引是否处于可用状态。–verify-index-only将检查索引中是否存在不匹配,而不会打印出所有的索引项。
如:验证00000000000052368601.log是否损坏:

# kafka-run-class.sh kafka.tools.DumpLogSegments --files
00000000000052368601.index,00000000000052368601.log
--index-sanity-check
Dumping 00000000000052368601.index
00000000000052368601.index passed sanity check.
Dumping 00000000000052368601.log
Starting offset: 52368601
offset: 52368601 position: 0 NoTimestampType: -1 isvalid: true
payloadsize: 661 magic: 0 compresscodec: GZIPCompressionCodec crc:
1194341321
offset: 52368603 position: 687 NoTimestampType: -1 isvalid: true
payloadsize: 895 magic: 0 compresscodec: GZIPCompressionCodec crc:
278946641
offset: 52368604 position: 1608 NoTimestampType: -1 isvalid: true
payloadsize: 665 magic: 0 compresscodec: GZIPCompressionCodec crc:
3767466431
...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
Replica Verification 副本验证

分区复制的工作原理类似于普通的kafka客户端,follower的broker在最老的offset开始复制,并定期检查磁盘当前的offset。当复制停止并重新启动时,它从最后要给检查点获取数据,以前的复制的日志段可以从broker中删除,在这种情况下,follower 不会填补空白。
为了验证topic分区的副本在集群中是否相同,可以使用kafka-replica-verification.sh进行验证,次攻击从给懂的topic分区集的所有副本中获取消息。并检查所有副本上是否存在所有的消息,必须为该工具提供一个正则表达式。以匹配希望验证的topic,如果没有提供,则验证所有的topic。还必须提供要连接的broker的显式列表。

  • Caution: Cluster Impact Ahead 集群碰撞
    副本验证工具对集群的影响类似于重新分配分区,因为它必须从旧的offset读取所有的消息,以验证副本。此外,它并行的读取一个分区的所有副本,因此应该谨慎使用。
    例如,验证broker1和broker2上以my-开头的topic副本:
# kafka-replica-verification.sh --broker-list
kafka1.example.com:9092,kafka2.example.com:9092 --topic-white-list 'my-.*'
2016-11-23 18:42:08,838: verification process is started.
2016-11-23 18:42:38,789: max lag is 0 for partition [my-topic,7]
at offset 53827844 among 10 partitions
2016-11-23 18:43:08,790: max lag is 0 for partition [my-topic,7]
at offset 53827878 among 10 partitions
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

Consuming and Producing 生产和消费

在使用ApacheKafka的时候,你经常会发现需要手动使用消费或者生产一些示例消息,以验证应用程序的运行情况。提供了两个实用程序。kafka-console-consumer.sh 和 kafka-console-producer.sh。这是围绕java客户端端的包装器,允许你与kafka的topic交互,而无须编写整个应用程序。

  • Piping Output to Another Application 将输出管道传输到另外一个程序
    虽然可以编写围绕控制台消费者或者生产者的应用程序,例如,使用消息并将其传输到另外一个应用程序进行处理,但是这种类型的应用程序相当脆弱,应该避免这么做。很难以不丢消息的方式与控制台的使用者进行交互。同样,控制台的生成器也不允许使用所有的特性,正确的发送字节也需要技巧,最好是直接使用java客户端库,或者直接使用kafka协议的其他语言的第三方客户端库。
Console Consumer 控制台消费者

kafka-console-consumer.sh 提供了一种使用来自kafka集群中的一个或者多个topic消息的方法,消息以标准输出的方式打印,然后分隔。默认情况下,它不使用格式输出消息中的原始字节,以下各段描述了所需的选项。

  • Checking Tool Versions 检查工具版本
    使用与kafka集群相同的版本的消费者非常重要,较老的控制台用户可能会通过不正确的方式与zookeeper交互而损坏集群。

第一个选项是指定是否使用新的消费者,并让配置指向kafka集群本身,在使用较老的消费者的时候,唯一要的参数是–zookeeper选项。后面是集群的连接字符串。从上面的例子来看,这可能是–zookeeper zoo1.example.com:2181/kafka-cluster。 如果使用新的消费者,旧必须指定–new-consumer的标识和–broker-list选项。后面是逗号分隔的broker列表。如:–broker-list
kafka1.example.com:9092,kafka2.example.com:9092。
接下来必须指定要使用的topic,为此提供了三种选择。–topic, --whitelist, 和 --blacklist。只能使用其中一种。–topic选项指定要使用的单个topic。–whitelist与–blacklist每个选项后面都跟一个正则表达式,记住要对正则表达式转义
这样它就不会被shell命令修改。白名单将使用与正则表达式匹配的所有topic,而黑名单将使用除正则表达式匹配的topic之外的所有topic。
样例如下:

# kafka-console-consumer.sh --zookeeper
zoo1.example.com:2181/kafka-cluster --topic my-topic
sample message 1
sample message 2
^CProcessed a total of 2 messages
#
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

除了基本的命令行选项之外,还恶意将任何普通的用户配置选项传递给控制台用户,这可以通过两种方式完成,具体取决于你需要传递的选项的数量以及你喜欢的方式,第一个是通过指定提供消费者的配置文件,–consumer.config CONFIGFILE需要包含配置文件的完整路径。另外一种方法是使用表单的一个或者多个参数在命令行上指定选项,消费者属性KEY=VALUE,其中key是配置选项名。VALUE是设置它的值,者对于消费者选项如设置消费者组ID非常有用。

  • Confusing Command-Line Options 命令行参数混淆
    控制台的生产者和消费者都有一个–property选项。不要将其与–consumer-property和–producer-property混淆。–property仅仅用于将配置传递给消息格式化程序,而不是客户端本身。

控制台消费者还有一些其他的参数,你应该知道:

  • –formatter CLASSNAME 指定用于解码消息的消息格式化程序类。默认设置为:kafka.tools.DefaultFormatter.
  • –from-beginning 使用从旧的offset中读取topic特定消息。否则,消费者从最近开始读取。
  • –max-messages NUM
    消费者在退出之前消费最多的num个消息。
  • –partition NUM
    只使用ID NUM标识分区。
Message formatter options消息格式化器选项

除了默认的格式化器外,还有三种供选择:

  • kafka.tools.LoggingMessageFormatter 使用日志记录器输出消息,而不是标准的输出,消息在信息级别上打印,包括时间戳,key和value
  • kafka.tools.ChecksumMessageFormatter
    仅仅打印校验值
  • kafka.tools.NoOpMessageFormatter
    消耗但是不输出消息。
    kafka.tools.DefaultMessageFormatter 也能通过–property传递选项:
  • print.timestamp 设置为true则显示每个消息的时间戳。
  • print.key 设置为true,除显示value之外还显示key。
  • key.separator 指定打印消息key和value之间的分隔符。
  • line.separator 指定消息之间的分隔符。
  • key.deserializer 提供一个类名,打印前对key进行反序列化。
  • value.deserializer 提供一个类名,用于打印前反序列化消息value。
    反序列化器必须实现org.apache.kafka.common.serialization.Deserializer接口。控制台消费者将对他们调用tostring方法以获得要显示的输出。通常,你可以将这些反序列化器实现为java类,通过在执行kafka_console_consumer.sh之前设置的classpath环境变量,将其插入到控制台消费者的类路径中。
Consuming the offsets topics 指定offset消费topic

再某些时候,查看集群的消费者组提交了哪些offset是非常关键的。你可能想知道某个特定的组是否正在提交offset,或者offset提交的频率是多少。这可以通过使用控制台消费者对__consumer_offsets这个特殊的内部topic进行消费来实现。为了解码这个topic总的消息,必须使用格式化程序kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter。
样例如下:

# kafka-console-consumer.sh --zookeeper
zoo1.example.com:2181/kafka-cluster --topic __consumer_offsets
--formatter 'kafka.coordinator.GroupMetadataManager$OffsetsMessage
Formatter' --max-messages 1
[my-group-name,my-topic,0]::[OffsetMetadata[481690879,NO_METADATA]
,CommitTime 1479708539051,ExpirationTime 1480313339051]
Processed a total of 1 messages
#
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
Console Producer 控制台生产者

与控制台消费者程序类似,kakfa-console-producer.sh工具可以用于将消息写入该集群的kafka的topic中,默认情况下,每行读取一条消息,用tab分隔key和value。如果没有tab,则key为null。

  • Changing Line-Reading Behavior 你可以提供自己的类来按行读取,以便进行自定义操作。你创建的类必须继承kafka.common.MessageReader并将负责创建ProducerRecord.属性再命令行上指定类,行阅读器选项,并确保包含的类的JAR再类的路径中。

控制台生产者中要求必须提供两个参数,–broker-list 用于指定一个或者多个broker。通过hostname:port格式,进行提供。–topic 指定你需要写入的topic。当你写入完成之后,发送一个文件结束字符EOF来关闭客户端。
样例如下,给my-topic写入信息:

# kafka-console-producer.sh --broker-list
kafka1.example.com:9092,kafka2.example.com:9092 --topic my-topic
sample message 1
sample message 2
^D
#
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

与控制台生产者一样,你也可以将任何普通的生产者配置选项传递给控制台生产者。这可以通过两种方式来完成,具体取决于你需要传递选项的参数以及你喜欢的方式。第一种方法是通过–producer.config CONFIGFILE指定生产者配置文件。其中CONFIGFILE是包含配置选项文件的完整路径。另外一种方法是再命令行上用表单的一个或者多个参数进行指定。–producer-property KEY=VALUE。其中KEY是配置选项名,VALUE是要设置的值。这对于消费者批处理配置等非常有用。如linger.ms 和 batch.size。
控制台生产者提供了很大命令行参数来调整其行为,一些更有用的选项是:

  • –key-serializer CLASSNAME 指定消息key序列化的编码类,默认为kafka.serializer.DefaultEncoder。

  • –value-serializer CLASSNAME 指定消息value序列化编码类,默认为kafka.serializer.DefaultEncoder。

  • –compression-codec STRING
    指定再生成消息时使用的压缩类型,这可以是一个gzip,snappy或者lz4。默认是gzip。

  • –sync
    同步生成消息,再发送下一条消息之前等待每个消息的ack。

  • Creating a Custom Serializer 创建消息的自定义序列化器,必须继承kafka.serializer.Encoder 这可以额从标准输入获取字符串,并将他们转换为适合的topic。如Avro和Protobuf。

Line-Reader Options 行读取选项

kafka.tools.LineMessageReader 负责读取标准输入和创建生产者记录,也有几个有用的选项,可以通过–property传递到控制台生产者:

  • ignore.error 设置为false在解析时抛出异常,key设置为真,且不存在key分隔符,默认值为true。
  • parse.key
    设置为false总是将key设置为空,默认值为true。
    -key.separator
    指定读取时在消息key和消息value之间使用分隔字符,默认为tab。
    在生成消息的时候,LineMessageReader 将在key.separator的第一个实例上进行分隔输入,如果之后没有字符,消息的value将为空,如果行上没有key分隔符,或者parse.key为false的时候,key为空。

Client ACLs 客户端acls

命令行工具kafka-acls.sh提供了kafka客户端的访问控制与交互,Apache kafka网站提供了关于acl的安全性和其他的文档。

Unsafe Operations 不安全操作

有戏管理任务在技术上是可行的,但是除非在最极端的情况下,否则不应该尝试。通常是在诊断问题并没有其他选择的时候,或者是发现了需要临时解决的特定错误。这些任务通常没有文档记录,不受支持。并且会给应用程序带来一定的风险。
这里记录了其中一些较为常见的任务,以便在紧急情况下可以选择恢复。不建议在正常的集群中使用他们,应该在执行之前仔细考虑。

  • Danger: Here Be Dragons 本节的操作涉及直接使用存储在zookeeper中的元的集群数据,这可能是一个非常危险的操作,所以你必须非常消息,不要直接修改zookeeper中的信息,除非有其他的说明。
Moving the Cluster Controller 移动集群控制器

每个kafka集群都有一个控制器,它是一个在broker中运行的thread。控制器负责监督集群的操作,有时候需要强制将控制器移动到另外一个broker。一个这样的例子是当控制器遇到异常或者其他问题,使其无法运行而无法正常工作的时候,这些情况下移动控制器的风险并不高,但是这不是一项正常的任务,不应该定期执行。当前做为控制器的broker使用名为/controller的集群路径的顶层的zookeeper的节点注册。手动删除这个zookeeper节点将导致当前控制器退出,集群将选择一个新的控制器。

Killing a Partition Move 终止分区移动

分区重新分配的正常操作流程为:

  • 1.请求重新分配(创建zookeeper节点)。

  • 2.集群控制器向添加的新的broker添加分区。

  • 3.新的broker开始复制每个分区,知道它同步。

  • 4.集群控制器从分区复制列表中删除旧的broker。
    因为所有的重新分配都是在请求时并行重新启动,所有通常没有理由尝试取消正在进行的重新分配。一个例外是当broker在重新分配过程中失败而不能立即重新启动的时候。这将导致无法完成重新分配,从而排除启动任何额外的重新分配,例如从失败的broker中删除分区并将他们分配给其他的broker,在这种情况下,可能会使集群忘记所有的重新分配。
    要删除正在进行的分区重新分配:

  • 1.从zookeeper节点上kakfa集群的路径中删除/admin/reassign_partitions。

  • 2.强制控制器移动。请参阅前文移动集群控制器的详细过程。

  • Checking Replication Factors 检查副本因子
    在删除正在进行的分区移动的时候,任何尚未完成的分区都不会执行从复制列表中删除旧的broker的步骤。这意味着某些分区的复制因子可能大于预期。broker不允许对具有不一致副本因子的分区,(例如增加分区)的topi进行一些管理操作。建议检查仍在进行中的分区。并在重新分配的另外一个分区时确保它的复制因子是正确的。

Removing Topics to Be Deleted 删除topic

当使用命令行工具删除topic的时候,zookeeper节点请求创建删除操作。在正常情况下,集群会立即执行此操作,但是,命令行工具无法知道集群中是否启用了topic删除操作,因此,无论如何,它都会请求删除topic,如果禁用了删除,则会导致意外的结果,可能集群会将删除操作挂起,以避免这种情况。
通过在zookeeper上的/admin/delete_topic节点创建一个子节点来删除topic。该节点以topic命名。删除这些zookeeper节点,将删除挂起的请求。

Deleting Topics Manually 手动删除

如果你运行的集群禁用了删除topic。或者你发现自己需要删除正常的操作流程之外的一些topic,那么可以从集群中手动删除他们。但是这需要完全关闭集群中的所有borker,并且在集群中的任何broker都在运行时不能这样做。

  • Shut Down Brokers First 关闭首选broker
    当集群在线的时候,在zookeeper中修改集群的元数据是一项非常危险的操作,会使集群处于不稳定状态,当集群在线的时候,不要试图删除或者修改zookeeper的topic的元数据。

从集群中删除topic的过程:

  • 1.关闭集群中的所有broker。
  • 2.删除zookeeper中的/brokers/topics/TOPICNAME节点,注意,删除此节点之前必须首先删除子节点。
  • 3.从每个broker的日志牡蛎中删除分区目录。他们将被命名为TOPICNAME-NUM.其NUM是分区ID。
  • 4.重启所有的broker。

Summary 总结

运行kafka集群是一项艰巨的任务,需要进行大量的配置和维护工作,以保持系统在最高性能下能运行。在本章中,我们讨论了许多日常的任务,比如管理topic和经常需要处理的客户端配置。我们还介绍了调试问题所需要的一些更深的任务,比如检查日志段。最后,我们介绍了一些虽然不安全或者例行的操作,但是可以用于摆脱棘手的情况,总之,这些工具将帮助你管理kafka集群。
当然,如果没有适当的监控,管理集群是不可能的,第十章将讨论监控集群和集群的运行情况的操作方法,这样你可以七二班kafka工作良好,我们还将提供监控客户端的最佳办法,报告生产者和消费者。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/643149
推荐阅读
相关标签
  

闽ICP备14008679号