赞
踩
首先定义一个常量类Const
public interface Const {
/**
* kafka topic1.
*/
String KAFKA_SINGLE_MAIN_TOPIC = "service_main_topic";
/**
* kafka topic2.
*/
String KAFKA_SINGLE_DETAIL_TOPIC = "service_detail_topic";
}
这里定义了我们项目中需要的topic
创建一个Config类,把topic注入到spring中,spring去kafka中提前创建好topic
@Configuration public class KafkaInitialConfig { /*** * 创建TopicName为service_main_topic的Topic并设置分区数为8以及副本数为1 * 通过bean创建(bean的名字为initialTopic1) * @return */ @Bean public NewTopic initialTopic1() { return new NewTopic(Const.KAFKA_SINGLE_MAIN_TOPIC, 8, (short) 1); } /*** * 创建TopicName为service_detail_topic的Topic并设置分区数为10以及副本数为1 * 通过bean创建(bean的名字为initialTopic2) * @return */ @Bean public NewTopic initialTopic2() { return new NewTopic(Const.KAFKA_SINGLE_DETAIL_TOPIC, 10, (short) 1); } }
创建一个通用发送消息的Config
import lombok.extern.slf4j.Slf4j; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; @Slf4j @Component public class KafkaConfig { @Resource private KafkaTemplate<String, String> kafkaTemplate; /** * 发送消息到kafka */ public void sendMessage(String topic, String message) { log.info("kafka发送消息,topic =[{}],message=[{}]", topic, message); kafkaTemplate.send(topic, message); } /** * 发送消息到kafka * 分区发送 */ public void sendMessage(String topic, String partionKey, String message) { log.info("kafka发送消息,topic =[{}],partionKey=[{}],message=[{}]", topic, partionKey, message); kafkaTemplate.send(topic, partionKey, message); } }
然后建立了一个测试的controller,测试通不通
@RestController @RequestMapping("test") public class TestController { @Autowired KafkaConfig kafkaConfig; @RequestMapping(value = "test1", method = RequestMethod.GET) public void test1() { kafkaConfig.sendMessage(Const.KAFKA_SINGLE_MAIN_TOPIC, "测试Kafka1"); } @RequestMapping(value = "test2", method = RequestMethod.GET) public void test2() { kafkaConfig.sendMessage(Const.KAFKA_SINGLE_DETAIL_TOPIC, "123", "测试Kafka2"); } }
下面是发送和接收结果:
正常!结束。
还有配置文件配置放在这里
spring: kafka: bootstrap-servers: 127.0.0.1:9092 listener: concurrency: 4 missing-topics-fatal: false template: default-topic: service-main-topic producer: # 消息重发的次数 retries: 3 # 一次处理消息消息的大小batch,一批消息最大多大时发送 batch-size: 16384 # 生产者最大可发送的消息大小,内有多个batch,一旦满了,只有发送到kafka后才能空出位置,否则阻塞接受新消息 buffer-memory: 524288 # 键的序列化方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化方式 value-serializer: org.apache.kafka.common.serialization.StringSerializer
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。