当前位置:   article > 正文

RabbitMQ之交换机

RabbitMQ之交换机

目录

前言

Exchange(交换机)的类型与应用

        - 交换机的属性

1. 直连交换机:Direct Exchange

2. 主题交换机:Topic Exchange

3. 扇形交换机:Fanout Exchange

4. 默认交换机(直连)


前言

        在讲交换机之前我们需要了解一些概念,在RabbitMQ工作流程有一项叫Exchange(交换机:消息的分发中心),它的作用是将生产者发送的消息转发到具体的队列,队列再将消息以推送或者拉取方式给消费者进行消费。

原:RabbitMQ中生产者发送的信息不会直接投递到队列中,而是先将消息投递到交换机中,在由交换机路由到一个或多个队列中。

流程:生产者 --(路由键)---> 交换机 --(绑定键)---> 队列 --(pull,push)--->消费者

 

 这里就需要了解这两个东西:

  • 路由键(RoutingKey)每个消息都有一个称为路由键(routing key)的属性,它其实就是一个简单的字符串(或者可以说是一种规则的字符串)
  • 绑定键(BindingKey)就是指定将队列跟接收路由键的交换机进行绑定

        生产者将信息发送给哪个Exchange是由RoutingKey决定的,而Exchange与哪个队列绑定是由BindingKey决定的。

Exchange(交换机)的类型与应用

- 交换机的属性

除交换机类型外,在声明交换机时还可以附带许多其他的属性,其中最重要的几个分别是:

  • Type:交换机名称
  • Durability:是否持久化。如果持久性,则RabbitMQ重启后,交换机还存在
  • Auto-delete:当所有与之绑定的消息队列都完成了对此交换机的使用后,是否删掉它
  • Internal:当前Exchange是否用于RabbitMQ内部使用,默认fasle
  • Arguments:扩展参数

这里就直接开始操作,配置在一篇就已做了讲解,非常简单!接下来的代码也之上在原基础上做了添加。

项目结构:

所需依赖:

  1. <!--amqp协议-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.boot</groupId>
  8. <artifactId>spring-boot-starter-web</artifactId>
  9. </dependency>
  10. <dependency>
  11. <groupId>org.projectlombok</groupId>
  12. <artifactId>lombok</artifactId>
  13. <optional>true</optional>
  14. </dependency>

1. 直连交换机:Direct Exchange

  • 这是最简单的一种交换机类型。
  • 当一个队列与交换机绑定时,需要指定一个路由键(RoutingKey),只有当消息的路由键与该队列绑定时指定的绑定键(BindingKey)完全匹配时,消息才会被路由到该队列。

如下图:

一、p发送消息时带了一个叫black的routing_key,交换机接受后会送到与black绑定的binding_key的队列中,也就是Q2。以此类推...

二、还有一种情况,当Q2也与交换机绑定了black,这时p发送的信息会同时推送到Q1和Q2两个队列中。

  • 生产者
  1. package com.ycxw.publisher.demos;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.DirectExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. //定义队列
  9. @Configuration
  10. @SuppressWarnings("all")
  11. public class RabbitConfig {
  12. /**
  13. * 定义队列 Q1
  14. * @return
  15. */
  16. @Bean
  17. public Queue directQ1() {
  18. return new Queue("direct-Q1");
  19. }
  20. /**
  21. * 定义队列 Q2
  22. * @return
  23. */
  24. @Bean
  25. public Queue directQ2() {
  26. return new Queue("direct-Q2");
  27. }
  28. /**
  29. * 自定义直连交换机
  30. * @return
  31. */
  32. @Bean
  33. public DirectExchange directExchange() {
  34. return new DirectExchange("direct-exchange", true, false);
  35. }
  36. /**
  37. * 将队列 Q1与交换机进行绑定,并设置路由键
  38. * @return
  39. */
  40. @Bean
  41. public Binding bindingQ1() {
  42. return BindingBuilder.bind(directQ1())
  43. .to(directExchange())
  44. .with("direct_orange");
  45. }
  46. /**
  47. * 将队列 Q2与交换机进行绑定,并设置路由键
  48. * @return
  49. */
  50. @Bean
  51. public Binding bindingQ2() {
  52. return BindingBuilder.bind(directQ2())
  53. .to(directExchange())
  54. .with("direct_black");
  55. }
  56. }
  1. package com.ycxw.publisher.demos;
  2. import com.fasterxml.jackson.core.JsonProcessingException;
  3. import com.fasterxml.jackson.databind.ObjectMapper;
  4. import org.springframework.amqp.core.AmqpTemplate;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.web.bind.annotation.RequestMapping;
  7. import org.springframework.web.bind.annotation.RestController;
  8. /**
  9. * 模拟发送请求
  10. */
  11. @RestController
  12. public class Sender {
  13. @Autowired
  14. private AmqpTemplate rabbitTemplate;
  15. @RequestMapping("/send1")
  16. public String sendFirst() {
  17. /*向消息队列发送消息 converAndSend(交换机,路由键,发送的信息)*/
  18. rabbitTemplate.convertAndSend("direct-exchange", "direct_orange", "我是Q1");