当前位置:   article > 正文

SpringBoot整合Kafka发送事物_kafkatemplate.send().addcallback()是哪个版本

kafkatemplate.send().addcallback()是哪个版本

1.pom文件需要引入(Spring Boot 版本2.5.12-SNAPSHOT)

  1. <dependency>
  2. <groupId>org.springframework.kafka</groupId>
  3. <artifactId>spring-kafka</artifactId>
  4. </dependency>

2.yml配置

  1. spring:
  2. kafka:
  3. bootstrap-servers: ip:port,ip:port,ip:port,ip:port
  4. producer:
  5. # 消息重发的次数。 配置事务的话:如果用户显式地指定了 retries 参数,那么这个参数的值必须大于0 如果不配置事物 retries: 0 要放开
  6. #retries: 1
  7. #一个批次可以使用的内存大小
  8. batch-size: 16384
  9. # 设置生产者内存缓冲区的大小。
  10. buffer-memory: 33554432
  11. # 键的序列化方式
  12. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  13. # 值的序列化方式
  14. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  15. #配置事务的话:如果用户显式地指定了 acks 参数,那么这个参数的值必须-1 all 如果不配置事物要放开
  16. #acks: all
  17. #事务id 如果不配置事物要注释掉
  18. transaction-id-prefix: tran
  19. consumer:
  20. # 自动提交的时间间隔 在spring boot 2.X 版本是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
  21. auto-commit-interval: 1S
  22. # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
  23. auto-offset-reset: earliest
  24. # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
  25. enable-auto-commit: false
  26. # 键的反序列化方式
  27. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  28. # 值的反序列化方式
  29. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  30. listener:
  31. # 在侦听器容器中运行的线程数。
  32. concurrency: 4
  33. #listner负责ack,手动调用Acknowledgment.acknowledge()后立即提交
  34. ack-mode: manual_immediate
  35. #避免出现主题未创建报错
  36. missing-topics-fatal: false

3.发送消息(例子)

  1. private static final String TOPIC_NAME = "***";
  2. //注入KafkaTemplate
  3. @Autowired
  4. private KafkaTemplate<String, Object> kafkaTemplate;
  5. //注入式
  6. @GetMapping("/testInjection")
  7. @Transactional(rollbackFor = RuntimeException.class)
  8. public void testInjection(int num) {
  9. kafkaTemplate.send(TOPIC_NAME, "发送消息:" + num);
  10. if (num == 0) {
  11. throw new RuntimeException();
  12. }
  13. kafkaTemplate.send(TOPIC_NAME, "发送消息:" + num);
  14. }
  15. //声明式
  16. @GetMapping("/testStatement")
  17. public void testStatement(int num) {
  18. kafkaTemplate.executeInTransaction((KafkaOperations.OperationsCallback<String, Object, Object>) kafkaOperations -> {
  19. kafkaOperations.send(TOPIC_NAME, "发送消息:" + num);
  20. if (num == 0) {
  21. throw new RuntimeException();
  22. }
  23. kafkaOperations.send(TOPIC_NAME, "发送消息:" + num);
  24. return true;
  25. });
  26. }
  1. kafkaTemplate.send(TOPIC_NAME, "发送消息:" + num).addCallback(success -> {
  2. //成功回调topic, partition, offset, success.getProducerRecord().value());
  3. }, failure -> {
  4. //失败回调
  5. });

4.客户端接收消息

  1. @Component
  2. public class MQListener {
  3. @KafkaListener(topics = {"topic自定义"}, groupId = "自定义")
  4. public void onMessage(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
  5. System.out.println("消费收到消息:" + record.topic() + "-" + record.partition() + "-消息体:{" + record.value() + "}");
  6. //提交事物
  7. ack.acknowledge();
  8. }
  9. // @KafkaListener(topics = {"topic自定义"}, groupId = "自定义")
  10. // public void onMessage1(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
  11. // System.out.println("消费7:" + record.topic() + "-" + record.partition() + "-" + record.value());
  12. // //提交事物
  13. // ack.acknowledge();
  14. // }
  15. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/794078
推荐阅读
相关标签
  

闽ICP备14008679号