当前位置:   article > 正文

SpringBoot中使用Kafka_springboot kafkalistener properties

springboot kafkalistener properties

1.添加对应依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-web</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.kafka</groupId>
  8. <artifactId>spring-kafka</artifactId>
  9. </dependency>
  10. <dependency>
  11. <groupId>org.springframework.boot</groupId>
  12. <artifactId>spring-boot-starter-test</artifactId>
  13. <scope>test</scope>
  14. </dependency>
  15. <dependency>
  16. <groupId>org.springframework.kafka</groupId>
  17. <artifactId>spring-kafka-test</artifactId>
  18. <scope>test</scope>
  19. </dependency>
  20. </dependencies>

2.编写application.properties

  1. spring.application.name=springboot-kafka-02
  2. server.port=8080
  3. # 用于建立初始连接的broker地址
  4. spring.kafka.bootstrap-servers=localhost:9092
  5. # producer用到的key和value的序列化类
  6. spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer
  7. spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
  8. # 默认的批处理记录数
  9. spring.kafka.producer.batch-size=16384
  10. # 32MB的总发送缓存
  11. spring.kafka.producer.buffer-memory=33554432
  12. # consumer用到的key和value的反序列化类
  13. spring.kafka.consumer.key- deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
  14. spring.kafka.consumer.value- deserializer=org.apache.kafka.common.serialization.StringDeserializer
  15. # consumer的消费组id
  16. spring.kafka.consumer.group-id=spring-kafka-02-consumer
  17. # 是否自动提交消费者偏移量
  18. spring.kafka.consumer.enable-auto-commit=true
  19. # 每隔100ms向broker提交一次偏移量
  20. spring.kafka.consumer.auto-commit-interval=100
  21. # 如果该消费者的偏移量不存在,则自动设置为最早的偏移量
  22. spring.kafka.consumer.auto-offset-reset=earliest

3.编写启动类

  1. @SpringBootApplication
  2. public class SpringbootkafkaApplication {
  3. public static void main(String[] args) {
  4. SpringApplication.run(SpringbootkafkaApplication.class, args);
  5. }
  6. }

4.生产者producer测试方法

同步等待broker的响应 

  1. @RestController
  2. public class KafkaSyncProducerController {
  3. @Autowired
  4. private KafkaTemplate<Integer,String> template;
  5. @RequestMapping("send/sync/{message}") public String sendSync(@PathVariable String message) {
  6. ListenableFuture<SendResult<Integer, String>> future = template.send(new ProducerRecord<Integer, String>(
  7. "topic_1",
  8. 0,
  9. 1,
  10. message
  11. ));
  12. try {
  13. SendResult<Integer, String> result = future.get();
  14. RecordMetadata metadata = result.getRecordMetadata();
  15. System.out.println(metadata.serializedKeySize());
  16. System.out.println(metadata.serializedValueSize());
  17. System.out.println(metadata.topic());
  18. System.out.println(metadata.partition());
  19. System.out.println(metadata.offset());
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. } catch (ExecutionException e) {
  23. e.printStackTrace();
  24. }
  25. return "success";
  26. }
  27. }

添加回调,异步等待响应

  1. @RestController
  2. public class KafkaASyncProducerController {
  3. @Autowired
  4. private KafkaTemplate<Integer,String> template;
  5. @RequestMapping("send/sync/{message}") public String sendSync(@PathVariable String message) {
  6. ListenableFuture<SendResult<Integer, String>> future = template.send(new ProducerRecord<Integer, String>(
  7. "topic_1",
  8. 0,
  9. 1,
  10. message
  11. ));
  12. future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
  13. @Override
  14. public void onFailure(Throwable ex) {
  15. System.out.println("消息发送失败!"+ ex);
  16. }
  17. @Override
  18. public void onSuccess(SendResult<Integer, String> result) {
  19. RecordMetadata metadata = result.getRecordMetadata();
  20. System.out.println(metadata.serializedKeySize());
  21. System.out.println(metadata.serializedValueSize());
  22. System.out.println(metadata.topic());
  23. System.out.println(metadata.partition());
  24. System.out.println(metadata.offset());
  25. System.out.println("消息发送成功!");
  26. }
  27. });
  28. return "success";
  29. }
  30. }

5.消费者Consumer测试方法

  1. @Component
  2. public class MyConsumer {
  3. @KafkaListener(topics="topic_1")
  4. public void onMessage(ConsumerRecord<Integer,String> record){
  5. System.out.println(record.topic());
  6. System.out.println(record.offset());
  7. System.out.println(record.partition());
  8. System.out.println(record.serializedKeySize());
  9. System.out.println(record.serializedValueSize());
  10. System.out.println(record.key());
  11. System.out.println(record.value());
  12. }
  13. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/951095
推荐阅读
相关标签
  

闽ICP备14008679号