赞
踩
RabbitAdmin类可以很好的才注意RabbitMQ,在Spring中直接进行诸如即可。
注意:
2.1.1 引入Pom文件
-
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>com.cp</groupId>
- <artifactId>rabbitmq-spring</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <packaging>jar</packaging>
-
- <name>rabbitmq-spring</name>
- <description>rabbitmq-spring</description>
-
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>1.5.14.RELEASE</version>
- <relativePath/> <!-- lookup parent from repository -->
- </parent>
-
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
- <java.version>1.8</java.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>3.6.5</version>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
-
- </project>
-
- 复制代码
2.1.2 配置Bean
-
- @Configuration
- @ComponentScan({"com.cp.spring.*"})
- public class RabbitMQConfig {
-
- //相当于<Bean id="connectionFactory"></Bean>
- @Bean
- public ConnectionFactory connectionFactory(){
- CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
- connectionFactory.setAddresses("127.0.0.1:5672");
- connectionFactory.setUsername("user_cp");
- connectionFactory.setPassword("123456");
- connectionFactory.setVirtualHost("/vhost_cp");
- return connectionFactory;
- }
-
- //形参名称要与bean的方法名保持一致
- @Bean
- public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
- RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
- rabbitAdmin.setAutoStartup(true);
- return rabbitAdmin;
- }
- }
-
- 复制代码
2.1.3 测试类
-
- @RunWith(SpringRunner.class)
- @SpringBootTest
- public class ApplicationTests {
-
- @Test
- public void contextLoads() {
- }
-
- @Autowired
- private RabbitAdmin rabbitAdmin;
-
- @Test
- public void testAdmin() throws Exception {
- //直连监听
- rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));
-
- rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));
-
- rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));
-
- rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));
-
- rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));
-
- rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));
-
- //第一个参数:具体的队列 第二个参数:绑定的类型 第三个参数:交换机 第四个参数:路由key 第五个参数:arguments 参数
- rabbitAdmin.declareBinding(new Binding("test.direct.queue",
- Binding.DestinationType.QUEUE,
- "test.direct", "direct", new HashMap<>()));
-
- //BindingBuilder 链式编程
- rabbitAdmin.declareBinding(
- BindingBuilder
- .bind(new Queue("test.topic.queue", false)) //直接创建队列
- .to(new TopicExchange("test.topic", false, false)) //直接创建交换机 建立关联关系
- .with("user.#")); //指定路由Key
-
- rabbitAdmin.declareBinding(
- BindingBuilder
- .bind(new Queue("test.fanout.queue", false))
- .to(new FanoutExchange("test.fanout", false, false)));
-
- //清空队列数据
- rabbitAdmin.purgeQueue("test.topic.queue", false);
- }
- }
-
- 复制代码
通过以上代码,可以自行测试一下结果。
实现了InitializingBean
接口,表明在Bean配置加载完后再加载RabbitAdmin配置。找到afterPropertiesSet()方法中最要的initialize()初始化方法。
- this.applicationContext.getBeansOfType(Collection.class, false, false).values()
- 复制代码
可以看到Exchange、Queue、Binding都是从Spring容器中获取三种类型,加载到上方定义的contextExchanges、contextQueues、contextBindings三种容器中。 后续的源码中,也可以看出通过筛选Spring容器中RabbitMQ的信息之后,再去建立RabbitMQ服务器的连接。主要通过Spring以@Bean的方式,将配置加载到Spring容器之后,再从容器中获取相关信息,再去建立连接。
-使用SpringAMQP去声明,就需要使用SpringAMQP的如下模式,即声明@Bean方式
-
- @Configuration
- @ComponentScan({"com.cp.spring.*"})
- public class RabbitMQConfig {
-
- //相当于<Bean id="connectionFactory"></Bean>
- @Bean
- public ConnectionFactory connectionFactory(){
- CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
- connectionFactory.setAddresses("127.0.0.1:5672");
- connectionFactory.setUsername("user_cp");
- connectionFactory.setPassword("123456");
- connectionFactory.setVirtualHost("/vhost_cp");
- return connectionFactory;
- }
-
- //形参名称要与bean的方法名保持一致
- @Bean
- public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
- RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
- rabbitAdmin.setAutoStartup(true);
- return rabbitAdmin;
- }
-
- /**
- * 针对消费者配置
- * 1\. 设置交换机类型
- * 2\. 将队列绑定到交换机
- FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
- HeadersExchange :通过添加属性key-value匹配
- DirectExchange:按照routingkey分发到指定队列
- TopicExchange:多关键字匹配
- */
- @Bean
- public TopicExchange exchange001() {
- return new TopicExchange("topic001", true, false);
- }
-
- @Bean
- public Queue queue001() {
- return new Queue("queue001", true); //队列持久
- }
-
- @Bean
- public Binding binding001() {
- return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*");
- }
-
- @Bean
- public TopicExchange exchange002() {
- return new TopicExchange("topic002", true, false);
- }
-
- @Bean
- public Queue queue002() {
- return new Queue("queue002", true); //队列持久
- }
-
- @Bean
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。