赞
踩
import org.apache.kafka.clients.admin.*; import org.apache.kafka.common.KafkaFuture; import java.util.*; import java.util.concurrent.ExecutionException; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.ListTopicsOptions; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartitionInfo; import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutionException; /** * @author: zhoumo * @data: 2024/6/24 16:37 * @descriptions: */ public class KafkaTopicInfo { final static String ip="127.0.0.1:9090"; public static void main(String[] args) { getListDetail(); } public static void createTopic(String topicName) throws ExecutionException, InterruptedException { // Kafka 配置 Properties props = new Properties(); // Kafka 服务器地址和端口 props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ip); // 创建 AdminClient 实例 try (AdminClient adminClient = AdminClient.create(props)) { // 创建一个新的主题 // 指定分区数量 // 指定复制因子 int numPartitions = 2; short replicationFactor = 1; NewTopic newTopic = new NewTopic(topicName, numPartitions, replicationFactor); // 创建主题 adminClient.createTopics(Collections.singletonList(newTopic)).all().get(); System.out.println("Topic created successfully: " + topicName); } catch (Exception e) { e.printStackTrace(); } } public static void deleteTopic(String topicName) { // Kafka 配置 Properties props = new Properties(); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ip); try (AdminClient adminClient = AdminClient.create(props)) { // 要删除的主题名称 // String topicName = "myTopic"; // 删除主题 DeleteTopicsResult deleteResult = adminClient.deleteTopics(Collections.singletonList(topicName)); deleteResult.all().get(); System.out.println("Topic deleted successfully: " + topicName); } catch (ExecutionException | InterruptedException e) { e.printStackTrace(); } } public static void getList() { // Kafka 配置 Properties props = new Properties(); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ip); try (AdminClient adminClient = AdminClient.create(props)) { // 列出所有主题 ListTopicsOptions options = new ListTopicsOptions(); // 是否包括内部主题,默认为 false options.listInternal(true); ListTopicsResult topicsResult = adminClient.listTopics(options); Set<String> topics = topicsResult.names().get(); System.out.println("Existing topics:"); for (String topic : topics) { System.out.println(topic); } } catch (ExecutionException | InterruptedException e) { e.printStackTrace(); } } public static void getListDetail() { // Kafka 配置 Properties props = new Properties(); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ip); try (AdminClient adminClient = AdminClient.create(props)) { // 列出所有主题 ListTopicsOptions options = new ListTopicsOptions(); // 是否包括内部主题,默认为 false options.listInternal(true); KafkaFuture<Set<String>> topics = adminClient.listTopics(options).names(); System.out.println("Existing topics:"); for (String topic : topics.get()) { System.out.println(topic); // 获取主题的详细信息(包括分区情况) /* 回退jdk1.8 版本 KafkaFuture<TopicDescription> topicDescription = adminClient.describeTopics(Set.of(topic)).values().get(topic); printTopicDetails(topicDescription.get()); */ Set<String> topicSet = new HashSet<>(); topicSet.add(topic); KafkaFuture<TopicDescription> topicDescriptionFuture = adminClient.describeTopics(topicSet).values().get(topic); TopicDescription topicDescription = topicDescriptionFuture.get(); printTopicDetails(topicDescription); } } catch (ExecutionException | InterruptedException e) { e.printStackTrace(); } } private static void printTopicDetails(TopicDescription topicDescription) { System.out.println("Topic: " + topicDescription.name()); System.out.println("Partitions:"); for (TopicPartitionInfo partition : topicDescription.partitions()) { System.out.printf(" Partition %d, Leader: %d, Replicas: %s, Isrs: %s%n", partition.partition(), partition.leader().id(), partition.replicas(), partition.isr()); } System.out.println(); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。