赞
踩
目录
部署完RabbitMQ有两种使用方式:
MQ组成部分:
Java中使用(注解开发):
1.引依赖和配置属性 - 版本选个差不多的 我是继承的springboot的
- <!--AMQP依赖,包含RabbitMQ-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- spring:
- rabbitmq:
- host: 192.168.189.189
- port: 5672
- virtual-host: hmallVir
- username: hmall
- password: 123321
2.使用RabbitTemplate发消息
- @SpringBootTest
- public class TestSendMs {
- @Autowired
- RabbitTemplate rabbitTemplate;
-
- @Test
- void testSend1(){
- String queueName = "hmallQueue1";
- String ms = "hello";
- rabbitTemplate.convertAndSend(queueName, ms);
- }
-
- }
3.使用RabbitListerner接受消息
- @Component
- public class MyListeners {
-
- @RabbitListener(queues = "hmallQueue1")
- public void listenerMs1(String ms) throws InterruptedException {
- System.out.println("消费者1接收到消息 -> " + ms);
- Thread.sleep(20);
- }
-
- }
4.声明队列和交换机的方式二(简单常用)
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "hmallQueue1"),
- exchange = @Exchange(name = "hmall", type = ExchangeTypes.DIRECT),
- key = {"red", "blue"}
- ))
- public void listenDirectQueue1(String msg){
- System.out.println("消费者1接收到消息 -> " + msg);
- }
轮询接受消息问题:
队列读取消息时使用轮询机制,每个队列都读取相同的消息数量,这样不好,我们要针对队列处理消息的能力,需要在配置文件设置属性fetch
- spring:
- rabbitmq:
- host: 192.168.189.189
- port: 5672
- virtual-host: hmallVir
- username: hmall
- password: 123321
- listener:
- simple:
- prefetch: 1 # 每次处理完消息再获取新的消息 性能越好处理消息越多
扩展消息转换器:
默认的消息转换器是直接将对象序列化为Byte[],即读不懂又不安全还占内存
- <dependency>
- <groupId>com.fasterxml.jackson.dataformat</groupId>
- <artifactId>jackson-dataformat-xml</artifactId>
- </dependency>
- @Bean
- public MessageConverter jacksonMessageConverter(){
- return new Jackson2JsonMessageConverter();
- }
连接重试,注意这里是阻塞式的,意味着连接失败会一直重试其他业务不会执行,所以建议禁用此模式。
- spring:
- rabbitmq:
- host: 192.168.189.189
- port: 5672
- virtual-host: hmallVir
- username: hmall
- password: 123321
- connection-timeout: 1s # 连接失败重试
- template:
- retry:
- enabled: true # 开启重试机制
- initial-interval: 1000ms # 失败后的初始等待时间
- multiplier: 1 # 失败后下次的等待市场倍数
- max-attempts: 3 # 最大重试次数
SpringAMQP生产者确认机制的返回值情况?
服务一旦挂了消息就都没有了,还有就是内存如果满了,会触发阻塞式的强制持久化操作,这会导致这段时间处理消息的能力为0
delivery_mode
属性设置为2。这个设置会将消息标记为持久化,确保消息被写入磁盘而不是仅保存在内存中。durable
属性设置为true。这样,队列的元数据以及队列中的消息都会被持久化到磁盘上。需要注意的是,即使队列被设置为持久化,也不能保证内部所存储的消息不会丢失,因为RabbitMQ默认在消息被消费后立即删除它。要确保消息不被删除,需要在消费时进行相应的设置。durable
属性设置为true,就可以将交换机标记为持久化。如果交换机不设置持久化,那么在RabbitMQ服务重启之后,相关的交换机元数据会丢失,但已发送的消息不会丢失,只是不能再将新的消息发送到这个交换机中。这里就是模拟事务嘛,利用AOP思想
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。