赞
踩
我们在开发过程中使用Kafka会遇到topic太多,自己创建太费劲,所以想一次配置终身使用,自己去创建topic,和flyway一样自己去创建表,今天总结一下配置方式。
kafka:
# 自动创建topic
topics:
# topic
- name: import_vulnera_topic
#分区数
num-partitions: 6
#副本数
replication-factor: 2
# topic
- name: import_vulnerability_topic
num-partitions: 6
replication-factor: 2
# topic
- name: import_vulnerability_result_topic
num-partitions: 6
replication-factor: 2
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.context.support.GenericWebApplicationContext;
import javax.annotation.PostConstruct;
import java.util.List;
/**
* 自动创建topic
*/
@Configuration
@SuppressWarnings("all")
public class TopicAdministrator {
private final TopicConfigurations configurations;
private final GenericWebApplicationContext context;
public TopicAdministrator(TopicConfigurations configurations, GenericWebApplicationContext genericContext) {
this.configurations = configurations;
this.context = genericContext;
}
@PostConstruct
public void init() {
initializeBeans(configurations.getTopics());
}
private void initializeBeans(List<TopicConfigurations.Topic> topics) {
topics.forEach(t -> context.registerBean(t.name, NewTopic.class, t::toNewTopic));
}
}
import lombok.Data;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import java.util.List;
@Configuration
@ConfigurationProperties(prefix = "kafka")
@Data
public class TopicConfigurations {
private List<Topic> topics;
@Data
static class Topic {
String name;
Integer numPartitions = 3;
Short replicationFactor = 1;
NewTopic toNewTopic() {
return new NewTopic(this.name, this.numPartitions, this.replicationFactor);
}
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。