赞
踩
1、为什么要重试?
如果消费者处理消息失败后不重试,然后发送应答给rabbitmq,rabbitmq就会将队列中的消息删除,从而造成消息的丢失。所以我们要在消费者处理消息失败的时候,重试一定的次数。比如重试3次,如果重试3次之后还是失败,则把这条消息发送到死信队列。
所以我们现在要实现消息的重试,实现效果为:
首先,将消息携带routtingkey的消息发送到正常转发器exchange@normal,exchange@normal将消息发送到正常队列queue@normal,queue@normal得到消息后进行处理,如果处理成功,则给rabbitmq发送应答。如果消息处理失败,判断消息失败的次数:如果失败次数小于3次,则将消息发送到重试转发器exchange@retry,exchange@retry得到消息后,发送到重试队列queue@retry,queue@retry10s后,将该条消息再次发送到正常转发器exchange@normal进行正常的消费;如果失败次数超过3次,则将消息发送到失败转发器exchange@filed,exchange@filed将失败了的消息发送给失败队列queue@filed,然后可以根据业务需求处理失败了的数据。比如保存到失败文件或者数据库等,也可以人工处理后,重新发送给exchange@normal。
思路图如下:
2、重试实现的思路
生产者端发送消息:
1)、声明三个转发器
- //正常的转发器
- private static String NORMAL_EXCHANGE = "exchange@normal";
- //重试转发器
- private static String RETRY_EXCHANGE = "exchange@retry";
- //失败转发器
- private static String FILED_EXCHANGE = "exchange@filed";
-
- ......
-
- //声明正常的转发器
- channel.exchangeDeclare(NORMAL_EXCHANGE,"topic");
- //声明重试的转发器
- channel.exchangeDeclare(RETRY_EXCHANGE,"topic");
- //声明失败的转发器
- channel.exchangeDeclare(FILED_EXCHANGE,"topic");
2)、发送消息到正常的队列
- //发送5条消息到正常的转发器,路由密匙为normal
- for(int i=0;i<5;i++){
- String message = "retry..."+i;
- channel.basicPublish(NORMAL_EXCHANGE,"normal",
- MessageProperties.PERSISTENT_BASIC,message.getBytes());
- }
消费者端接受、处理消息:
1)、定义并声明转发器和队列,需要注意的是queue@normal和queue@filed两个队列就是普通的队列。queue@retry队列需要设置两个参数:x-dead-letter-exchange、x-message-ttl。这两个参数按照我自己的理解是x-dead-letter-exchange指定重试时将消息重发给哪一个转发器、x-message-ttl消息到达重试队列后,多长时间后重发。
- //转发器
- private static String NORMAL_EXCHANGE = "exchange@normal";
- private static String RETRY_EXCHANGE = "exchange@retry";
- private static String FILED_EXCHANGE = "exchange@filed";
- //队列
- private static String NORMAL_QUEUE = "queue@normal";
- private static String RETRY_QUEUE = "queue@retry";
- private static String FILED_QUEUE = "queue@filed";
-
- ......
-
- //声明正常队列
- channel.queueDeclare(NORMAL_QUEUE,true,false,false,null);
-
- //声明重试队列,重试队列比较特殊,需要设置两个参数
- Map<String,Object> arg = new HashMap<String,Object>();
- //参数1:将消息发送到哪一个转发器
- arg.put("x-dead-letter-exchange",NORMAL_EXCHANGE);
- //参数2:多长时间后重发
- arg.put("x-message-ttl",10000);
- channel.queueDeclare(RETRY_QUEUE,true,false,false,arg);
-
- //声明失败队列
- channel.queueDeclare(FILED_QUEUE,true,false,false,null);

2)、将队列绑定转发器和路由密匙
- //将队列绑定转发器和路由密匙
- channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal");
- channel.queueBind(RETRY_QUEUE,RETRY_EXCHANGE,"normal");
- channel.queueBind(FILED_QUEUE,FILED_EXCHANGE,"normal");
3)、定义consumer,消费消息。因为消息设置的是自动应答,所以不需要手动应答。如果你设置了手动应答,则在消息消费成功或者失败后都要应答。
- Consumer consumer = new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- //此处处理消息
- try{
- String message = new String(body,"utf-8");
- System.out.println("消费者接受到的消息:"+message);
- //模拟处理消息产生异常
- int i = 1/0;
- }catch(Exception e){
- try{
- //延迟5s
- Thread.sleep(5000);
- //判断失败次数
- long retryCount = getRetryCount(properties);
- if(retryCount>=3){
- //如果失败超过三次,则发送到失败队列
- channel.basicPublish(FILED_EXCHANGE,envelope.getRoutingKey(),MessageProperties.PERSISTENT_BASIC,body);
- System.out.println("消息失败了...");
- }else{
- //发送到重试队列,10s后重试
- channel.basicPublish(RETRY_EXCHANGE,envelope.getRoutingKey(),properties,body);
- System.out.println("消息重试中...");
- }
- }catch (Exception e1){
- e.printStackTrace();
- }
-
- }
- }
- };
- //消费消息
- channel.basicConsume(NORMAL_QUEUE,true,consumer);

判断消息失败次数的方法如下:
- public long getRetryCount(AMQP.BasicProperties properties){
- long retryCount = 0L;
- Map<String,Object> header = properties.getHeaders();
- if(header != null && header.containsKey("x-death")){
- List<Map<String,Object>> deaths = (List<Map<String,Object>>)header.get("x-death");
- if(deaths.size()>0){
- Map<String,Object> death = deaths.get(0);
- retryCount = (Long)death.get("count");
- }
- }
- return retryCount;
- }
重试的完整代码如下:
1)、生产者
- @Component
- public class RetryPublisher {
-
- private static String NORMAL_EXCHANGE = "exchange@normal";
- private static String RETRY_EXCHANGE = "exchange@retry";
- private static String FILED_EXCHANGE = "exchange@filed";
- public void send(){
- Connection connection = ConnectionUtil.getInstance();
- Channel channel = null;
- try{
- channel = connection.createChannel();
- //声明正常的转发器
- channel.exchangeDeclare(NORMAL_EXCHANGE,"topic");
- //声明重试的转发器
- channel.exchangeDeclare(RETRY_EXCHANGE,"topic");
- //声明失败的转发器
- channel.exchangeDeclare(FILED_EXCHANGE,"topic");
-
- //发送5条消息到正常的转发器,路由密匙为normal
- for(int i=0;i<5;i++){
- String message = "retry..."+i;
- channel.basicPublish(NORMAL_EXCHANGE,"normal", MessageProperties.PERSISTENT_BASIC,message.getBytes());
- }
- }catch (Exception e){
-
- }
- }
- }

2)、消费者方法和判断消息失败次数的方法
- @Component
- public class RetryReceiver {
-
- //转发器
- private static String NORMAL_EXCHANGE = "exchange@normal";
- private static String RETRY_EXCHANGE = "exchange@retry";
- private static String FILED_EXCHANGE = "exchange@filed";
- //队列
- private static String NORMAL_QUEUE = "queue@normal";
- private static String RETRY_QUEUE = "queue@retry";
- private static String FILED_QUEUE = "queue@filed";
-
-
- public void receiver(){
-
- Connection connection = ConnectionUtil.getInstance();
- final Channel channel;
- try{
- channel = connection.createChannel();
- //声明正常队列
- channel.queueDeclare(NORMAL_QUEUE,true,false,false,null);
-
- //声明重试队列,重试队列比较特殊,需要设置两个参数
- Map<String,Object> arg = new HashMap<String,Object>();
- //参数1:将消息发送到哪一个转发器
- arg.put("x-dead-letter-exchange",NORMAL_EXCHANGE);
- //参数2:多长时间后发送
- arg.put("x-message-ttl",10000);
- channel.queueDeclare(RETRY_QUEUE,true,false,false,arg);
-
- //声明失败队列
- channel.queueDeclare(FILED_QUEUE,true,false,false,null);
-
-
- //将队列绑定转发器和路由密匙
- channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal");
- channel.queueBind(RETRY_QUEUE,RETRY_EXCHANGE,"normal");
- channel.queueBind(FILED_QUEUE,FILED_EXCHANGE,"normal");
-
- Consumer consumer = new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- //此处处理消息
- try{
- String message = new String(body,"utf-8");
- System.out.println("消费者接受到的消息:"+message);
- //模拟处理消息是产生异常
- int i = 1/0;
- }catch(Exception e){
- try{
- //延迟5s
- Thread.sleep(5000);
- //判断失败次数
- long retryCount = getRetryCount(properties);
- if(retryCount>=3){
- //如果失败超过三次,则发送到失败队列
- channel.basicPublish(FILED_EXCHANGE,envelope.getRoutingKey(),MessageProperties.PERSISTENT_BASIC,body);
- System.out.println("消息失败了...");
- }else{
- //发送到重试队列,10s后重试
- channel.basicPublish(RETRY_EXCHANGE,envelope.getRoutingKey(),properties,body);
- System.out.println("消息重试中...");
- }
- }catch (Exception e1){
- e.printStackTrace();
- }
-
- }
- }
- };
- //消费消息
- channel.basicConsume(NORMAL_QUEUE,true,consumer);
- }catch (Exception e){
- e.printStackTrace();
- }
- }
-
- /**
- * 获取消息失败次数
- * @param properties
- * @return
- */
- public long getRetryCount(AMQP.BasicProperties properties){
- long retryCount = 0L;
- Map<String,Object> header = properties.getHeaders();
- if(header != null && header.containsKey("x-death")){
- List<Map<String,Object>> deaths = (List<Map<String,Object>>)header.get("x-death");
- if(deaths.size()>0){
- Map<String,Object> death = deaths.get(0);
- retryCount = (Long)death.get("count");
- }
- }
- return retryCount;
- }
- }

3)、测试Controlelr
- @Controller
- public class RetryController {
- @Autowired
- private RetryPublisher publisher;
- @Autowired
- private RetryReceiver receiver;
-
- @RequestMapping("/retrySend")
- @ResponseBody
- public void send(){
- publisher.send();
- }
-
- @RequestMapping("/retryReceive")
- @ResponseBody
- public void receive(){
- receiver.receiver();
- }
-
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。