当前位置:   article > 正文

SpringBoot整合Kafka_kafka-producer-network-thread

kafka-producer-network-thread

一:环境准备.

    1.1 Linux云服务器上安装Zookeeper,Kafka.可以参照我的这两篇博客.Zookeeper,Kafka.跟着做,一遍过.

    (注意)如果是开启了防火墙的,做一下端口映射,Kafka是9092端口,Zookeeper是2181端口.Linux云服务器也在做一下安全组配置,确保可以访问到.

    1.2 SpringBoot框架的依赖.

         ①.SpringBoot框架的依赖:1.5.10.RELEASE.

         ②.Kafka依赖.    

  1. <dependency>
  2. <groupId>org.springframework.kafka</groupId>
  3. <artifactId>spring-kafka</artifactId>
  4. <version>1.1.1.RELEASE</version>
  5. </dependency>

   1.3 分别正确启动Zookeeper,Kafka.还是参照我上面那两篇博客.有非常详细的图文介绍的.

  二:Kafka的配置.

   2.1 application.properties的配置文件.

  1. # 配置kafka服务器
  2. spring.kafka.bootstrap-servers=Linux的IP:PORT
  3. # 配置生产者消息的keyvalue的编解码方式-producer
  4. spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
  5. spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
  6. spring.kafka.producer.retries=0
  7. # 每次批量发送消息的数量
  8. spring.kafka.producer.batch-size=16384
  9. spring.kafka.producer.buffer-memory=33554432
  10. # 配置消费者消息的keyvalue的编解码方式-consumer
  11. spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
  12. spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
  13. spring.kafka.consumer.group-id=test-producer-topic
  14. spring.kafka.consumer.auto-offset-reset=earliest
  15. spring.kafka.consumer.enable-auto-commit=true
  16. spring.kafka.consumer.auto-commit-interval=1000

    2.2 配置消息生产者.

          KafkaProducer.key和value是String类型.

  1. import org.springframework.beans.factory.annotation.Autowired;
  2. import org.springframework.kafka.core.KafkaTemplate;
  3. import org.springframework.web.bind.annotation.RequestMapping;
  4. import org.springframework.web.bind.annotation.RestController;
  5. /**
  6. * author:
  7. * date: 2018-12-27
  8. * time: 17:34
  9. * description:Kafka的生产者的配置
  10. */
  11. @RestController
  12. @RequestMapping("/kafka")
  13. public class KafkaProducer {
  14. @Autowired
  15. private KafkaTemplate<String, String> kafkaTemplate;
  16. @RequestMapping("/send")
  17. public String send(String message){
  18. // 指定消息的key
  19. kafkaTemplate.send("test-producer-topic","test-key", message);
  20. return "Producer Message Success!";
  21. }
  22. }

    KafkaTemplate的位置.    

SpringBoot对Kafka的自动配置.

KafkaProperties.

2.3 配置消息消费者.消息监听.

  KafkaConsumer

  1. import org.apache.kafka.clients.consumer.ConsumerRecord;
  2. import org.springframework.kafka.annotation.KafkaListener;
  3. import org.springframework.stereotype.Component;
  4. /**
  5. * author:
  6. * date: 2018-12-27
  7. * time: 17:46
  8. * description:Kafka消费者
  9. */
  10. @Component
  11. public class KafkaConsumer {
  12. @KafkaListener(topics = "test-producer-topic")
  13. public void listen(ConsumerRecord<?, ?> record) throws Exception{
  14. System.out.printf("topic=%s,offset=%d,key=%s,value=%s,serializedKeySize=%s,serializedValueSize\n",record.topic(),record.offset(),record.key(),record.value(),record.serializedKeySize(),record.serializedValueSize());
  15. }
  16. }

输入监听生产者的消息的topic,offset,key,value,serializedKeySize的大小和serializedValueSize的大小.

启动项目:

访问 http://localhost:8090/dockerboot/kafka/send?message=KafkaMessage

观察控制台:

报错信息如下:2018-12-27 20:10:42.877 [kafka-producer-network-thread | producer-1] ERROR org.springframework.kafka.support.LoggingProducerListener 76  -| Exception thrown when sending a message with key='test-key' and payload='KafkaMessage' to topic test-producer-topic:
org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for test-producer-topic-0

解决方法:关闭Kafka,修改Kafka目录下的config的server.properties文件.

重新启动Kafka.

再次访问项目.

控制台结果:

再次执行.消息的偏移量加1.不只是执行一次.数据的分段.消费者不停的poll topic.

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/142543
推荐阅读
相关标签
  

闽ICP备14008679号