赞
踩
使用kafka-topics.sh脚本:
主题中可以使用的参数定义:
[root@linux01 ~]# kafka-topics.sh --zookeeper linux01:2181/mykafka --create --topic topic_x --partitions 1 --replication-factor 1
[root@linux01 ~]# kafka-topics.sh --zookeeper linux01:2181/mykafka --create --topic topic_test_02 --partitions 3 --replication-factor 1 --config max.message.bytes=1048576 --config segment.bytes=10485760
[root@linux01 ~]# kafka-topics.sh --zookeeper linux01:2181/mykafka --list
[root@linux01 ~]# kafka-topics.sh --zookeeper linux01:2181/mykafka --describe --topic topic_x
[root@linux01 ~]# kafka-topics.sh --zookeeper linux01:2181/mykafka --topics-with-overrides --describe
[root@linux01 ~]# kafka-topics.sh --zookeeper linux01:2181/mykafka --create --topic topic_test_01 --partitions 2 --replication-factor 1
[root@linux01 ~]# kafka-topics.sh --zookeeper linux01:2181/mykafka --alter --topic topic_test_01 --config max.message.bytes=1048576
[root@linux01 ~]# kafka-topics.sh --zookeeper linux01:2181/mykafka --describe --topic topic_test_01
[root@linux01 ~]# kafka-topics.sh --zookeeper linux01:2181/mykafka --alter --topic topic_test_01 --config segment.bytes=10485760
[root@linux01 ~]# kafka-topics.sh --zookeeper linux01:2181/mykafka --alter --delete-config max.message.bytes --topic topic_test_01
[root@linux01 ~]# kafka-topics.sh --zookeeper linux01:2181/mykafka --delete --topic topic_test_02
[root@linux01 ~]# cd /var/lagou/kafka/kafka-logs/
如上,给主题添加删除的标记:
topic_test_02-0.5f680b9bc37a4b669b906009155d886a-delete
要过一段时间删除。
通过命令行工具操作,主题的分区只能增加,不能减少。否则报错:
通过–alter修改主题的分区数,增加分区。
kafka-topics.sh --zookeeper linux01/mykafka --alter --topic topic_test_01 --partitions 3
副本分配的三个目标:
在不考虑机架信息的情况下:
kafka-topics.sh --config xx=xx --config yy=yy
配置给主题的参数。
说明
除了使用Kafka的bin目录下的脚本工具来管理Kafka,还可以使用管理Kafka的API将某些管理查看的功能集成到系统中。在Kafka0.11.0.0版本之前,可以通过kafka-core包(Kafka的服务端,采用Scala
编写)中的AdminClient和AdminUtils来实现部分的集群管理操作。Kafka0.11.0.0之后,又多了一个AdminClient,在kafka-client包下,一个抽象类,具体的实现是org.apache.kafka.clients.admin.KafkaAdminClient。
功能与原理介绍
Kafka官网:The AdminClient API supports managing and inspecting topics, brokers, acls, and other Kafka objects。
KafkaAdminClient包含了一下几种功能(以Kafka1.0.2版本为准):
其内部原理是使用Kafka自定义的一套二进制协议来实现,详细可以参见Kafka协议。
用到的参数:
主要操作步骤:
客户端根据方法的调用创建相应的协议请求,比如创建Topic的createTopics方法,其内部就是发送CreateTopicRequest请求。
客户端发送请求至Kafka Broker。
Kafka Broker处理相应的请求并回执,比如与CreateTopicRequest对应的是CreateTopicResponse。 客户端接收相应的回执并进行解析处理。
和协议有关的请求和回执的类基本都在org.apache.kafka.common.requests包中,AbstractRequest和AbstractResponse是这些请求和响应类的两个父类。
综上,如果要自定义实现一个功能,只需要三个步骤:
具体实现:
pom.xml
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.0.2</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13.1</version> <scope>compile</scope> </dependency> </dependencies>
实现类:MyAdminClient
package com.lagou.kafka; import org.apache.kafka.clients.admin.*; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.requests.DescribeLogDirsResponse; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.BiConsumer; import java.util.function.Consumer; public class MyAdminClient { private KafkaAdminClient client; @Before public void before() { Map<String, Object> configs = new HashMap<>(); configs.put("bootstrap.servers", "linux01:9092"); configs.put("client.id", "admin_001"); client = (KafkaAdminClient) KafkaAdminClient.create(configs); } @After public void after() { // 关闭admin客户端 client.close(); } @Test public void testListTopics1() throws ExecutionException, InterruptedException { ListTopicsResult listTopicsResult = client.listTopics(); // 1.1 将请求变成同步的请求,直接获取结果====================================== KafkaFuture<Collection<TopicListing>> listings = listTopicsResult.listings(); Collection<TopicListing> topicListings = listings.get(); topicListings.forEach(new Consumer<TopicListing>() { @Override public void accept(TopicListing topicListing) { // 该主题是否是内部主题 boolean internal = topicListing.isInternal(); // 打印主题名字 String name = topicListing.name(); // 直接打印全部信息 String s = topicListing.toString(); System.out.println(s + "\t" + name + "\t" + internal); } }); } }
异步方法列出主题
@Test public void testListTopics1() throws ExecutionException, InterruptedException { ListTopicsResult listTopicsResult = client.listTopics(); // 1.2, 异步方法列出主题====================================== KafkaFuture<Set<String>> names = listTopicsResult.names(); Set<String> strings = names.get(); strings.forEach(name -> { System.out.println(name); }); KafkaFuture<Map<String, TopicListing>> mapKafkaFuture = listTopicsResult.namesToListings(); Map<String, TopicListing> stringTopicListingMap = mapKafkaFuture.get(); stringTopicListingMap.forEach((k, v) -> { System.out.println(k + "\t" + v); }); }
设置请求属性
@Test public void testListTopics1() throws ExecutionException, InterruptedException { //另一种方法, 可以设置请求的属性 ListTopicsOptions options = new ListTopicsOptions(); // 列出内部主题 options.listInternal(true); // 设置超时时间 options.timeoutMs(900); ListTopicsResult listTopicsResult1 = client.listTopics(options); Map<String, TopicListing> stringTopicListingMap1 = listTopicsResult1.namesToListings().get(); stringTopicListingMap1.forEach((k, v) -> { System.out.println(k + "\t" + v); }); // 关闭管理客户端 client.close(); }
// 创建主题 @Test public void testCreateTopic() throws ExecutionException, InterruptedException { Map<String, String> configs = new HashMap<>(); configs.put("max.message.bytes", "1048576"); configs.put("segment.bytes", "1048576000"); NewTopic newTopic = new NewTopic("adm_tp_01", 2, (short) 1); newTopic.configs(configs); CreateTopicsResult topics = client.createTopics(Collections.singleton(newTopic)); KafkaFuture<Void> all = topics.all(); Void aVoid = all.get(); System.out.println(aVoid); }
@Test
public void testDeleteTopic() throws ExecutionException,
InterruptedException {
DeleteTopicsOptions options = new DeleteTopicsOptions();
options.timeoutMs(900);
DeleteTopicsResult deleteResult =
client.deleteTopics(Collections.singleton("adm_tp_01"), options);
deleteResult.all().get();
}
@Test
public void testAlterTopic() throws ExecutionException, InterruptedException {
NewPartitions newPartitions = NewPartitions.increaseTo(5);
Map<String, NewPartitions> newPartitionsMap = new HashMap<>();
newPartitionsMap.put("adm_tp_01", newPartitions);
CreatePartitionsOptions option = new CreatePartitionsOptions();
// Set to true if the request should be validated without creating new partitions.
// 如果只是验证,而不创建分区,则设置为true
// option.validateOnly(true);
CreatePartitionsResult partitionsResult = client.createPartitions(newPartitionsMap, option);
Void aVoid = partitionsResult.all().get();
}
NewPartitions.increaseTo(5) 代表创建5个分区
@Test public void testDescribeTopics() throws ExecutionException, InterruptedException { DescribeTopicsOptions options = new DescribeTopicsOptions(); options.timeoutMs(3000); DescribeTopicsResult topicsResult = client.describeTopics(Collections.singleton("adm_tp_01"), options); Map<String, TopicDescription> stringTopicDescriptionMap = topicsResult.all().get(); stringTopicDescriptionMap.forEach((k, v) -> { System.out.println(k + "\t" + v); System.out.println("======================================="); System.out.println(k); boolean internal = v.isInternal(); String name = v.name(); List<TopicPartitionInfo> partitions = v.partitions(); String partitionStr = Arrays.toString(partitions.toArray()); System.out.println("内部的?" + internal); System.out.println("topic name = " + name); System.out.println("分区:" + partitionStr); partitions.forEach(partition -> { System.out.println(partition); }); }); }
@Test public void testDescribeCluster() throws ExecutionException, InterruptedException { DescribeClusterResult describeClusterResult = client.describeCluster(); KafkaFuture<String> stringKafkaFuture = describeClusterResult.clusterId(); String s = stringKafkaFuture.get(); System.out.println("cluster name = " + s); KafkaFuture<Node> controller = describeClusterResult.controller(); Node node = controller.get(); System.out.println("集群控制器:" + node); Collection<Node> nodes = describeClusterResult.nodes().get(); nodes.forEach(node1 -> { System.out.println(node1); }); }
@Test public void testDescribeConfigs() throws ExecutionException, InterruptedException, TimeoutException { ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, "0"); DescribeConfigsResult describeConfigsResult = client.describeConfigs(Collections.singleton(configResource)); Map<ConfigResource, Config> configMap = describeConfigsResult.all().get(15, TimeUnit.SECONDS); configMap.forEach(new BiConsumer<ConfigResource, Config>() { @Override public void accept(ConfigResource configResource, Config config) { ConfigResource.Type type = configResource.type(); String name = configResource.name(); System.out.println("资源名称:" + name); Collection<ConfigEntry> entries = config.entries(); entries.forEach(new Consumer<ConfigEntry>() { @Override public void accept(ConfigEntry configEntry) { boolean aDefault = configEntry.isDefault(); boolean readOnly = configEntry.isReadOnly(); boolean sensitive = configEntry.isSensitive(); String name1 = configEntry.name(); String value = configEntry.value(); System.out.println("是否默认:" + aDefault + "\t是否只读?" + readOnly + "\t是否敏感?" + sensitive + "\t" + name1 + " --> " + value); } }); ConfigEntry retries = config.get("retries"); if (retries != null) { System.out.println(retries.name() + " -->" + retries.value()); } else { System.out.println("没有这个属性"); } } }); }
@Test public void testAlterConfig() throws ExecutionException, InterruptedException { // 这里设置后,原来资源中不冲突的属性也会丢失,直接按照这里的配置设置 Map<ConfigResource, Config> configMap = new HashMap<>(); ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "adm_tp_01"); Config config = new Config(Collections.singleton( new ConfigEntry("segment.bytes", "1048576000"))); configMap.put(resource, config); AlterConfigsResult alterConfigsResult = client.alterConfigs(configMap); Void aVoid = alterConfigsResult.all().get(); }
@Test public void testDescribeLogDirs() throws ExecutionException, InterruptedException { final DescribeLogDirsResult describeLogDirsResult = client.describeLogDirs(Collections.singleton(0)); final Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> integerMapMap = describeLogDirsResult.all().get(); integerMapMap.forEach(new BiConsumer<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>() { @Override // Integer表示broker编号, String表示logdirs路径, LogDirInfo表示信息 public void accept(Integer integer, Map<String , DescribeLogDirsResponse.LogDirInfo> stringLogDirInfoMap) { System.out.println("broker.id = " + integer); // log.dirs可以设置多个目录 stringLogDirInfoMap.forEach(new BiConsumer<String, DescribeLogDirsResponse.LogDirInfo>() { @Override public void accept(String s, DescribeLogDirsResponse.LogDirInfo logDirInfo) { System.out.println("logdir = " + s); final Map<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> replicaInfos = logDirInfo.replicaInfos; // 涉及主题分区, 以及副本信息 replicaInfos.forEach(new BiConsumer<TopicPartition , DescribeLogDirsResponse.ReplicaInfo>() { @Override public void accept(TopicPartition topicPartition , DescribeLogDirsResponse.ReplicaInfo replicaInfo) { System.out.println("主题分区:" + topicPartition.partition()); System.out.println("主题:" + topicPartition.topic()); // final boolean isFuture = replicaInfo.isFuture; // final long offsetLag = replicaInfo.offsetLag; // final long size = replicaInfo.size; } }); } }); } }); }
@Test public void testDescribeLogDirs2() throws ExecutionException, InterruptedException { DescribeLogDirsOptions option = new DescribeLogDirsOptions(); option.timeoutMs(1000); DescribeLogDirsResult describeLogDirsResult = client.describeLogDirs(Collections.singleton(0), option); Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> integerMapMap = describeLogDirsResult.all().get(); integerMapMap.forEach(new BiConsumer<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>() { @Override public void accept(Integer integer, Map<String, DescribeLogDirsResponse.LogDirInfo> stringLogDirInfoMap) { System.out.println("broker.id = " + integer); stringLogDirInfoMap.forEach(new BiConsumer<String, DescribeLogDirsResponse.LogDirInfo>() { @Override public void accept(String s, DescribeLogDirsResponse.LogDirInfo logDirInfo) { System.out.println("log.dirs:" + s); /* //查看该broker上的主题 / 分区 / 偏移量等信息 logDirInfo.replicaInfos.forEach(new BiConsumer < TopicPartition, DescribeLogDirsResponse.ReplicaInfo > () { @Override public void accept (TopicPartition topicPartition, DescribeLogDirsResponse.ReplicaInfo replicaInfo){ int partition = topicPartition.partition(); String topic = topicPartition.topic(); boolean isFuture = replicaInfo.isFuture; long offsetLag = replicaInfo.offsetLag; long size = replicaInfo.size; System.out.println("partition:" + partition + "\ttopic:" + topic + "\tisFuture:" + isFuture + "\toffsetLag:" + offsetLag + "\tsize:" + size); } }); */ } }); } }); }
Kafka 1.0.2,__consumer_offsets主题中保存各个消费组的偏移量。
早期由zookeeper管理消费组的偏移量。
查询方法:
通过原生 kafka 提供的工具脚本进行查询。
工具脚本的位置与名称为 bin/kafka-consumer-groups.sh
首先运行脚本,查看帮助:
这里我们先编写一个生产者,消费者的例子:
我们先启动消费者,再启动生产者, 再通过 bin/kafka-consumer-groups.sh 进行消费偏移量查询,
由于kafka 消费者记录group的消费偏移量有两种方式 :
1)kafka 自维护 (新)
2)zookpeer 维护 (旧) ,已经逐渐被废弃
所以 ,脚本只查看由broker维护的,由zookeeper维护的可以将 --bootstrap-server 换成 – zookeeper 即可。
[root@linux01 ~]# kafka-consumer-groups.sh --bootstrap-server linux01:9092 --list
注意:
1). 这里面是没有指定 topic,查看的是所有topic消费者的 group.id 的列表。
2). 注意: 重名的 group.id 只会显示一次
2.查看指定group.id 的消费者消费情况
[root@linux01 ~]# kafka-consumer-groups.sh --bootstrap-server linux01:9092 --describe --group group
如果消费者停止,查看偏移量信息:
将偏移量设置为最早的:
将偏移量设置为最新的:
分别将指定主题的指定分区的偏移量向前移动10个消息:
代码:
KafkaProducerSingleton.java
package com.lagou.kafka.demo.producer; import org.apache.kafka.clients.producer.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Properties; import java.util.Random; public class KafkaProducerSingleton { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerSingleton.class); private static KafkaProducer<String, String> kafkaProducer; private Random random = new Random(); private String topic; private int retry; private KafkaProducerSingleton() { } /** * 静态内部类 * * @author tanjie */ private static class LazyHandler { private static final KafkaProducerSingleton instance = new KafkaProducerSingleton(); } /** * 单例模式,kafkaProducer是线程安全的,可以多线程共享一个实例 * @return */ public static final KafkaProducerSingleton getInstance() { return LazyHandler.instance; } /** * kafka生产者进行初始化 * * @return KafkaProducer */ public void init(String topic, int retry) { this.topic = topic; this.retry = retry; if (null == kafkaProducer) { Properties props = new Properties(); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux01:9092"); props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.setProperty(ProducerConfig.ACKS_CONFIG, "1"); kafkaProducer = new KafkaProducer<String, String>(props); } } /** * 通过kafkaProducer发送消息 * @param message */ public void sendKafkaMessage(final String message) { ProducerRecord<String, String> record = new ProducerRecord<String, String>( topic, random.nextInt(3), "", message); kafkaProducer.send(record, new Callback() { public void onCompletion(RecordMetadata recordMetadata, Exception exception) { if (null != exception) { LOGGER.error("kafka发送消息失败:" + exception.getMessage(), exception); retryKakfaMessage(message); } } }); } /** * 当kafka消息发送失败后,重试 * * @param retryMessage */ private void retryKakfaMessage(final String retryMessage) { ProducerRecord<String, String> record = new ProducerRecord<String, String>( topic, random.nextInt(3), "", retryMessage); for (int i = 1; i <= retry; i++) { try { kafkaProducer.send(record); return; } catch (Exception e) { LOGGER.error("kafka发送消息失败:" + e.getMessage(), e); retryKakfaMessage(retryMessage); } } } /** * kafka实例销毁 */ public void close() { if (null != kafkaProducer) { kafkaProducer.close(); } } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public int getRetry() { return retry; } public void setRetry(int retry) { this.retry = retry; } }
ProducerHandler.java
package com.lagou.kafka.demo.producer; public class ProducerHandler implements Runnable { private String message; public ProducerHandler(String message) { this.message = message; } @Override public void run() { KafkaProducerSingleton kafkaProducerSingleton = KafkaProducerSingleton.getInstance(); kafkaProducerSingleton.init("tp_demo_01", 3); int i = 0; while (true) { try { System.out.println("当前线程:" + Thread.currentThread().getName() + "\t获取的kafka实例:" + kafkaProducerSingleton); kafkaProducerSingleton.sendKafkaMessage("发送消息: " + message + " " + (++i)); Thread.sleep(100); } catch (Exception e) { } } } }
MyProducer.java
package com.lagou.kafka.demo.producer;
public class MyProducer {
public static void main(String[] args){
Thread thread = new Thread(new ProducerHandler("hello lagou "));
thread.start();
}
}
KafkaConsumerAuto.java
package com.lagou.kafka.demo.consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Collections; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class KafkaConsumerAuto { /** * kafka消费者不是线程安全的 */ private final KafkaConsumer<String, String> consumer; private ExecutorService executorService; public KafkaConsumerAuto() { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux01:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "group"); // 打开自动提交 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put("auto.commit.interval.ms", "100"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<String, String>(props); // 订阅主题 consumer.subscribe(Collections.singleton("tp_demo_01")); } public void execute() throws InterruptedException { executorService = Executors.newFixedThreadPool(2); while (true) { ConsumerRecords<String, String> records = consumer.poll(2_000); if (null != records) { executorService.submit(new ConsumerThreadAuto(records, consumer)); } Thread.sleep(1000); } } public void shutdown() { try { if (consumer != null) { consumer.close(); } if (executorService != null) { executorService.shutdown(); } if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) { System.out.println("关闭线程池超时。。。"); } } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } } }
ConsumerThreadAuto.java
package com.lagou.kafka.demo.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class ConsumerThreadAuto implements Runnable { private ConsumerRecords<String, String> records; private KafkaConsumer<String, String> consumer; public ConsumerThreadAuto(ConsumerRecords<String, String> records, KafkaConsumer<String, String> consumer) { this.records = records; this.consumer = consumer; } @Override public void run() { for(ConsumerRecord<String,String> record : records){ System.out.println("当前线程:" + Thread.currentThread() + "\t主题:" + record.topic() + "\t偏移量:" + record.offset() + "\t分区:" + record.partition() + "\t获取的消息:" + record.value()); } } }
ConsumerAutoMain.java
package com.lagou.kafka.demo.consumer;
public class ConsumerAutoMain {
public static void main(String[] args) {
KafkaConsumerAuto kafka_consumerAuto = new KafkaConsumerAuto();
try {
kafka_consumerAuto.execute();
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
kafka_consumerAuto.shutdown();
}
}
}
宕机如何恢复
(1)少部分副本宕机
当leader宕机了,会从follower选择一个作为leader。当宕机的重新恢复时,会把之前commit的数据清空,重新从leader里pull数据。
(2)全部副本宕机
当全部副本宕机了有两种恢复方式
1、等待ISR中的一个恢复后,并选它作为leader。(等待时间较长,降低可用性)
2、选择第一个恢复的副本作为新的leader,无论是否在ISR中。(并未包含之前leader commit的数据,因此造成数据丢失)
下图中
分区P1的Leader是0,ISR是0和1
分区P2的Leader是2,ISR是1和2
分区P3的Leader是1,ISR是0,1,2。
生产者和消费者的请求都由Leader副本来处理。Follower副本只负责消费Leader副本的数据和Leader保持同步。
对于P1,如果0宕机会发生什么?
Leader副本和Follower副本之间的关系并不是固定不变的,在Leader所在的broker发生故障的时候,就需要进行分区的Leader副本和Follower副本之间的切换,需要选举Leader副本。
如何选举?
如果某个分区所在的服务器除了问题,不可用,kafka会从该分区的其他的副本中选择一个作为新的Leader。之后所有的读写就会转移到这个新的Leader上。现在的问题是应当选择哪个作为新的Leader。
只有那些跟Leader保持同步的Follower才应该被选作新的Leader。
Kafka会在Zookeeper上针对每个Topic维护一个称为ISR(in-sync replica,已同步的副本)的集合,该集合中是一些分区的副本。
只有当这些副本都跟Leader中的副本同步了之后,kafka才会认为消息已提交,并反馈给消息的生产者。
如果这个集合有增减,kafka会更新zookeeper上的记录。
如果某个分区的Leader不可用,Kafka就会从ISR集合中选择一个副本作为新的Leader。
显然通过ISR,kafka需要的冗余度较低,可以容忍的失败数比较高。
假设某个topic有N+1个副本,kafka可以容忍N个服务器不可用。
总结:
Kafka中Leader分区选举,通过维护一个动态变化的ISR集合来实现,一旦Leader分区丢掉,则从ISR中随机挑选一个副本做新的Leader分区。
如果ISR中的副本都丢失了,则:
向已经部署好的Kafka集群里面添加机器,我们需要从已经部署好的Kafka节点中复制相应的配置文件,然后把里面的broker id修改成全局唯一的,最后启动这个节点即可将它加入到现有Kafka集群中。
问题:新添加的Kafka节点并不会自动地分配数据,无法分担集群的负载,除非我们新建一个topic。
需要手动将部分分区移到新添加的Kafka节点上,Kafka内部提供了相关的工具来重新分布某个topic的分区。
在重新分布topic分区之前,我们先来看看现在topic的各个分区的分布位置:
kafka-topics.sh --zookeeper linux01:2181/mykafka --create --topic tp_re_01 --partitions 5 --replication-factor 1
kafka-topics.sh --zookeeper linux01:2181/mykafka --describe --topic tp_re_01
拷贝JDK并安装
#JDK
export JAVA_HOME=/opt/lagou/servers/jdk1.8.0_231
export PATH=:$JAVA_HOME/bin:$PATH
#kafka
export KAFKA_HOME=/opt/kafka_2.12-1.0.2
export PATH=$PATH:$KAFKA_HOME/bin
此处不需要zookeeper,切记!!!
让配置生效:
source /etc/profile
拷贝linux01上安装的Kafka 到linux02
cd /opt
scp -r kafka_2.12-1.0.2/ linux02:$PWD
修改配置
vim cd /opt/kafka_2.12-1.0.2/config/
#修改以下两处配置,注意broker.id不能与其他节点重复
启动linux02上Kafka:
kafka-server-start.sh -daemon /opt/kafka_2.12-1.0.2/config/server.properties
此时查看linux02 节点启动的时候的ClusterId,看和zookeeper节点上的ClusterId是否一致,如果是,证明linux02和linux01在同一个集群中。 linux02启动的Cluster ID:
linux01 zookeeper节点上的Cluster ID:
在linux01上查看zookeeper的节点信息:
get /mykafka/brokers/ids/1
linux02的节点已经加入集群了。
现在我们在现有集群的基础上再添加一个Kafka节点,然后使用Kafka自带的 kafka- reassign-partitions.sh 工具来重新分布分区。该工具有三种使用模式:
1、generate模式,给定需要重新分配的Topic,自动生成reassign plan(并不执行)
2、execute模式,根据指定的reassign plan重新分配Partition
3、verify模式,验证重新分配Partition是否成功
plan,不过我们先得按照要求定义一个文件,里面说明哪些topic需要重新分区,文件内容如下:
[root@linux01 ~]# vim topics-to-move.json
#添加如下json内容:
{
"topics": [
{
"topic": "tp_re_01"
}
],
"version": 1
}
然后使用 kafka-reassign-partitions.sh 工具生成reassign plan
[root@linux01 ~]# kafka-reassign-partitions.sh --zookeeper linux01:2181/mykafka --topics-to-move-json-file topics-to-move.json --broker-list "0,1" --generate #结果如下 Current partition replica assignment {"version":1,"partitions":[ {"topic":"tp_re_01","partition":4,"replicas":[1],"log_dirs":["any"]}, {"topic":"tp_re_01","partition":1,"replicas":[0],"log_dirs":["any"]}, {"topic":"tp_re_01","partition":2,"replicas":[1],"log_dirs":["any"]}, {"topic":"tp_re_01","partition":3,"replicas":[0],"log_dirs":["any"]}, {"topic":"tp_re_01","partition":0,"replicas":[1],"log_dirs":["any"]}]} Proposed partition reassignment configuration {"version":1,"partitions":[ {"topic":"tp_re_01","partition":4,"replicas":[0],"log_dirs":["any"]}, {"topic":"tp_re_01","partition":1,"replicas":[1],"log_dirs":["any"]}, {"topic":"tp_re_01","partition":2,"replicas":[0],"log_dirs":["any"]}, {"topic":"tp_re_01","partition":3,"replicas":[1],"log_dirs":["any"]}, {"topic":"tp_re_01","partition":0,"replicas":[0],"log_dirs":["any"]}]}
Proposed partition reassignment configuration下面生成的就是将分区重新分布到broker 1上的结果。我们将这些内容保存到名为result.json文件里面(文件名不重要,文件格式也不一定要以json为
结尾,只要保证内容是json即可),然后执行这些reassign plan:
[root@linux01 ~]# vim topics-to-execute.json
#添加分布json
{"version":1,"partitions":[
{"topic":"tp_re_01","partition":4,"replicas":[0],"log_dirs":["any"]},
{"topic":"tp_re_01","partition":1,"replicas":[1],"log_dirs":["any"]},
{"topic":"tp_re_01","partition":2,"replicas":[0],"log_dirs":["any"]},
{"topic":"tp_re_01","partition":3,"replicas":[1],"log_dirs":["any"]},
{"topic":"tp_re_01","partition":0,"replicas":[0],"log_dirs":["any"]}]}
执行计划:
[root@linux01 ~]# kafka-reassign-partitions.sh --zookeeper linux01:2181/mykafka --reassignment-json-file topics-to-execute.json --execute
#显示结果
Current partition replica assignment
{"version":1,"partitions":[{"topic":"tp_re_01","partition":4,"replicas":[1],"log_dirs":["any"]},{"topic":"tp_re_01","partition":1,"replicas":[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":2,"replicas":[1],"log_dirs":["any"]},{"topic":"tp_re_01","partition":3,"replicas":[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":0,"replicas":[1],"log_dirs":["any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.
这样Kafka就在执行reassign plan,我们可以校验reassign plan是否执行完成:
[root@linux01 ~]# kafka-reassign-partitions.sh --zookeeper linux01:2181/mykafka --reassignment-json-file topics-to-execute.json --verify
#显示结果
[2021-03-18 14:37:30,087] WARN Client session timed out, have not heard from server in 15014ms for sessionid 0x0 (org.apache.zookeeper.ClientCnxn)
Status of partition reassignment:
Reassignment of partition tp_re_01-1 completed successfully
Reassignment of partition tp_re_01-4 completed successfully
Reassignment of partition tp_re_01-2 completed successfully
Reassignment of partition tp_re_01-3 completed successfully
Reassignment of partition tp_re_01-0 completed successfully
查看主题的细节:
[root@linux01 ~]# kafka-topics.sh --zookeeper linux01:2181/mykafka --describe --topic tp_re_01
#显示结果
Topic:tp_re_01 PartitionCount:5 ReplicationFactor:1 Configs:
Topic: tp_re_01 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: tp_re_01 Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: tp_re_01 Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: tp_re_01 Partition: 3 Leader: 1 Replicas: 1 Isr: 1
Topic: tp_re_01 Partition: 4 Leader: 0 Replicas: 0 Isr: 0
分区的分布的确和操作之前不一样了,broker 1上已经有分区分布上去了。使用 kafka-reassign- partitions.sh 工具生成的reassign plan只是一个建议,方便大家而已。其实我们自己完全可以编辑一个reassign plan,然后执行它,如下:
[root@linux01 ~]# vim my-topics-to-execute.json
#添加如下json内容
{ "version": 1, "partitions": [ { "topic": "tp_re_01", "partition": 4, "replicas": [ 1 ], "log_dirs": [ "any" ] }, { "topic": "tp_re_01", "partition": 1, "replicas": [ 0 ], "log_dirs": [ "any" ] }, { "topic": "tp_re_01", "partition": 2, "replicas": [ 0 ], "log_dirs": [ "any" ] }, { "topic": "tp_re_01", "partition": 3, "replicas": [ 1 ], "log_dirs": [ "any" ] }, { "topic": "tp_re_01", "partition": 0, "replicas": [ 0 ], "log_dirs": [ "any" ] } ] }
将上面的json数据文件保存到my-topics-to-execute.json文件中,然后也是执行它:
[root@linux01 ~]# kafka-reassign-partitions.sh --zookeeper linux01:2181/mykafka --reassignment-json-file my-topics-to-execute.json --execute #显示结果 [2021-03-18 14:47:48,299] WARN Client session timed out, have not heard from server in 15006ms for sessionid 0x0 (org.apache.zookeeper.ClientCnxn) Current partition replica assignment {"version":1,"partitions":[{"topic":"tp_re_01","partition":4,"replicas":[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":1,"replicas":[1],"log_dirs":["any"]},{"topic":"tp_re_01","partition":2,"replicas":[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":3,"replicas":[1],"log_dirs":["any"]},{"topic":"tp_re_01","partition":0,"replicas":[0],"log_dirs":["any"]}]} Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions. [root@linux01 ~]# kafka-reassign-partitions.sh --zookeeper linux01:2181/mykafka --reassignment-json-file my-topics-to-execute.json --verify #显示结果 Status of partition reassignment: Reassignment of partition tp_re_01-1 completed successfully Reassignment of partition tp_re_01-4 completed successfully Reassignment of partition tp_re_01-2 completed successfully Reassignment of partition tp_re_01-3 completed successfully Reassignment of partition tp_re_01-0 completed successfully
等这个reassign plan执行完,我们再来看看分区的分布:
[root@linux01 ~]# kafka-topics.sh --zookeeper linux01:2181/mykafka --describe --topic tp_re_01
#显示结果
分区重新分配完成
我们可以在新建主题的时候,手动指定主题各个Leader分区以及Follower分区的分配情况,即什么分区副本在哪个broker节点上。
随着系统的运行,broker的宕机重启,会引发Leader分区和Follower分区的角色转换,最后可能Leader大部分都集中在少数几台broker上,由于Leader负责客户端的读写操作,此时集中Leader分区的少数几台服务器的网络I/O,CPU,以及内存都会很紧张。
Leader和Follower的角色转换会引起Leader副本在集群中分布的不均衡,此时我们需要一种手段,让Leader的分布重新恢复到一个均衡的状态。
执行脚本:
[root@linux01 ~]# kafka-topics.sh --zookeeper linux01:2181/mykafka --create --topic tp_demo_03 --replica-assignment "0:1,1:0,0:1"
上述脚本执行的结果是:创建了主题tp_demo_03,有三个分区,每个分区两个副本,Leader副本在列表中第一个指定的brokerId上,Follower副本在随后指定的brokerId上。
[root@linux01 ~]# kafka-topics.sh --zookeeper linux01:2181/mykafka --describe --topic tp_demo_03
然后模拟broker0宕机的情况:
# 通过jps找到Kafka进程PID [root@node1 ~]# jps 54912 Jps 1699 QuorumPeerMain 1965 Kafka # 直接杀死进程 [root@node1 ~]# kill -9 1965 [root@node1 ~]# jps 1699 QuorumPeerMain 54936 Jps [root@node1 ~]# # 查看主题分区信息: [root@node1 ~]# kafka-topics.sh --zookeeper linux01:2181/mykafka --describe --topic tp_demo_03 Topic:tp_demo_03 PartitionCount:3 ReplicationFactor:2 Configs: Topic: tp_demo_03 Partition: 0 Leader: 1 Replicas: 0,1 Isr: 1 Topic: tp_demo_03 Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1 Topic: tp_demo_03 Partition: 2 Leader: 1 Replicas: 0,1 Isr: 1 [root@node1 ~]# # 重新启动node1上的Kafka [root@node1 ~]# kafka-server-start.sh -daemon /opt/kafka_2.12-1.0.2/config/server.properties [root@node1 ~]# jps 1699 QuorumPeerMain 55525 Kafka 55557 Jps [root@node1 ~]# # 查看主题的分区信息: [root@node1 ~]# kafka-topics.sh --zookeeper linux01:2181/mykafka --describe --topic tp_demo_03 Topic:tp_demo_03 PartitionCount:3 ReplicationFactor:2 Configs: Topic: tp_demo_03 Partition: 0 Leader: 1 Replicas: 0,1 Isr: 1,0 Topic: tp_demo_03 Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0 Topic: tp_demo_03 Partition: 2 Leader: 1 Replicas: 0,1 Isr: 1,0 [root@node1 ~]# # broker恢复了,但是Leader的分配并没有变化,还是处于Leader切换后的分配情况。
是否有一种方式,可以让Kafka自动帮我们进行修改?改为初始的副本分配?
此时,用到了Kafka提供的自动再均衡脚本: kafka-preferred-replica-election.sh
先看介绍:
该工具会让每个分区的Leader副本分配在合适的位置,让Leader分区和Follower分区在服务器之间均衡分配。
如果该脚本仅指定zookeeper地址,则会对集群中所有的主题进行操作,自动再平衡。
{ "partitions": [ { "topic": "tp_demo_03", "partition": 0 }, { "topic": "tp_demo_03", "partition": 1 }, { "topic": "tp_demo_03", "partition": 2 } ] }
[root@linux01 ~]# kafka-preferred-replica-election.sh --zookeeper linux01:2181/mykafka --path-to-json-file preferred-replica.json
#显示结果
[2021-03-18 15:21:54,690] WARN Client session timed out, have not heard from server in 15010ms for sessionid 0x0 (org.apache.zookeeper.ClientCnxn)
Created preferred replica election path with {"version":1,"partitions":[
{"topic":"tp_demo_03","partition":0},
{"topic":"tp_demo_03","partition":1},
{"topic":"tp_demo_03","partition":2}]}
Successfully started preferred replica election for partitions Set(tp_demo_03-0, tp_demo_03-1, tp_demo_03-2)
[root@linux01 ~]# kafka-topics.sh --zookeeper linux01:2181/mykafka --describe --topic tp_demo_03
#显示结果
恢复到最初的分配情况。
之所以是这样的分配,是因为我们在创建主题的时候:
--replica-assignment "0:1,1:0,0:1"
在逗号分割的每个数值对中排在前面的是Leader分区,后面的是副本分区。那么所谓的preferredreplica,就是排在前面的数字就是Leader副本应该在的brokerId。
实际项目中,我们可能由于主题的副本因子设置的问题,需要重新设置副本因子或者由于集群的扩展,需要重新设置副本因子。
topic一旦使用又不能轻易删除重建,因此动态增加副本因子就成为最终的选择。
说明:kafka 1.0版本配置文件默认没有default.replication.factor=x, 因此如果创建topic时,不指定–replication-factor 想, 默认副本因子为1. 我们可以在自己的server.properties中配置上常用的副本因子,省去手动调整。例如设置default.replication.factor=3, 详细内容可参考官方文档https://kafka.apache.org/documentation/#replication
原因分析:
假设我们有2个kafka broker分别broker0,broker1。
1). 创建主题:
[root@linux01 ~]# kafka-topics.sh --zookeeper linux01:2181/mykafka --create --topic tp_re_02 --partitions 3 --replication-factor 1
2)查看主题细节:
[root@linux01 ~]# kafka-topics.sh --zookeeper linux01:2181/mykafka --describe --topic tp_re_02
3). 修改副本因子:错误
4)使用 kafka-reassign-partitions.sh 修改副本因子:
-. 创建increment-replication-factor.json
{ "version": 1, "partitions": [ { "topic": "tp_re_02", "partition": 0, "replicas": [ 0, 1 ] }, { "topic": "tp_re_02", "partition": 1, "replicas": [ 0, 1 ] }, { "topic": "tp_re_02", "partition": 2, "replicas": [ 1, 0 ] } ] }
5)执行分配:
[root@linux01 ~]# kafka-reassign-partitions.sh --zookeeper linux01:2181/mykafka --reassignment-json-file increment-replication-factor.json --execute
Current partition replica assignment
{"version":1,"partitions":[
{"topic":"tp_re_02","partition":2,"replicas":[1],"log_dirs":["any"]},
{"topic":"tp_re_02","partition":1,"replicas":[0],"log_dirs":["any"]},
{"topic":"tp_re_02","partition":0,"replicas":[1],"log_dirs":["any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.
6)查看主题细节:
[root@linux01 ~]# kafka-topics.sh --zookeeper linux01:2181/mykafka --describe --topic tp_re_02
在Kafka中,每个Topic会包含多个分区,默认情况下一个分区只能被一个消费组下面的一个消费者消费,这里就产生了分区分配的问题。Kafka中提供了多重分区分配算法(PartitionAssignor)的实现:RangeAssignor、RoundRobinAssignor、StickyAssignor。
PartitionAssignor接口用于用户定义实现分区分配算法,以实现Consumer之间的分区分配。
消费组的成员订阅它们感兴趣的Topic并将这种订阅关系传递给作为订阅组协调者的Broker。协调者选择其中的一个消费者来执行这个消费组的分区分配并将分配结果转发给消费组内所有的消费者。
Kafka默认采用RangeAssignor的分配算法。
RangeAssignor对每个Topic进行独立的分区分配。对于每一个Topic,首先对分区按照分区ID进行数值排序,然后订阅这个Topic的消费组的消费者再进行字典排序,之后尽量均衡的将分区分配给消费
者。这里只能是尽量均衡,因为分区数可能无法被消费者数量整除,那么有一些消费者就会多分配到一些分区。
大致算法:
RangeAssignor策略的原理是按照消费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能均匀地分配给所有的消费者。对于每一个Topic,RangeAssignor策略会将消费组内所有订阅这个Topic的消费者按照名称的字典序排序,然后为每个消费者划分固定的分区范围,如果不够平均分配,那么字典序靠前的消费者会被多分配一个分区。
这种分配方式明显的一个问题是随着消费者订阅的Topic的数量的增加,不均衡的问题会越来越严重,比如上图中4个分区3个消费者的场景,C0会多分配一个分区。如果此时再订阅一个分区数为4的Topic,那么C0又会比C1、C2多分配一个分区,这样C0总共就比C1、C2多分配两个分区了,而且随着Topic的增加,这个情况会越来越严重。
字典序靠前的消费组中的消费者比较“贪婪”。
RoundRobinAssignor的分配策略是将消费组内订阅的所有Topic的分区及所有消费者进行排序后尽量均衡的分配(RangeAssignor是针对单个Topic的分区进行排序分配的)。如果消费组内,消费者订阅的Topic列表是相同的(每个消费者都订阅了相同的Topic),那么分配结果是尽量均衡的(消费者之间分配到的分区数的差值不会超过1)。如果订阅的Topic列表是不同的,那么分配结果是不保证“尽量均衡”的,因为某些消费者不参与一些Topic的分配。
相对于RangeAssignor,在订阅多个Topic的情况下,RoundRobinAssignor的方式能消费者之间尽量均衡的分配到分区(分配到的分区数的差值不会超过1——RangeAssignor的分配策略可能随着订阅的Topic越来越多,差值越来越大)。
对于消费组内消费者订阅Topic不一致的情况:假设有两个个消费者分别为C0和C1,有2个TopicT1、T2,分别拥有3和2个分区,并且C0订阅T1和T2,C1订阅T2,那么RoundRobinAssignor的分配结果如下:
看上去分配已经尽量的保证均衡了,不过可以发现C0承担了4个分区的消费而C1订阅了T2一个分区,是不是把T2P0交给C1消费能更加的均衡呢?
如果消费者1宕机,则按照RoundRobin的方式分配结果如下:
打乱从新来过,轮询分配:
按照Sticky的方式:
仅对消费者1分配的分区进行重分配,红线部分。最终达到均衡的目的
再举一个例子:
有3个Consumer:C0、C1、C2
3个Topic:T0、T1、T2,它们分别有1、2、3个分区
C0订阅T0;C1订阅T0、T1;C2订阅T0、T1、T2
分配结果如下图所示:
消费者0下线,则按照轮询的方式分配:
按照Sticky方式分配分区,仅仅需要动的就是红线部分,其他部分不动。
自定义的分配策略必须要实现org.apache.kafka.clients.consumer.internals.PartitionAssignor接口。PartitionAssignor接口的定义如下:
Subscription subscription(Set<String> topics); String name(); // 主要实现的是下面这个方法 Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions); void onAssignment(Assignment assignment); class Subscription { private final List<String> topics; private final ByteBuffer userData; } class Assignment { private final List<TopicPartition> partitions; private final ByteBuffer userData; }
PartitionAssignor接口中定义了两个内部类:Subscription和Assignment。
Subscription类用来表示消费者的订阅信息,类中有两个属性:topics和userData,分别表示消费者所订阅topic列表和用户自定义信息。PartitionAssignor接口通过subscription()方法来设置消费者自身相关的Subscription信息,注意到此方法中只有一个参数topics,与Subscription类中的topics的相互呼应,但是并没有有关userData的参数体现。为了增强用户对分配结果的控制,可以在subscription()方法内部添加一些影响分配的用户自定义信息赋予userData,比如:权重、ip地址、host或者机架(rack)等等。
再来说一下Assignment类,它是用来表示分配结果信息的,类中也有两个属性:partitions和userData,分别表示所分配到的分区集合和用户自定义的数据。可以通过PartitionAssignor接口中的onAssignment()方法是在每个消费者收到消费组leader分配结果时的回调函数,例如在StickyAssignor策略中就是通过这个方法保存当前的分配方案,以备在下次消费组再平衡(rebalance)时可以提供分配参考依据。
接口中的name()方法用来提供分配策略的名称,对于Kafka提供的3种分配策略而言,RangeAssignor对应的protocol_name为“range”,RoundRobinAssignor对应的protocol_name为“roundrobin”,StickyAssignor对应的protocol_name为“sticky”,所以自定义的分配策略中要注意命名的时候不要与已存在的分配策略发生冲突。这个命名用来标识分配策略的名称,在后面所描述的加入消费组以及选举消费组leader的时候会有涉及。
真正的分区分配方案的实现是在assign()方法中,方法中的参数metadata表示集群的元数据信息,而subscriptions表示消费组内各个消费者成员的订阅信息,最终方法返回各个消费者的分配信息。
Kafka中还提供了一个抽象类
org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor,它可以简化PartitionAssignor接口的实现,对assign()方法进行了实现,其中会将Subscription中的userData信息去掉后,在进行分配。Kafka提供的3种分配策略都是继承自这个抽象类。如果开发人员在自定义分区分配策略时需要使用userData信息来控制分区分配的结果,那么就不能直接继承AbstractPartitionAssignor这个抽象类,而需要直接实现PartitionAssignor接口。
package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
public class MyAssignor extends AbstractPartitionAssignor {
}
在使用时,消费者客户端需要添加相应的Properties参数,示例如下:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。