当前位置:   article > 正文

Kafka消息发送,不丢失,不重复_kafkatemplate.send和ack不冲突吗

kafkatemplate.send和ack不冲突吗

一、引入依赖

1、pom.xml中
  1. <!-- kafka依赖 -->
  2. <dependency>
  3. <groupId>org.springframework.kafka</groupId>
  4. <artifactId>spring-kafka</artifactId>
  5. </dependency>
2、application.yml中配置
  1. kafka:
  2. bootstrap-servers: 自己的ip:端口号
  3. producer: #生产者序列化器
  4. retries: 10
  5. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  6. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  7. ack-mode: manual #设置生产者的消息确认模式。manual表示手动确认
  8. consumer: #消费者序列化器
  9. group-id: ${spring.application.name}-test
  10. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  11. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  12. listener: #监听器相关配置
  13. ack-mode: manual #消息确认模式。manual表示手动确认

二、创建一个配置类,使用 (@Configuration)注解

创建消息队列

  1. @Bean
  2. public NewTopic logTopic() {
  3. // 1表示分区数为1,(short)1 表示副本数为1
  4. return new NewTopic("消息队列", 1, (short) 1);
  5. }

三、消息发送

在业务层发送消息先注入KafkaTemplate

  1. @Autowried
  2. KafkaTemplate kafkaTemplate;

消息发送(字符串消息直接发送,对象消息转化成json字符串后在发送)

发送失败后重新发送消息,保证消息不丢失

  1. String s = JSON.toJSONString(messageVo);
  2. //s就是要发送的消息
  3. kafkaTemplate.send("logTopic",s) //发型消息
  4. .addCallback(new SuccessCallback() { //.addcallback(),添加一个回调函数,用来处理发送消息的结果
  5. @Override
  6. public void onSuccess(Object o) {
  7. //发送成功
  8. // 把消息的唯一ID设置到Redis中,为了解决重复消费的问题
  9. stringRedisTemplate.opsForValue().set("key","消息已经发送");
  10. }
  11. },
  12. new FailureCallback() {
  13. @Override
  14. public void onFailure(Throwable throwable) {
  15. //发送失败,重新发送消息
  16. kafkaTemplate.send("logTopic",s)
  17. }
  18. }
  19. );

四、消息接收,使用(@KafkaListener()注解);

  1. @KafkaListener(topics = {"logTopic"})
  2. public void recvLogMessage(String message,Acknowledgment acknowledgment){
  3. try {
  4. //将消息转换成对象
  5. JSON.parseObject(message, “对象”.class);
  6. //获取唯一标识,查看消息,是否已经被接收
  7. Boolean flag = stringRedisTemplate.hasKey(key);
  8. if(flag){
  9. //消息还没有接收,
  10. 业务处理,进行CRUD操作
  11. //处理完成之后从列表中将消息删除
  12. acknowledgment.acknowledge();
  13. //删除redis记录,解决重复消费问题
  14. stringRedisTemplate.delete(key);
  15. return;
  16. }
  17. //redis中没有记录,消息已经消费完成,将消息从队列中删除
  18. acknowledgment.acknowledge();
  19. }catch(Exception e){
  20. e.printStackTrace();
  21. }
  22. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/天景科技苑/article/detail/982101
推荐阅读
相关标签
  

闽ICP备14008679号