赞
踩
在Kafka系统中有两种方式来创建主题,自动创建和手动创建。
可以通过auto.create.topics.enable
属性来自动创建主题。默认情况下,该属性值为true。声场这应用程序向Kafka急群众一个不存在的主题写数据时,会自动创建一个默认分区和默认副本系数的主题。
$KAFKA_HOME/config/server.properties
文件中的属性num.partitions
控制。$KAFKA_HOME/config/server.properties
文件中的属性default.replication.factor
控制。可以通过kafka-topics.sh
脚本手动创建主题。
创建主题
kafka-topics.sh --create --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 --replication-factor 2 --partitions 3 --topic kafka-action
,
分割。创建主题时,可以通过config参数来设置主题级别的配置覆盖默认配置,可以设置多组配置。#
# 设置该主题的max.message.bytes为404800字节
kafka-topics.sh --create --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 --replication-factor 2 --partitions 3 --config max.message.bytes=404800 --topic kafka-test
Kafka系统中的kafka-topics.sh
脚本提供连个查看主题信息的命令。
kafka-topics.sh --list --zookeeper zoo1:2181,zoo2:2181,zoo3:2181
当执行describe命令时,指定topic参数则查看特定主题信息,若不指定topic参数则查看所有主题信息。
kafka-topics.sh --describe --zookeeper zoo1:2181,zoo2:2181,zoo3:2181
通过describe
与under-replicated-partitions
命令组合受用,可以查看处于under replicated
状态的分区。处于该状态的主题可能正在进行同步操作,也可能同步发生异常,所有查询到的主题分区的ISR列表长度小于AR列表长度。
kafka-topics.sh --describe --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 --under-replicated-partitions
通过describe
与unavailable-partitions
命令组合使用,可以查看没有Leader副本的主题。
kafka-topics.sh --describe --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 --unavailable-partitions
通过describe
与topics-with-overrides
命令组合使用,可以查看主题覆盖了哪些配置。组合使用与只有describe命令的区别在于:topic-with-overrides
命令只显示descibe命令执行的第一行信息。
kafka-topics.sh --describe --zookeeper zoo1:2181,zoo2:2181,zoo3:2181--topics-with-overrides
当创建一个主题后,可以通过alter命令对主题进行修改,包括主题级别的配置、增加主题分区、修改副本分配方案、修改主题Offset。
在创建主题时,可以通过config参数覆盖主题级别的默认配置,当主题创建后可以通过alter与config参数组合使用,修改或增加新的配置可以覆盖响应配置原来的值,或者通过alter与delete-config参数组合使用删除响应配置设置使其恢复默认值。
#修改配置
kafka-topics.sh --alter --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 --topic config-test --config max.message.bytes=204800
#删除配置
kafka-topics.sh --alter --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 --topic config-test --delete-config max.message.bytes
Kafka不支持减少分区的操作,只能为一个主题增加分区。
kafka-topics.sh --alter --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 --topic config-test --partitions 5
Kafka系统中,删除主题有两种方式。
delete.topic.enable=true
,该配置默认为false。否则执行该脚本并未真正删除主题,而是在Zookeeper的/admin/delete_topics目录下创建一个与待删除主题同名的节点,将该主题标记为删除状态。kafka-topics.sh --delete --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 --topic kafka-action
Kafka自带了一个终端延时生产者发布消息的脚本kafka-console-producer.sh
。
Kafka自带了一个kafka-console-producer.sh脚本,可以执行该脚本可以在终端调用Kafka生产者想Kafka发送消息。该脚本鱼腥时需要指定broker-list和topic两个参数,分别用来指定Kafka的代理地址列表以及消息被发送的目标主题。
kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic kafka-action2 --property parse.key=true --property key.separator=' '
Kafka的消费者以Pull的方式获取信息,同事Kafka采用了消费组的模式,每个消费者都属于某一个消费组。创建消费者时,不指定消费者的groupId,则消费者属于默认消费组,消费组是一个全局概念,在设置group.id时,要确保该值在kafka急群众唯一。
同一个消费组下的各消费者在消费消息时是互斥的,对于一条消息而已,只能被同组的某一个消费者消费,但不同消费组的消费者能消费同一条消息。
kafka提供了一个kafka-console-consumer.sh脚本方便用户在终端模拟消费者消费消息。
kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic kafka-action2
kafka-console-consumer.sh的tipic参数不支持同事指定多个主题,提供了一个whitelist参数,该参数可以同事指定多个主题,且支持正则表达式。
kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --consumer-property group.id=new-consumer-test --consumer-property client.id=new-consumer-cl --whitelist "kafka-action|kafka-action-new"
每个消息都属于一个特定的消费组,通过消费组就可以实现消息的单播与广播。
一条消息只能被一个消费者消费的模式。要实现消息单播,只要让消费者属于同一个消费组即可。
启动一个生产者
kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic kafka-action
启动两个相同组下的消费者
kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic kafka-action --consumer-property group.id=single-consumer-group
一条消息能够被多个消费者消费的模式。
启动生产者
kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic kafka-action
启动两个不同组下的消费者
#消费者1
kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic kafka-action --consumer-property group.id=single-consumer-group
#消费者2
kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic kafka-action --consumer-property group.id=multi-consumer-group
当创建一个主题时,分区及副本会被均匀地分配到kafka集群响应节点上,这样Leader副本也在急群众均匀分布。当一个主题创建后Leader副本会作为分区的Leader副本,Leader负责所有读写操作。但随着运行时间推移,当Leader节点发生故障,就会从Follower节点中选出一个新的Leader,这样就有可能导致集群的负载不均衡,从而影响整个集群的健壮性和稳定性,当原Leader节点恢复后再次加入到集群时也不会主动成为Leader副本。Kafka提供了两种方法重新选择有限副本作为分区Leader的方法,使集群负载重新达到平衡。
自动平衡
在代理节点启动时,设置auto.leader.rebalance.enable=true,默认为true。
当该配置为true时,控制器在故障转移操作时会启动一个定时任务,每隔 l e a d e r . i m n b a l a n c e . c h e c k . i n t e r v a l . s e c o n d s leader.imnbalance.check.interval.seconds leader.imnbalance.check.interval.seconds 秒除法一次分区均衡操作默认5分钟,而只有在代理不均衡的百分比达到 l e a d e r . i m n b a l a n c e . c h e c k . b r o k e r . p r e c e n t a g e leader.imnbalance.check.broker.precentage leader.imnbalance.check.broker.precentage以上时才会整整执行分区重新分配操作,默认值为10%。
当该配置为false,当某个节点在失效前是某个分区的Leader副本,该节点恢复后他也只是一个Follower副本。
手动平衡
kafka提供一个队分区Leader进行重新平衡的工具脚本kafka-preferred-replica-election.sh,通过该工具将Follower副本选举为Leader,从而重新让集群分区达到平衡。
第一种方法Kafka自动触发,但存在一定时间延迟,第二种方法需要手动执行,同事提供耕细粒度分区均衡操作,支持以JJSON字符串形式指定需要触发平衡操作的分区列表,不指定分区,则会尝试对所有分区执行将Follower副本选举为Leader副本。
当下线一个节点钱,需要将该节点上的分区副本迁移到其他可用的节点上,Kafka并不会自动进行分区副本迁移,若不进行手动重新分配,就会导致某些主题数据丢失和不可用的情况。当新增节点时,只有新创建的主题才会分配到新的节点上,而之前主题的分区并不会自动分配到新加入的节点上,因此在主题创建是,该主题的AR列表中并没有新加入的节点。
生成分区分配方案
收钱创建一个文件,该文件以JSON字符串格式指定要进行分区重分配的主题。例如,在$KAFKA_HOME/config目录下创建一个名为topics-tomove.json的文件,该文件内容{"topics":[{"topic":"主题名称"}],"version":1}
,若要对多个主题分区重新分配,则意JSON搁置指定多组topic,version固定值。
执行kafka-reassign-partitions.sh脚本
kafka-reassign-partitions.sh --zookeeper zoo1:2181,zoo3:2181 --topics-to-move-json-file topics-to-move.json --broker-list "1,3" --generate
执行分区迁移
kafka-reassign-partitions.sh --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 --reassignment-json-file partitions-reassignment.json --execute
分区迁移的基本原来是在目标节点上创建分区目录,然后复制缘分去数据到目标节点,最后删除原节点的数据,在迁移过程中要确保目标节点有足够空间。
查看分区迁移进度
kafka-reassign-partitions.sh --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 --reassignment-json-file partitions-reassignment.json --veriry
若分区正在前一种则煮给你太为in grogress。分区迁移一旦开始无法停止,更不要强行停止集群,否则会造成数据不一致。
分区迁移时复制限流有两种方法
动态配置限流
使用kafka-verifiable-producer.sh生成10W测试数据
kafka-verifiable-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic reassign-partitions --max-messages 100000
设置brokerId为2复制速率为1KB/S
kafka-configs.sh --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 --entity-type brokers --entity-name 2 --alter --add-config follower.replication.throttled.rate=100,leader.replication.throttled.rate=1024
throttle设置限流
kafka-reassign-partitions.sh --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 --reassignment-json-file partitions-reassignment.json --execute --throttle 1024
kafka自带kafka-topics.sh脚本可以对主题的分区数进行修改。
kafka-topics.sh --create --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 --replication-factor 1 --partitions 3 --topic partition-replica-foo
replica-extens.json文件内容
{ "version":1, "partitions":[ { "topic":"partition-replica-foo", "partition":0, "replicas":[ 1, 2 ] }, { "topic":"partition-replica-foo", "partition":1, "replicas":[ 2, 3 ] }, { "topic":"partition-replica-foo", "partition":2, "replicas":[ 3, 1 ] }, { "topic":"partition-replica-foo", "partition":3, "replicas":[ 1, 2 ] }, { "topic":"partition-replica-foo", "partition":4, "replicas":[ 2, 3 ] } ] }
执行分区副本重新分配命令
kafka-reassign-partitions.sh --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 --reassignment-json-file replica-extends.json --execute
kafka自带了对连接器应用的脚本,用于将数据从外部系统导入到kafka或从kafka中导出到外部系统。kafka连接器有独立模式和分布式模式两种工作模式。
kafka自带connect-standalone.sh用于以独立模式启动kafka连接器。
Source连接器
Source连接器用于将外部数据导入到kafka相应主题中。kafka自带的connect-file-source.properties文件配置了一个读取文件的Source连接器。
connect-standalone.sh connect-standalone.properties connect-file-source.properties
Sink连接器
kafka自带connect-console.sink.properties配置一个将kafka中的数据导出到文件的sink连接器。
connect-standalone.sh connect-standalone.properties connect-file-sink.properties
kafka自带connect-distributed.sh脚本用于以分布式模式运行连接器。
connect-distributed.sh connect-distributed.properties
kafka提供了一个镜像操作的工具kafka-mirror-maker.sh,用于将一个集群的数据同步到另一个集群。
kafka-mirror-maker.sh --consumer.config mirror-consumer.properties --producer.config mirror-producer.properties --whitelist kafka-action
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。