赞
踩
启动mq并初始化用户名,密码;使用卷挂载插件目录(mq自带很多插件,此处谨慎使用目录挂载);暴露控制台端口15672及服务端口5672;指定网络为mynet;
docker run -d --name mq -e RABBITMQ_DEFAULT_USER=xiesijie -e RABBITMQ_DEFAULT_PASS=xiesijie -v /home/mq/mq-plugins:/plugins --hostname mq -p 15672:15672 -p 5672:5672 --network mynet rabbitmq:3.8-management
1、producer:生产者,通常指发送消息的一方,即为发送消息至队列。
2、consumer:消费者,通常指消息接收方。
3、virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机中的queue,exchange相互独立,通常每个项目配置一个虚拟主机。
4、queue:队列,存储消息。生产者发送的消息会存储到队列中,等待消费者消费。
可配置持久化、消费后删除、或消费确认后删除(支持按需配置);
5、exchange:交换机,负责消息路由,生产者发送的消息由exchange决定投递到哪个队列。
常用的交换机有三种类型:
*:有且只匹配一个词。比如 a.*可以匹配到"a.b"、"a.c",但是匹配不了"a.b.c"。
#:匹配一个或多个词。比如"rabbit.#"既可以匹配到"rabbit.a.b"、"rabbit.a",也可以匹配到"rabbit.a.b.c"。
6、channel:生产者消费者于mq服务之间的AMQP信道,TCP连接。
7、borker:可理解为是一个代理的中央角色,负责管理和控制消息的流动,确保消息能够准确、可靠地传递到目标消费者。
- <!-- rabbmitMq 依赖 springboot默认支持的是rabbitMq-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- #rabbitMq配置
- spring:
- rabbitmq:
- host: xxx.xxx.xxx.xx
- port: 5672
- #虚拟主机名称
- virtual-host: root
- username: root
- password: root
- connection-timeout: 1s
mq提供RabbitTemplate类执行对mq服务的操作。spring容器会自动初始化,使用时候只需要注入即可。
- @Autowired
- private RabbitTemplate rabbitTemplate;
如下是一个最简单的发送消息示例:发送消息至指定队列,可先于15672可视化界面创建队列。
- @Test
- void sendMsg2Queue() {
- String queueName = "demo-queue";
- String msg = "hello wolaile11";
- rabbitTemplate.convertAndSend(queueName, msg);
- }
接收示例:使用@RabbitListner注解,其中queues参数指定监听(接收)消息队列的名称,可指定多个使用逗号分割。(即:一个消费者可以监听多个生产者)
- @RabbitListener(queues = {"demo-queue"})
- public void listenDemoQueue(String msg) {
- System.out.println("收到了:" + msg);
- }
反之,一个生产者可以被多个消费者监听,默认消费者会分配到相同数量的消息。实际可能不同消息接收服务的性能各异,通常可做如下配置,让每个消费者每次仅可获取一或N(取决于如下配置)条消息,完成处理后获取下一条。这种处理方式被称为work模型,解决消息堆积问题。
- listener:
- simple:
- prefetch: 1
Queue: 用与声明队列,可以用QueueBuilder构建,或直接new,构建的Queue需要作为一个bean交给Spring容器管理。如下三种方式分别创建持久化(new关键字创建不指定持久化方式默认持久化3.6版本以后)、非持久化、持久化的Queue。此处持久化仅针对队列,不包含队列中的消息。
- Queue direct = new Queue("direct");
- Queue queueDurable = QueueBuilder.nonDurable("queueDurable").build();
- Queue queueNonDurable = QueueBuilder.durable("queueNonDurable").build();
Exchange: 声明交换机,可以用ExchangeBuilder或new创建。创建可创建fanout、direct、topic三类交换机。
- ExchangeBuilder.directExchange("direct.demo1").build();
- new DirectExchange("direct.demo1");
Binding:用于绑定交换机和队列。
- BindingBuilder.bind(fanoutQueue1).to(fanoutExchange)
-
- //指定绑定的key 绑定多个key需要写多次
- BindingBuilder.bind(directQueue1).to(directExchange).with("white");
可以基于@RabbitListener注解声明:
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "annotation.queue", durable = "true"),
- exchange = @Exchange(name = "annotation.exchange",
- type = ExchangeTypes.DIRECT), key = {"red", "yellow"}))
- public void bingByAnnotation(String msg) {
- System.out.println("收到了annotation.queue消息:" + msg);
- }
如上代码意为:声明且通过red、yellow两个key绑定了持久化队列annotation.queue与direct类型交换机annotation.exchange,并进行消息监听。
消息对象在spring中默认会被JDK自带的序列化转换为字节进行传输,导致消息内容出现“乱码”。可配置JSON消息转换器,指定消息使用JSON格式传输。
引入依赖
- <!--json转换器,用于mq对象类型消息使用json传输,而非jdk自带的序列化-->
- <dependency>
- <groupId>com.fasterxml.jackson.dataformat</groupId>
- <artifactId>jackson-dataformat-xml</artifactId>
- </dependency>
声明消息转换器MessageConverter交给spring容器
- @Bean
- public MessageConverter jacksonMessageConvertor() {
- Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter();
- return jsonMessageConverter;
- }
以上内容基本满足rabbitmq的基础使用
下文是确保消息可靠性相关内容
生产者重连:此处仅仅是连接mq服务失败重连。
添加配置如下
- connection-timeout: 1s
- template:
- #开启mq连接超时重试
- retry:
- enabled: true
- #失败后等待时间
- initial-interval: 1s
- #重试等待时长是上次的多少倍
- multiplier: 2
- #最大重试次数
- max-attempts: 3
测试 :docker容器关闭mq服务之后,测试生产者发消息:
控制台打印三次连接信息,且下一次的等待时间时间根据配置有所递增,此时的等待是阻塞式,生产环境切记合理配置等待时间减少性能影响。
生产者确认:存在网络及cpu资源开销,根据实际确定是否使用
Publish Confirm:每次消息都会返回,接收成功返回ACK,失败返回NACK,需配置且发送消息时传递CorrelationData对象,接收callback返回的ACK。
Publish Return: 仅仅到达交换机后路由失败,找不到队列才会返回,通常不开启;若开启需要开启配置且声明ReturnsCallback。一般不使用
配置如下:
Publish Return接收路由失败返回信息声明如下
- @Configuration
- @Slf4j
- public class MqConfirmConfig {
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @PostConstruct
- public void initReturnsCallback() {
- rabbitTemplate.setReturnsCallback(returnedMessage ->
- log.debug("收到消息的return" + "callback,exchange:{},key:{},msg:{},cide:{},text:{}",
- returnedMessage.getExchange(),
- returnedMessage.getRoutingKey(),
- returnedMessage.getMessage(),
- returnedMessage.getReplyCode(),
- returnedMessage.getReplyText()));
- }
-
-
- }
Publish Confirm接收ACK返回如下:传递不存在的路由则会返回NACK,传值不存在的key则能接收到Publish Return的返回信息。
- @Test
- public void testSyncAnnotation() {
- CorrelationData cd = new CorrelationData();
- cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
- @Override
- public void onFailure(Throwable ex) {
- System.out.println("spring处理失败");
- }
- @Override
- public void onSuccess(CorrelationData.Confirm result) {
- if (result.isAck()){
- log.info("消息发送成功,返回ack");
- }else {
- //todo 重发消息
- log.error("消息发送失败,返回nack,原因{}",result.getReason());
- }
- }
- });
- rabbitTemplate.convertAndSend("annotation.exchange111","red","消息来了",cd);
- }
具体返回情况可参阅下图
本人使用的3.9版本默认交换机、队列、消息都默认是持久化的。若较低版本可能需要发消息时候指定持久化属性。如下:使用构造器创建消息,同时指定属性MessageDeliveryMode.PERSISTENT表示持久消息
- Message message = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8))
- .setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
- rabbitTemplate.convertAndSend("work-queue", message);
持久化和非持久化的区别:
1、持久化的消息不会随着mq服务重启或者宕机而丢失。
2、持久化的消息mq会存储到磁盘中,同时为了保证效率,内存中也会保留部分消息,不会出现mq内存阻塞。
3、非持久化的消息会全部堆积到内存,当内存满的时候,才会写入磁盘,该操作成为pageout,这个过程mq会出现阻塞。
3.12版本后默认是lazy模式,消息直接写入磁盘,即便不经过内存直接写入磁盘但内部使用一些io优化写入速度快。
低版本可创建队列时声明:
消费者消息确认机制,springAMQP底层已经实现,做相关配置即可 。
- listener:
- simple:
- prefetch: 1
- # 消费者消息确认机制
- # none无需确认 发送完成即删除
- # manual:需要手动处理
- # auto:自动,类似事务回滚
- # 接受完成 就删除返回ack 接受失败则重新发送 返回nack 拒绝 返回reject消息删除
- acknowledge-mode: none
acknowledge-mode默认是auto,消息接收失败,比如抛出异常后会返回nack,进行无限次的重试。当抛出MessageConversionException类异常时,会返回reject,队列中的消息会被删除。可进行如下配置进行设置重试机制。
如上配置表示开启本地(消费者端)重试机制,初始等待时间为1s,每次重试等待时间是上次一倍,最大重试三次, stateless表示每一次重试都是一次新的,不携带任何状态信息。当重试次数耗尽之后消息会从队列删除(默认策略)。
重试策略有以下三种:
默认是上图第一种,重试耗尽丢弃消息;第二种意为本地重试之后,消息重新入队在进行发送,基本等于上文描述的返回nack,消息无限重试。常用的是第三种,指定一个错误消息的交换机,本地重试失败后将消息发送给一个特定的处理异常的消费机,绑定该交换机的队列可进行异常日志记录等操作,供开发、运维人员排查异常。代码如下:
- @Configuration
- @ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled",havingValue = "true")
- public class ReceiveErrorConfig {
-
- @Bean
- public DirectExchange errorExchange(){
- return new DirectExchange("error.direct");
- }
-
- @Bean
- public Queue errorQueue(){
- return new Queue("error.queue");
- }
-
- @Bean
- public Binding errorBinding(Queue errorQueue,DirectExchange errorExchange){
- return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");
- }
-
- @Bean
- public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
- return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");
- }
- }
基本思路为当开启重试机制配置后,创建一个MessageRecoverer 类放入Spring容器。MessageRecoverer对象指定交换机和队列,这里通常使用Direct类型的交换机。配置之后一场消息会被投入到指定的队列,控制台打印如下:
唯一消息id方案:给每条消息都设置一个唯一的id,消费者消费的时候据这个id查询数据库,判断是否存在,存在则表示为重复消息。不存在则消费消息并将唯一id存入数据库。这个id可以在传输过程中自定义传入,可以参考下问,在消息转换器中开启相关配置。
springAMQP默认使用Jackson2JsonMessageConverter进行消息转换,消息会被转换为十六进制的字节进行传输。通常可以配置JSON类型的消息转换器,指定消息以JSON格式传输。只需要创建Jackson2JsonMessageConverter bean交给spring容器即可:
- @Bean
- public MessageConverter jacksonMessageConvertor() {
- Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter();
- //给消息附带id唯一标识 防止重复消费
- jsonMessageConverter.setCreateMessageIds(true);
- return jsonMessageConverter;
- }
setCreateMessageIds(true)表示消息发送过程中会给消息设置一个MessageId属性,消费者可通过这个id避免一个消息被重复消费。
死信交换机,接收绑定该交换机的所有死信消息。
当一个队列的消息满足下列情况之一时,就会成为死信dead letter
1、消费者使用basic.reject或basic.nack声明消费失败,并且消息的requeue参数设置为false;
2、消息是一个过期消息,设置消息的expiration属性(达到了队列或消息本身设置的过期时间);
3、要投递的队列消息堆满了,最早的消息可能成为死信。
如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列的所有死信消息都会投递到这个交换机中,这个交换机成为死信交换机。简称DLX dead letter exchange。
使用死信交换机的基本逻辑是给一个未绑定消费者的但绑定死信交换机的队列,发送一条携带过期时间的消息,当这条消息到达过期时间未被消费,则会自动进入死信交换机,以此达到延时消息的效果。适用场景如:下单30分钟后未支付,订单自动取消。
执行流程如上
- /**
- * 创建没有消费者的队列 且绑定死信交换机
- *
- * @return queue001
- */
- @Bean
- public Queue queue001() {
- return QueueBuilder.durable("queue.001").deadLetterExchange("dlx.001").build();
- }
绑定死信交换机如上
支持发消息的时候传入一个延时时间,交换机收到消息后到达指定时间后消息才会发送给队列。
下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
注意插件的使用版本需要和mq版本相匹配。将下载的.ez后缀的插件放置服务器mq插件目录挂载的文件夹中输入命令开启插件使用:
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
延迟交换机创建方式如下
设置Delay()属性发送延迟消息
- /**
- * 发送延迟消息
- */
- @Test
- void sendDelayMessage() {
- String msg = "hello everyone1111";
- Message message = MessageBuilder.withBody(msg.getBytes()).build();
- message.getMessageProperties().setDelay(10000);
- String exchange = "delay.exchange";
- rabbitTemplate.convertAndSend(exchange, "red", message);
- }
每个延迟消息内部都会定义一个始终,会占用性能。若存在很多、且延迟时间很长的消息要考虑性能消耗。如实际生产场景中存在类似下单30分钟后未支付自动取消订单的功能。可参考如下实践思路:通常下单后前几分钟完成订单支付的概率是较大的,所以我们可以创建一个包含时间数组的对象作为消息对象,数组中的元素1分钟、2分钟、5分钟、8分钟、14分钟递增,总共等于30分钟。没发送一次延迟消息的时候就从数组中取出第一个元素(并删除),作为延迟时间发送延迟消息。当检测到用户完成支付之后就修改订单状态。未完成支付就从包含时间数组取出下一个延迟时间再次发送延迟消息。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。