赞
踩
确保消息在 RabbitMQ 中安全保存,必须开启消息持久化机制,即交换机持久化,队列持久化,消息持久化。 默认情况下,SpringAmqp 声明的交换机,队列,消息都是持久化的,并不需要我们特意指定,即 Durability 属性都为 Durable。
- 1.交换器的持久化
- // 参数1 exchange:交换器名
- // 参数2 type:交换器类型
- // 参数3 durable:是否持久化
- channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
- 2.队列的持久化
- // 参数1 queue:队列名
- // 参数2 durable:是否持久化
- // 参数3 exclusive:仅创建者可以使用的私有队列,断开后自动删除
- // 参数4 autoDelete:当所有消费客户端连接断开后,是否自动删除队列
- // 参数5 arguments
- channel.queueDeclare(QUEUE_NAME, true, false, false, null);
- 3.消息的持久化
- // 参数1 exchange:交换器
- // 参数2 routingKey:路由键
- // 参数3 props:消息的其他参数,其中 MessageProperties.PERSISTENT_TEXT_PLAIN 表示持久化
- // 参数4 body:消息体
- channel.basicPublish("", queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

- 1.交换器持久化
- // 参数1 name:交互器名
- // 参数2 durable:是否持久化
- // 参数3 autoDelete:当所有消费客户端连接断开后,是否自动删除队列
- new TopicExchange(name, durable, autoDelete);
- 2.队列持久化
- // 参数1 name:队列名
- // 参数2 durable:是否持久化
- // 参数3 exclusive:仅创建者可以使用的私有队列,断开后自动删除
- // 参数4 autoDelete:当所有消费客户端连接断开后,是否自动删除队列
- new Queue(name, durable, exclusive, autoDelete);
1.生产者确认机制 2.失败重试机制 3.消费者确认机制(默认自动确认)
- spring:
- rabbitmq:
- # ...前面配置省略..
- # 消息确认(ACK)
- publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)
- publisher-returns: true #确认消息已发送到队列(Queue)
- listener:
- simple:
- acknowledge-mode: manual # 手动确认
- direct:
- acknowledge-mode: manual # 手动确认
- # 集群配置,集群配置时使用 rabbitmq.addresses即可,不用配置rabbitmq.port rabbitmq.host了
- addresses: 192.168.0.101:5672,192.168.0.101:5673,192.168.0.101:5673
通过发送后的回调函数确认消息是否发送成功。
ConfirmCallback(): 消息到达交换机的回调。
ReturnCallback(): 消息从交换机到队列后的回调。
1、全局方式
- @Bean
- public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
- RabbitTemplate rabbitTemplate = new RabbitTemplate();
- rabbitTemplate.setConnectionFactory(connectionFactory);
- //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
- rabbitTemplate.setMandatory(true);
-
- //确认消息送到交换机(Exchange)回调
- rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
- System.out.println("\n确认消息送到交换机(Exchange)结果:");
- System.out.println("相关数据:" + correlationData);
- System.out.println("是否成功:" + ack);
- System.out.println("错误原因:" + cause);
- });
-
- //确认消息送到队列(Queue)回调
- rabbitTemplate.setReturnsCallback(returnedMessage -> {
- System.out.println("\n确认消息送到队列(Queue)结果:");
- System.out.println("发生消息:" + returnedMessage.getMessage());
- System.out.println("回应码:" + returnedMessage.getReplyCode());
- System.out.println("回应信息:" + returnedMessage.getReplyText());
- System.out.println("交换机:" + returnedMessage.getExchange());
- System.out.println("路由键:" + returnedMessage.getRoutingKey());
- });
- return rabbitTemplate;
- }

2.局部方式
- @Service
- public class SendMessageService implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
-
- private static Logger logger = LoggerFactory.getLogger(SendMessageService.class);
-
- @Autowired
- public RabbitTemplate rabbitTemplate;
-
- public void sendMessage(String str){
- rabbitTemplate.setMandatory(true);
- rabbitTemplate.setReturnsCallback(this);
- rabbitTemplate.setConfirmCallback(this);
- // CorrelationData构造函数中的id可以随便写,但是必须要非null而且是唯一的
- rabbitTemplate.convertAndSend("exchange","routingKey", str,new CorrelationData(UUID.randomUUID().toString()));
- }
-
- @Override
- public void returnedMessage(ReturnedMessage returnedMessage) {
- System.out.println("sender return success" + returnedMessage.toString());
- }
-
- @Override
- public void confirm(CorrelationData correlationData, boolean b, String s) {
- logger.info("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
- if (!b) {
- logger.error("消息发送异常!");
- // 进行处理
- } else {
- logger.info("发送者已经收到确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), b, s);
- }
- }
- }

- # 开启重试,默认是false
- spring.rabbitmq.listener.simple.retry.enabled=true
- # 重试次数,默认为3次
- spring.rabbitmq.listener.simple.retry.max-attempts=5
- # 重试最大间隔时间
- spring.rabbitmq.listener.simple.retry.max-interval=10000
1.消息确认模式有: AcknowledgeMode.NONE:自动确认 AcknowledgeMode.AUTO:根据情况确认 AcknowledgeMode.MANUAL:手动确认 2.消费者手动调用方式 Basic.Ack: 用于确认当前消息 Basic.Nack: 用于否定当前消息(批量拒绝) Basic.Reject: 用于拒绝当前消息(单量拒绝) 3.手动确认与拒绝
- # 确认收到一个或多个消息。false,只确认当前一个消息;如果为true,则确认所有小于等于deliveryTag的消息。
- channel.basicAck(deliveryTag,true);
- # 拒绝一个或多个消息。multiple为false,拒绝当前一个消息。multiple为true,拒绝所有小于等于deliveryTag的消息。
- # requeue为true 把消费失败的消息从新添加到队列的尾端,requeue为false不会重新回到队列。
- channel.basicNack(deliveryTag, multiple, requeue) ;
- # 拒绝一个消息:true: 重新放回队列,false: 不再重新入队,配置了死信队列则进入死信队列。
- channel.basicReject(deliveryTag, true);
- # 让其自动重新发送消息
- channel.basicAck(deliveryTag,false);
4.代码实现
- @Component
- public class myConsumer {
-
- @RabbitListener(queues = "order.queue")
- public void messageConsumer(String orderMsg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
- try {
- System.out.println("消息:" + orderMsg);
- System.out.println(1 / 0); // 出现异常
- // 手动确认
- channel.basicAck(tag, false);
- } catch (Exception e) {
- // 如果出现异常的情况下 根据实际情况重发
- // 重发一次后,丢失
- // 参数1:消息的tag
- // 参数2:多条处理
- // 参数3:重发
- // false 不会重发,会把消息打入到死信队列
- // true 重发,建议不使用try/catch 否则会死循环
- // 手动拒绝消息
- channel.basicNack(tag, false, false);
- }
- }
- }

Time To Live(TTL) : TTL 指的是消息的存活时间,RabbitMQ可以通过x-message-ttl参数来设置指定Queue上消息的存活时间,非负整数,单位为微秒。 RabbitMQ 可以从两种维度设置消息过期时间,分别为队列和消息本身。 设置队列过期时间,那么队列中所有消息都具有相同的过期时间。 设置消息过期时间,对队列中的某一条消息设置过期时间,每条消息TTL都可以不同。 注1:如果同时设置队列和队列中消息的TTL,则TTL值以两者中较小的值为准。 注2:队列过期后,队列内的所有消息全部变为死信。 注3:消息过期后,只有消息位于队列的顶端,才会判断其是否过期,过期的消息变为死信Dead Letter
Queue 中的某些消息无法被消费也没有后续的处理,就变成了死信。 死信的来源 1.消息在队列的存活时间超过设置的TTL(Time To Live(存活时间))时间。 2.队列达到最大长度(队列满了,无法再添加数据到 mq 中)。 3.消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false(不再重新入队)。 配置死信队列 1.配置业务队列,绑定到业务交换机上。 2.为业务队列配置死信交换机和路由key。 3.为死信交换机配置死信队列。
1.配置参数
- spring:
- rabbitmq:
- host: localhost
- password: guest
- username: guest
- listener:
- type: simple
- simple:
- default-requeue-rejected: false
- acknowledge-mode: manual
2.配置类
-
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.DirectExchange;
- import org.springframework.amqp.core.FanoutExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.amqp.core.QueueBuilder;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- import java.util.HashMap;
- import java.util.Map;
-
- @Configuration
- public class RabbitMQConfig {
-
- // 业务交换机
- public static final String BUSINESS_EXCHANGE_NAME = "simple.business.exchange";
- // 死信交换机
- public static final String DEAD_LETTER_EXCHANGE = "simple.dead.exchange";
- // 业务队列
- public static final String BUSINESS_QUEUEA_NAME = "simple.business.queuea";
- // 死信队列
- public static final String DEAD_LETTER_QUEUEA_NAME = "simple.dead.queuea";
- // 死信队列路由Key
- public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "simple.dead.queuea.routingkey";
-
- /**
- * 声明业务 交换机(Exchange)
- */
- @Bean("businessExchange")
- public FanoutExchange businessExchange(){
- return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
- }
-
- /**
- * 声明死信 交换机(Exchange)
- */
- @Bean("deadExchange")
- public DirectExchange deadLetterExchange(){
- return new DirectExchange(DEAD_LETTER_EXCHANGE);
- }
-
- /**
- * 声明业务队列A
- */
- @Bean("businessQueueA")
- public Queue businessQueueA(){
- Map<String, Object> args = new HashMap<>(3);
- // 延时 设置消息发送到队列之后多久被丢弃,单位:毫秒
- args.put("x-message-ttl", 30*60*1000);
- // x-dead-exchange 这里声明当前队列绑定的死信交换机
- args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
- // x-dead-routing-key 这里声明当前队列的死信路由key
- args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
- return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(args).build();
- }
-
- /**
- * 声明死信队列A
- */
- @Bean("deadQueueA")
- public Queue deadLetterQueueA(){
- return new Queue(DEAD_LETTER_QUEUEA_NAME);
- }
-
- /**
- * 声明业务队列A绑定关系
- */
- @Bean
- public Binding businessBindingA(@Qualifier("businessQueueA") Queue queue, @Qualifier("businessExchange") FanoutExchange exchange){
- return BindingBuilder.bind(queue).to(exchange);
- }
-
- /**
- * 声明死信队列A绑定关系
- */
- @Bean
- public Binding deadLetterBindingA(@Qualifier("deadQueueA") Queue queue, @Qualifier("deadExchange") DirectExchange exchange){
- return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
- }
-
- }

3.发送者
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- @Component
- public class BusinessSender {
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- public void sendMsg(String msg){
- rabbitTemplate.convertSendAndReceive(RabbitMQConfig.BUSINESS_EXCHANGE_NAME, "", msg);
- rabbitTemplate.convertAndSend(RabbitMQConfig.BUSINESS_EXCHANGE_NAME, "", msg,ms->{
- ms.getMessageProperties().setExpiration("100");
- return ms;
- });
-
- }
- }

4.正常接收者
- import com.rabbitmq.client.Channel;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- import java.io.IOException;
-
- @Slf4j
- @Component
- public class BusinessReceiver {
-
- @RabbitListener(queues = RabbitMQConfig.BUSINESS_QUEUEA_NAME)
- public void receiveA(Message message, Channel channel) throws IOException {
- String msg = new String(message.getBody());
- log.info("收到业务消息A:{}", msg);
- boolean ack = true;
- Exception exception = null;
- try {
- if (msg.contains("deadletter")){
- throw new RuntimeException("dead letter exception");
- }
- } catch (Exception e){
- ack = false;
- exception = e;
- }
- if (!ack){
- log.error("消息消费发生异常,error msg:{}", exception.getMessage(), exception);
- channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
- } else {
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
- }
- }
-
- }

5.死信接收者
- import com.rabbitmq.client.Channel;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- import java.io.IOException;
-
- @Component
- public class DeadReceiver {
-
- @RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUEA_NAME)
- public void receiveA(Message message, Channel channel) throws IOException {
- System.out.println("收到死信消息A:" + new String(message.getBody()));
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
- }
- }

每个节点只放一部分数据,每个节点都同步queue的元数据(存放含queue数据的真正实例位置)。
消费时实际上如果连接到别的节点,会从queue所在节点上拉取数据过来。
数据同时存储在多台机器上,实现了高可用。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。