赞
踩
搭建RabbitMQ消费者和生产者模块。
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.5.RELEASE</version> <relativePath/> </parent> <modelVersion>4.0.0</modelVersion> <groupId>com.atang.rabbitmq</groupId> <artifactId>springboot-rabbitmq-consumer</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies> </project>
server: port: 8088 logging: config: classpath:logging-config.xml level: org.springframework: info org.springframework.amqp.rabbit: debug spring: rabbitmq: host: localhost port: 5672 username: guest password: guest publisher-confirm-type: correlated publisher-returns: true dynamic: true
Rabbit自动配置会自动检测SpringContext中exchange、queue、binding相关Bean,动态的在RabbitMQ Server创建相关元素。
该功能主要通过 org.springframework.amqp.rabbit.core.RabbitAdmin#initialize 方法实现。而RabbitAdmin Bean是在RabbitAutoConfiguration自动配置类中注册的,通过设置 spring.rabbitmq.dynamic 为false可以禁用该功能。
下列代码创建类型为topic、名称为test_topic_exchange的exchange,当routingKey满足test.#时消息会被路由到 test_queue 队列。
@Configuration public class RabbitMqConfig { public static final String TEST_TOPIC_EXCHANGE = "test_topic_exchange"; public static final String TEST_QUEUE = "test_queue"; @Bean("testExchange") public Exchange testExchange() { return ExchangeBuilder.topicExchange(TEST_TOPIC_EXCHANGE).durable(true).build(); } @Bean("testQueue") public Queue testQueue() { return QueueBuilder.durable(TEST_QUEUE).build(); } @Bean public Binding getBinding(@Qualifier("testQueue") Queue queue, @Qualifier("testExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("test.#").noargs(); } }
这时候我们启动应用发现这些元素并没有注册,而真正注册的时机是在RabbitMQ Connection创建的时候。可参考
org.springframework.amqp.rabbit.core.RabbitAdmin#afterPropertiesSet。
final AtomicBoolean initializing = new AtomicBoolean(false); this.connectionFactory.addConnectionListener(connection -> { if (!initializing.compareAndSet(false, true)) { // If we are already initializing, we don't need to do it again... return; } try { /* * ...but it is possible for this to happen twice in the same ConnectionFactory (if more than * one concurrent Connection is allowed). It's idempotent, so no big deal (a bit of network * chatter). In fact it might even be a good thing: exclusive queues only make sense if they are * declared for every connection. If anyone has a problem with it: use auto-startup="false". */ if (this.retryTemplate != null) { this.retryTemplate.execute(c -> { initialize(); return null; }); } else { initialize(); } } finally { initializing.compareAndSet(true, false); } });
@RabbitListener(queues = "test_queue")
@Component
public class TestListener {
private final Logger logger = LoggerFactory.getLogger(TestListener.class);
@RabbitHandler
public void logMessage(Message message) {
String body = new String(message.getBody(), StandardCharsets.UTF_8);
logger.info("receiving body {}.", body);
}
}
这时再启动应用,可以从启动日志中看到RabttiMQ的自动配置过程。
@RestController
public class SendMessageController {
public static final Logger logger = LoggerFactory.getLogger(SendMessageController.class);
@Resource
private RabbitTemplate rabbitTemplate;
@RequestMapping("/sendMessage")
public void sendMessage() {
String context = "Current Date: " + new Date();
logger.info("Sender : " + context);
this.rabbitTemplate.convertAndSend("test_topic_exchange", "test.date", context);
}
}
通过RabbitMQ管理端查看,显示收到一个消息,但未回告。
查看Consumer端日志,未回告原因是因为Consumer端没有处理test_queue队列中String类型消息的Handler。
@RabbitHandler
public void logMessage(String message) {
logger.info("receiving body {}.", message);
}
重新发送消息,Consumer成功收到消息并打印日志
一个是生产者连接、一个是消费者连接
同理一个生产者Channel、一个消费者Channel
由于所有消息都都成功消费,所以消息没有积压。
RabbitMQ的四种ExChange
Spring Boot 整合RabbitMQ
SpringBoot整合RabbitMQ
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。