赞
踩
目录
RabbitMQ是一种基于amq协议的消息队列,本文主要记录一下使用 spring-boot-starter-amqp 操作 rabbitmq。
a) 虚拟主机(vhost)
虚拟主机:一个虚拟主机持有一组交换机、队列和绑定。虚拟主机的作用在于进行权限管控,rabbitmq默认有一个虚拟主机"/"。可以使用rabbitmqctl add_vhost
命令添加虚拟主机,然后使用rabbitmqctl set_permissions
命令设置指定用户在指定虚拟主机下的权限,以此达到权限管控的目的。
b) 消息通道(channel)
消息通道: 在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
c) 交换机(exchange)
交换机: exchange的功能是用于消息分发,它负责接收消息并转发到与之绑定的队列,exchange不存储消息,如果一个exchange没有binding任何Queue,那么当它会丢弃生产者发送过来的消息,在启用ACK机制后,如果exchange找不到队列,则会返回错误。一个exchange可以和多个Queue进行绑定。
交换机有四种类型:
direct 是 rabbitmq 的默认交换机类型。根据 routingKey 完全匹配
direct 类型的行为是"先匹配, 再投送"。即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去。
根据绑定关键字通配符规则匹配、比较灵活
类似路由模式,但是 routing_key 支持模糊匹配,按规则转发消息(最灵活)。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。
不需要指定 routingkey,相当于群发
转发消息到所有绑定队列,忽略 routing_key。
不太常用,可以自定义匹配规则
设置header attribute参数类型的交换机。相较于 direct 和 topic 固定地使用 routing_key , headers 则是一个自定义匹配规则的类型,忽略routing_key。在队列与交换器绑定时, 会设定一组键值对规则, 消息中也包括一组键值对( headers 属性), 当这些键值对有一对, 或全部匹配时, 消息被投送到对应队列。
在绑定Queue与Exchange时指定一组键值对,当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配。如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers属性是一个键值对,可以是Hashtable,键值对的值可以是任何类型。
spring-boot-starter-amqp
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
在 application.yml 中添加 rabbitmq 连接信息
- spring:
- application:
- name: homepage-rabbitMQ
- rabbitmq:
- host: localhost
- port: 5672
- username: springcloud
- password: 123456
- virtual-host: /spring_cloud
-
- server:
- port: 8400
-
- eureka:
- client:
- service-url:
- #将自己注册进下面这个地址的服务注册中心
- defaultZone: http://admin:admin@localhost:8000/eureka/
spring-boot-starter-amqp依赖为我们提供了两个jar
操作对象 | 类型 | 描述 |
---|---|---|
AmqpTemplate | interface | 所属jar包:spring-amqp.jar --》org.springframework.amqp.core |
AmqpAdmin | interface | 所属jar包:spring-amqp.jar --》org.springframework.amqp.core |
RabbitTemplate | class | 实现了 AmqpTemplate 接口 所属jar包:spring-rabbit.jar --》org.springframework.amqp.rabbit.core |
RabbitAdmin | class | 实现了 AmqpAdmin 接口 所属jar包:spring-rabbit.jar --》org.springframework.amqp.rabbit.core |
问:rabbitTemplate 和 amqpTemplate 有什么关系?
答:源码中会发现 rabbitTemplate 实现自 amqpTemplate 接口,使用起来并无区别,需引入spring-boot-starter-amqp依赖
下面文字来自官方文档:
与Spring框架和相关项目提供的许多其他高级抽象一样,Spring AMQP提供了一个“template”,它扮演着核心角色。定义主要操作的接口称为 AmqpTemplate。这些操作涵盖了发送和接收消息的一般行为。换句话说,它们对于任何实现都不是惟一的,因此名称中有“AMQP”。另一方面,该接口的一些实现与AMQP协议的实现绑定在一起。与JMS本身是接口级API不同,AMQP是一个线级协议。该协议的实现提供了自己的客户机库,因此模板接口的每个实现都依赖于特定的客户机库。目前,只有一个实现:RabbitTemplate。在接下来的示例中,您将经常看到“AmqpTemplate”的用法,但是当您查看配置示例,或者调用模板实例化和/或setter的任何代码摘录时,您将看到实现类型(例如,“RabbitTemplate”)。
该类封装了对 RabbitMQ 的管理操作
主要用于在Java代码中对理队和队列进行管理,用于创建、绑定、删除队列与交换机,发送消息等
- @Bean
- public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
- return new RabbitAdmin(connectionFactory);
- }
-
- @Autowired
- private RabbitAdmin rabbitAdmin;
Exchange 操作
- //创建四种类型的 Exchange,均为持久化,不自动删除
- rabbitAdmin.declareExchange(new DirectExchange("direct.exchange",true,false));
- rabbitAdmin.declareExchange(new TopicExchange("topic.exchange",true,false));
- rabbitAdmin.declareExchange(new FanoutExchange("fanout.exchange",true,false));
- rabbitAdmin.declareExchange(new HeadersExchange("header.exchange",true,false));
- //删除 Exchange
- rabbitAdmin.deleteExchange("header.exchange");
Queue 操作
- //定义队列,均为持久化
- rabbitAdmin.declareQueue(new Queue("debug",true));
- rabbitAdmin.declareQueue(new Queue("info",true));
- rabbitAdmin.declareQueue(new Queue("error",true));
- //删除队列
- rabbitAdmin.deleteQueue("debug");
- //将队列中的消息全消费掉
- rabbitAdmin.purgeQueue("info",false);
Binding 绑定
- //绑定队列到交换器,通过路由键
- rabbitAdmin.declareBinding(new Binding("debug",Binding.DestinationType.QUEUE,
- "direct.exchange","key.1",new HashMap()));
-
- rabbitAdmin.declareBinding(new Binding("info",Binding.DestinationType.QUEUE,
- "direct.exchange","key.2",new HashMap()));
-
- rabbitAdmin.declareBinding(new Binding("error",Binding.DestinationType.QUEUE,
- "direct.exchange","key.3",new HashMap()));
-
- //进行解绑
- rabbitAdmin.removeBinding(BindingBuilder.bind(new Queue("info")).
- to(new TopicExchange("direct.exchange")).with("key.2"));
-
- //使用BindingBuilder进行绑定
- rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("info")).
- to(new TopicExchange("topic.exchange")).with("key.#"));
-
- //声明topic类型的exchange
- rabbitAdmin.declareExchange(new TopicExchange("exchange1",true,false));
- rabbitAdmin.declareExchange(new TopicExchange("exchange2",true,false));
-
- //exchange与exchange绑定
- rabbitAdmin.declareBinding(new Binding("exchange1",Binding.DestinationType.EXCHANGE,
- "exchange2","key.4",new HashMap()));
个人案例:
- package com.mq;
-
- import homepage.ApplicationMQ;
- import org.junit.Test;
- import org.junit.runner.RunWith;
- import org.springframework.amqp.core.*;
- import org.springframework.amqp.rabbit.core.RabbitAdmin;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
- import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
-
- /**
- * RabbitAdmin用于创建、绑定、删除队列与交换机,发送消息等
- */
- @RunWith(SpringJUnit4ClassRunner.class)
- @SpringBootTest(classes = ApplicationMQ.class)
- public class RabbitAdminTest {
-
- @Autowired
- private RabbitAdmin rabbitAdmin;
-
- /**
- * 创建绑定Direct路由模式
- * routingKey 完全匹配
- * Binding(String destination, Binding.DestinationType destinationType, String exchange, String routingKey, Map<String, Object> arguments)
- * Binding(目的地, 目的地类型, exchange, routingKey, 参数)
- */
- @Test
- public void testDirect() {
- //切记命名不能重复复
- final String QUEUE_NAME="test.direct.queue";
- final String EXCHANGE_NAME="test.direct";
- //创建队列
- Queue directQueue=new Queue(QUEUE_NAME);
- rabbitAdmin.declareQueue(directQueue);
- //创建Direct交换机
- DirectExchange directExchange=new DirectExchange(EXCHANGE_NAME);
- rabbitAdmin.declareExchange(directExchange);
- //绑定交换机和队列(注意:绑定的时候,一定要确认绑定的双方都是存在的,否则会报IO异常,NOT_FOUND)
- Binding directBinding=new Binding(QUEUE_NAME, Binding.DestinationType.QUEUE, EXCHANGE_NAME, "mq.direct", null);
- rabbitAdmin.declareBinding(directBinding);
- }
-
- /**
- * 创建绑定Topic主题模式\通配符模式
- * routingKey 模糊匹配
- * BindingBuilder.bind(queue).to(exchange).with(routingKey)
- */
- @Test
- public void testTopic() {
- rabbitAdmin.declareQueue(new Queue("test.topic.queue", true, false, false));
- rabbitAdmin.declareExchange(new TopicExchange("test.topic", true, false));
- //如果注释掉上面两句实现声明,直接进行下面的绑定竟然不行,该版本amqp-client采用的是5.1.2,将上面两行代码放开,则运行成功
- rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("test.topic.queue", true, false, false)).to(new TopicExchange("test.topic", true, false)).with("mq.topic"));
- }
-
- /**
- * 创建绑定Fanout发布订阅模式
- * BindingBuilder.bind(queue).to(FanoutExchange)
- */
- @Test
- public void testFanout() {
- rabbitAdmin.declareQueue(new Queue("test.fanout.queue", true, false, false, null));
- rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", true, false, null));
- rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("test.fanout.queue", true, false, false)).to(new FanoutExchange("test.fanout", true, false)));
- rabbitAdmin.purgeQueue("test.direct.queue", false);//清空队列消息
- }
- }
Spring AMQP 提供了 RabbitTemplate 来简化 RabbitMQ 发送和接收消息操作
RabbitTemplate 初始化
设置 RabbitTemplate 的默认交换器、默认路由键、默认队列
send | 数据以Message类型传入,自定义消息 Message |
---|---|
convertAndSend | 数据以Object类型传入,自动将 Java 对象序列化包装成 Message 对象,Java 对象需要实现 Serializable 序列化接口 |
receive | 数据以Message类型返回,返回 Message 对象 |
receiveAndConvert | 数据以Object类型返回,会自动将返回的 Message 反序列化转换成 Java 对象 |
发送消息
(1)send (自定义消息 Message)
- Message message = new Message("hello".getBytes(),new MessageProperties());
- // 发送消息到默认的交换器,默认的路由键
- rabbitTemplate.send(message);
- // 发送消息到指定的交换器,指定的路由键
- rabbitTemplate.send("direct.exchange","key.1",message);
- // 发送消息到指定的交换器,指定的路由键
- rabbitTemplate.send("direct.exchange","key.1",message,new CorrelationData(UUID.randomUUID().toString()));
(2)convertAndSend(自动 Java 对象包装成 Message 对象,Java 对象需要实现 Serializable 序列化接口)
- User user = new User("linyuan");
- // 发送消息到默认的交换器,默认的路由键
- rabbitTemplate.convertAndSend(user);
- // 发送消息到指定的交换器,指定的路由键,设置消息 ID
- rabbitTemplate.convertAndSend("direct.exchange","key.1",user,new CorrelationData(UUID.randomUUID().toString()));
- // 发送消息到指定的交换器,指定的路由键,在消息转换完成后,通过 MessagePostProcessor 来添加属性
- rabbitTemplate.convertAndSend("direct.exchange","key.1",user,mes -> {
- mes.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
- return mes;
- });
接收消息
(1)receive(返回 Message 对象)
- // 接收来自指定队列的消息,并设置超时时间
- Message msg = rabbitTemplate.receive("debug",2000l);
(2)receiveAndConvert(将返回 Message 转换成 Java 对象)
User user = (User) rabbitTemplate.receiveAndConvert();
我们在发送消息之前需要做一些准备,比如我们需要保证发送到的消息队列、交换机是存在的,不然我们发送给谁?
如果不存在我们就需要创建这些,为了保证肯定存在,我们也可以每次执行前都进行创建(已经存在的重复创建似乎没有影响具体我也没有详细研究过)
创建发送的对象有两种方式
(1)@Configuration和@Bean配置队列conf,指定
rabbitConfig.properties:
- learn.direct.queue=learn.direct.queue
- learn.topic.queue=learn.topic.queue
- learn.fanout.queue=learn.fanout.queue
-
- learn.direct.exchange=learn.direct.exchange
- learn.topic.exchange=learn.topic.exchange
- learn.fanout.exchange=learn.fanout.exchange
RabbitConfig :
- package com.marvin.demo.config;
-
-
- import org.springframework.amqp.core.*;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.context.annotation.PropertySource;
-
- @Configuration
- @PropertySource("classpath:rabbitConfig.properties")
- public class RabbitConfig {
-
- @Value("${learn.direct.queue}")
- private String directQueue;
- @Value("${learn.topic.queue}")
- private String topicQueue;
- @Value("${learn.fanout.queue}")
- private String fanoutQueue;
-
-
- @Value("${learn.direct.exchange}")
- private String directExchange;
- @Value("${learn.topic.exchange}")
- private String topicExchange;
- @Value("${learn.fanout.exchange}")
- private String fanoutExchange;
-
- //创建队列
- @Bean("vipDirectQueue")
- public Queue getDirectQueue(){
- return new Queue(directQueue);
- }
- @Bean("vipTopicQueue")
- public Queue getTopicQueue(){
- return new Queue(topicQueue);
- }
- @Bean("vipFanoutQueue")
- public Queue getFanoutQueue(){
- return new Queue(fanoutQueue);
- }
-
- //创建交换机
- @Bean("vipDirectExchange")
- public DirectExchange getDirectExchange(){
- return new DirectExchange(directExchange);
- }
- @Bean("vipTopicExchange")
- public TopicExchange getTopicExchange(){
- return new TopicExchange(topicExchange);
- }
- @Bean("vipFanoutExchange")
- public FanoutExchange getFanoutExchange(){
- return new FanoutExchange(fanoutExchange);
- }
-
- //绑定
- @Bean
- public Binding bindingDirectQueue(@Qualifier("vipDirectQueue") Queue queue, @Qualifier("vipDirectExchange")DirectExchange exchange){
- return BindingBuilder.bind(queue).to(exchange).with("test");
- }
-
- }
springboot启动类:
- package com.marvin.demo;
-
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
-
- @SpringBootApplication
- public class ApplicationConsumer {
- public static void main(String[] args) {
- SpringApplication.run(ApplicationConsumer.class);
- }
- }
-
(2)RabbitMQ管理界面手动添加
AmqpTemplate : spring 封装的MQ的模版,直接调用即可
MessageConvert
- 涉及网络传输的应用序列化不可避免,发送端以某种规则将消息转成 byte 数组进行发送,接收端则以约定的规则进行 byte[] 数组的解析
- RabbitMQ 的序列化是指 Message 的 body 属性,即我们真正需要传输的内容,RabbitMQ 抽象出一个 MessageConvert 接口处理消息的序列化,其实现有 SimpleMessageConverter(默认)、Jackson2JsonMessageConverter 等
- 当调用了 convertAndSend 方法时会使用 MessageConvert 进行消息的序列化
- SimpleMessageConverter 对于要发送的消息体 body 为 byte[] 时不进行处理,如果是 String 则转成字节数组,如果是 Java 对象,则使用 jdk 序列化将消息转成字节数组,转出来的结果较大,含class类名,类相应方法等信息。因此性能较差
- 当使用 RabbitMQ 作为中间件时,数据量比较大,此时就要考虑使用类似 Jackson2JsonMessageConverter 等序列化形式以此提高性能
pojo对象
- package com.marvin.demo.entity;
-
- import java.io.Serializable;
-
- public class UserBean implements Serializable {
- private Integer id;
- private String username;
- private String pwd;
-
- public UserBean(Integer id, String username, String pwd) {
- this.id = id;
- this.username = username;
- this.pwd = pwd;
- }
-
- //此处省略get、set方法。。。
-
- @Override
- public String toString() {
- return "UserBean{" +
- "id=" + id +
- ", username='" + username + '\'' +
- ", pwd='" + pwd + '\'' +
- '}';
- }
- }
convertAndSend类
- public class HelloSender {
-
- // spring boot 为我们提供的包装类,此处个人就不写SET方法
- @Autowired
- private AmqpTemplate amqpTemplate;
-
- /**
- * 直接使用convertAndSend,会序列化对象,在接值的一方参数类型一定要一致
- */
- public void testDirect3() {
- UserBean userBean=new UserBean(3,"cc","cc");
- // 调用 发送消息的方法
- amqpTemplate.convertAndSend("learn_annotation_DirectExchange","directQueue3",userBean);
- }
-
- }
监听接收类
- /**
- * 使用convertAndSend传参时会自动序列化
- * 监听接值的方法参数一定要一致才会自动转换
- * @param userBean
- */
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(value = "learn_annotation_DirectQueue3"),
- exchange = @Exchange(value = "learn_annotation_DirectExchange",type = "direct"),
- key = "directQueue3"
- ))
- public void processDirect3(UserBean userBean){
- log.info("enter ConsumerDirect-->processDirect3()~~~~~~~~~~~~~~~~~~~");
- //接收参数自动转换反序列化
- System.out.println("ConsumerDirect queue3 msg:"+userBean);
- System.out.println("ConsumerDirect Object3 UserBean:"+userBean.toString());
- }
结果:
其他例子
- @Autowired
- private AmqpTemplate amqpTemplate; 注入模板
-
- /**
- * 封装发送到消息队列的方法
- *
- * @param id
- * @param type 发送消息的类型
- */
- private void sendMessage(Long id, String type) {
- log.info("发送消息到mq");
- try {
- amqpTemplate.convertAndSend("item." + type, id);
- } catch (Exception e) {
- log.error("{}商品消息发送异常,商品ID:{}", type, id, e);
- }
- }
添加 @RabbitListener 注解来指定某方法作为消息消费的方法,例如监听某 Queue 里面的消息
注解 | 描述 |
---|---|
@RabbitListener | 该注解指定目标方法来作为消费消息的方法 通过注解参数指定所监听的队列或者Binding 也可以标注在类上面,但需配合 @RabbitHandler 注解一起使用 |
@QueueBinding | 将交换机和队列绑定 例:@QueueBinding( key = "computer" |
@Queue | 声明队列 (durable = "true" 表示持久化的) 例:@Queue(name = "ly.search.insert.queue", durable = "true") |
@Exchange | 声明交换机(type = ExchangeTypes.TOPIC 表示交换机类型) 例:@Exchange(name = "ly.item.exchange", type = ExchangeTypes.TOPIC) |
@RabbitHandler | 如果 @RabbitListener 标注在类上面,就表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理 具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型 注意:最好类都存在一个兜底方法,防止参数类型找不到 例: @RabbitHandler(isDefault = true) |
@Payload | |
@Headers、@Header |
案例:
- //1. @RabbitListener(queues = "myQueue") // 不能自动创建队列
- //2. 自动创建队列 @RabbitListener(queuesToDeclare = @Queue("myQueue"))
- //3. 自动创建, Exchange和Queue绑定
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue("myQueue"),
- exchange = @Exchange("myExchange")
- ))
- public void process(String message) {
- log.info("MqReceiver: {}", message);
- }
-
- /**
- * 数码供应商服务 接收消息
- * @param message
- */
- @RabbitListener(bindings = @QueueBinding(
- exchange = @Exchange("myOrder"),
- key = "computer",
- value = @Queue("computerOrder")
- ))
- public void processComputer(String message) {
- log.info("computer MqReceiver: {}", message);
- }
-
-
- /**
- * 水果供应商服务 接收消息
- * @param message
- */
- @RabbitListener(bindings = @QueueBinding(
- exchange = @Exchange("myOrder"),
- key = "fruit",
- value = @Queue("fruitOrder")
- ))
- public void processFruit(String message) {
- log.info("fruit MqReceiver: {}", message);
- }
- /**
- * 1、ExchangeTypes:可以从这个抽象类里获取类型,避免手写出错
- * 2、key:是String[]数组,可以设置多个key
- */
-
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "ly.search.insert.queue", durable = "true"),
- exchange = @Exchange(name = "ly.item.exchange", type = ExchangeTypes.TOPIC, ignoreDeclarationExceptions = "true"),
- key = {"item.insert", "item.update"}
- ))
- public void process1(Long id) {
- //需要做的动作。。。
- }
-
-
- @RabbitListener(
- bindings = @QueueBinding(
- value = @Queue(value = "order-queue", durable = "true"),
- exchange = @Exchange(value = "order-exchange", durable = "true", type = "topic"),
- key = "order.*"
- )
- )
- public void process2(String message) {
- //需要做的动作。。。
-
- }
下面是注解的源码
queues :不能自动创建队列
queuesToDeclare :自动创建队列
bindings :属性自动创建, Exchange和Queue绑定
- @Target({ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE})
- @Retention(RetentionPolicy.RUNTIME)
- @MessageMapping
- @Documented
- @Repeatable(RabbitListeners.class)
- public @interface RabbitListener {
- String id() default "";
-
- String containerFactory() default "";
-
- String[] queues() default {};
-
- Queue[] queuesToDeclare() default {};
-
- boolean exclusive() default false;
-
- String priority() default "";
-
- String admin() default "";
-
- QueueBinding[] bindings() default {};
-
- String group() default "";
-
- String returnExceptions() default "";
-
- String errorHandler() default "";
-
- String concurrency() default "";
-
- String autoStartup() default "";
- }
默认情况下如果一个 Message 被消费者所正确接收则会被从 Queue 中移除
如果一个 Queue 没被任何消费者订阅,那么这个 Queue 中的消息会被 Cache(缓存),当有消费者订阅时则会立即发送,当 Message 被消费者正确接收时,就会被从 Queue 中移除
通过 ConfirmCallback 和 ReturnCallback 来保证消息发送成功
区别:
ConfirmCallback :保证生产者到 Exchange 的发送
ReturnCallback :保证 Exchange 到 Queue 的发送
使用场景:
- 如果消息没有到 exchange ,则 confirm 回调, ack = false
- 如果消息到达 exchange ,则 confirm 回调, ack = true
- exchange 到 queue 成功,则不回调 return
- exchange 到 queue 失败,则回调 return (需设置mandatory=true,否则不会回调,消息就丢了)
问:发送的消息怎么样才算失败或成功?如何确认?
答:当消息无法路由到队列时,确认消息路由失败。消息成功路由时,当需要发送的队列都发送成功后,进行确认消息,对于持久化队列意味着写入磁盘,对于镜像队列意味着所有镜像接收成功
判断消息发送到 Exchange 是否成功
通过实现 ConfirmCallback 接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中
- @Component
- public class RabbitTemplateConfig implements RabbitTemplate.ConfirmCallback{
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @PostConstruct
- public void init(){
- rabbitTemplate.setConfirmCallback(this); //指定 ConfirmCallback
- }
-
- /**
- * 当消息发送到交换机(exchange)时,该方法被调用.
- * 1.如果消息没有到exchange,则 ack=false
- * 2.如果消息到达exchange,则 ack=true
- * @param correlationData:唯一标识
- * @param ack:确认结果
- * @param cause:引起原因
- */
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
-
- System.out.println("消息唯一标识:"+correlationData);
- System.out.println("确认结果:"+ack);
- System.out.println("引起原因:"+cause);
-
- if(ack){
- //如果confirm返回成功 则进行更新
- System.out.println("confirm消息确认成功");
- } else {
- //(nack)失败则进行具体的后续操作:重试 或者补偿等手段
- System.out.println("confirm消息确认失败,异常处理...");
- }
-
- }
- }
还需要在配置文件添加配置
- spring:
- rabbitmq:
- publisher-confirms: true
判断消息从 Exchange 发送到 Queue 是否成功,失败调用该方法(成功不调用)
通过实现 ReturnCallback 接口,启动消息失败返回,比如路由不到队列时触发回调
- @Component
- public class RabbitTemplateConfig implements RabbitTemplate.ReturnCallback{
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @PostConstruct
- public void init(){
- rabbitTemplate.setReturnCallback(this); //指定 ReturnCallback
- }
-
- /**
- * 当消息从交换机到队列失败时,该方法被调用。(若成功,则不调用)
- * 需要注意的是:该方法调用后,MsgSendConfirmCallBack中的confirm方法也会被调用,且ack = true
- * @param message:传递的消息主体
- * @param replyCode:问题状态码
- * @param replyText:问题描述
- * @param exchange:使用的交换器
- * @param routingKey:使用的路由键
- */
- @Override
- public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
-
- //失败则进行具体的后续操作:重试 或者补偿等手段。。。
-
- System.out.println("消息主体 message : "+message);
- System.out.println("问题状态码 : "+replyCode);
- System.out.println("问题描述:"+replyText);
- System.out.println("消息使用的交换器 exchange : "+exchange);
- System.out.println("消息使用的路由键 routing : "+routingKey);
- }
- }
还需要在配置文件添加配置
- spring:
- rabbitmq:
- publisher-returns: true
消息消费者如何通知 Rabbit 消息消费成功?
答:
默认情况下消息消费者是自动 ack (确认)消息的,如果要手动 ack(确认)则需要修改确认模式为 manual
- spring:
- rabbitmq:
- listener:
- simple:
- acknowledge-mode: manual
或在 RabbitListenerContainerFactory 中进行开启手动 ack
- @Bean
- public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
- SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
- factory.setConnectionFactory(connectionFactory);
- factory.setMessageConverter(new Jackson2JsonMessageConverter());
- factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); //开启手动 ack
- return factory;
- }
1.1、确认消息
- @RabbitHandler
- public void processMessage2(String message,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long tag) {
- System.out.println(message);
- try {
- channel.basicAck(tag,false); // 确认消息
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
需要注意的 basicAck 方法需要传递两个参数
1.2、手动否认、拒绝消息
发送一个 header 中包含 error 属性的消息
消费者获取消息时检查到头部包含 error 则 nack 消息
- @RabbitHandler
- public void processMessage2(String message, Channel channel,@Headers Map<String,Object> map) {
- System.out.println(message);
- if (map.get("error")!= null){
- System.out.println("错误的消息");
- try {
- channel.basicNack((Long)map.get(AmqpHeaders.DELIVERY_TAG),false,true); //否认消息
- return;
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- try {
- channel.basicAck((Long)map.get(AmqpHeaders.DELIVERY_TAG),false); //确认消息
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
此时控制台重复打印,说明该消息被 nack 后一直重新入队列然后一直重新消费
- hello
- 错误的消息
- hello
- 错误的消息
- hello
- 错误的消息
- hello
- 错误的消息
也可以拒绝该消息,消息会被丢弃,不会重回队列
channel.basicReject((Long)map.get(AmqpHeaders.DELIVERY_TAG),false); //拒绝消息
自动确认涉及到一个问题就是如果在处理消息的时候抛出异常,消息处理失败,但是因为自动确认而导致 Rabbit 将该消息删除了,造成消息丢失
- @Bean
- public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
- SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
- container.setConnectionFactory(connectionFactory);
- container.setQueueNames("consumer_queue"); // 监听的队列
- container.setAcknowledgeMode(AcknowledgeMode.NONE); // NONE 代表自动确认
- container.setMessageListener((MessageListener) message -> { //消息监听处理
- System.out.println("====接收到消息=====");
- System.out.println(new String(message.getBody()));
- //相当于自己的一些消费逻辑抛错误
- throw new NullPointerException("consumer fail");
- });
- return container;
- }
手动确认消息
- @Bean
- public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
- SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
- container.setConnectionFactory(connectionFactory);
- container.setQueueNames("consumer_queue"); // 监听的队列
- container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手动确认
- container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> { //消息处理
- System.out.println("====接收到消息=====");
- System.out.println(new String(message.getBody()));
- if(message.getMessageProperties().getHeaders().get("error") == null){
- channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
- System.out.println("消息已经确认");
- }else {
- //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
- channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
- System.out.println("消息拒绝");
- }
-
- });
- return container;
- }
AcknowledgeMode 除了 NONE 和 MANUAL 之外还有 AUTO ,它会根据方法的执行情况来决定是否确认还是拒绝(是否重新入queue)
- @Bean
- public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
- SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
- container.setConnectionFactory(connectionFactory);
- container.setQueueNames("consumer_queue"); // 监听的队列
- container.setAcknowledgeMode(AcknowledgeMode.AUTO); // 根据情况确认消息
- container.setMessageListener((MessageListener) (message) -> {
- System.out.println("====接收到消息=====");
- System.out.println(new String(message.getBody()));
- //抛出NullPointerException异常则重新入队列
- //throw new NullPointerException("消息消费失败");
- //当抛出的异常是AmqpRejectAndDontRequeueException异常的时候,则消息会被拒绝,且requeue=false
- //throw new AmqpRejectAndDontRequeueException("消息消费失败");
- //当抛出ImmediateAcknowledgeAmqpException异常,则消费者会被确认
- throw new ImmediateAcknowledgeAmqpException("消息消费失败");
- });
- return container;
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。