赞
踩
A.集成
一:添加依赖
在pom.xml文件中添加spring-boot-starter-amqp依赖,以便使用Spring Boot提供的RabbitMQ支持:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
二:配置RabbitMQ连接信息
rabbitmq:
host: 13X.9.1XX.7X
port: 5672 #通过控制台可以查看 记得开启这个端口的防护
username: admin
password: admin
三:创建队列
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
@Bean
public Queue queue() {
//name,名字;durable,是否开启持久化
return new Queue("logs",false);
}
}
启动就可以得到下队列
四:创建控制类来生产数据
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class RabbitMQController { private static final Logger logger = LoggerFactory.getLogger(RabbitMQController.class); @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("aaa") public void simpleTest() { logger.info("RabbitMQController开始!"); rabbitTemplate.convertAndSend("logs","hello world!"); logger.info("RabbitMQController结束!"); } }
因为只创建了生产,消费者没有创建,所以在RabbitMQ客户端可以查看,然后点击,消费可得数据
五:创建消费者,获取数据
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class ConsumeBean {
private static final Logger logger = LoggerFactory.getLogger(ConsumeBean.class);
@RabbitListener(queues={"logs"})
public void getMsg(String message){
logger.info("消费者:{}",message);
}
}
这样就可以看出,消息自动就被接收,消费掉了
B.消息传递的开放标准协议(AMQP)
AMQP(Advanced Message Queuing Protocol)它定义了一种抽象的消息传递模型,包括以下几个主要组件:
消息
(Message):AMQP中的基本单位,是要在消息队列系统中传递的数据。消息通常包括消息体和消息头,消息体是实际要传递的数据,而消息头包含元数据信息,如消息的路由键、优先级等。
生产者
(Producer):负责创建并发送消息到消息队列中的实体。生产者将消息发布到交换机(Exchange),交换机根据路由规则将消息路由到一个或多个队列中。
消费者
(Consumer):从消息队列中接收并处理消息的实体。消费者订阅一个或多个队列,并在有消息到达时接收并处理它们。
交换机
(Exchange):用于接收生产者发送的消息,并根据路由规则将消息路由到一个或多个队列中。AMQP定义了不同类型的交换机,如直连交换机(Direct Exchange)、主题交换机(Topic Exchange)、扇出交换机(Fanout Exchange)等。
队列
(Queue):存储消息的容器,消费者从队列中获取消息进行处理。消息可以被一个或多个消费者订阅,但每条消息只会被一个消费者接收。
绑定
(Binding):用于将交换机和队列之间建立关联关系的规则。绑定定义了消息如何从交换机路由到队列,通常包括交换机名称、路由键等信息。
连接
(Connection):生产者和消费者与消息代理(如RabbitMQ)之间建立的网络连接。连接是长期的、持久的,用于传输消息和管理通信。
通过这些抽象组件,AMQP定义了一个灵活且可扩展的消息传递模型,使得不同的消息队列系统可以遵循相同的协议进行通信和交互。这种抽象模型使得开发者可以更容易地实现消息传递系统,并实现消息的可靠传递和处理。
六大模式
1.简单队列 一个生产者一个队列一个消费者
2.工作队列 一个生产者一个队列多个消费者
3.订阅模式 一个生产者一个交换机 多个队列多个消费者(对与消一对一)
4.路由模式 一个生产者一个交换机 分类进入队列 多个队列多个消费者(对与消一对一)
5.主题模式(通配符模式) 一个生产者一个交换机 通配符分类进入队列 多个队列多个消费者(对与消一对一)
6.RPC 是一种实现远程过程调用的方式,允许客户端应用程序调用远程服务器上的服务,并等待服务端返回结果。
1.简单队列
创建生产者(Producer): import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class MessageProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(String message) { rabbitTemplate.convertAndSend("queueName", message); } } //创建消费者 import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class MessageConsumer { @RabbitListener(queues = "queueName") public void receiveMessage(String message) { System.out.println("Received message: " + message); } } //队列配置 import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { @Bean public Queue queue1() { return new Queue("queueName"); } }
2.工作队列
//队列配置 import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig2{ @Bean public Queue taskQueue() { return new Queue("taskQueue"); } } //创建生产者 import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class TaskProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendTask(String task) { rabbitTemplate.convertAndSend("taskQueue", task); } } //创建消费者 import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class TaskConsumer { @RabbitListener(queues = "taskQueue") public void processTask(String task) { System.out.println("Processing task: " + task); // Simulate task processing try { Thread.sleep(1000); // Simulate task processing time } catch (InterruptedException e) { Thread.currentThread().interrupt(); } System.out.println("Task processed: " + task); } }
3.订阅模式
//创建生产者(Producer) import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class MessageProducer3 { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(String message) { rabbitTemplate.convertAndSend("fanoutExchange", "", message); } } //创建消费者(Consumer) import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class MessageConsumerA { @RabbitListener(queues = "queueFanout1") public void receiveMessage(String message) { System.out.println("Consumer 1 received message: " + message); } } @Component public class MessageConsumerB { @RabbitListener(queues = "queueFanout2") public void receiveMessage(String message) { System.out.println("Consumer 2 received message: " + message); } } //配置交换机和队列 import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig3 { @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } @Bean public Queue queueFanout1() { return new Queue("queueFanout1"); } @Bean public Queue queueFanout2() { return new Queue("queueFanout2"); } @Bean public Binding binding1() { return BindingBuilder.bind(queueFanout1()).to(fanoutExchange()); } @Bean public Binding binding2() { return BindingBuilder.bind(queueFanout2()).to(fanoutExchange()); } }
4.路由模式
//创建生产者(Producer) import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class MessageProducer4 { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(String message, String routingKey) { rabbitTemplate.convertAndSend("directExchange", routingKey, message); } } //创建消费者(Consumer) import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class MessageConsumerly1 { @RabbitListener(queues = "queueDirect1") public void receiveMessage(String message) { System.out.println("Consumer 1 received message: " + message); } } @Component public class MessageConsumerly2 { @RabbitListener(queues = "queueDirect2") public void receiveMessage(String message) { System.out.println("Consumer 2 received message: " + message); } } //配置交换机和队列 import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig4 { @Bean public DirectExchange directExchange() { return new DirectExchange("directExchange"); } @Bean public Queue queueDirect1() { return new Queue("queueDirect1"); } @Bean public Queue queueDirect2() { return new Queue("queueDirect2"); } @Bean public Binding bindingDirect1() { return BindingBuilder.bind(queueDirect1()).to(directExchange()).with("routingDirectKey1"); } @Bean public Binding bindingDirect2() { return BindingBuilder.bind(queueDirect2()).to(directExchange()).with("routingDirectKey2"); } }
5.主题模式
//创建生产者(Producer) import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class MessageProducer5 { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(String message, String routingKey) { rabbitTemplate.convertAndSend("topicExchange", routingKey, message); } } //创建消费者(Consumer) import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class MessageConsumer5 { @RabbitListener(queues = "queueTopic5") public void receiveMessage(String message) { System.out.println("Received message: " + message); } } //配置交换机和队列 import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig5 { @Bean public TopicExchange topicExchange() { return new TopicExchange("topicExchange"); } @Bean public Queue queueTopic5() { return new Queue("queueTopic5"); } @Bean public Binding bindingTopic5() { return BindingBuilder.bind(queueTopic5()).to(topicExchange()).with("topic.*"); } }
6.RPC模式
//创建RPC客户端(Client) import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class RpcClient { @Autowired private RabbitTemplate rabbitTemplate; public String sendMessageAndReceiveResponse(String message) { return (String) rabbitTemplate.convertSendAndReceive("rpcExchange", "rpcQueue", message); } } //创建RPC服务端(Server) import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class RpcServer { @RabbitListener(queues = "rpcQueue") public String processMessage(String message) { // Perform some processing based on the message return "Processed: " + message; } } //配置交换机和队列 import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig6 { @Bean public DirectExchange rpcExchange() { return new DirectExchange("rpcExchange"); } @Bean public Queue rpcQueue() { return new Queue("rpcQueue"); } @Bean public Binding rpcBinding() { return BindingBuilder.bind(rpcQueue()).to(rpcExchange()).with("rpcQueue"); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。