当前位置:   article > 正文

SpringBoot集成kafka全面实战_spring boot kafka 实践

spring boot kafka 实践

本文是SpringBoot+Kafka的实战讲解,如果对kafka的架构原理还不了解的读者,建议先看一下《大白话kafka架构原理》《秒懂kafka HA(高可用)》两篇文章。

一、生产者实践

  • 普通生产者

  • 带回调的生产者

  • 自定义分区器

  • kafka事务提交

二、消费者实践

  • 简单消费

  • 指定topic、partition、offset消费

  • 批量消费

  • 监听异常处理器

  • 消息过滤器

  • 消息转发

  • 定时启动/停止监听器

一、前戏

1、在项目中连接kafka,因为是外网,首先要开放kafka配置文件中的如下配置(其中IP为公网IP),

advertised.listeners=PLAINTEXT://112.126.74.249:9092

2、在开始前我们先创建两个topic:topic1、topic2,其分区和副本数都设置为2,用来测试,

  1. [root@iZ2zegzlkedbo3e64vkbefZ ~]# cd /usr/local/kafka-cluster/kafka1/bin/
  2. [root@iZ2zegzlkedbo3e64vkbefZ bin]# ./kafka-topics.sh --create --zookeeper 172.17.80.219:2181 --replication-factor 2 --partitions 2 --topic topic1
  3. Created topic topic1.
  4. [root@iZ2zegzlkedbo3e64vkbefZ bin]# ./kafka-topics.sh --create --zookeeper 172.17.80.219:2181 --replication-factor 2 --partitions 2 --topic topic2
  5. Created topic topic2.

当然我们也可以不手动创建topic,在执行代码kafkaTemplate.send("topic1", normalMessage)发送消息时,kafka会帮我们自动完成topic的创建工作,但这种情况下创建的topic默认只有一个分区,分区也没有副本。所以,我们可以在项目中新建一个配置类专门用来初始化topic,如下,

  1. @Configuration
  2. public class KafkaInitialConfiguration {
  3. // 创建一个名为testtopic的Topic并设置分区数为8,分区副本数为2
  4. @Bean
  5. public NewTopic initialTopic() {
  6. return new NewTopic("testtopic",8, (short) 2 );
  7. }
  8. // 如果要修改分区数,只需修改配置值重启项目即可
  9. // 修改分区数并不会导致数据的丢失,但是分区数只能增大不能减小
  10. @Bean
  11. public NewTopic updateTopic() {
  12. return new NewTopic("testtopic",10, (short) 2 );
  13. }
  14. }

3、新建SpringBoot项目

① 引入pom依赖

  1. <dependency>
  2. <groupId>org.springframework.kafka</groupId>
  3. <artifactId>spring-kafka</artifactId>
  4. </dependency>

② application.propertise配置(本文用到的配置项这里全列了出来)

  1. ###########【Kafka集群】###########
  2. spring.kafka.bootstrap-servers=112.126.74.249:9092,112.126.74.249:9093
  3. ###########【初始化生产者配置】###########
  4. # 重试次数
  5. spring.kafka.producer.retries=0
  6. # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选01all/-1)
  7. spring.kafka.producer.acks=1
  8. # 批量大小
  9. spring.kafka.producer.batch-size=16384
  10. # 提交延时
  11. spring.kafka.producer.properties.linger.ms=0
  12. # 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
  13. # linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
  14. # 生产端缓冲区大小
  15. spring.kafka.producer.buffer-memory = 33554432
  16. # Kafka提供的序列化和反序列化类
  17. spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
  18. spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
  19. # 自定义分区器
  20. # spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner
  21. ###########【初始化消费者配置】###########
  22. # 默认的消费组ID
  23. spring.kafka.consumer.properties.group.id=defaultConsumerGroup
  24. # 是否自动提交offset
  25. spring.kafka.consumer.enable-auto-commit=true
  26. # 提交offset延时(接收到消息后多久提交offset)
  27. spring.kafka.consumer.auto.commit.interval.ms=1000
  28. # 当kafka中没有初始offset或offset超出范围时将自动重置offset
  29. # earliest:重置为分区中最小的offset;
  30. # latest:重置为分区中最新的offset(消费分区中新产生的数据);
  31. # none:只要有一个分区不存在已提交的offset,就抛出异常;
  32. spring.kafka.c
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/142505?site
推荐阅读
相关标签
  

闽ICP备14008679号