当前位置:   article > 正文

RabbitMQ的消费者处理消息失败后之重试3次,重试3次仍然失败发送到死信队列。_rabbitmq重试失败后怎么处理

rabbitmq重试失败后怎么处理

1、为什么要重试?

       如果消费者处理消息失败后不重试,然后发送应答给rabbitmq,rabbitmq就会将队列中的消息删除,从而造成消息的丢失。所以我们要在消费者处理消息失败的时候,重试一定的次数。比如重试3次,如果重试3次之后还是失败,则把这条消息发送到死信队列。

       所以我们现在要实现消息的重试,实现效果为:

       首先,将消息携带routtingkey的消息发送到正常转发器exchange@normal,exchange@normal将消息发送到正常队列queue@normal,queue@normal得到消息后进行处理,如果处理成功,则给rabbitmq发送应答。如果消息处理失败,判断消息失败的次数:如果失败次数小于3次,则将消息发送到重试转发器exchange@retry,exchange@retry得到消息后,发送到重试队列queue@retry,queue@retry10s后,将该条消息再次发送到正常转发器exchange@normal进行正常的消费;如果失败次数超过3次,则将消息发送到失败转发器exchange@filed,exchange@filed将失败了的消息发送给失败队列queue@filed,然后可以根据业务需求处理失败了的数据。比如保存到失败文件或者数据库等,也可以人工处理后,重新发送给exchange@normal。

      思路图如下:

           

                 

2、重试实现的思路

      生产者端发送消息:

      1)、声明三个转发器

  1. //正常的转发器
  2. private static String NORMAL_EXCHANGE = "exchange@normal";
  3. //重试转发器
  4. private static String RETRY_EXCHANGE = "exchange@retry";
  5. //失败转发器
  6. private static String FILED_EXCHANGE = "exchange@filed";
  7. ......
  8. //声明正常的转发器
  9. channel.exchangeDeclare(NORMAL_EXCHANGE,"topic");
  10. //声明重试的转发器
  11. channel.exchangeDeclare(RETRY_EXCHANGE,"topic");
  12. //声明失败的转发器
  13. channel.exchangeDeclare(FILED_EXCHANGE,"topic");

     2)、发送消息到正常的队列

  1. //发送5条消息到正常的转发器,路由密匙为normal
  2. for(int i=0;i<5;i++){
  3. String message = "retry..."+i;
  4. channel.basicPublish(NORMAL_EXCHANGE,"normal",
  5. MessageProperties.PERSISTENT_BASIC,message.getBytes());
  6. }

      消费者端接受、处理消息:

      1)、定义并声明转发器和队列,需要注意的是queue@normal和queue@filed两个队列就是普通的队列。queue@retry队列需要设置两个参数:x-dead-letter-exchange、x-message-ttl。这两个参数按照我自己的理解是x-dead-letter-exchange指定重试时将消息重发给哪一个转发器、x-message-ttl消息到达重试队列后,多长时间后重发。

  1. //转发器
  2. private static String NORMAL_EXCHANGE = "exchange@normal";
  3. private static String RETRY_EXCHANGE = "exchange@retry";
  4. private static String FILED_EXCHANGE = "exchange@filed";
  5. //队列
  6. private static String NORMAL_QUEUE = "queue@normal";
  7. private static String RETRY_QUEUE = "queue@retry";
  8. private static String FILED_QUEUE = "queue@filed";
  9. ......
  10. //声明正常队列
  11. channel.queueDeclare(NORMAL_QUEUE,true,false,false,null);
  12. //声明重试队列,重试队列比较特殊,需要设置两个参数
  13. Map<String,Object> arg = new HashMap<String,Object>();
  14. //参数1:将消息发送到哪一个转发器
  15. arg.put("x-dead-letter-exchange",NORMAL_EXCHANGE);
  16. //参数2:多长时间后重发
  17. arg.put("x-message-ttl",10000);
  18. channel.queueDeclare(RETRY_QUEUE,true,false,false,arg);
  19. //声明失败队列
  20. channel.queueDeclare(FILED_QUEUE,true,false,false,null);

     2)、将队列绑定转发器和路由密匙

  1. //将队列绑定转发器和路由密匙
  2. channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal");
  3. channel.queueBind(RETRY_QUEUE,RETRY_EXCHANGE,"normal");
  4. channel.queueBind(FILED_QUEUE,FILED_EXCHANGE,"normal");

     3)、定义consumer,消费消息。因为消息设置的是自动应答,所以不需要手动应答。如果你设置了手动应答,则在消息消费成功或者失败后都要应答。

  1. Consumer consumer = new DefaultConsumer(channel){
  2. @Override
  3. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  4. //此处处理消息
  5. try{
  6. String message = new String(body,"utf-8");
  7. System.out.println("消费者接受到的消息:"+message);
  8. //模拟处理消息产生异常
  9. int i = 1/0;
  10. }catch(Exception e){
  11. try{
  12. //延迟5s
  13. Thread.sleep(5000);
  14. //判断失败次数
  15. long retryCount = getRetryCount(properties);
  16. if(retryCount>=3){
  17. //如果失败超过三次,则发送到失败队列
  18. channel.basicPublish(FILED_EXCHANGE,envelope.getRoutingKey(),MessageProperties.PERSISTENT_BASIC,body);
  19. System.out.println("消息失败了...");
  20. }else{
  21. //发送到重试队列,10s后重试
  22. channel.basicPublish(RETRY_EXCHANGE,envelope.getRoutingKey(),properties,body);
  23. System.out.println("消息重试中...");
  24. }
  25. }catch (Exception e1){
  26. e.printStackTrace();
  27. }
  28. }
  29. }
  30. };
  31. //消费消息
  32. channel.basicConsume(NORMAL_QUEUE,true,consumer);

     判断消息失败次数的方法如下:

  1. public long getRetryCount(AMQP.BasicProperties properties){
  2. long retryCount = 0L;
  3. Map<String,Object> header = properties.getHeaders();
  4. if(header != null && header.containsKey("x-death")){
  5. List<Map<String,Object>> deaths = (List<Map<String,Object>>)header.get("x-death");
  6. if(deaths.size()>0){
  7. Map<String,Object> death = deaths.get(0);
  8. retryCount = (Long)death.get("count");
  9. }
  10. }
  11. return retryCount;
  12. }

重试的完整代码如下:

     1)、生产者

  1. @Component
  2. public class RetryPublisher {
  3. private static String NORMAL_EXCHANGE = "exchange@normal";
  4. private static String RETRY_EXCHANGE = "exchange@retry";
  5. private static String FILED_EXCHANGE = "exchange@filed";
  6. public void send(){
  7. Connection connection = ConnectionUtil.getInstance();
  8. Channel channel = null;
  9. try{
  10. channel = connection.createChannel();
  11. //声明正常的转发器
  12. channel.exchangeDeclare(NORMAL_EXCHANGE,"topic");
  13. //声明重试的转发器
  14. channel.exchangeDeclare(RETRY_EXCHANGE,"topic");
  15. //声明失败的转发器
  16. channel.exchangeDeclare(FILED_EXCHANGE,"topic");
  17. //发送5条消息到正常的转发器,路由密匙为normal
  18. for(int i=0;i<5;i++){
  19. String message = "retry..."+i;
  20. channel.basicPublish(NORMAL_EXCHANGE,"normal", MessageProperties.PERSISTENT_BASIC,message.getBytes());
  21. }
  22. }catch (Exception e){
  23. }
  24. }
  25. }

     2)、消费者方法和判断消息失败次数的方法

  1. @Component
  2. public class RetryReceiver {
  3. //转发器
  4. private static String NORMAL_EXCHANGE = "exchange@normal";
  5. private static String RETRY_EXCHANGE = "exchange@retry";
  6. private static String FILED_EXCHANGE = "exchange@filed";
  7. //队列
  8. private static String NORMAL_QUEUE = "queue@normal";
  9. private static String RETRY_QUEUE = "queue@retry";
  10. private static String FILED_QUEUE = "queue@filed";
  11. public void receiver(){
  12. Connection connection = ConnectionUtil.getInstance();
  13. final Channel channel;
  14. try{
  15. channel = connection.createChannel();
  16. //声明正常队列
  17. channel.queueDeclare(NORMAL_QUEUE,true,false,false,null);
  18. //声明重试队列,重试队列比较特殊,需要设置两个参数
  19. Map<String,Object> arg = new HashMap<String,Object>();
  20. //参数1:将消息发送到哪一个转发器
  21. arg.put("x-dead-letter-exchange",NORMAL_EXCHANGE);
  22. //参数2:多长时间后发送
  23. arg.put("x-message-ttl",10000);
  24. channel.queueDeclare(RETRY_QUEUE,true,false,false,arg);
  25. //声明失败队列
  26. channel.queueDeclare(FILED_QUEUE,true,false,false,null);
  27. //将队列绑定转发器和路由密匙
  28. channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal");
  29. channel.queueBind(RETRY_QUEUE,RETRY_EXCHANGE,"normal");
  30. channel.queueBind(FILED_QUEUE,FILED_EXCHANGE,"normal");
  31. Consumer consumer = new DefaultConsumer(channel){
  32. @Override
  33. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  34. //此处处理消息
  35. try{
  36. String message = new String(body,"utf-8");
  37. System.out.println("消费者接受到的消息:"+message);
  38. //模拟处理消息是产生异常
  39. int i = 1/0;
  40. }catch(Exception e){
  41. try{
  42. //延迟5s
  43. Thread.sleep(5000);
  44. //判断失败次数
  45. long retryCount = getRetryCount(properties);
  46. if(retryCount>=3){
  47. //如果失败超过三次,则发送到失败队列
  48. channel.basicPublish(FILED_EXCHANGE,envelope.getRoutingKey(),MessageProperties.PERSISTENT_BASIC,body);
  49. System.out.println("消息失败了...");
  50. }else{
  51. //发送到重试队列,10s后重试
  52. channel.basicPublish(RETRY_EXCHANGE,envelope.getRoutingKey(),properties,body);
  53. System.out.println("消息重试中...");
  54. }
  55. }catch (Exception e1){
  56. e.printStackTrace();
  57. }
  58. }
  59. }
  60. };
  61. //消费消息
  62. channel.basicConsume(NORMAL_QUEUE,true,consumer);
  63. }catch (Exception e){
  64. e.printStackTrace();
  65. }
  66. }
  67. /**
  68. * 获取消息失败次数
  69. * @param properties
  70. * @return
  71. */
  72. public long getRetryCount(AMQP.BasicProperties properties){
  73. long retryCount = 0L;
  74. Map<String,Object> header = properties.getHeaders();
  75. if(header != null && header.containsKey("x-death")){
  76. List<Map<String,Object>> deaths = (List<Map<String,Object>>)header.get("x-death");
  77. if(deaths.size()>0){
  78. Map<String,Object> death = deaths.get(0);
  79. retryCount = (Long)death.get("count");
  80. }
  81. }
  82. return retryCount;
  83. }
  84. }

     3)、测试Controlelr

  1. @Controller
  2. public class RetryController {
  3. @Autowired
  4. private RetryPublisher publisher;
  5. @Autowired
  6. private RetryReceiver receiver;
  7. @RequestMapping("/retrySend")
  8. @ResponseBody
  9. public void send(){
  10. publisher.send();
  11. }
  12. @RequestMapping("/retryReceive")
  13. @ResponseBody
  14. public void receive(){
  15. receiver.receiver();
  16. }
  17. }

本文的思路图来自博主:https://www.cnblogs.com/itrena/p/9044097.html

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/空白诗007/article/detail/974057
推荐阅读
相关标签
  

闽ICP备14008679号