当前位置:   article > 正文

浅谈Spring结合RabbitMQ的使用_spring 使用rabbitmq

spring 使用rabbitmq

一、消息的持久化

确保消息在 RabbitMQ 中安全保存,必须开启消息持久化机制,即交换机持久化,队列持久化,消息持久化 默认情况下,SpringAmqp 声明的交换机,队列,消息都是持久化的,并不需要我们特意指定,即 Durability 属性都为 Durable。

1.RabbitMQ 客户端持久化(三步缺一不可)

  1. 1.交换器的持久化
  2. // 参数1 exchange:交换器名
  3. // 参数2 type:交换器类型
  4. // 参数3 durable:是否持久化
  5. channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
  6. 2.队列的持久化
  7. // 参数1 queue:队列名
  8. // 参数2 durable:是否持久化
  9. // 参数3 exclusive:仅创建者可以使用的私有队列,断开后自动删除
  10. // 参数4 autoDelete:当所有消费客户端连接断开后,是否自动删除队列
  11. // 参数5 arguments
  12. channel.queueDeclare(QUEUE_NAME, true, false, false, null);
  13. 3.消息的持久化
  14. // 参数1 exchange:交换器
  15. // 参数2 routingKey:路由键
  16. // 参数3 props:消息的其他参数,其中 MessageProperties.PERSISTENT_TEXT_PLAIN 表示持久化
  17. // 参数4 body:消息体
  18. channel.basicPublish("", queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

2.Spring AMQP 持久化(两步缺一不可)

  1. 1.交换器持久化
  2. // 参数1 name:交互器名
  3. // 参数2 durable:是否持久化
  4. // 参数3 autoDelete:当所有消费客户端连接断开后,是否自动删除队列
  5. new TopicExchange(name, durable, autoDelete);
  6. 2.队列持久化
  7. // 参数1 name:队列名
  8. // 参数2 durable:是否持久化
  9. // 参数3 exclusive:仅创建者可以使用的私有队列,断开后自动删除
  10. // 参数4 autoDelete:当所有消费客户端连接断开后,是否自动删除队列
  11. new Queue(name, durable, exclusive, autoDelete);

二、RabbitMq的保证消息可靠机制

1.生产者确认机制
2.失败重试机制
3.消费者确认机制(默认自动确认)
  1. spring:
  2. rabbitmq:
  3. # ...前面配置省略..
  4. # 消息确认(ACK)
  5. publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)
  6. publisher-returns: true #确认消息已发送到队列(Queue)
  7. listener:
  8. simple:
  9. acknowledge-mode: manual # 手动确认
  10. direct:
  11. acknowledge-mode: manual # 手动确认
  12. # 集群配置,集群配置时使用 rabbitmq.addresses即可,不用配置rabbitmq.port rabbitmq.host了
  13. addresses: 192.168.0.101:5672,192.168.0.101:5673,192.168.0.101:5673

1.生产者确认机制

通过发送后的回调函数确认消息是否发送成功。

ConfirmCallback(): 消息到达交换机的回调。
ReturnCallback(): 消息从交换机到队列后的回调。

1、全局方式

  1. @Bean
  2. public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
  3. RabbitTemplate rabbitTemplate = new RabbitTemplate();
  4. rabbitTemplate.setConnectionFactory(connectionFactory);
  5. //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
  6. rabbitTemplate.setMandatory(true);
  7. //确认消息送到交换机(Exchange)回调
  8. rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
  9. System.out.println("\n确认消息送到交换机(Exchange)结果:");
  10. System.out.println("相关数据:" + correlationData);
  11. System.out.println("是否成功:" + ack);
  12. System.out.println("错误原因:" + cause);
  13. });
  14. //确认消息送到队列(Queue)回调
  15. rabbitTemplate.setReturnsCallback(returnedMessage -> {
  16. System.out.println("\n确认消息送到队列(Queue)结果:");
  17. System.out.println("发生消息:" + returnedMessage.getMessage());
  18. System.out.println("回应码:" + returnedMessage.getReplyCode());
  19. System.out.println("回应信息:" + returnedMessage.getReplyText());
  20. System.out.println("交换机:" + returnedMessage.getExchange());
  21. System.out.println("路由键:" + returnedMessage.getRoutingKey());
  22. });
  23. return rabbitTemplate;
  24. }

2.局部方式

  1. @Service
  2. public class SendMessageService implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
  3. private static Logger logger = LoggerFactory.getLogger(SendMessageService.class);
  4. @Autowired
  5. public RabbitTemplate rabbitTemplate;
  6. public void sendMessage(String str){
  7. rabbitTemplate.setMandatory(true);
  8. rabbitTemplate.setReturnsCallback(this);
  9. rabbitTemplate.setConfirmCallback(this);
  10. // CorrelationData构造函数中的id可以随便写,但是必须要非null而且是唯一的
  11. rabbitTemplate.convertAndSend("exchange","routingKey", str,new CorrelationData(UUID.randomUUID().toString()));
  12. }
  13. @Override
  14. public void returnedMessage(ReturnedMessage returnedMessage) {
  15. System.out.println("sender return success" + returnedMessage.toString());
  16. }
  17. @Override
  18. public void confirm(CorrelationData correlationData, boolean b, String s) {
  19. logger.info("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
  20. if (!b) {
  21. logger.error("消息发送异常!");
  22. // 进行处理
  23. } else {
  24. logger.info("发送者已经收到确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), b, s);
  25. }
  26. }
  27. }

2、失败重试机制

  1. # 开启重试,默认是false
  2. spring.rabbitmq.listener.simple.retry.enabled=true
  3. # 重试次数,默认为3次
  4. spring.rabbitmq.listener.simple.retry.max-attempts=5
  5. # 重试最大间隔时间
  6. spring.rabbitmq.listener.simple.retry.max-interval=10000

3、消费者确认机制(默认自动确认)

1.消息确认模式有:
AcknowledgeMode.NONE:自动确认
AcknowledgeMode.AUTO:根据情况确认
AcknowledgeMode.MANUAL:手动确认
2.消费者手动调用方式
Basic.Ack: 用于确认当前消息
Basic.Nack: 用于否定当前消息(批量拒绝)
Basic.Reject: 用于拒绝当前消息(单量拒绝)
3.手动确认与拒绝
  1. # 确认收到一个或多个消息。false,只确认当前一个消息;如果为true,则确认所有小于等于deliveryTag的消息。
  2. channel.basicAck(deliveryTag,true);
  3. # 拒绝一个或多个消息。multiple为false,拒绝当前一个消息。multiple为true,拒绝所有小于等于deliveryTag的消息。
  4. # requeue为true 把消费失败的消息从新添加到队列的尾端,requeue为false不会重新回到队列。
  5. channel.basicNack(deliveryTag, multiple, requeue) ;
  6. # 拒绝一个消息:true: 重新放回队列,false: 不再重新入队,配置了死信队列则进入死信队列。
  7. channel.basicReject(deliveryTag, true);
  8. # 让其自动重新发送消息
  9. channel.basicAck(deliveryTag,false);

 4.代码实现

  1. @Component
  2. public class myConsumer {
  3. @RabbitListener(queues = "order.queue")
  4. public void messageConsumer(String orderMsg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
  5. try {
  6. System.out.println("消息:" + orderMsg);
  7. System.out.println(1 / 0); // 出现异常
  8. // 手动确认
  9. channel.basicAck(tag, false);
  10. } catch (Exception e) {
  11. // 如果出现异常的情况下 根据实际情况重发
  12. // 重发一次后,丢失
  13. // 参数1:消息的tag
  14. // 参数2:多条处理
  15. // 参数3:重发
  16. // false 不会重发,会把消息打入到死信队列
  17. // true 重发,建议不使用try/catch 否则会死循环
  18. // 手动拒绝消息
  19. channel.basicNack(tag, false, false);
  20. }
  21. }
  22. }

二、延迟队列与死信队列

 

1、队列的延时

Time To Live(TTL) :
TTL 指的是消息的存活时间,RabbitMQ可以通过x-message-ttl参数来设置指定Queue上消息的存活时间,非负整数,单位为微秒。
RabbitMQ 可以从两种维度设置消息过期时间,分别为队列和消息本身。
设置队列过期时间,那么队列中所有消息都具有相同的过期时间。
设置消息过期时间,对队列中的某一条消息设置过期时间,每条消息TTL都可以不同。
注1:如果同时设置队列和队列中消息的TTL,则TTL值以两者中较小的值为准。
注2:队列过期后,队列内的所有消息全部变为死信。
注3:消息过期后,只有消息位于队列的顶端,才会判断其是否过期,过期的消息变为死信Dead Letter

2、死信队列

Queue 中的某些消息无法被消费也没有后续的处理,就变成了死信。
死信的来源
1.消息在队列的存活时间超过设置的TTL(Time To Live(存活时间))时间。
2.队列达到最大长度(队列满了,无法再添加数据到 mq 中)。
3.消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false(不再重新入队)。
配置死信队列
1.配置业务队列,绑定到业务交换机上。
2.为业务队列配置死信交换机和路由key。
3.为死信交换机配置死信队列。

3、使用案例

1.配置参数

  1. spring:
  2. rabbitmq:
  3. host: localhost
  4. password: guest
  5. username: guest
  6. listener:
  7. type: simple
  8. simple:
  9. default-requeue-rejected: false
  10. acknowledge-mode: manual

2.配置类

  1. import org.springframework.amqp.core.Binding;
  2. import org.springframework.amqp.core.BindingBuilder;
  3. import org.springframework.amqp.core.DirectExchange;
  4. import org.springframework.amqp.core.FanoutExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.amqp.core.QueueBuilder;
  7. import org.springframework.beans.factory.annotation.Qualifier;
  8. import org.springframework.context.annotation.Bean;
  9. import org.springframework.context.annotation.Configuration;
  10. import java.util.HashMap;
  11. import java.util.Map;
  12. @Configuration
  13. public class RabbitMQConfig {
  14. // 业务交换机
  15. public static final String BUSINESS_EXCHANGE_NAME = "simple.business.exchange";
  16. // 死信交换机
  17. public static final String DEAD_LETTER_EXCHANGE = "simple.dead.exchange";
  18. // 业务队列
  19. public static final String BUSINESS_QUEUEA_NAME = "simple.business.queuea";
  20. // 死信队列
  21. public static final String DEAD_LETTER_QUEUEA_NAME = "simple.dead.queuea";
  22. // 死信队列路由Key
  23. public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "simple.dead.queuea.routingkey";
  24. /**
  25. * 声明业务 交换机(Exchange)
  26. */
  27. @Bean("businessExchange")
  28. public FanoutExchange businessExchange(){
  29. return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
  30. }
  31. /**
  32. * 声明死信 交换机(Exchange)
  33. */
  34. @Bean("deadExchange")
  35. public DirectExchange deadLetterExchange(){
  36. return new DirectExchange(DEAD_LETTER_EXCHANGE);
  37. }
  38. /**
  39. * 声明业务队列A
  40. */
  41. @Bean("businessQueueA")
  42. public Queue businessQueueA(){
  43. Map<String, Object> args = new HashMap<>(3);
  44. // 延时 设置消息发送到队列之后多久被丢弃,单位:毫秒
  45. args.put("x-message-ttl", 30*60*1000);
  46. // x-dead-exchange 这里声明当前队列绑定的死信交换机
  47. args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
  48. // x-dead-routing-key 这里声明当前队列的死信路由key
  49. args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
  50. return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(args).build();
  51. }
  52. /**
  53. * 声明死信队列A
  54. */
  55. @Bean("deadQueueA")
  56. public Queue deadLetterQueueA(){
  57. return new Queue(DEAD_LETTER_QUEUEA_NAME);
  58. }
  59. /**
  60. * 声明业务队列A绑定关系
  61. */
  62. @Bean
  63. public Binding businessBindingA(@Qualifier("businessQueueA") Queue queue, @Qualifier("businessExchange") FanoutExchange exchange){
  64. return BindingBuilder.bind(queue).to(exchange);
  65. }
  66. /**
  67. * 声明死信队列A绑定关系
  68. */
  69. @Bean
  70. public Binding deadLetterBindingA(@Qualifier("deadQueueA") Queue queue, @Qualifier("deadExchange") DirectExchange exchange){
  71. return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
  72. }
  73. }

3.发送者

  1. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.stereotype.Component;
  4. @Component
  5. public class BusinessSender {
  6. @Autowired
  7. private RabbitTemplate rabbitTemplate;
  8. public void sendMsg(String msg){
  9. rabbitTemplate.convertSendAndReceive(RabbitMQConfig.BUSINESS_EXCHANGE_NAME, "", msg);
  10. rabbitTemplate.convertAndSend(RabbitMQConfig.BUSINESS_EXCHANGE_NAME, "", msg,ms->{
  11. ms.getMessageProperties().setExpiration("100");
  12. return ms;
  13. });
  14. }
  15. }

4.正常接收者

  1. import com.rabbitmq.client.Channel;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. import java.io.IOException;
  7. @Slf4j
  8. @Component
  9. public class BusinessReceiver {
  10. @RabbitListener(queues = RabbitMQConfig.BUSINESS_QUEUEA_NAME)
  11. public void receiveA(Message message, Channel channel) throws IOException {
  12. String msg = new String(message.getBody());
  13. log.info("收到业务消息A:{}", msg);
  14. boolean ack = true;
  15. Exception exception = null;
  16. try {
  17. if (msg.contains("deadletter")){
  18. throw new RuntimeException("dead letter exception");
  19. }
  20. } catch (Exception e){
  21. ack = false;
  22. exception = e;
  23. }
  24. if (!ack){
  25. log.error("消息消费发生异常,error msg:{}", exception.getMessage(), exception);
  26. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
  27. } else {
  28. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  29. }
  30. }
  31. }

5.死信接收者

  1. import com.rabbitmq.client.Channel;
  2. import org.springframework.amqp.core.Message;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. import java.io.IOException;
  6. @Component
  7. public class DeadReceiver {
  8. @RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUEA_NAME)
  9. public void receiveA(Message message, Channel channel) throws IOException {
  10. System.out.println("收到死信消息A:" + new String(message.getBody()));
  11. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  12. }
  13. }

三、集群模式

1、普通模式(默认的集群模式)。

每个节点只放一部分数据,每个节点都同步queue的元数据(存放含queue数据的真正实例位置)。
消费时实际上如果连接到别的节点,会从queue所在节点上拉取数据过来。

2、镜像模式

数据同时存储在多台机器上,实现了高可用。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/728232
推荐阅读
相关标签
  

闽ICP备14008679号