赞
踩
1.1 Linux云服务器上安装Zookeeper,Kafka.可以参照我的这两篇博客.Zookeeper,Kafka.跟着做,一遍过.
(注意)如果是开启了防火墙的,做一下端口映射,Kafka是9092端口,Zookeeper是2181端口.Linux云服务器也在做一下安全组配置,确保可以访问到.
1.2 SpringBoot框架的依赖.
①.SpringBoot框架的依赖:1.5.10.RELEASE.
②.Kafka依赖.
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- <version>1.1.1.RELEASE</version>
- </dependency>
1.3 分别正确启动Zookeeper,Kafka.还是参照我上面那两篇博客.有非常详细的图文介绍的.
二:Kafka的配置.
2.1 application.properties的配置文件.
- # 配置kafka服务器
- spring.kafka.bootstrap-servers=Linux的IP:PORT
- # 配置生产者消息的key和value的编解码方式-producer
- spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
- spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
- spring.kafka.producer.retries=0
- # 每次批量发送消息的数量
- spring.kafka.producer.batch-size=16384
- spring.kafka.producer.buffer-memory=33554432
- # 配置消费者消息的key和value的编解码方式-consumer
- spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
- spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
- spring.kafka.consumer.group-id=test-producer-topic
- spring.kafka.consumer.auto-offset-reset=earliest
- spring.kafka.consumer.enable-auto-commit=true
- spring.kafka.consumer.auto-commit-interval=1000
2.2 配置消息生产者.
KafkaProducer.key和value是String类型.
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
- /**
- * author:
- * date: 2018-12-27
- * time: 17:34
- * description:Kafka的生产者的配置
- */
- @RestController
- @RequestMapping("/kafka")
- public class KafkaProducer {
- @Autowired
- private KafkaTemplate<String, String> kafkaTemplate;
-
- @RequestMapping("/send")
- public String send(String message){
- // 指定消息的key
- kafkaTemplate.send("test-producer-topic","test-key", message);
- return "Producer Message Success!";
- }
- }
KafkaTemplate的位置.
SpringBoot对Kafka的自动配置.
KafkaProperties.
2.3 配置消息消费者.消息监听.
KafkaConsumer
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.stereotype.Component;
- /**
- * author:
- * date: 2018-12-27
- * time: 17:46
- * description:Kafka消费者
- */
- @Component
- public class KafkaConsumer {
- @KafkaListener(topics = "test-producer-topic")
- public void listen(ConsumerRecord<?, ?> record) throws Exception{
- System.out.printf("topic=%s,offset=%d,key=%s,value=%s,serializedKeySize=%s,serializedValueSize\n",record.topic(),record.offset(),record.key(),record.value(),record.serializedKeySize(),record.serializedValueSize());
- }
- }
输入监听生产者的消息的topic,offset,key,value,serializedKeySize的大小和serializedValueSize的大小.
启动项目:
访问 http://localhost:8090/dockerboot/kafka/send?message=KafkaMessage
观察控制台:
报错信息如下:2018-12-27 20:10:42.877 [kafka-producer-network-thread | producer-1] ERROR org.springframework.kafka.support.LoggingProducerListener 76 -| Exception thrown when sending a message with key='test-key' and payload='KafkaMessage' to topic test-producer-topic:
org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for test-producer-topic-0
解决方法:关闭Kafka,修改Kafka目录下的config的server.properties文件.
重新启动Kafka.
再次访问项目.
控制台结果:
再次执行.消息的偏移量加1.不只是执行一次.数据的分段.消费者不停的poll topic.
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。