当前位置:   article > 正文

Springboot中使用Kafka_springboot使用kafka

springboot使用kafka

使用步骤:

1.引入依赖

  1. <dependency>
  2. <groupId>org.springframework.kafka</groupId>
  3. <artifactId>spring-kafka</artifactId>
  4. </dependency>

2.编写配置文件

  1. server:
  2. port: 8080
  3. spring:
  4. kafka:
  5. bootstrap-servers: 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:90947
  6. producer: # ⽣产者
  7. retries: 3 # 设置⼤于0的值,则客户端会将发送失败的记录重新发送
  8. batch-size: 16384
  9. buffer-memory: 33554432
  10. acks: 1
  11. # 指定消息key和消息体的编解码⽅式
  12. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  13. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  14. consumer: # 消费者
  15. group-id: default-group
  16. enable-auto-commit: false
  17. auto-offset-reset: earliest
  18. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  19. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  20. max-poll-records: 500
  21. listener:
  22. # 当每⼀条记录被消费者监听器(ListenerConsumer)处理之后提交
  23. # RECORD
  24. # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
  25. # BATCH
  26. # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间⼤于TIME时提交
  27. # TIME
  28. # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量⼤于等于COUNT时提交
  29. # COUNT
  30. # TIME | COUNT 有⼀个条件满⾜时提交
  31. # COUNT_TIME
  32. # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后, ⼿动调⽤Acknowledgment.acknowledge()后提交
  33. # MANUAL
  34. # ⼿动调⽤Acknowledgment.acknowledge()后⽴即提交,⼀般使⽤这种
  35. # MANUAL_IMMEDIATE
  36. ack-mode: MANUAL_IMMEDIATE
  37. redis:
  38. host: 172.16.253.21

3.编写消息生产者

  1. package com.qf.kafka.spring.boot.demo.controller;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.kafka.core.KafkaTemplate;
  4. import org.springframework.web.bind.annotation.RequestMapping;
  5. import org.springframework.web.bind.annotation.RestController;
  6. @RestController
  7. @RequestMapping("/msg")
  8. public class MyKafkaController {
  9. private final static String TOPIC_NAME = "my-replicated-topic";
  10. @Autowired
  11. private KafkaTemplate<String,String> kafkaTemplate;
  12. @RequestMapping("/send")
  13. public String sendMessage(){
  14. kafkaTemplate.send(TOPIC_NAME,0,"key","this is a message!");
  15. return "send success!";
  16. }
  17. }

4.编写消费者

  1. package com.qf.kafka.spring.boot.demo.consumer;
  2. import org.apache.kafka.clients.consumer.ConsumerRecord;
  3. import org.apache.kafka.clients.consumer.ConsumerRecords;
  4. import org.springframework.kafka.annotation.KafkaListener;
  5. import org.springframework.kafka.support.Acknowledgment;
  6. import org.springframework.stereotype.Component;
  7. @Component
  8. public class MyConsumer {
  9. @KafkaListener(topics = "my-replicated-topic",groupId = "MyGroup1")
  10. public void listenGroup(ConsumerRecord<String, String> record,
  11. Acknowledgment ack) {
  12. String value = record.value();
  13. System.out.println(value);
  14. System.out.println(record);
  15. //⼿动提交offset
  16. ack.acknowledge();
  17. }
  18. }

5.消费者中配置消费主题、分区和偏移量

  1. @KafkaListener(groupId = "testGroup", topicPartitions = {
  2. @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
  3. @TopicPartition(topic = "topic2", partitions = "0", partitionOffsets =
  4. @PartitionOffset(partition = "1", initialOffset = "100")) },concurrency = "3")//concurrency就是同组下的消费者个数,就是并发消费数,建议⼩于等于分区总数
  5. public void listenGroupPro(ConsumerRecord<String, String> record, Acknowledgment ack)
  6. {
  7. String value = record.value();
  8. System.out.println(value);
  9. System.out.println(record);
  10. //⼿动提交offset
  11. ack.acknowledge();
  12. }
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/521809
推荐阅读
相关标签
  

闽ICP备14008679号