赞
踩
作者:后端小肥肠
创作不易,未经允许禁止转载。
RabbitMQ,作为一款高性能、可靠的消息队列软件,已经成为许多企业和开发团队的首选之一。它的灵活性和可扩展性使得它适用于各种应用场景,从简单的任务队列到复杂的分布式系统。本文将深入探讨RabbitMQ的应用场景以及如何在实际项目中构建可靠的RabbitMQ架构体系。
在现代应用中,异步消息处理是提升用户体验和系统效率的关键。RabbitMQ可以有效地用于多种异步处理任务,例如:
RabbitMQ支持多种通信模式,如点对点、发布/订阅等,这些模式帮助系统各部分保持低耦合度,便于独立扩展和维护。例如:
在流量高峰期,如促销或大型活动期间,系统可能会遭遇巨大的访问压力。RabbitMQ可以用来缓冲入站消息,如订单或请求,从而保护后端服务不被过载:
RabbitMQ提供了一个灵活的消息传递系统,可以集成复杂的企业系统。它支持多种协议和广泛的开发语言库,适用于:
RabbitMQ也常用于系统日志处理和监控。它可以聚合各服务产生的日志信息,并传输到日志分析系统:
RabbitMQ 在数据同步中扮演着重要的角色,特别是在分布式系统中,它能够确保数据在多个系统或组件之间保持一致性和最新状态。这对于维护数据的完整性和及时性至关重要。例如:
数据库同步:在多地数据中心运营的情况下,RabbitMQ 可以用来同步不同地点的数据库。通过消息队列,当一个数据中心的数据库更新时,相应的变更可以通过 RabbitMQ 发送到其他数据中心,从而保证所有地点的数据一致。
实时数据复制:在金融服务或电子商务平台,实时数据复制是保证高可用性和灾难恢复的关键。使用 RabbitMQ,可以实现高效的数据复制策略,如将交易数据从主系统复制到备份系统或分析数据库。
缓存刷新:在使用缓存提高应用性能的情况下,RabbitMQ 可以用来在数据更新时自动通知系统刷新缓存。这样,用户总是能够获取到最新的数据,而不是过时的缓存数据。
通过这些应用场景,可以看出RabbitMQ在现代软件架构中扮演的多样化角色,不仅增强了系统的可靠性和伸缩性,还提高了开发和运维的效率。
网上RabbitMQ安装教程很多,本文只简述基于docker安装的核心步骤:
1. 环境准备,准备Cenos虚拟机,我的是7.x版本:
2. 拉取或解压RabbitMQ镜像:
3. 运行docker容器:
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -v /home/docker/rabbitmq/rabbitmq:/var/lib/rabbitmq -v /home/docker/rabbitmq/rabbitmq_conf:/etc/rabbitmq -e RABBITMQ_DEFAULT_VHOST=km_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:latest
4. 进入容器 :
docker exec -it 容器id /bin/bash
5. 运行rabbitmq-plugins enable rabbitmq_management(解决无法访问网页端15672端口问题),即可完成RabbitMQ安装。
本文以异步处理应用场景为例,展示如何构建稳定可靠的RabbitMQ架构体系:
上述流程为异步消息通信的技术流程,在异步消息通信中当消息投递后就立刻返回了结果,我们无法获取消息消费的具体过程,这就导致了虽然我们可以即刻获取程序返回状态,但是程序执行细节或是否失败无法通过程序响应返回的方式获取。
基于以上RabbitMQ异步通信的优缺点,我们要搭建一个可靠的RabbitMQ架构需要从以下几个方面入手:
生产者稳定架构:
1. 消息投递回调监听。创建消息投递回调监听函数,监听生产者投递的消息是否投递成功。
2. 消息确认表创建。创建消息确认表(message_confirmation),记录消息投递状态,其中字段status反应了是否投递成功(0为为投递成功,1为投递成功)。
- CREATE TABLE "public"."message_confirmation" (
- "id" varchar(100) COLLATE "pg_catalog"."default" NOT NULL,
- "status" int4,
- "create_time" timestamp(6),
- "update_time" timestamp(6),
- "message" varchar(255) COLLATE "pg_catalog"."default",
- CONSTRAINT "message_confirmation_pkey" PRIMARY KEY ("id")
- )
- ;
-
- ALTER TABLE "public"."message_confirmation"
- OWNER TO "postgres";
3. 创建定时任务监听消息投递确认表。每隔一段时间遍历消息确认表,筛选出status为0的消息数据,进行重复投递动作。
消费者稳定架构
1. 死信队列运用。由于网络或外部因素导致消息消费失败,可将消息投递至死信队列进行二次消费。
2. 日志表记录。如死信队列也消费失败,可将消息写入日志表(message_error)后进行手动消费,由技术人员获取日志表中消费失败记录,排查消费失败原因。
- CREATE TABLE "public"."message_error" (
- "id" varchar(100) COLLATE "pg_catalog"."default" NOT NULL,
- "message_id" varchar(100) COLLATE "pg_catalog"."default" NOT NULL,
- "error_log" text COLLATE "pg_catalog"."default",
- "create_time" timestamp(6),
- "update_time" timestamp(6),
- CONSTRAINT "message_error_pkey" PRIMARY KEY ("id")
- )
- ;
-
- ALTER TABLE "public"."message_error"
- OWNER TO "postgres";
依赖 | 版本 |
---|---|
Spring Boot | 2.6.3 |
Java | 1.8以上 |
postgresSQL | 13.12 |
RabbitMQ | 3.9.11 |
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>com.baomidou</groupId>
- <artifactId>mybatis-plus-boot-starter</artifactId>
- </dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-validation</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- <dependency>
- <groupId>org.postgresql</groupId>
- <artifactId>postgresql</artifactId>
- </dependency>
- </dependencies>
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
- server:
- port: 8873
- spring:
- datasource:
- url: jdbc:postgresql://127.0.0.1:5432/xfc_mq_producer
- username: postgres
- password: postgres
- driver-class-name: org.postgresql.Driver
- rabbitmq:
- port: 5672
- host: 192.168.10.11
- username: admin
- password: admin
- virtual-host: my_vhost
- publisher-confirm-type: correlated
- listener:
- simple:
- acknowledge-mode: manual
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
- @PostConstruct
- public void regCallback() {
- // 消息发送成功以后,给予生产者的消息回执,来确保生产者的可靠性
- rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- log.info("cause:"+cause);
- // 如果ack为true代表消息已经收到
- String messageId = correlationData.getId();
-
- if (!ack) {
- // 这里可能要进行其他的方式进行存储
- log.error("MQ队列应答失败,messageId是:" + messageId);
- return;
- }
-
- try {
- MessageConfirmation messageConfirmation = messageConfirmationMapper.selectById(messageId);
- messageConfirmation.setStatus(1);
- int count=messageConfirmationMapper.updateById(messageConfirmation);
- if (count == 1) {
- log.info("本地消息状态修改成功,消息成功投递到消息队列中...");
- }
- } catch (Exception ex) {
- log.error("本地消息状态修改失败,出现异常:" + ex.getMessage());
- }
- }
- });
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
上述回调函数主要用于监听生产者发送的消息是否发送成功,并将消息发送状态更新至消息确认表中。
- @Configuration
- @EnableScheduling
- @Slf4j
- public class confirmMessageTaskService {
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Autowired
- MessageConfirmationMapper messageConfirmationMapper;
-
- @Scheduled(cron = "0 */1 * * * ?")
- public void sendMessage(){
- // 把消息为0的状态消息重新查询出来,投递到MQ中。
- LambdaQueryWrapper<MessageConfirmation> queryWrapper = new LambdaQueryWrapper<>();
- queryWrapper.eq(MessageConfirmation::getStatus, 0);
- List<MessageConfirmation> noConfirmMessages = messageConfirmationMapper.selectList(queryWrapper)
- .stream()
- .collect(Collectors.toList());
- noConfirmMessages.forEach((noConfirmMessage)->{
- rabbitTemplate.convertAndSend("xz_push_exchange","", JsonUtil.obj2String(noConfirmMessage),
- new CorrelationData(noConfirmMessage.getId()));
- });
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
上述定时任务为每分钟遍历消息确认表,将status=0的消息筛选出来进行消息投递。
- public void sendMessage(MessageConfirmation messageConfirmation) {
- messageConfirmationMapper.insert(messageConfirmation);
- rabbitTemplate.convertAndSend("xfc_fanout_exchange","", JsonUtil.obj2String(messageConfirmation),
- new CorrelationData(messageConfirmation.getId()));
- }
- server:
- port: 8872
- spring:
- datasource:
- url: jdbc:postgresql://127.0.0.1:5432/xfc_mq_consumer
- username: postgres
- password: postgres
- driver-class-name: org.postgresql.Driver
- rabbitmq:
- port: 5672
- host: 192.168.10.11
- username: admin
- password: admin
- virtual-host: my_vhost
- listener:
- simple:
- acknowledge-mode: manual
- mybatis-plus:
- typeAliasesPackage: com.xfc.consumer.entities
- mapper-locations: classpath:mapper/*.xml
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
- @Configuration
- public class RabbitMQConfig {
- /**
- * 死信队列
- * @return
- */
- @Bean
- public FanoutExchange deadExchange() {
- return new FanoutExchange("dead_xfc_fanout_exchange", true, false);
- }
-
- @Bean
- public Queue deadXfcQueue() {
- return new Queue("dead.xfc.queue", true);
- }
- @Bean
- public Binding bindDeadXfc() {
- return BindingBuilder.bind(deadXfcQueue()).to(deadExchange());
- }
-
- /**
- * 队列
- * @return
- */
- @Bean
- public FanoutExchange fanoutExchange() {
- return new FanoutExchange("xfc_fanout_exchange", true, false);
- }
-
- @Bean
- public Queue xfcQueue() {
- Map<String, Object> args = new HashMap<>();
- args.put("x-dead-letter-exchange", "dead_xfc_fanout_exchange");
- return new Queue("xfc.queue", true, false, false, args);
- }
-
- @Bean
- public Binding bindXfc() {
- return BindingBuilder.bind(xfcQueue()).to(fanoutExchange());
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
上述代码为RabbitMQ配置类,用于在项目初始化时生成相应的交换机和队列。
- @Service
- @Slf4j
- public class XfcMqConsumer {
- @RabbitListener(queues = {"xfc.queue"})
- public void messageconsumer(String message, Channel channel,
- CorrelationData correlationData,
- @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
- MessageConfirmation messageConfirmation=null;
- try {
- log.info("收到MQ的消息是: " + message );
- messageConfirmation= JsonUtil.string2Obj(message, MessageConfirmation.class);
- /**
- * 编写业务逻辑
- */
-
- } catch (Exception e) {
- e.printStackTrace();
- log.error("消息投放到死信队列"+e.getMessage(),e);
- channel.basicNack(tag,false,false);// 死信队列
- }
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
- @Service
- @Slf4j
- public class DeadMqConsumer {
- @Autowired
- MessageErrorMapper messageErrorMapper;
- @RabbitListener(queues = {"dead.xfc.queue"})
- public void messageconsumer(String message, Channel channel,
- CorrelationData correlationData,
- @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
- MessageConfirmation messageConfirmation=null;
- try {
- log.info("收到MQ的消息是: " + message );
- messageConfirmation= JsonUtil.string2Obj(message, MessageConfirmation.class);
- /**
- * 编写业务逻辑
- */
- } catch (Exception e) {
- e.printStackTrace();
- /**
- * 写入message_error
- */
- messageErrorMapper.insert(new MessageError(messageConfirmation.getId(),e.getMessage(),new Date()));
- channel.basicNack(tag,false,false);// 死信队列
- }
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
以上代码编写完成后需要进行架构效果测试,其步骤如下:
1. 消息投递测试
上图调用了消息投递接口。
在消息确认表中,新增了一条消息且status=1,代表该条消息已投递成功。
2. 消费者正常消费测试
3. 消费异常测试
上图可看出消息消费异常投入到了死信队列。
在死信队列中依然消费失败。
消费失败后成功写入了日志表。
本文讲解了RabbitMQ应用场景以及在异步处理场景中如何搭建稳定的RabbitMQ架构体系,逐步详细的给出了生产者及消费者端代码并在文章最后对架构效果进行了测试,感兴趣的同学可根据代码进行实操,有疑问和其他见解也可在评论区留言,我看到都会回复。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。