当前位置:   article > 正文

RabbitMQ基本使用及企业开发中注意事项

RabbitMQ基本使用及企业开发中注意事项

目录

一、基本使用

二、使用注意事项

1. 生产者重连机制 - 保证mq服务是通的

2. 生产者确认机制 - 回调机制

3. MQ的可靠性

4. Lazy Queue模式

5. 消费者确认机制


一、基本使用

部署完RabbitMQ有两种使用方式:

  • 网页客户端
  • Java代码

MQ组成部分:

  • 虚拟主机 -> 进行数据隔离的,好比mysql中的不同数据库
  • 交换机 -> 进行路由转发消息
    • fanout:最普通 相当于广播
    • topic:可依key绑定不同队列,可使用.分割key,并支持模糊匹配
    • direct:可依key绑定不同队列,不支持模糊匹配
  • 队列 -> 接受交换机转发过来的消息

Java中使用(注解开发):

1.引依赖和配置属性 - 版本选个差不多的 我是继承的springboot的

  1. <!--AMQP依赖,包含RabbitMQ-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>
  1. spring:
  2. rabbitmq:
  3. host: 192.168.189.189
  4. port: 5672
  5. virtual-host: hmallVir
  6. username: hmall
  7. password: 123321

2.使用RabbitTemplate发消息

  1. @SpringBootTest
  2. public class TestSendMs {
  3. @Autowired
  4. RabbitTemplate rabbitTemplate;
  5. @Test
  6. void testSend1(){
  7. String queueName = "hmallQueue1";
  8. String ms = "hello";
  9. rabbitTemplate.convertAndSend(queueName, ms);
  10. }
  11. }

3.使用RabbitListerner接受消息

  1. @Component
  2. public class MyListeners {
  3. @RabbitListener(queues = "hmallQueue1")
  4. public void listenerMs1(String ms) throws InterruptedException {
  5. System.out.println("消费者1接收到消息 -> " + ms);
  6. Thread.sleep(20);
  7. }
  8. }

4.声明队列和交换机的方式二(简单常用)

  1. @RabbitListener(bindings = @QueueBinding(
  2. value = @Queue(name = "hmallQueue1"),
  3. exchange = @Exchange(name = "hmall", type = ExchangeTypes.DIRECT),
  4. key = {"red", "blue"}
  5. ))
  6. public void listenDirectQueue1(String msg){
  7. System.out.println("消费者1接收到消息 -> " + msg);
  8. }

轮询接受消息问题:

队列读取消息时使用轮询机制,每个队列都读取相同的消息数量,这样不好,我们要针对队列处理消息的能力,需要在配置文件设置属性fetch

  1. spring:
  2. rabbitmq:
  3. host: 192.168.189.189
  4. port: 5672
  5. virtual-host: hmallVir
  6. username: hmall
  7. password: 123321
  8. listener:
  9. simple:
  10. prefetch: 1 # 每次处理完消息再获取新的消息 性能越好处理消息越多

扩展消息转换器:

默认的消息转换器是直接将对象序列化为Byte[],即读不懂又不安全还占内存

  1. <dependency>
  2. <groupId>com.fasterxml.jackson.dataformat</groupId>
  3. <artifactId>jackson-dataformat-xml</artifactId>
  4. </dependency>
  1. @Bean
  2. public MessageConverter jacksonMessageConverter(){
  3. return new Jackson2JsonMessageConverter();
  4. }

二、使用注意事项

1. 生产者重连机制 - 保证mq服务是通的

连接重试,注意这里是阻塞式的,意味着连接失败会一直重试其他业务不会执行,所以建议禁用此模式。

  1. spring:
  2. rabbitmq:
  3. host: 192.168.189.189
  4. port: 5672
  5. virtual-host: hmallVir
  6. username: hmall
  7. password: 123321
  8. connection-timeout: 1s # 连接失败重试
  9. template:
  10. retry:
  11. enabled: true # 开启重试机制
  12. initial-interval: 1000ms # 失败后的初始等待时间
  13. multiplier: 1 # 失败后下次的等待市场倍数
  14. max-attempts: 3 # 最大重试次数
2. 生产者确认机制 - 回调机制

SpringAMQP生产者确认机制的返回值情况?

  1. ack 消息成功投递到mq
    1. 交换机收到了 没有路由转发到队列 ack + return路由异常
    2. 交换机收到并路由成功 只ack
  2. nack 消息丢失 投递失败
3. MQ的可靠性

服务一旦挂了消息就都没有了,还有就是内存如果满了,会触发阻塞式的强制持久化操作,这会导致这段时间处理消息的能力为0

  1. 设置消息的持久化属性:需要将delivery_mode属性设置为2。这个设置会将消息标记为持久化,确保消息被写入磁盘而不是仅保存在内存中。
  2. 声明持久化的队列:在RabbitMQ中声明队列时,需要将durable属性设置为true。这样,队列的元数据以及队列中的消息都会被持久化到磁盘上。需要注意的是,即使队列被设置为持久化,也不能保证内部所存储的消息不会丢失,因为RabbitMQ默认在消息被消费后立即删除它。要确保消息不被删除,需要在消费时进行相应的设置。
  3. 声明持久化的交换机:与队列类似,交换机也可以被设置为持久化。在声明交换机时,将durable属性设置为true,就可以将交换机标记为持久化。如果交换机不设置持久化,那么在RabbitMQ服务重启之后,相关的交换机元数据会丢失,但已发送的消息不会丢失,只是不能再将新的消息发送到这个交换机中。
4. Lazy Queue模式
  1. 降低内存压力:在高负载情况下,大量消息堆积可能导致内存压力剧增。使用Lazy Queue模式,消息直接写入磁盘,可以有效降低内存使用,避免内存溢出等问题。
  2. 支持大量消息存储:由于消息存储在磁盘上,Lazy Queue模式可以支持数百万条消息的存储,满足大规模消息处理的需求。
  3. 提高系统稳定性:通过将消息持久化到磁盘,Lazy Queue模式增强了消息的可靠性,即使RabbitMQ服务器发生故障或重启,消息也不会丢失,保证了系统的稳定性和数据的完整性。


5. 消费者确认机制

这里就是模拟事务嘛,利用AOP思想

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/393725
推荐阅读
相关标签
  

闽ICP备14008679号