当前位置:   article > 正文

RabbitMQ(二)Springboot整合及使用_convertandsend用法

convertandsend用法

目录

一、RabbitMQ的重要概念

二、依赖与配置

1、添加Maven依赖

2、添加相关配置

三、spring操作rabbitMQ的对象(重要)

1、RabbitAdmin

2、RabbitTemplate

四、消息生产者

1、创建消息队列、交换机

2、生产消息并发送 

五、消息消费者(重要)

1、@RabbitListener

六、消息发送确认 (重要)

1、ConfirmCallback

2、ReturnCallback

七、消息接收确认(重要)

1、确认消息(局部方法处理消息)

2、确认消息(全局处理消息)

消息可靠总结


 

一、RabbitMQ的重要概念

RabbitMQ是一种基于amq协议的消息队列,本文主要记录一下使用 spring-boot-starter-amqp 操作 rabbitmq。

a) 虚拟主机(vhost)

 虚拟主机:一个虚拟主机持有一组交换机、队列和绑定。虚拟主机的作用在于进行权限管控,rabbitmq默认有一个虚拟主机"/"。可以使用rabbitmqctl add_vhost命令添加虚拟主机,然后使用rabbitmqctl set_permissions命令设置指定用户在指定虚拟主机下的权限,以此达到权限管控的目的。

b) 消息通道(channel)

消息通道: 在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

c) 交换机(exchange)

交换机: exchange的功能是用于消息分发,它负责接收消息并转发到与之绑定的队列,exchange不存储消息,如果一个exchange没有binding任何Queue,那么当它会丢弃生产者发送过来的消息,在启用ACK机制后,如果exchange找不到队列,则会返回错误。一个exchange可以和多个Queue进行绑定。

交换机有四种类型:

  • 路由模式(Direct):默认

direct 是 rabbitmq 的默认交换机类型。根据 routingKey 完全匹配

​direct 类型的行为是"先匹配, 再投送"。即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去。

  • 主题模式\通配符模式(Topic):

根据绑定关键字通配符规则匹配、比较灵活

​类似路由模式,但是 routing_key 支持模糊匹配,按规则转发消息(最灵活)。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。

  • 发布订阅模式(Fanout):

不需要指定 routingkey,相当于群发

​转发消息到所有绑定队列,忽略 routing_key

  • Headers:

不太常用,可以自定义匹配规则

设置header attribute参数类型的交换机。相较于 direct 和 topic 固定地使用 routing_key , headers 则是一个自定义匹配规则的类型,忽略routing_key。在队列与交换器绑定时, 会设定一组键值对规则, 消息中也包括一组键值对( headers 属性), 当这些键值对有一对, 或全部匹配时, 消息被投送到对应队列。
​ 在绑定Queue与Exchange时指定一组键值对,当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配。如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers属性是一个键值对,可以是Hashtable,键值对的值可以是任何类型。

二、依赖与配置

1、添加Maven依赖

spring-boot-starter-amqp

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-web</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-amqp</artifactId>
  8. </dependency>
  9. <dependency>
  10. <groupId>org.springframework.boot</groupId>
  11. <artifactId>spring-boot-starter-test</artifactId>
  12. <scope>test</scope>
  13. </dependency>

2、添加相关配置

application.yml 中添加 rabbitmq 连接信息

  1. spring:
  2. application:
  3. name: homepage-rabbitMQ
  4. rabbitmq:
  5. host: localhost
  6. port: 5672
  7. username: springcloud
  8. password: 123456
  9. virtual-host: /spring_cloud
  10. server:
  11. port: 8400
  12. eureka:
  13. client:
  14. service-url:
  15. #将自己注册进下面这个地址的服务注册中心
  16. defaultZone: http://admin:admin@localhost:8000/eureka/

三、spring操作rabbitMQ的对象(重要)

spring-boot-starter-amqp依赖为我们提供了两个jar

  • spring-amqp.jar
  • spring-rabbit.jar
操作对象 类型描述
AmqpTemplateinterface

所属jar包:spring-amqp.jar  --》org.springframework.amqp.core

AmqpAdmininterface所属jar包:spring-amqp.jar  --》org.springframework.amqp.core
RabbitTemplateclass

实现了 AmqpTemplate 接口

所属jar包:spring-rabbit.jar --》org.springframework.amqp.rabbit.core

RabbitAdminclass

实现了 AmqpAdmin 接口

所属jar包:spring-rabbit.jar --》org.springframework.amqp.rabbit.core

问:rabbitTemplate 和 amqpTemplate 有什么关系?

答:源码中会发现 rabbitTemplate 实现自 amqpTemplate 接口,使用起来并无区别,需引入spring-boot-starter-amqp依赖

下面文字来自官方文档:

与Spring框架和相关项目提供的许多其他高级抽象一样,Spring AMQP提供了一个“template”,它扮演着核心角色。定义主要操作的接口称为 AmqpTemplate。这些操作涵盖了发送和接收消息的一般行为。换句话说,它们对于任何实现都不是惟一的,因此名称中有“AMQP”。另一方面,该接口的一些实现与AMQP协议的实现绑定在一起。与JMS本身是接口级API不同,AMQP是一个线级协议。该协议的实现提供了自己的客户机库,因此模板接口的每个实现都依赖于特定的客户机库。目前,只有一个实现:RabbitTemplate。在接下来的示例中,您将经常看到“AmqpTemplate”的用法,但是当您查看配置示例,或者调用模板实例化和/或setter的任何代码摘录时,您将看到实现类型(例如,“RabbitTemplate”)。 

1、RabbitAdmin

该类封装了对 RabbitMQ 的管理操作

主要用于在Java代码中对理队和队列进行管理,用于创建、绑定、删除队列与交换机,发送消息等

  1. @Bean
  2. public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
  3. return new RabbitAdmin(connectionFactory);
  4. }
  5. @Autowired
  6. private RabbitAdmin rabbitAdmin;

Exchange 操作

  1. //创建四种类型的 Exchange,均为持久化,不自动删除
  2. rabbitAdmin.declareExchange(new DirectExchange("direct.exchange",true,false));
  3. rabbitAdmin.declareExchange(new TopicExchange("topic.exchange",true,false));
  4. rabbitAdmin.declareExchange(new FanoutExchange("fanout.exchange",true,false));
  5. rabbitAdmin.declareExchange(new HeadersExchange("header.exchange",true,false));
  6. //删除 Exchange
  7. rabbitAdmin.deleteExchange("header.exchange");

Queue 操作

  1. //定义队列,均为持久化
  2. rabbitAdmin.declareQueue(new Queue("debug",true));
  3. rabbitAdmin.declareQueue(new Queue("info",true));
  4. rabbitAdmin.declareQueue(new Queue("error",true));
  5. //删除队列
  6. rabbitAdmin.deleteQueue("debug");
  7. //将队列中的消息全消费掉
  8. rabbitAdmin.purgeQueue("info",false);

Binding 绑定

  1. //绑定队列到交换器,通过路由键
  2. rabbitAdmin.declareBinding(new Binding("debug",Binding.DestinationType.QUEUE,
  3. "direct.exchange","key.1",new HashMap()));
  4. rabbitAdmin.declareBinding(new Binding("info",Binding.DestinationType.QUEUE,
  5. "direct.exchange","key.2",new HashMap()));
  6. rabbitAdmin.declareBinding(new Binding("error",Binding.DestinationType.QUEUE,
  7. "direct.exchange","key.3",new HashMap()));
  8. //进行解绑
  9. rabbitAdmin.removeBinding(BindingBuilder.bind(new Queue("info")).
  10. to(new TopicExchange("direct.exchange")).with("key.2"));
  11. //使用BindingBuilder进行绑定
  12. rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("info")).
  13. to(new TopicExchange("topic.exchange")).with("key.#"));
  14. //声明topic类型的exchange
  15. rabbitAdmin.declareExchange(new TopicExchange("exchange1",true,false));
  16. rabbitAdmin.declareExchange(new TopicExchange("exchange2",true,false));
  17. //exchange与exchange绑定
  18. rabbitAdmin.declareBinding(new Binding("exchange1",Binding.DestinationType.EXCHANGE,
  19. "exchange2","key.4",new HashMap()));

个人案例:

  1. package com.mq;
  2. import homepage.ApplicationMQ;
  3. import org.junit.Test;
  4. import org.junit.runner.RunWith;
  5. import org.springframework.amqp.core.*;
  6. import org.springframework.amqp.rabbit.core.RabbitAdmin;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.boot.test.context.SpringBootTest;
  9. import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
  10. /**
  11. * RabbitAdmin用于创建、绑定、删除队列与交换机,发送消息等
  12. */
  13. @RunWith(SpringJUnit4ClassRunner.class)
  14. @SpringBootTest(classes = ApplicationMQ.class)
  15. public class RabbitAdminTest {
  16. @Autowired
  17. private RabbitAdmin rabbitAdmin;
  18. /**
  19. * 创建绑定Direct路由模式
  20. * routingKey 完全匹配
  21. * Binding(String destination, Binding.DestinationType destinationType, String exchange, String routingKey, Map<String, Object> arguments)
  22. * Binding(目的地, 目的地类型, exchange, routingKey, 参数)
  23. */
  24. @Test
  25. public void testDirect() {
  26. //切记命名不能重复复
  27. final String QUEUE_NAME="test.direct.queue";
  28. final String EXCHANGE_NAME="test.direct";
  29. //创建队列
  30. Queue directQueue=new Queue(QUEUE_NAME);
  31. rabbitAdmin.declareQueue(directQueue);
  32. //创建Direct交换机
  33. DirectExchange directExchange=new DirectExchange(EXCHANGE_NAME);
  34. rabbitAdmin.declareExchange(directExchange);
  35. //绑定交换机和队列(注意:绑定的时候,一定要确认绑定的双方都是存在的,否则会报IO异常,NOT_FOUND)
  36. Binding directBinding=new Binding(QUEUE_NAME, Binding.DestinationType.QUEUE, EXCHANGE_NAME, "mq.direct", null);
  37. rabbitAdmin.declareBinding(directBinding);
  38. }
  39. /**
  40. * 创建绑定Topic主题模式\通配符模式
  41. * routingKey 模糊匹配
  42. * BindingBuilder.bind(queue).to(exchange).with(routingKey)
  43. */
  44. @Test
  45. public void testTopic() {
  46. rabbitAdmin.declareQueue(new Queue("test.topic.queue", true, false, false));
  47. rabbitAdmin.declareExchange(new TopicExchange("test.topic", true, false));
  48. //如果注释掉上面两句实现声明,直接进行下面的绑定竟然不行,该版本amqp-client采用的是5.1.2,将上面两行代码放开,则运行成功
  49. rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("test.topic.queue", true, false, false)).to(new TopicExchange("test.topic", true, false)).with("mq.topic"));
  50. }
  51. /**
  52. * 创建绑定Fanout发布订阅模式
  53. * BindingBuilder.bind(queue).to(FanoutExchange)
  54. */
  55. @Test
  56. public void testFanout() {
  57. rabbitAdmin.declareQueue(new Queue("test.fanout.queue", true, false, false, null));
  58. rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", true, false, null));
  59. rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("test.fanout.queue", true, false, false)).to(new FanoutExchange("test.fanout", true, false)));
  60. rabbitAdmin.purgeQueue("test.direct.queue", false);//清空队列消息
  61. }
  62. }

2、RabbitTemplate

Spring AMQP 提供了 RabbitTemplate 来简化 RabbitMQ 发送和接收消息操作

RabbitTemplate 初始化

设置 RabbitTemplate 的默认交换器、默认路由键、默认队列

send数据以Message类型传入,自定义消息 Message
convertAndSend数据以Object类型传入,自动将 Java 对象序列化包装成 Message 对象,Java 对象需要实现 Serializable 序列化接口
receive数据以Message类型返回,返回 Message 对象
receiveAndConvert数据以Object类型返回,会自动将返回的 Message 反序列化转换成 Java 对象

发送消息

(1)send (自定义消息 Message)

  1. Message message = new Message("hello".getBytes(),new MessageProperties());
  2. // 发送消息到默认的交换器,默认的路由键
  3. rabbitTemplate.send(message);
  4. // 发送消息到指定的交换器,指定的路由键
  5. rabbitTemplate.send("direct.exchange","key.1",message);
  6. // 发送消息到指定的交换器,指定的路由键
  7. rabbitTemplate.send("direct.exchange","key.1",message,new CorrelationData(UUID.randomUUID().toString()));

(2)convertAndSend(自动 Java 对象包装成 Message 对象,Java 对象需要实现 Serializable 序列化接口)

  1. User user = new User("linyuan");
  2. // 发送消息到默认的交换器,默认的路由键
  3. rabbitTemplate.convertAndSend(user);
  4. // 发送消息到指定的交换器,指定的路由键,设置消息 ID
  5. rabbitTemplate.convertAndSend("direct.exchange","key.1",user,new CorrelationData(UUID.randomUUID().toString()));
  6. // 发送消息到指定的交换器,指定的路由键,在消息转换完成后,通过 MessagePostProcessor 来添加属性
  7. rabbitTemplate.convertAndSend("direct.exchange","key.1",user,mes -> {
  8. mes.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
  9. return mes;
  10. });

接收消息

(1)receive(返回 Message 对象)

  1. // 接收来自指定队列的消息,并设置超时时间
  2. Message msg = rabbitTemplate.receive("debug",2000l);

(2)receiveAndConvert(将返回 Message 转换成 Java 对象)

User user = (User) rabbitTemplate.receiveAndConvert();

四、消息生产者

1、创建消息队列、交换机

我们在发送消息之前需要做一些准备,比如我们需要保证发送到的消息队列、交换机是存在的,不然我们发送给谁?

如果不存在我们就需要创建这些,为了保证肯定存在,我们也可以每次执行前都进行创建(已经存在的重复创建似乎没有影响具体我也没有详细研究过)

创建发送的对象有两种方式

(1)@Configuration和@Bean配置队列conf,指定

rabbitConfig.properties:

  1. learn.direct.queue=learn.direct.queue
  2. learn.topic.queue=learn.topic.queue
  3. learn.fanout.queue=learn.fanout.queue
  4. learn.direct.exchange=learn.direct.exchange
  5. learn.topic.exchange=learn.topic.exchange
  6. learn.fanout.exchange=learn.fanout.exchange

RabbitConfig :

  1. package com.marvin.demo.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.beans.factory.annotation.Qualifier;
  4. import org.springframework.beans.factory.annotation.Value;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. import org.springframework.context.annotation.PropertySource;
  8. @Configuration
  9. @PropertySource("classpath:rabbitConfig.properties")
  10. public class RabbitConfig {
  11. @Value("${learn.direct.queue}")
  12. private String directQueue;
  13. @Value("${learn.topic.queue}")
  14. private String topicQueue;
  15. @Value("${learn.fanout.queue}")
  16. private String fanoutQueue;
  17. @Value("${learn.direct.exchange}")
  18. private String directExchange;
  19. @Value("${learn.topic.exchange}")
  20. private String topicExchange;
  21. @Value("${learn.fanout.exchange}")
  22. private String fanoutExchange;
  23. //创建队列
  24. @Bean("vipDirectQueue")
  25. public Queue getDirectQueue(){
  26. return new Queue(directQueue);
  27. }
  28. @Bean("vipTopicQueue")
  29. public Queue getTopicQueue(){
  30. return new Queue(topicQueue);
  31. }
  32. @Bean("vipFanoutQueue")
  33. public Queue getFanoutQueue(){
  34. return new Queue(fanoutQueue);
  35. }
  36. //创建交换机
  37. @Bean("vipDirectExchange")
  38. public DirectExchange getDirectExchange(){
  39. return new DirectExchange(directExchange);
  40. }
  41. @Bean("vipTopicExchange")
  42. public TopicExchange getTopicExchange(){
  43. return new TopicExchange(topicExchange);
  44. }
  45. @Bean("vipFanoutExchange")
  46. public FanoutExchange getFanoutExchange(){
  47. return new FanoutExchange(fanoutExchange);
  48. }
  49. //绑定
  50. @Bean
  51. public Binding bindingDirectQueue(@Qualifier("vipDirectQueue") Queue queue, @Qualifier("vipDirectExchange")DirectExchange exchange){
  52. return BindingBuilder.bind(queue).to(exchange).with("test");
  53. }
  54. }

 springboot启动类:

  1. package com.marvin.demo;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. @SpringBootApplication
  5. public class ApplicationConsumer {
  6. public static void main(String[] args) {
  7. SpringApplication.run(ApplicationConsumer.class);
  8. }
  9. }

(2)RabbitMQ管理界面手动添加

2、生产消息并发送 

AmqpTemplate : spring 封装的MQ的模版,直接调用即可

  • Send():一般传递数据是Message类型
  • convertAndSend():一般传递数据是Object类型,会自动序列化后传递

MessageConvert

  • 涉及网络传输的应用序列化不可避免,发送端以某种规则将消息转成 byte 数组进行发送,接收端则以约定的规则进行 byte[] 数组的解析
  • RabbitMQ 的序列化是指 Message 的 body 属性,即我们真正需要传输的内容,RabbitMQ 抽象出一个 MessageConvert 接口处理消息的序列化,其实现有 SimpleMessageConverter(默认)、Jackson2JsonMessageConverter 等
  • 当调用了 convertAndSend 方法时会使用 MessageConvert 进行消息的序列化
  • SimpleMessageConverter 对于要发送的消息体 body 为 byte[] 时不进行处理,如果是 String 则转成字节数组,如果是 Java 对象,则使用 jdk 序列化将消息转成字节数组,转出来的结果较大,含class类名,类相应方法等信息。因此性能较差
  • 当使用 RabbitMQ 作为中间件时,数据量比较大,此时就要考虑使用类似 Jackson2JsonMessageConverter 等序列化形式以此提高性能

pojo对象

  1. package com.marvin.demo.entity;
  2. import java.io.Serializable;
  3. public class UserBean implements Serializable {
  4. private Integer id;
  5. private String username;
  6. private String pwd;
  7. public UserBean(Integer id, String username, String pwd) {
  8. this.id = id;
  9. this.username = username;
  10. this.pwd = pwd;
  11. }
  12. //此处省略get、set方法。。。
  13. @Override
  14. public String toString() {
  15. return "UserBean{" +
  16. "id=" + id +
  17. ", username='" + username + '\'' +
  18. ", pwd='" + pwd + '\'' +
  19. '}';
  20. }
  21. }

convertAndSend类 

  1. public class HelloSender {
  2. // spring boot 为我们提供的包装类,此处个人就不写SET方法
  3. @Autowired
  4. private AmqpTemplate amqpTemplate;
  5. /**
  6. * 直接使用convertAndSend,会序列化对象,在接值的一方参数类型一定要一致
  7. */
  8. public void testDirect3() {
  9. UserBean userBean=new UserBean(3,"cc","cc");
  10. // 调用 发送消息的方法
  11. amqpTemplate.convertAndSend("learn_annotation_DirectExchange","directQueue3",userBean);
  12. }
  13. }

监听接收类

  1. /**
  2. * 使用convertAndSend传参时会自动序列化
  3. * 监听接值的方法参数一定要一致才会自动转换
  4. * @param userBean
  5. */
  6. @RabbitListener(bindings = @QueueBinding(
  7. value = @Queue(value = "learn_annotation_DirectQueue3"),
  8. exchange = @Exchange(value = "learn_annotation_DirectExchange",type = "direct"),
  9. key = "directQueue3"
  10. ))
  11. public void processDirect3(UserBean userBean){
  12. log.info("enter ConsumerDirect-->processDirect3()~~~~~~~~~~~~~~~~~~~");
  13. //接收参数自动转换反序列化
  14. System.out.println("ConsumerDirect queue3 msg:"+userBean);
  15. System.out.println("ConsumerDirect Object3 UserBean:"+userBean.toString());
  16. }

结果:

其他例子 

  1. @Autowired
  2. private AmqpTemplate amqpTemplate; 注入模板
  3. /**
  4. * 封装发送到消息队列的方法
  5. *
  6. * @param id
  7. * @param type 发送消息的类型
  8. */
  9. private void sendMessage(Long id, String type) {
  10. log.info("发送消息到mq");
  11. try {
  12. amqpTemplate.convertAndSend("item." + type, id);
  13. } catch (Exception e) {
  14. log.error("{}商品消息发送异常,商品ID:{}", type, id, e);
  15. }
  16. }

五、消息消费者(重要)

添加 @RabbitListener 注解来指定某方法作为消息消费的方法,例如监听某 Queue 里面的消息

注解描述
@RabbitListener

该注解指定目标方法来作为消费消息的方法

通过注解参数指定所监听的队列或者Binding

也可以标注在类上面,但需配合 @RabbitHandler 注解一起使用

@QueueBinding

将交换机和队列绑定

例:@QueueBinding(
            exchange = @Exchange("myOrder"),
            value = @Queue("computerOrder"),

            key = "computer"
    )

@Queue

声明队列 (durable = "true" 表示持久化的)

例:@Queue(name = "ly.search.insert.queue", durable = "true")

@Exchange

声明交换机(type = ExchangeTypes.TOPIC 表示交换机类型)

例:@Exchange(name = "ly.item.exchange", type = ExchangeTypes.TOPIC)

@RabbitHandler

如果 @RabbitListener 标注在类上面,就表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理

具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型

注意:最好类都存在一个兜底方法,防止参数类型找不到

例:

@RabbitHandler(isDefault = true)
private void myMethodName(Object o){
    log.error("没有找到对应转换的方法入参,进入兜底方法 --> {}",o );
}

@Payload

@Headers、@Header

案例:

  1. 直接使用@RabbitListener(queues = "myQueue")  不能自动创建队列
  2. 自动创建队列 @RabbitListener(queuesToDeclare = @Queue("myQueue"))
  3. bindings :属性自动创建, Exchange和Queue绑定
  4. ExchangeTypes:可以从这个抽象类里获取类型,避免手写出错
  5. key:是String[]数组,可以设置多个key
  1. //1. @RabbitListener(queues = "myQueue") // 不能自动创建队列
  2. //2. 自动创建队列 @RabbitListener(queuesToDeclare = @Queue("myQueue"))
  3. //3. 自动创建, Exchange和Queue绑定
  4. @RabbitListener(bindings = @QueueBinding(
  5. value = @Queue("myQueue"),
  6. exchange = @Exchange("myExchange")
  7. ))
  8. public void process(String message) {
  9. log.info("MqReceiver: {}", message);
  10. }
  11. /**
  12. * 数码供应商服务 接收消息
  13. * @param message
  14. */
  15. @RabbitListener(bindings = @QueueBinding(
  16. exchange = @Exchange("myOrder"),
  17. key = "computer",
  18. value = @Queue("computerOrder")
  19. ))
  20. public void processComputer(String message) {
  21. log.info("computer MqReceiver: {}", message);
  22. }
  23. /**
  24. * 水果供应商服务 接收消息
  25. * @param message
  26. */
  27. @RabbitListener(bindings = @QueueBinding(
  28. exchange = @Exchange("myOrder"),
  29. key = "fruit",
  30. value = @Queue("fruitOrder")
  31. ))
  32. public void processFruit(String message) {
  33. log.info("fruit MqReceiver: {}", message);
  34. }

  1. /**
  2. * 1、ExchangeTypes:可以从这个抽象类里获取类型,避免手写出错
  3. * 2、key:是String[]数组,可以设置多个key
  4. */
  5. @RabbitListener(bindings = @QueueBinding(
  6. value = @Queue(name = "ly.search.insert.queue", durable = "true"),
  7. exchange = @Exchange(name = "ly.item.exchange", type = ExchangeTypes.TOPIC, ignoreDeclarationExceptions = "true"),
  8. key = {"item.insert", "item.update"}
  9. ))
  10. public void process1(Long id) {
  11. //需要做的动作。。。
  12. }
  13. @RabbitListener(
  14. bindings = @QueueBinding(
  15. value = @Queue(value = "order-queue", durable = "true"),
  16. exchange = @Exchange(value = "order-exchange", durable = "true", type = "topic"),
  17. key = "order.*"
  18. )
  19. )
  20. public void process2(String message) {
  21. //需要做的动作。。。
  22. }

1、@RabbitListener

下面是注解的源码

queues :不能自动创建队列

queuesToDeclare :自动创建队列

bindings 属性自动创建, Exchange和Queue绑定

  1. @Target({ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE})
  2. @Retention(RetentionPolicy.RUNTIME)
  3. @MessageMapping
  4. @Documented
  5. @Repeatable(RabbitListeners.class)
  6. public @interface RabbitListener {
  7. String id() default "";
  8. String containerFactory() default "";
  9. String[] queues() default {};
  10. Queue[] queuesToDeclare() default {};
  11. boolean exclusive() default false;
  12. String priority() default "";
  13. String admin() default "";
  14. QueueBinding[] bindings() default {};
  15. String group() default "";
  16. String returnExceptions() default "";
  17. String errorHandler() default "";
  18. String concurrency() default "";
  19. String autoStartup() default "";
  20. }

六、消息发送确认 (重要)

默认情况下如果一个 Message 被消费者所正确接收则会被从 Queue 中移除

如果一个 Queue 没被任何消费者订阅,那么这个 Queue 中的消息会被 Cache(缓存),当有消费者订阅时则会立即发送,当 Message 被消费者正确接收时,就会被从 Queue 中移除 

通过 ConfirmCallback ReturnCallback 来保证消息发送成功

区别:

ConfirmCallback :保证生产者到 Exchange 的发送

ReturnCallback :保证 Exchange 到 Queue 的发送

使用场景:

  • 如果消息没有到 exchange ,则 confirm 回调, ack = false
  • 如果消息到达 exchange ,则 confirm 回调, ack = true
  • exchange 到 queue 成功,则不回调 return
  • exchange 到 queue 失败,则回调 return (需设置mandatory=true,否则不会回调,消息就丢了)

问:发送的消息怎么样才算失败或成功?如何确认?

答:当消息无法路由到队列时,确认消息路由失败。消息成功路由时,当需要发送的队列都发送成功后,进行确认消息,对于持久化队列意味着写入磁盘,对于镜像队列意味着所有镜像接收成功

1、ConfirmCallback

判断消息发送到 Exchange 是否成功

通过实现 ConfirmCallback 接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中

  1. @Component
  2. public class RabbitTemplateConfig implements RabbitTemplate.ConfirmCallback{
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. @PostConstruct
  6. public void init(){
  7. rabbitTemplate.setConfirmCallback(this); //指定 ConfirmCallback
  8. }
  9. /**
  10. * 当消息发送到交换机(exchange)时,该方法被调用.
  11. * 1.如果消息没有到exchange,则 ack=false
  12. * 2.如果消息到达exchange,则 ack=true
  13. * @param correlationData:唯一标识
  14. * @param ack:确认结果
  15. * @param cause:引起原因
  16. */
  17. @Override
  18. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  19. System.out.println("消息唯一标识:"+correlationData);
  20. System.out.println("确认结果:"+ack);
  21. System.out.println("引起原因:"+cause);
  22. if(ack){
  23. //如果confirm返回成功 则进行更新
  24. System.out.println("confirm消息确认成功");
  25. } else {
  26. //(nack)失败则进行具体的后续操作:重试 或者补偿等手段
  27. System.out.println("confirm消息确认失败,异常处理...");
  28. }
  29. }
  30. }

还需要在配置文件添加配置

  1. spring:
  2. rabbitmq:
  3. publisher-confirms: true

2、ReturnCallback

判断消息从 Exchange 发送到 Queue 是否成功,失败调用该方法(成功不调用)

通过实现 ReturnCallback 接口,启动消息失败返回,比如路由不到队列时触发回调

  1. @Component
  2. public class RabbitTemplateConfig implements RabbitTemplate.ReturnCallback{
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. @PostConstruct
  6. public void init(){
  7. rabbitTemplate.setReturnCallback(this); //指定 ReturnCallback
  8. }
  9. /**
  10. * 当消息从交换机到队列失败时,该方法被调用。(若成功,则不调用)
  11. * 需要注意的是:该方法调用后,MsgSendConfirmCallBack中的confirm方法也会被调用,且ack = true
  12. * @param message:传递的消息主体
  13. * @param replyCode:问题状态码
  14. * @param replyText:问题描述
  15. * @param exchange:使用的交换器
  16. * @param routingKey:使用的路由键
  17. */
  18. @Override
  19. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  20. //失败则进行具体的后续操作:重试 或者补偿等手段。。。
  21. System.out.println("消息主体 message : "+message);
  22. System.out.println("问题状态码 : "+replyCode);
  23. System.out.println("问题描述:"+replyText);
  24. System.out.println("消息使用的交换器 exchange : "+exchange);
  25. System.out.println("消息使用的路由键 routing : "+routingKey);
  26. }
  27. }

 还需要在配置文件添加配置

  1. spring:
  2. rabbitmq:
  3. publisher-returns: true

七、消息接收确认(重要)

消息消费者如何通知 Rabbit 消息消费成功?

答:

  • 消息通过 ACK 确认是否被正确接收,每个 Message 都要被确认(acknowledged),可以手动去 ACK 或自动 ACK
  • 自动确认会在消息发送给消费者后立即确认,但存在丢失消息的可能,如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息
  • 如果消息已经被处理,但后续代码抛出异常,使用 Spring 进行管理的话消费端业务逻辑会进行回滚,这也同样造成了实际意义的消息丢失
  • 如果手动确认则当消费者调用 ack、nack、reject 几种方法进行确认,手动确认可以在业务失败后进行一些操作,如果消息未被 ACK 则会发送到下一个消费者
  • 如果某个服务忘记 ACK 了,则 RabbitMQ 不会再发送数据给它,因为 RabbitMQ 认为该服务的处理能力有限
  • ACK 机制还可以起到限流作用,比如在接收到某条消息时休眠几秒钟
  • 消息确认模式有:
    • AcknowledgeMode.NONE:自动确认
    • AcknowledgeMode.AUTO:根据情况确认
    • AcknowledgeMode.MANUAL:手动确认

1、确认消息(局部方法处理消息)

默认情况下消息消费者是自动 ack (确认)消息的,如果要手动 ack(确认)则需要修改确认模式为 manual

  1. spring:
  2. rabbitmq:
  3. listener:
  4. simple:
  5. acknowledge-mode: manual

或在 RabbitListenerContainerFactory 中进行开启手动 ack

  1. @Bean
  2. public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
  3. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  4. factory.setConnectionFactory(connectionFactory);
  5. factory.setMessageConverter(new Jackson2JsonMessageConverter());
  6. factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); //开启手动 ack
  7. return factory;
  8. }

1.1、确认消息

  1. @RabbitHandler
  2. public void processMessage2(String message,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long tag) {
  3. System.out.println(message);
  4. try {
  5. channel.basicAck(tag,false); // 确认消息
  6. } catch (IOException e) {
  7. e.printStackTrace();
  8. }
  9. }

需要注意的 basicAck 方法需要传递两个参数

  • deliveryTag(唯一标识 ID):当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel
  • multiple:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息

1.2、手动否认、拒绝消息

发送一个 header 中包含 error 属性的消息

消费者获取消息时检查到头部包含 error 则 nack 消息

  1. @RabbitHandler
  2. public void processMessage2(String message, Channel channel,@Headers Map<String,Object> map) {
  3. System.out.println(message);
  4. if (map.get("error")!= null){
  5. System.out.println("错误的消息");
  6. try {
  7. channel.basicNack((Long)map.get(AmqpHeaders.DELIVERY_TAG),false,true); //否认消息
  8. return;
  9. } catch (IOException e) {
  10. e.printStackTrace();
  11. }
  12. }
  13. try {
  14. channel.basicAck((Long)map.get(AmqpHeaders.DELIVERY_TAG),false); //确认消息
  15. } catch (IOException e) {
  16. e.printStackTrace();
  17. }
  18. }

此时控制台重复打印,说明该消息被 nack 后一直重新入队列然后一直重新消费

  1. hello
  2. 错误的消息
  3. hello
  4. 错误的消息
  5. hello
  6. 错误的消息
  7. hello
  8. 错误的消息

也可以拒绝该消息,消息会被丢弃,不会重回队列

channel.basicReject((Long)map.get(AmqpHeaders.DELIVERY_TAG),false);        //拒绝消息

2、确认消息(全局处理消息)

自动确认涉及到一个问题就是如果在处理消息的时候抛出异常,消息处理失败,但是因为自动确认而导致 Rabbit 将该消息删除了,造成消息丢失

  1. @Bean
  2. public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
  3. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
  4. container.setConnectionFactory(connectionFactory);
  5. container.setQueueNames("consumer_queue"); // 监听的队列
  6. container.setAcknowledgeMode(AcknowledgeMode.NONE); // NONE 代表自动确认
  7. container.setMessageListener((MessageListener) message -> { //消息监听处理
  8. System.out.println("====接收到消息=====");
  9. System.out.println(new String(message.getBody()));
  10. //相当于自己的一些消费逻辑抛错误
  11. throw new NullPointerException("consumer fail");
  12. });
  13. return container;
  14. }

手动确认消息

  1. @Bean
  2. public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
  3. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
  4. container.setConnectionFactory(connectionFactory);
  5. container.setQueueNames("consumer_queue"); // 监听的队列
  6. container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手动确认
  7. container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> { //消息处理
  8. System.out.println("====接收到消息=====");
  9. System.out.println(new String(message.getBody()));
  10. if(message.getMessageProperties().getHeaders().get("error") == null){
  11. channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  12. System.out.println("消息已经确认");
  13. }else {
  14. //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
  15. channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
  16. System.out.println("消息拒绝");
  17. }
  18. });
  19. return container;
  20. }

AcknowledgeMode 除了 NONE 和 MANUAL 之外还有 AUTO ,它会根据方法的执行情况来决定是否确认还是拒绝(是否重新入queue)

  • 如果消息成功被消费(成功的意思是在消费的过程中没有抛出异常),则自动确认
  • 当抛出 AmqpRejectAndDontRequeueException 异常的时候,则消息会被拒绝,且 requeue = false(不重新入队列)
  • 当抛出 ImmediateAcknowledgeAmqpException 异常,则消费者会被确认
  • 其他的异常,则消息会被拒绝,且 requeue = true(如果此时只有一个消费者监听该队列,则有发生死循环的风险,多消费端也会造成资源的极大浪费,这个在开发过程中一定要避免的)。可以通过 setDefaultRequeueRejected(默认是true)去设置
  1. @Bean
  2. public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
  3. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
  4. container.setConnectionFactory(connectionFactory);
  5. container.setQueueNames("consumer_queue"); // 监听的队列
  6. container.setAcknowledgeMode(AcknowledgeMode.AUTO); // 根据情况确认消息
  7. container.setMessageListener((MessageListener) (message) -> {
  8. System.out.println("====接收到消息=====");
  9. System.out.println(new String(message.getBody()));
  10. //抛出NullPointerException异常则重新入队列
  11. //throw new NullPointerException("消息消费失败");
  12. //当抛出的异常是AmqpRejectAndDontRequeueException异常的时候,则消息会被拒绝,且requeue=false
  13. //throw new AmqpRejectAndDontRequeueException("消息消费失败");
  14. //当抛出ImmediateAcknowledgeAmqpException异常,则消费者会被确认
  15. throw new ImmediateAcknowledgeAmqpException("消息消费失败");
  16. });
  17. return container;
  18. }

消息可靠总结

  • 持久化
    • exchange要持久化
    • queue要持久化
    • message要持久化
  • 消息确认
    • 启动消费返回(@ReturnList注解,生产者就可以知道哪些消息没有发出去)
    • 生产者和Server(broker)之间的消息确认
    • 消费者和Server(broker)之间的消息确认
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/399288
推荐阅读
相关标签
  

闽ICP备14008679号