赞
踩
目录
RabbitMQ是一个开源的消息代理和队列服务器,它基于高级消息队列协议(AMQP)设计。RabbitMQ可以作为消息代理、任务队列、事件传输系统等,广泛应用于分布式系统和微服务架构中
1.同步(Synchronous):
在同步模式下,生产者发送消息后,会等待直到消息被确认或者被拒绝,然后才继续执行后
续的操作。
这种方式确保了消息的发送和接收是顺序进行的,适用于需要严格顺序控制的场景
优点:
时效性强,等待结果后返回
缺点:
扩展性差
性能下降
级联失败问题
2.异步(Asynchronous):
在异步模式下,生产者发送消息后,不需要等待消息确认,就可以继续发送下一条消息或者
执行其他任务。
这种方式可以提高消息处理的吞吐量,因为生产者不需要等待每条消息的确认。
异步模式通常用于消息量大、对实时性要求不高的场景,或者当消费者处理速度较慢时,以
避免生产者阻塞。
优点:
耦合度低,扩展性强
异步调用,无需等待,性能好
故障隔离,下游服务故障不影响上游业务
缓存消息,流量削峰填谷
缺点:
不能立即得到调用结果,时效性差
不能确定下游业务是否成功
业务安全依赖于Broker的可靠性
Broker:消息代理
像查询就不需要用到异步调用,因为查询必须是要等查出来才能进行下一步操作的
RabbitMQ也有自己的管理界面,可以也可以在管理界面直接进行交换机,队列的建立
以下是整合RabbitMQ的依赖和一些基本依赖
SpringAMQP是RabbitMQ使用的主要依赖
SpringAMQP提供声明式的消息监听,我们只需要通过注解在方法上声明要监听的队列名称,将来
SpringAMQP就会把消息传递给当前方法
- <!--AMQP依赖,包含RabbitMQ-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- <!--单元测试-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- </dependency>
- <!--Jackson-->
- <dependency>
- <groupId>com.fasterxml.jackson.dataformat</groupId>
- <artifactId>jackson-dataformat-xml</artifactId>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- </dependency>
- <!-- json转换依赖-->
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>1.2.73</version>
- </dependency>
- <dependency>
- <groupId>org.json</groupId>
- <artifactId>json</artifactId>
- <version>20210307</version>
- </dependency>
- <!--日志依赖-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-logging</artifactId>
- </dependency>
- <!--quartz框架-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-quartz</artifactId>
- </dependency>
- <!-- mysql驱动依赖-->
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>8.0.27</version>
- </dependency>
- <!-- 数据库连接池依赖-->
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>druid</artifactId>
- <version>1.1.10</version>
- </dependency>
- <!-- lombak,类的简化-->
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- </dependency>
- <!--mybatisPlus-->
- <dependency>
- <groupId>com.baomidou</groupId>
- <artifactId>mybatis-plus-boot-starter</artifactId>
- <version>3.5.3.1</version>
- </dependency>
- spring:
- rabbitmq:
- host: 这里本地就写本地的ip,服务器上的就写外网ip
- port: 5672
- virtual-host: /dahei
- username: 自己设置的账号
- password: 自己设置的密码
这个配置类是避免返回是序列化后的值,消息转换器,将JDK自带的GBK编码变成JSON格式
- @Configuration
- public class MessageConfig {
- @Bean
- public MessageConverter messageConverter() {
- return new Jackson2JsonMessageConverter();
- }
- }
下面这段代码的作用是确保当消息无法被RabbitMQ队列接收时(例如,因为路由键不匹配或队列不存在),应用程序能够收到通知,并记录相关信息以便调试和监控。这是确保消息传递可靠性的一个重要机制。通过这种方式,开发者可以知道哪些消息没有被成功处理,从而采取相应的补救措施。
- @Slf4j
- @Configuration
- @RequiredArgsConstructor
- public class MQConfirmConfig implements ApplicationContextAware {
- private final RabbitTemplate rabbitTemplate;
- @Override
- public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
- // RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
- rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
- //配置回调
- @Override
- public void returnedMessage(ReturnedMessage returnedMessage) {
- log.debug("exchange:{},message:{}, Code:{}, Text:{}, key:{}",
- returnedMessage.getExchange(), returnedMessage.getMessage(), returnedMessage.getReplyCode(),
- returnedMessage.getReplyText(), returnedMessage.getRoutingKey());
- }
- });
- }
- }
要想接收消息,交换机和队列是必不可少的,相应的队列要绑定到相应的交换机上
- /**
- * @Author: 大黑
- * @Date: 2024/4/1 0:36
- */
- @Configuration
- public class FanoutConfiguration {
- //首先声明一个交换机
- @Bean
- public FanoutExchange fanoutExchange() {
- //ExchangeBuilder.fanoutExchange("dahei.fanout1").build();这种创建方式跟new创建是一样的
- return new FanoutExchange("dahei.fanout1");
- }
-
- //声明一个队列
- @Bean
- public Queue queue() {
- //QueueBuilder.durable()(创建一个持久的队列,将消息写入磁盘中);这种创建方式跟new是一样的
- //return QueueBuilder.durable("fanout.queue3").lazy().build();(创建一个lazy Queue-惰性队列)
- return new Queue("fanout.queue3");//默认持久的
- }
- //绑定交换机和队列
- @Bean
- public Binding binding(FanoutExchange fanoutExchange, Queue queue) {
- return BindingBuilder.bind(queue).to(fanoutExchange);
- }
- //在绑定交换机和队列的时候还可以直接调用这个方法
- //这中方法和上面那种的作用是一样的
- // @Bean
- // public Binding binding() {
- // return BindingBuilder.bind(queue()).to(fanoutExchange());
- // }
- }
这是基于注解的方式进行配置
@RabbitListener:消息监听注解,Spring管理
- /**
- * @Author: 大黑
- * @Date: 2024/3/29 0:07
- */
- @Component
- @Slf4j
- public class ListenerTest {
-
- @RabbitListener(queues = "simple.queue")
- public void listenerMq(String queueMsg) {
- // System.out.println("消费者收到了消息:" + queueMsg);
- log.info("接收到的消息为{}", queueMsg);
- }
-
- /**
- * work模式下的接收
- *
- * @param queueMsg
- * @throws InterruptedException
- */
- @RabbitListener(queues = "work.queue")
- public void workQueueMq1(String queueMsg) throws InterruptedException {
- // System.out.println("消费者收到了消息:" + queueMsg);
- log.info("消费者1" + "接收到的消息为{}", queueMsg);
- // Thread.sleep(20);
- }
-
- @RabbitListener(queues = "work.queue")
- public void workQueueMq2(String queueMsg) throws InterruptedException {
- // System.out.println("消费者收到了消息:" + queueMsg);
- log.info("消费者2" + "接收到的消息为{}", queueMsg);
- // Thread.sleep(200);
- }
-
- /**
- * fanout交换机模式下的接收
- *
- * @param queueMsg
- * @throws InterruptedException
- */
- @RabbitListener(queues = "fanout.queue1")
- public void fanoutQueueMq1(String queueMsg) throws InterruptedException {
- // System.out.println("消费者收到了消息:" + queueMsg);
- log.info("fanoutQueue1" + "接收到的消息为{}", queueMsg);
- // Thread.sleep(20);
- }
-
- @RabbitListener(queues = "fanout.queue2")
- public void fanoutQueueMq2(String queueMsg) throws InterruptedException {
- // System.out.println("消费者收到了消息:" + queueMsg);
- log.info("fanoutQueue2" + "接收到的消息为{}", queueMsg);
- // Thread.sleep(200);
- }
-
- /**
- * direct交换机模式下的接收
- *
- * @param queueMsg
- */
- @RabbitListener(queues = "direct.queue1")
- public void directQueueMq1(String queueMsg) {
- // System.out.println("消费者收到了消息:" + queueMsg);
- log.info("directQueueMq1" + "接收到的消息为{}", queueMsg);
- // Thread.sleep(20);
- }
-
- @RabbitListener(queues = "direct.queue2")
- public void directQueueMq2(String queueMsg) {
- // System.out.println("消费者收到了消息:" + queueMsg);
- log.info("directQueueMq2" + "接收到的消息为{}", queueMsg);
- // Thread.sleep(200);
- }
- /**
- * direct交换机模式下的接收(基于注解模式进行接收)
- *
- * @param queueMsg
- */
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "direct.queue1", durable = "true"),
- exchange = @Exchange(name = "dahei.direct", type = ExchangeTypes.DIRECT), //交换机类型type,有枚举类可供选择,这里一般都会有默认值
- key = {"red", "blue"}//direct交换机key,数组类型,可以声明多个key
- ))
- public void directNewQueueMq1(String queueMsg) {
- // System.out.println("消费者收到了消息:" + queueMsg);
- log.info("directQueueMq1" + "接收到的消息为{}", queueMsg);
- // Thread.sleep(20);
- }
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "direct.queue2", durable = "true"),
- exchange = @Exchange(name = "dahei.direct", type = ExchangeTypes.DIRECT), //交换机类型type,有枚举类可供选择,这里一般都会有默认值
- key = {"yellow", "blue"}//direct交换机key,数组类型,可以声明多个key
- ))
- public void directNewQueueMq2(String queueMsg) {
- // System.out.println("消费者收到了消息:" + queueMsg);
- log.info("directQueueMq2" + "接收到的消息为{}", queueMsg);
- // Thread.sleep(200);
- }
- /**
- * topic交换机模式
- * @param queueMsg
- */
- @RabbitListener(queues = "topic.queue1")
- public void topicQueueMq1(String queueMsg) {
- // System.out.println("消费者收到了消息:" + queueMsg);
- log.info("topicQueueMq1" + "接收到的消息为{}", queueMsg);
- // Thread.sleep(20);
- }
-
- @RabbitListener(queues = "topic.queue2")
- public void topicQueueMq2(String queueMsg) {
- // System.out.println("消费者收到了消息:" + queueMsg);
- log.info("topicQueueMq2" + "接收到的消息为{}", queueMsg);
- // Thread.sleep(200);
- }
- }
- @Slf4j
- @SpringBootTest
- class PublisherApplicationTests {
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Test
- void contextLoads() {
- String queueName = "simple.queue";
- String queueMsg = "你好,mq,这是伽有";
- rabbitTemplate.convertAndSend(queueName, queueMsg);
- }
-
- @Test
- //发送到队列
- void workQueue1() throws InterruptedException {
- //队列名称
- String queueName = "work.queue";
- for (int i = 0; i < 50; i++) {
- //发送的消息
- String queueMsg = "你好,workQueue_" + i;
- rabbitTemplate.convertAndSend(queueName, queueMsg);
- Thread.sleep(20);
- }
- }
-
- @Test
- //发送到fanout交换机
- void fanoutQueue() {
- String exeChange = "dahei.fanout";
- String queueMsg = "helle,tow queue";
- rabbitTemplate.convertAndSend(exeChange, null, queueMsg);
- }
-
- @Test
- //发送到direct交换机
- void directQueue() {
- String exeChange = "dahei.fanout";
- String queueMsg = "helle,tow queue";
- String routKet = "red";
- rabbitTemplate.convertAndSend(exeChange, routKet, queueMsg);
- }
-
- @Test
- //发送到topic交换机
- void topicQueue() {
- String exeChange = "dahei.topic";
- String queueMsg = "helle,tow queue";
- String routKet = "china.dahei";
- rabbitTemplate.convertAndSend(exeChange, routKet, queueMsg);
- }
-
- //发送其他类型的数据
- @Test
- void objectQueue() {
- Map<Object, Object> map = new HashMap<Object, Object>();
- map.put("name", "大黑");
- map.put("age", 22);
- rabbitTemplate.convertAndSend("object.queue", map);
- }
-
- //回调消息测试
- @Test
- void returnMessage() throws InterruptedException {
- //创建cd(callback)
- CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
- //添加ConfirmCallback
- cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
- @Override
- public void onFailure(Throwable ex) {
- log.error("回调消息失败", ex);
- }
-
- @Override
- public void onSuccess(CorrelationData.Confirm result) {
- log.debug("收到confirm back,开始发送消息");
- if (result.isAck()) {
- log.debug("消息发送成功, 收到ack");
- } else {
- log.debug("消息发送失败,收到nack,原因{}", result.getReason());
- }
- }
- });
- rabbitTemplate.convertAndSend("dahei.direct", "reds", "hello 大黑 回调情况如何", cd);
- Thread.sleep(2000);
- }
-
- //在临时消息发送达到内存最大时,会将一部分消息转存在磁盘中,然后会造成消息阻塞的情况
- @Test
- void pageOutTest() {
- Message message = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8))
- .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build();//创建一个非持久化的消息发送
- for (int i = 0; i < 1000000; i++) {
- rabbitTemplate.convertAndSend("simple.queue", message);
- }
- }
- }
基本概念:
高级特性:
工作模式:
点对点(Point-to-Point):消息从生产者发送到单个消费者。
发布/订阅(Publish/Subscribe):消息从生产者发送到多个消费者,通过交换机进行消息分发。
路由(Routing):使用路由键将消息路由到不同的队列。
主题(Topic):通过模式匹配路由键,实现更灵活的消息路由
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。