当前位置:   article > 正文

SpringBoot:初探-RabbitMQ-消息队列,【MySQL_springboot rabbitmq中未ack的消息

springboot rabbitmq中未ack的消息

spring.rabbitmq.host=192.168.0.133
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/

手动ACK 不开启自动ACK模式,目的是防止报错后未正确处理消息丢失 默认 为 none

spring.rabbitmq.listener.simple.acknowledge-mode=manual

# [](
)具体编码

## [](
)定义队列

如果手动创建过或者`RabbitMQ`中已经存在该队列那么也可以省略下述代码…
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

package com.battcn.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**

  • RabbitMQ配置

  • @author Levin

  • @since 2018/4/11 0011
    */
    @Configuration
    public class RabbitConfig {

    public static final String DEFAULT_BOOK_QUEUE = “dev.book.register.default.queue”;
    public static final String MANUAL_BOOK_QUEUE = “dev.book.register.manual.queue”;

    @Bean
    public Queue defaultBookQueue() {
    // 第一个是 QUEUE 的名字,第二个是消息是否需要持久化处理
    return new Queue(DEFAULT_BOOK_QUEUE, true);
    }

    @Bean
    public Queue manualBookQueue() {
    // 第一个是 QUEUE 的名字,第二个是消息是否需要持久化处理
    return new Queue(MANUAL_BOOK_QUEUE, true);
    }
    }

## [](
)实体类

创建一个`Book`类
  • 1
  • 2
  • 3
  • 4

public class Book implements java.io.Serializable {

private static final long serialVersionUID = -2164058270260403154L;

private String id;
private String name;
// 省略get set ...
  • 1
  • 2
  • 3
  • 4
  • 5

}

## [](
)控制器

编写一个`Controller`类,用于消息发送工作
  • 1
  • 2
  • 3
  • 4

package com.battcn.controller;

import com.battcn.config.RabbitConfig;
import com.battcn.entity.Book;
import com.battcn.handler.BookHandler;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**

  • @author Levin

  • @since 2018/4/2 0002
    */
    @RestController
    @RequestMapping(value = “/books”)
    public class BookController {

    private final RabbitTemplate rabbitTemplate;

    @Autowired
    public BookController(RabbitTemplate rabbitTemplate) {
    this.rabbitTemplate = rabbitTemplate;
    }

    /**

    • this.rabbitTemplate.convertAndSend(RabbitConfig.DEFAULT_BOOK_QUEUE, book); 对应 {@link BookHandler#listenerAutoAck}
    • this.rabbitTemplate.convertAndSend(RabbitConfig.MANUAL_BOOK_QUEUE, book); 对应 {@link BookHandler#listenerManualAck}
      */
      @GetMapping
      public void defaultMessage() {
      Book book = new Book();
      book.setId(“1”);
      book.setName(“一起来学Spring Boot”);
      this.rabbitTemplate.convertAndSend(RabbitConfig.DEFAULT_BOOK_QUEUE, book);
      this.rabbitTemplate.convertAndSend(RabbitConfig.MANUAL_BOOK_QUEUE, book);
      }
      }
## [](
)消息消费者

默认情况下 `spring-boot-data-amqp` 是自动`ACK`机制,就意味着 MQ 会在消息消费完毕后自动帮我们去ACK,这样依赖就存在这样一个问题:**如果报错了,消息不会丢失,会无限循环消费,很容易就吧磁盘空间耗完,虽然可以配置消费的次数但这种做法也有失优雅。目前比较推荐的就是我们手动ACK然后将消费错误的消息转移到其它的消息队列中,做补偿处理**
  • 1
  • 2
  • 3
  • 4

package com.battcn.handler;

import com.battcn.config.RabbitConfig;
import com.battcn.entity.Book;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**

  • BOOK_QUEUE 消费者

  • @author Levin

  • @since 2018/4/11 0011
    */
    @Component
    public class BookHandler {

    private static final Logger log = LoggerFactory.getLogger(BookHandler.class);

    /**

    • TODO 该方案是 spring-boot-data-amqp 默认的方式,不太推荐。具体推荐使用 listenerManualAck()

    • 默认情况下,如果没有配置手动ACK, 那么Spring Data AMQP 会在消息消费完毕后自动帮我们去ACK
    • 存在问题:如果报错了,消息不会丢失,但是会无限循环消费,一直报错,如果开启了错误日志很容易就吧磁盘空间耗完
    • 解决方案:手动ACK,或者try-catch 然后在 catch 里面讲错误的消息转移到其它的系列中去
    • spring.rabbitmq.listener.simple.acknowledge-mode=manual
    • @param book 监听的内容
      */
      @RabbitListener(queues = {RabbitConfig.DEFAULT_BOOK_QUEUE})
      public void listenerAutoAck(Book book, Message message, Channel channel) {
      // TODO 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉
      final long deliveryTag = message.getMessageProperties().getDeliveryTag();
      try {
      log.info("[listenerAutoAck 监听的消息] - [{}]", book.toString());
      // TODO 通知 MQ 消息已被成功消费,可以ACK了
      channel.basicAck(deliveryTag, false);
      } catch (IOException e) {
      try {
      // TODO 处理失败,重新压入MQ
      channel.basicRecover();
      } catch (IOException e1) {
      e1.printStackTrace();
      }
      }
      }

    @RabbitListener(queues = {RabbitConfig.MANUAL_BOOK_QUEUE})
    public void listenerManualAck(Book book, Message message, Channel channel) {
    log.info("[listenerManualAck 监听的消息] - [{}]", book.toString());
    try {
    // TODO 通知 MQ 消息已被成功消费,可以ACK了
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    } catch (IOException e) {
    // TODO 如果报错了,那么我们可以进行容错处理,比如转移当前消息进入其它队列
    }
    }
    }

## 主函数
  • 1

package com.battcn;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**

  • @author Levin
    */
    @SpringBootApplication
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/662689
推荐阅读
相关标签
  

闽ICP备14008679号