当前位置:   article > 正文

RocketMq的异步发送 重新发送问题解决_rocketmqtemplate.asyncsend发送失败

rocketmqtemplate.asyncsend发送失败
  1. package com.mz.controller;
  2. import cn.hutool.crypto.SecureUtil;
  3. import com.mz.domain.TMq;
  4. import com.mz.service.TMqService;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.apache.rocketmq.client.producer.SendCallback;
  7. import org.apache.rocketmq.client.producer.SendResult;
  8. import org.apache.rocketmq.client.producer.SendStatus;
  9. import org.apache.rocketmq.spring.core.RocketMQTemplate;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import org.springframework.data.redis.core.RedisTemplate;
  12. import org.springframework.messaging.Message;
  13. import org.springframework.messaging.support.MessageBuilder;
  14. import org.springframework.web.bind.annotation.RequestMapping;
  15. import org.springframework.web.bind.annotation.RestController;
  16. /**
  17. * @author 马震
  18. * @version 1.0
  19. * @date 2023/11/28 10:19
  20. */
  21. @RestController
  22. @Slf4j
  23. @RequestMapping("/mq")
  24. public class MqController {
  25. @Autowired
  26. RocketMQTemplate rocketMQTemplate;
  27. @Autowired
  28. RedisTemplate redisTemplate;
  29. @Autowired
  30. TMqService tMqService;
  31. @RequestMapping("send")
  32. public void mqList(){
  33. String body ="hello world";
  34. extracted(body);//打包下面方法
  35. }
  36. private void extracted(String body) {
  37. Message<String> message = MessageBuilder.withPayload(body).build();
  38. rocketMQTemplate.asyncSend("mz_11_28", message, new SendCallback() {
  39. @Override
  40. public void onSuccess(SendResult sendResult) {
  41. if(sendResult.getSendStatus().equals(SendStatus.SEND_OK)){
  42. System.out.println("发送很成功"+message);
  43. }else{ //说明没发送成功
  44. log.info(body); //打印日志
  45. TMq tMq = new TMq();
  46. tMq.setBody(body);
  47. tMqService.save(tMq); //持久化到表中
  48. System.out.println("发送失败,没发送的消息是:"+body);
  49. }
  50. }
  51. @Override
  52. public void onException(Throwable throwable) {
  53. System.out.println("连接异常");
  54. String md5 = SecureUtil.md5(body);
  55. Long increment = redisTemplate.opsForValue().increment(md5);//利用redis 里面的自增方法
  56. System.out.println(increment);
  57. //重新发送机制
  58. if(increment<=3){
  59. try {
  60. Thread.sleep(1000*60); //每分钟重新发送一次
  61. extracted(body);
  62. System.err.println("重新发送第"+increment+"次");
  63. } catch (InterruptedException e) {
  64. throw new RuntimeException(e);
  65. }
  66. }else{ //重新发送超过3次了直接持久化到表中
  67. log.info(body);
  68. TMq tMq = new TMq();
  69. tMq.setBody(body);
  70. tMqService.save(tMq);
  71. redisTemplate.delete(md5); //删除redis中那个键
  72. System.out.println("没发送的消息是:"+body);
  73. }
  74. }
  75. });
  76. }
  77. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/603883
推荐阅读
相关标签
  

闽ICP备14008679号