赞
踩
在往期文章中,我们讲了如何在Windows与Linux环境下安装RabbitMq服务,并访问Web管理端。
有很多同学其实并不知道RabbitMq是用来干嘛的,它起到一个什么作用,并且如何在常见的SpringBoot项目中集成mq并实现消息收发,本章就来给大家讲解一下什么是RabbitMq,并对接Java项目实现生产者与消费者。
–分割线–
通常我们服务与服务直接调用时通过Http接口或者Rpc远程调用的方式进行,但是这种方式对服务直接耦合性和依赖性比较高,在使用时,两个服务必须同时在线,否则将无法使用,所以为解决此问题,我们引入了RabbitMq消息中间件,发送者可以直接将数据载体发送至云端上,接收者随时都可以主动在云端读取想要的数据,相互之间不牵制,不影响,降低耦合性。
RabbitMQ是一个开源的消息代理软件,实现了高级消息队列协议(AMQP)。它是一个面向消息的中间件,用于在分布式系统中存储和转发消息。RabbitMQ的主要组件包括生产者、消费者和代理,其中代理负责将消息路由到相应的消费者。这种模型允许应用程序在不直接依赖彼此的情况下进行通信,从而实现异步和解耦。
RabbitMQ支持多种客户端,如Python、Java、PHP等,并且可以在不同平台上运行,包括嵌入式系统、多核心集群以及基于云端的服务器。它具有高可用性、灵活的路由扩展性和易用性等特点,适用于大型软件系统的模块之间高效通信,支持高并发和可扩展性。
通常说的消息队列,简称MQ(Message Queue),指的就是消息中间件。简单理解为一个使用队列来通信的组件,本质上就是个转发器,包含发消息,存消息,消费消息的过程。
此时生产者将消息发送到队列中,由消费者依次进行消费,是一种先进先出的模式,并且所有消息为串行,每次只有一条消息被递出。
在RabbitMq中也是这样的队列,不同的是,RabbitMq中消息既可以直接推送到队列,也可以将消息推送到交换机,由交换机经过一些列策略再推送到队列中。
RabbitMQ 服务器是 RabbitMQ 的核心组件,负责管理所有的交换器和队列。一个 RabbitMQ 实例可以包含多个服务器,每个服务器负责一部分交换器和队列。服务器之间通过 HTTP 协议通信,可以使用多种方式进行部署,如单机、集群、云服务等。
交换器是 RabbitMQ 中的消息传递核心,负责接收、路由、传递消息。RabbitMQ 支持多种交换器类型,如 fanout、direct、topic 等,每种类型的交换器都有不同的消息传递方式和应用场景。
队列是 RabbitMQ 中的消息存储容器,用于存储消息。RabbitMQ 支持多种队列类型,如持久化、非持久化、排他访问等,每种类型的队列都有不同的存储方式和应用场景。
管理用户,与用户权限,所属的host,比如系统默认的guest用户只能在本地服务使用,如果想要其他同事也使用rabbitmq服务就需要新建一个用户,并设置host,与权限。
将所有交换机与队列等进行虚拟隔离,不同的host之间相互独立,无法互通,类似于docker中的容器概念。
实际产生消息的地方,将消息投递至队列或交换机的发送者。
接收队列中消息的地方,消费消息,读取消息并消耗掉。
到这里我们想把消息发送到Mq上还不够,还需要了解它的工作模式,才能更好的使用它。
地址:https://www.rabbitmq.com/tutorials
一个生产者一个队列一个消费者,生产者直接将消息发送至队列并由消费者消费。
一个生产者一个队列多个消费者,此时队列里的消息会随机分配给其中一个消费者,即一个消息只能被一个消费者消费,分工合作。
一个生产者一个交换机多个队列多个消费者,交换机类型为:fanout,此时生产者将消息发送至交换机,由交换机广播至所绑定的队列中,每一个队列都有相同的消息,再由消费者进行消费。
一个生产者一个交换机多个队列多个消费者,交换机类型为:direct、此时生产者发送消息时,会携带一个关键字,专业名词叫(routing key),再由交换机与队列之间的绑定匹配策略进行分发,由消费者进行消费。
此模式与4中模式一致,交换机类型为:topic、在携带关键字的基础上增加了通配符,如:*#等。
第6第7中主要是介绍消息传递机制,可简单理解为消息发布确认与消息消费确认。
好!接下来我们就来具体实现一下在SpringBoot项目中,如何对接RabbitMq并实现消息发送与接收。
<!-- rabbit -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
# rabbitmq
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
我们暂且先使用guest账号,host使用默认。
本次我们采用最常用的路由模式,符合大多数业务。
/**
* rabbitmq配置类
* @author wfeil211@foxmail.com
*/
@Configuration
public class RabbitConfig {
// Direct交换机
@Bean
DirectExchange routingExchange() {
return new DirectExchange("routingExchange", true, false);
}
// queue
@Bean
public Queue routingQueueOne() {
return new Queue("routingQueueOne", true);
}
// 绑定
@Bean
Binding bindingAsDirectOne() {
return BindingBuilder.bind(routingQueueOne()).to(routingExchange()).with("routingKey");
}
// queue
@Bean
public Queue routingQueueTwo() {
return new Queue("routingQueueTwo", true);
}
// 绑定
@Bean
Binding bindingAsDirectTwo() {
return BindingBuilder.bind(routingQueueTwo()).to(routingExchange()).with("routingKey");
}
}
这里创建了一个交换机,两个队列,并使用"routingKey"将队列与交换机进行绑定上。
/**
* 生产者
* @author wfeil211@foxmail.com
*/
@Slf4j
@Service
public class RabbitMqProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送消息
*/
public void sendMessage() {
rabbitTemplate.convertAndSend("routingExchange", "routingKey", "这是生产者发来的消息!!!");
}
}
/**
* 消费者-1
* @author wfeil211@foxmail.com
*/
@Slf4j
@Component
public class ConsumerOne {
/**
* 监听队列
*/
@RabbitListener(queues = "routingQueueOne")
public void listenMessage(Message message) {
log.info("消费者-1收到mq消费请求,message:{}", new String(message.getBody()));
try {
// 业务处理
String content = new String(message.getBody());
} catch (Exception e) {
log.error("mq消费异常,原因:{}", e.toString());
}
}
}
/**
* 消费者-2
* @author wfeil211@foxmail.com
*/
@Slf4j
@Component
public class ConsumerTwo {
/**
* 监听队列
*/
@RabbitListener(queues = "routingQueueTwo")
public void listenMessage(Message message) {
log.info("消费者-2收到mq消费请求,message:{}", new String(message.getBody()));
try {
// 业务处理
String content = new String(message.getBody());
} catch (Exception e) {
log.error("mq消费异常,原因:{}", e.toString());
}
}
}
当我们启动好服务后,在我们的RabbitMq-Web管理端就可以看到创建的交换机与队列。
此时的两个队列已经绑定到交换机中
/**
* 接口调用
* @author wfeil211@foxmail.com
*/
@RestController
@RequestMapping("/rabbitPath")
public class RabbitMqProducerController {
@Autowired
private RabbitMqProducer rabbitMqProducer;
@PostMapping("/send")
public void send() {
rabbitMqProducer.sendMessage();
}
}
本次教程到这里就结束了,希望大家多多关注支持(首席摸鱼师 微信同号),持续跟踪最新文章吧~
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。