springboot版本:1.5.19.RELEASE
Gradle版本:4.10
Gradle的build.gradle中增加引入
Java代码
compile('org.springframework.boot:spring-boot-starter-amqp')
application.yaml中增加配置
Yaml代码
- buddie:
- rabbitmq:
- consume:
- host: 127.0.0.1
- port: 5672
- username: admin
- password: admin
- produce:
- host: 127.0.0.1
- port: 5674
- username: admin
- password: admin
增加配置类,配置我们的两个rabbitMQ:
Java代码
- import org.springframework.amqp.core.*;
- import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
- import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
- import org.springframework.amqp.rabbit.connection.ConnectionFactory;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.context.annotation.Primary;
-
- @Configuration
- public class RabbitConfig {
-
- public ConnectionFactory rabbitConfiguration(String host, int port, String username, String password) {
- CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
- connectionFactory.setHost(host);
- connectionFactory.setPort(port);
- connectionFactory.setUsername(username);
- connectionFactory.setPassword(password);
- return connectionFactory;
- }
-
- @Bean("consumeRabbitConnectionFactory")
- @Primary
- public ConnectionFactory innerRabbitConfiguration(@Value("${buddie.rabbitmq.consume.host}") String host,
- @Value("${buddie.rabbitmq.consume.port}") int port,
- @Value("${buddie.rabbitmq.consume.username}") String username,
- @Value("${buddie.rabbitmq.consume.password}") String password) {
- return this.rabbitConfiguration(host, port, username, password);
- }
-
- @Bean("consumeRabbitTemplate")
- @Primary
- public RabbitTemplate consumeRabbitTemplate(
- @Qualifier("consumeRabbitConnectionFactory") ConnectionFactory connectionFactory
- ) {
- return new RabbitTemplate(connectionFactory);
- }
-
- public SimpleRabbitListenerContainerFactory rabbitFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
- SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
- configurer.configure(factory, connectionFactory);
- return factory;
- }
-
- @Bean("consumeRabbitFactory")
- public SimpleRabbitListenerContainerFactory consumeRabbitFactory(
- SimpleRabbitListenerContainerFactoryConfigurer configurer,
- @Qualifier("consumeRabbitConnectionFactory") ConnectionFactory connectionFactory
- ) {
- return this.rabbitFactory(configurer, connectionFactory);
- }
-
- @Bean("produceRabbitConnectionFactory")
- public ConnectionFactory outerRabbitConfiguration(@Value("${buddie.rabbitmq.produce.host}") String host,
- @Value("${buddie.rabbitmq.produce.port}") int port,
- @Value("${buddie.rabbitmq.produce.username}") String username,
- @Value("${buddie.rabbitmq.produce.password}") String password) {
- return this.rabbitConfiguration(host, port, username, password);
- }
-
- @Bean("produceRabbitTemplate")
- public RabbitTemplate produceRabbitTemplate(
- @Qualifier("produceRabbitConnectionFactory") ConnectionFactory connectionFactory
- ) {
- return new RabbitTemplate(connectionFactory);
- }
-
- @Bean("produceRabbitFactory")
- public SimpleRabbitListenerContainerFactory outerRabbitFactory(
- SimpleRabbitListenerContainerFactoryConfigurer configurer,
- @Qualifier("produceRabbitConnectionFactory") ConnectionFactory connectionFactory
- ) {
- return this.rabbitFactory(configurer, connectionFactory);
- }
-
- @Bean
- public Queue topicQueueCreate() {
- return new Queue("topic.task");
- }
-
-
- @Bean
- public TopicExchange topicExchange() {
- return new TopicExchange("topic.exchange");
- }
-
- @Bean
- public Binding topicBindingCreate() {
- return BindingBuilder.bind(this.topicQueueCreate()).to(this.topicExchange()).with("topic.task.#");
- }
-
- }
注意事项:
做为生产者,在启服时,并不会对连接rabbitMQ,更不会去创建Topic,Queue及绑定。
而作为消费者,在启服后,会连接rabbitMQ,并检查Queue是否有消息可消费。
所以应该将消费的rabbitMQ配置,加上@Primary,否则在rabbitMQ上没有对应的Queue时,报错,无法启动服务器
使用:
- import com.fasterxml.jackson.databind.ObjectMapper;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.Resource;
- import java.io.IOException;
-
- @Component
- public class MessageConsumeService {
- private static final Logger logger = LoggerFactory.getLogger(MessageConsumeService.class);
- @Resource(name = "produceRabbitTemplate")
- private RabbitTemplate outerRabbitTemplate;
- @Autowired
- private ObjectMapper objectMapper;
-
- @RabbitListener(queues = PropConstants.TOPIC_TASK, containerFactory = "consumeRabbitFactory")
- public void receiveTopicCreate(String srcMessage) {
- try {
- outerRabbitTemplate.convertAndSend("topic.exchange", "topic.task.#", srcMessage);
- } catch (Exception e) {
- logger.error("MessageConsumeService.receiveTopicCreate error", e);
- }
- }
- }