当前位置:   article > 正文

springboot集成整合kafka实现消息的发送与接收、同步发送和异步发送自定义kafkaTemplate、监控消息发送成功失败、消费指定的分区_springboot kafaka 发送topic 失败会一致发

springboot kafaka 发送topic 失败会一致发

写在前面:各位看到此博客的小伙伴,如有不对的地方请及时通过私信我或者评论此博客的方式指出,以免误人子弟。多谢!

 

先大体说一下本篇要完成的事:

  1. 自定义KafkaTemplate
  2. 本地连阿里服务器上的kafka向指定topic发送消息
  3. 项目中创建一个监听类,接收发送的消息,同时在阿里服务器上创建一个消费者接收消息
  4. 使用kafkaTemplate提供的addCallback方法监控消息是否发送成功,并进行下一步处理
  5. 消费topic的指定分区数据
  6. 监听类使用ConsumerRecord接收消息
  7. 同步发送消息

直在此之前,首先要安装并配置好zookeeper和kafka,并了解基本的kafka命令,可以参考之前的文章,有详细的安装配置步骤 Linux系统安装kafka步骤 Linux系统安装zookeeper步骤 linux系统kafka常用命令

直接贴代码记录一下:

引入依赖:

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-web</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.kafka</groupId>
  7. <artifactId>spring-kafka</artifactId>
  8. </dependency>

发送消息我们使用自定义的KafkaTemplate,当然你可以直接使用默认的,自定义是为了以后方便添加自定义的分区策略啥的,贴下代码:

  1. @Configuration
  2. @EnableKafka
  3. public class KafkaProducerConfig {
  4. @Value("${kafka.producer.servers}")
  5. private String servers;
  6. @Value("${kafka.producer.retries}")
  7. private int retries;
  8. public Map<String,Object> producerConfigs(){
  9. Map<String,Object> props = new HashMap<>();
  10. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
  11. props.put(ProducerConfig.RETRIES_CONFIG,retries);
  12. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  13. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  14. return props;
  15. }
  16. public ProducerFactory<String,String> producerFactory(){
  17. return new DefaultKafkaProducerFactory<>(producerConfigs());
  18. }
  19. @Bean
  20. public KafkaTemplate<String,String> kafkaTemplate(){
  21. return new KafkaTemplate<>(producerFactory());
  22. }
  23. }

yml文件中配置kafka服务的连接地址:

  1. spring:
  2. kafka:
  3. bootstrap-servers: your ip:9092
  4. consumer:
  5. group-id: test-consumer-group
  6. kafka:
  7. producer:
  8. servers: your ip:9092
  9. retries: 3

对于上面的配置,其实有一些不太对的地方,上面bootstrap-servers已经配置了kafka的地址,下面我又单独定义了生产者的地址,有点重复,但是这是为了自定义kafkatemplate时使用,以后边学边优化。

在kafka服务器创建一个topic:mytopic

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --relication-factor 1 --partitions 5 --topic mytopic

创建一个消息发送类向mytopic发送消息:

  1. @RestController
  2. public class SendMessageController {
  3. @Autowired
  4. private KafkaTemplate<String, String> kafkaTemplate;
  5. private static final String TOPIC = "mytopic";
  6. @GetMapping("/send")
  7. public String send(String message) {
  8. kafkaTemplate.send(TOPIC, message);
  9. return "success";
  10. }
  11. }

在kafka服务器创建一个消费者消费mytopic的消息:

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytopic

启动项目,访问http://localhost:8080/send?message=test  可以看到kafak服务端的消费者已经接收到了发送的消息test

kafka消费者接收到了消息,说明咱们的环境是没问题的了,接下来,在项目创建一个消息监听类监听发送到mytopic上的消息,接收到并打印到控制台:

  1. @Component
  2. public class MessageListener {
  3. @KafkaListener(topics = {"mytopic"})
  4. public void listener1(String message){
  5. System.out.println("接收到的消息:" + message);
  6. }
  7. }

需要指定一个分组,可以在@KafkaListener注解之上,也可以在yml文件中进行配置,不配置会报错,本篇中配置在了yml中,此时再次访问http://localhost:8080/send?message=test 可以看到kafka服务端的消费者打印了消息,并且项目的控制台也打印了消息:“接收到的消息:test”

上面我们只是接收到了消息,而通常的情况是,我接收到消息通知之后,要根据收到的消息进行下一步的处理,例如:接收到通知之后,给某个人发一条短信,或者其它处理,不管进行什么处理,我都需要知道发送的这条消息是成功了还是失败了,成功失败对应的处理肯定也不一样,其实KafkaTemplate为我们提供了一个带回调的方法addCallback,可以在回调方法中监控消息是发送成功还是失败,失败时做补偿处理,贴一下代码:

  1. @GetMapping("/send1")
  2. public String send1(String message) {
  3. kafkaTemplate.send(topic, message).addCallback(
  4. success ->{
  5. String topic = success.getRecordMetadata().topic();
  6. int partition = success.getRecordMetadata().partition();
  7. long offset = success.getRecordMetadata().offset();
  8. System.out.println("topic:" + topic + " partition:" + partition + " offset:" + offset);
  9. },
  10. failure ->{
  11. String message1 = failure.getMessage();
  12. System.out.println(message1);
  13. }
  14. );
  15. return "success";
  16. }

访问http://localhost:8080/send1?message=test1 只贴控制台的截图了之后,kafka消费者的截图就不贴了,控制台打印如下:

addCallback有两个重载的方法,除了上面的写法,你还可以像下面这样:

  1. @GetMapping("/send2")
  2. public String send2(String message){
  3. kafkaTemplate.send(topic, message).addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
  4. @Override
  5. public void onFailure(@NonNull Throwable throwable) {
  6. System.out.println(throwable.getMessage());
  7. }
  8. @Override
  9. public void onSuccess(SendResult<String, String> result) {
  10. System.out.println("topic:" + result.getRecordMetadata().topic() );
  11. }
  12. });
  13. return "success";
  14. }

咱们在创建topic的时候指定了5个分区,那么怎么将消息发送到指定分区,怎么监听指定分区的消息呢?这篇先说一下监听指定分区消息,@KafkaListener注解有个属性topicPartitions可以指定要监听的topic分区,代码如下:

  1. @KafkaListener(topics = {"mytopic"},topicPartitions = {@TopicPartition(topic = "mytopic",partitions = {"1","3"})})
  2. public void test1(String message){
  3. System.out.println("接收到的消息:" + message);
  4. }

访问http://localhost:8080/send1?message=test1 多次,打印结果如下,只有消息发送到分区1,3的时候监听类中才打印消息。

前面我们在发送消息的时候使用了带回调的方法,可以判断消息是否发送成功,但是监听的时候只是接收消息,并不知道更详细的信息,如partition、offset等,可以使用ConsumerRecord参数作为监听方法的参数,代码如下:

  1. @KafkaListener(topics = {"test5"})
  2. public void consumer(ConsumerRecord<?,?> consumerRecord){
  3. //判断是否为null
  4. Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
  5. if(kafkaMessage.isPresent()){
  6. //得到Optional实例中的值
  7. Object message = kafkaMessage.get();
  8. System.err.println("接收到的消息:"+message);
  9. }
  10. }

consumerRecord中包含的信息由很多,截个图看下:

最后记录一下消息的同步发送,默认情况下 KafkaTemplate 发送消息是采取异步方式,如果希望同步发送消息只需要在 send 方法后面调用 get 方法即可,get 方法返回的即为结果(如果发送失败则抛出异常),测试代码如下:

  1. @GetMapping("/send7")
  2. public String send7(String message) {
  3. try {
  4. SendResult<String, String> result = kafkaTemplate.send(topic, message).get();
  5. System.out.println("消息发送到的分区:" + result.getRecordMetadata().topic());
  6. } catch (InterruptedException e) {
  7. e.printStackTrace();
  8. } catch (ExecutionException e) {
  9. e.printStackTrace();
  10. System.out.println("发送消息失败");
  11. }
  12. return "success";
  13. }

get 方法还有一个重载方法 get(long timeout, TimeUnit unit),当 send 方法耗时大于 get 方法所设定的参数时会抛出一个超时异常,不贴测试代码了。

我有几个个疑问:

  1. 配置的这个groupId为什么随便写都可以,跟consumer.properties配置文件中的group.id有什么关系吗?
  2. 当指定监听的分区后,yml中的配置文件只能与consumer.properties配置文件中的相同才行,否则报Commit cannot be completed since the group has already rebalanced的错误,为什么,什么时候会触发Rebalance呢?

以上几个问题希望知道的的小伙伴帮忙解答下,谢谢!!!  看到但不知道的小伙伴欢迎关注我,在以后的学习中我会慢慢完善,希望对您有用!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/914581
推荐阅读
相关标签
  

闽ICP备14008679号