当前位置:   article > 正文

RabbitMQ整合Spring AMQP实战!(全)_spring amqp rabbit 2.2.6

spring amqp rabbit 2.2.6

前言

1. AMQP 核心组件

  • RabbitAdmin
  • SpringAMQP声明
  • RabbitTemplate
  • SimpleMessageListenerContainer
  • MessageListenerAdapter
  • MessageConverter

2. RabbitAdmin

RabbitAdmin类可以很好的才注意RabbitMQ,在Spring中直接进行诸如即可。

注意:

  • autoStartUp必须要设置为true,否则Spring容器不会加载RabbitAdmin类
  • RabbitAdmin底层实现就是从Spring容器中获取Exchange、Bingding、RoutingKey以及Queue的@Bean声明
  • 使用RabbitTemplate的execute方法执行对应的什么、修改、删除等一系列RabbitMQ基础功能操作
  • 例如:添加一个交换机、删除一个绑定、清空一个队列里的消息等等

2.1 代码演示

2.1.1 引入Pom文件

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <groupId>com.cp</groupId>
  6. <artifactId>rabbitmq-spring</artifactId>
  7. <version>0.0.1-SNAPSHOT</version>
  8. <packaging>jar</packaging>
  9. <name>rabbitmq-spring</name>
  10. <description>rabbitmq-spring</description>
  11. <parent>
  12. <groupId>org.springframework.boot</groupId>
  13. <artifactId>spring-boot-starter-parent</artifactId>
  14. <version>1.5.14.RELEASE</version>
  15. <relativePath/> <!-- lookup parent from repository -->
  16. </parent>
  17. <properties>
  18. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  19. <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  20. <java.version>1.8</java.version>
  21. </properties>
  22. <dependencies>
  23. <dependency>
  24. <groupId>org.springframework.boot</groupId>
  25. <artifactId>spring-boot-starter</artifactId>
  26. </dependency>
  27. <dependency>
  28. <groupId>org.springframework.boot</groupId>
  29. <artifactId>spring-boot-starter-test</artifactId>
  30. <scope>test</scope>
  31. </dependency>
  32. <dependency>
  33. <groupId>com.rabbitmq</groupId>
  34. <artifactId>amqp-client</artifactId>
  35. <version>3.6.5</version>
  36. </dependency>
  37. <dependency>
  38. <groupId>org.springframework.boot</groupId>
  39. <artifactId>spring-boot-starter-amqp</artifactId>
  40. </dependency>
  41. </dependencies>
  42. <build>
  43. <plugins>
  44. <plugin>
  45. <groupId>org.springframework.boot</groupId>
  46. <artifactId>spring-boot-maven-plugin</artifactId>
  47. </plugin>
  48. </plugins>
  49. </build>
  50. </project>
  51. 复制代码

2.1.2 配置Bean

  1. @Configuration
  2. @ComponentScan({"com.cp.spring.*"})
  3. public class RabbitMQConfig {
  4. //相当于<Bean id="connectionFactory"></Bean>
  5. @Bean
  6. public ConnectionFactory connectionFactory(){
  7. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  8. connectionFactory.setAddresses("127.0.0.1:5672");
  9. connectionFactory.setUsername("user_cp");
  10. connectionFactory.setPassword("123456");
  11. connectionFactory.setVirtualHost("/vhost_cp");
  12. return connectionFactory;
  13. }
  14. //形参名称要与bean的方法名保持一致
  15. @Bean
  16. public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
  17. RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
  18. rabbitAdmin.setAutoStartup(true);
  19. return rabbitAdmin;
  20. }
  21. }
  22. 复制代码

2.1.3 测试类

  1. @RunWith(SpringRunner.class)
  2. @SpringBootTest
  3. public class ApplicationTests {
  4. @Test
  5. public void contextLoads() {
  6. }
  7. @Autowired
  8. private RabbitAdmin rabbitAdmin;
  9. @Test
  10. public void testAdmin() throws Exception {
  11. //直连监听
  12. rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));
  13. rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));
  14. rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));
  15. rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));
  16. rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));
  17. rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));
  18. //第一个参数:具体的队列 第二个参数:绑定的类型 第三个参数:交换机 第四个参数:路由key 第五个参数:arguments 参数
  19. rabbitAdmin.declareBinding(new Binding("test.direct.queue",
  20. Binding.DestinationType.QUEUE,
  21. "test.direct", "direct", new HashMap<>()));
  22. //BindingBuilder 链式编程
  23. rabbitAdmin.declareBinding(
  24. BindingBuilder
  25. .bind(new Queue("test.topic.queue", false)) //直接创建队列
  26. .to(new TopicExchange("test.topic", false, false)) //直接创建交换机 建立关联关系
  27. .with("user.#")); //指定路由Key
  28. rabbitAdmin.declareBinding(
  29. BindingBuilder
  30. .bind(new Queue("test.fanout.queue", false))
  31. .to(new FanoutExchange("test.fanout", false, false)));
  32. //清空队列数据
  33. rabbitAdmin.purgeQueue("test.topic.queue", false);
  34. }
  35. }
  36. 复制代码

通过以上代码,可以自行测试一下结果。

RabbitAdmin源码

实现了InitializingBean接口,表明在Bean配置加载完后再加载RabbitAdmin配置。找到afterPropertiesSet()方法中最要的initialize()初始化方法。

  1. this.applicationContext.getBeansOfType(Collection.class, false, false).values()
  2. 复制代码

可以看到Exchange、Queue、Binding都是从Spring容器中获取三种类型,加载到上方定义的contextExchanges、contextQueues、contextBindings三种容器中。 后续的源码中,也可以看出通过筛选Spring容器中RabbitMQ的信息之后,再去建立RabbitMQ服务器的连接。主要通过Spring以@Bean的方式,将配置加载到Spring容器之后,再从容器中获取相关信息,再去建立连接。

3. SpringAMQP声明

  • 在Rabbit基础API里面声明一个Exchange、声明一个绑定、一个队列

-使用SpringAMQP去声明,就需要使用SpringAMQP的如下模式,即声明@Bean方式

3.1 代码演示

  1. @Configuration
  2. @ComponentScan({"com.cp.spring.*"})
  3. public class RabbitMQConfig {
  4. //相当于<Bean id="connectionFactory"></Bean>
  5. @Bean
  6. public ConnectionFactory connectionFactory(){
  7. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  8. connectionFactory.setAddresses("127.0.0.1:5672");
  9. connectionFactory.setUsername("user_cp");
  10. connectionFactory.setPassword("123456");
  11. connectionFactory.setVirtualHost("/vhost_cp");
  12. return connectionFactory;
  13. }
  14. //形参名称要与bean的方法名保持一致
  15. @Bean
  16. public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
  17. RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
  18. rabbitAdmin.setAutoStartup(true);
  19. return rabbitAdmin;
  20. }
  21. /**
  22. * 针对消费者配置
  23. * 1\. 设置交换机类型
  24. * 2\. 将队列绑定到交换机
  25. FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
  26. HeadersExchange :通过添加属性key-value匹配
  27. DirectExchange:按照routingkey分发到指定队列
  28. TopicExchange:多关键字匹配
  29. */
  30. @Bean
  31. public TopicExchange exchange001() {
  32. return new TopicExchange("topic001", true, false);
  33. }
  34. @Bean
  35. public Queue queue001() {
  36. return new Queue("queue001", true); //队列持久
  37. }
  38. @Bean
  39. public Binding binding001() {
  40. return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*");
  41. }
  42. @Bean
  43. public TopicExchange exchange002() {
  44. return new TopicExchange("topic002", true, false);
  45. }
  46. @Bean
  47. public Queue queue002() {
  48. return new Queue("queue002", true); //队列持久
  49. }
  50. @Bean
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/139179
推荐阅读
相关标签
  

闽ICP备14008679号