当前位置:   article > 正文

Springboot 集成kafka 生产者和消费者配置_springboot 同时配置生产者和消费者

springboot 同时配置生产者和消费者
  1. package com.springboot.kafka.business;
  2. import org.springframework.kafka.core.KafkaTemplate;
  3. import org.springframework.stereotype.Component;
  4. import javax.annotation.Resource;
  5. /**
  6. * 生产者
  7. */
  8. @Component
  9. public class KafkaProducerBusiness {
  10. @Resource
  11. private KafkaTemplate<String, Object> kafkaTemplate;
  12. public void send(String topic,String msg){
  13. kafkaTemplate.send(topic,msg);
  14. }
  15. public void sendCallback(String topic,String msg){
  16. kafkaTemplate.send(topic,msg).addCallback(
  17. success ->{
  18. String topics = success.getRecordMetadata().topic();
  19. int partition = success.getRecordMetadata().partition();
  20. long offset = success.getRecordMetadata().offset();
  21. System.out.println("topic:" + topics + " partition:" + partition + " offset:" + offset);
  22. },
  23. failure ->{
  24. String message = failure.getMessage();
  25. System.out.println(message);
  26. }
  27. );
  28. }
  29. }
  1. import com.springboot.kafka.constant.KafkaConstant;
  2. import org.apache.kafka.clients.consumer.ConsumerRecord;
  3. import org.springframework.kafka.annotation.KafkaListener;
  4. import org.springframework.stereotype.Component;
  5. import java.lang.reflect.Array;
  6. /**
  7. * 消费者
  8. */
  9. @Component
  10. public class KafkaConsumerBusiness {
  11. /**
  12. * topicGroup1
  13. * @param record
  14. */
  15. @KafkaListener(topics = KafkaConstant.topic,groupId = KafkaConstant.topicGroup1)
  16. public void getMessage1(ConsumerRecord record){
  17. Object values = record.value();
  18. System.out.println("1------------------------------------"+values);
  19. }
  20. /**
  21. * topicGroup2
  22. * @param record
  23. */
  24. @KafkaListener(topics = KafkaConstant.topic,groupId = KafkaConstant.topicGroup2)
  25. public void getMessage2(ConsumerRecord record){
  26. Object values = record.value();
  27. System.out.println("2------------------------------------"+values);
  28. }
  29. /**
  30. * 监听多个topic topic在application.yml配置
  31. * @param record
  32. */
  33. @KafkaListener(topics = {"#{'${spring.kafka.topics}'.split(',')}"},groupId = KafkaConstant.topicGroup)
  34. public void getMessage3(ConsumerRecord record){
  35. Object values = record.value();
  36. System.out.println("3------------------------------------"+values);
  37. }
  38. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/524856
推荐阅读
相关标签
  

闽ICP备14008679号