赞
踩
基本消息模型就是:
一个生产者丶默认交换机丶一个队列丶一个消费者。
work消息模型就是:
一个生产者丶默认交换机丶一个队列丶多个消费者。
fanout消息模型就是:
多个消费者,每一个消费这都有自己的队列,每个队列都绑定到交换机
生产者发送消息到交换机-交换机发送到哪个队列
Routing路由模式模型就是:
在某种场景下,我们希望不同的消息被不同的队列消费
这个时候我们就要用到direct类型的exchange
生产者向交换机发送消息—交换机根据路由key发送给队列-队列的消费者接收消息
Routing路由模式模型就是:
Topics模式和direct路由模式类似,
区别在于Topic类型的交换机可以匹配通配符
符号(通配符):#表示匹配一个或者多个词
*表示匹配一个词
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--工具类-->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.17</version>
</dependency>
server:
port: 7001
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: username
password: password
virtualHost: /
创建rabbit工具类
package com.example.rabbitmq_topic.util; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Slf4j @Component public class RabbitUtil { @Autowired private AmqpAdmin amqpAdmin; @Autowired private AmqpTemplate rabbitTemplate; /** * 创建交换机 * * @param changeName 交换机名称 */ public void createExchange(String changeName) { TopicExchange exchange = new TopicExchange(changeName, true, false); amqpAdmin.declareExchange(exchange); } /** * 创建队列 * * @param queueName 队列名称 */ public void createQueue(String queueName) { Queue queue = new Queue(queueName, true, false, false); amqpAdmin.declareQueue(queue); } /** * 交换机绑定 * * @param changeName 交换机名称 * @param routingKey 路由key * @param queueName 队列名称 */ public void bindExchange(String changeName, String routingKey, String queueName) { Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, changeName, routingKey, null); amqpAdmin.declareBinding(binding); } /** * 发送信息到交换机 * * @param changeName 交换机名称 * @param routingKey 路由key * @param message 消息 */ public void sendMessage(String changeName, String routingKey, String message) { rabbitTemplate.convertAndSend(changeName, routingKey, message); } /** * 删除交换机 * * @param changeName 交换机名称 */ public void deleteExchange(String changeName) { amqpAdmin.deleteExchange(changeName); } /** * 删除队列 * * @param queueName 队列名称 */ public void deleteQueue(String queueName) { amqpAdmin.deleteQueue(queueName); } }
创建controller
package com.example.rabbitmq_topic.controlller; import com.example.rabbitmq_topic.util.RabbitUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController public class TestController { @Autowired private RabbitUtil rabbitUtil; @GetMapping("/send") public String sendMessage(@RequestParam("message") String message){ rabbitUtil.createExchange("test-exchange"); rabbitUtil.createQueue("test-queue"); rabbitUtil.createQueue("hello-queue"); //#.test采用通配符绑定 rabbitUtil.bindExchange("test-exchange","#.test","test-queue"); rabbitUtil.bindExchange("test-exchange","#.hello","hello-queue"); //只允许hello队列接收信息 rabbitUtil.sendMessage("test-exchange","hello",message); return "发送成功"; } }
package com.example.rabbitmq_topic.rabbitListener; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Slf4j @Component public class TestRabbitListener { //接收test-queue,hello-queue队列信息,如果每个队列都有信息,那么就会执行多次方法 //如果只想接收hello-queue队列信息 如: @RabbitListener(queues={"test-queue"}) @RabbitListener(queues={"test-queue","hello-queue"}) public void onMessage(String message){ log.info("收到信息:{}",message); } }
package com.example.rabbitmq_topic.rabbitListener; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Slf4j @Component public class TestRabbitListener { //QueueBinding: 交换机绑定队列 //Exchange: 创建交换机 //key: 路由key,采用通配符设置key //queue: 创建队列 //如果只创建一个交换机和一个队列可删除一个@QueueBinding对象 @RabbitListener( bindings ={ @QueueBinding(exchange = @Exchange(value = "bind-exchange",type = "topic",declare = "true",autoDelete = "false"), key = "#.queue", value = @Queue(value = "bind-queue",autoDelete = "false",declare = "true") ), @QueueBinding(exchange = @Exchange(value = "bind-exchange",type = "topic",declare = "true",autoDelete = "false"), key = "#.queue", value = @Queue(value = "bindTest-queue",autoDelete = "false",declare = "true") ) } ) public void onBindMessage(String message){ log.info("收到信息:{}",message); } }
发送消息代码
rabbitUtil.sendMessage("bind-exchange","queue",message);
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。