当前位置:   article > 正文

springboot集成kafka详细步骤(发送及监听消息示例)_java监听kafka消息

java监听kafka消息

1、本机的kafka环境配置,不再赘述

2、添加 pom 文件

  1. <!--kafka依赖-->
  2. <dependency>
  3. <groupId>org.springframework.kafka</groupId>
  4. <artifactId>spring-kafka</artifactId>
  5. <version>2.8.6</version>
  6. </dependency>

3、配置application.yml

  1. spring:
  2. kafka:
  3. bootstrap-servers: 127.0.0.1:9092,127.0.0.1:9093
  4. producer:
  5. # # 消息重发的次数。 配置事务的话:如果用户显式地指定了 retries 参数,那么这个参数的值必须大于0
  6. retries: 1
  7. #一个批次可以使用的内存大小
  8. batch-size: 16384
  9. # 设置生产者内存缓冲区的大小。
  10. buffer-memory: 33554432
  11. # 键的序列化方式
  12. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  13. # 值的序列化方式
  14. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  15. #配置事务的话:如果用户显式地指定了 acks 参数,那么这个参数的值必须-1 all
  16. acks: all
  17. consumer:
  18. # 默认的消费组ID
  19. group-id: java-group
  20. # 自动提交的时间间隔 在spring boot 2.X 版本是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
  21. auto-commit-interval: 1S
  22. # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
  23. auto-offset-reset: latest
  24. # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
  25. enable-auto-commit: false
  26. # 键的反序列化方式
  27. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  28. # 值的反序列化方式
  29. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  30. # 批量一次最大拉取数据量
  31. max-poll-records: 65535
  32. #监测消费端心跳时间
  33. heartbeat-interval: 30000
  34. # 批量拉取间隔,要大于批量拉取数据的处理时间,时间间隔太小会有重复消费
  35. max.poll.interval.ms: 50000
  36. listener:
  37. #手工ack,调用ack后立刻提交offset
  38. ack-mode: MANUAL_IMMEDIATE
  39. #容器运行的线程数
  40. concurrency: 4

4、复写kafka的相关配置类:生产、消费相关配置

  1. import org.apache.kafka.clients.consumer.ConsumerConfig;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.ProducerConfig;
  4. import org.apache.kafka.common.serialization.StringDeserializer;
  5. import org.apache.kafka.common.serialization.StringSerializer;
  6. import org.springframework.beans.factory.annotation.Value;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
  10. import org.springframework.kafka.core.ConsumerFactory;
  11. import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
  12. import org.springframework.kafka.listener.ContainerProperties;
  13. import java.util.HashMap;
  14. import java.util.Map;
  15. import java.util.Properties;
  16. @Configuration
  17. public class KafkaMQConfig {
  18. // 此处配置代替zk
  19. @Value("${spring.kafka.bootstrap-servers}")
  20. private String servers;
  21. // 消费组标识
  22. @Value("${spring.kafka.consumer.group-id}")
  23. private String groupId;
  24. // 偏移量的起始点
  25. @Value("${spring.kafka.consumer.auto-offset-reset}")
  26. private String autoOffsetReset;
  27. // 偏移量的提交方式
  28. @Value("${spring.kafka.consumer.enable-auto-commit}")
  29. private String enableAutoCommit;
  30. // 一次从kafka服务拉取的数据量
  31. @Value("${spring.kafka.consumer.max-poll-records}")
  32. private String maxPollRecords;
  33. // 监测消费端心跳时间
  34. @Value("${spring.kafka.consumer.heartbeat-interval}")
  35. private String heartbeatInterval;
  36. // 两次拉取数据的最大时间间隔
  37. @Value("${spring.kafka.consumer.max.poll.interval.ms}")
  38. private String maxPollIntervalMs;
  39. //生产者相关配置
  40. @Bean
  41. public KafkaProducer kafkaProducer() {
  42. Properties props = new Properties();
  43. // 设置接入点,请通过控制台获取对应 Topic 的接入点
  44. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
  45. // Kafka 消息的序列化方式
  46. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  47. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  48. // 请求的最长等待时间
  49. props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
  50. // 构造 Producer 对象,注意,该对象是线程安全的
  51. // 一般来说,一个进程内一个 Producer 对象即可
  52. // 如果想提高性能,可构造多个对象,但最好不要超过 5
  53. return new KafkaProducer<String, String>(props);
  54. }
  55. // 消费端相关配置
  56. @Bean
  57. public Map<String, Object> consumerConfigs() {
  58. Map<String, Object> props = new HashMap<>();
  59. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
  60. props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
  61. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
  62. props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
  63. props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
  64. props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
  65. props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, heartbeatInterval);
  66. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  67. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  68. return props;
  69. }
  70. @Bean
  71. public ConsumerFactory<String, String> consumerFactory() {
  72. return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  73. }
  74. @Bean
  75. public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
  76. ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  77. factory.setConsumerFactory(consumerFactory());
  78. factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
  79. return factory;
  80. }
  81. }

5、生产、消费的伪代码

  1. import lombok.RequiredArgsConstructor;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.springframework.kafka.annotation.KafkaListener;
  5. import org.springframework.kafka.core.KafkaTemplate;
  6. import org.springframework.kafka.support.Acknowledgment;
  7. import org.springframework.kafka.support.KafkaHeaders;
  8. import org.springframework.messaging.handler.annotation.Header;
  9. import org.springframework.stereotype.Component;
  10. @Slf4j
  11. @Component
  12. @RequiredArgsConstructor
  13. public class KafkaUtils {
  14. private final KafkaTemplate<String, String> kafkaTemplate;
  15. /**
  16. * 发送消息
  17. */
  18. public void kafkaSendMsg(String topicName, String msg) {
  19. kafkaTemplate.send(topicName, msg);
  20. log.info("kafka成功发送消息给:" + topicName + ",内容为:" + msg);
  21. }
  22. /**
  23. * 监听消息
  24. */
  25. @KafkaListener(topics = {"test"}, groupId = "java-group",containerFactory="kafkaListenerContainerFactory")
  26. public void kafkaListener(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
  27. log.info("这是消费者在消费消息:" + record.topic() + "----" + record.partition() + "----" + record.value());
  28. ack.acknowledge();
  29. }
  30. }

6、测试消息发送

  1. @RestController
  2. @RequestMapping("/v1")
  3. public class TestController {
  4. @Autowired
  5. private KafkaUtils kafkaUtils;
  6. /**
  7. * 测试卡夫卡消息
  8. * @return 结果
  9. */
  10. @GetMapping("/kafkaTest")
  11. public JSONObject kafkaTest() {
  12. kafkaUtils.kafkaSendMsg("test", "2022年11月10日上午发送的消息!!!");
  13. return null;
  14. }
  15. }

经过测试!

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号