当前位置:   article > 正文

RabbitMQ 交换机、绑定、队列、消息、虚拟主机详解

RabbitMQ 交换机、绑定、队列、消息、虚拟主机详解

交换机属性

交换机属性:
  1. name: 交换机名称
  2. type: 交换机类型 direct,topic,fanout,headers
  3. durability: 是否需要持久化,true 为持久化
  4. auto delete: 当最后一个绑定到 exchange 上的队列被删除后,exchange 没有绑定的队列了,自动删除该 exchange
  5. internal: 当前 exchange 是否用于 rabbitMQ 内部使用,默认为 false
  6. arguments: 扩展参数,用于扩展 AMQP 协议自制定化使用
  7. 复制代码
1.direct exchange类型
  1. direct exchange: 所有发送到 direct exchange 的消息被转发到 routing key 中指定的queue
  2. 复制代码

注意:direct模式可以使用 rabbitMQ 自带的 exchange:default exchange,所以不需要将 exchange 进行任何绑定(binding)操作,消息传递时,routingkey 必须完全匹配才会被队列接收,否则该消息会被抛弃。 流转示意图如下

  1. 代码地址: https://github.com/hmilyos/rabbitmq-api-demo
  2. 复制代码

消费端代码:

  1. public class ConsumerDirectExchange {
  2. private static final Logger log = LoggerFactory.getLogger(ConsumerDirectExchange.class);
  3. // 声明
  4. public final static String EXCHANGE_NAME = "test_direct_exchange";
  5. public final static String EXCHANGE_TYPE = "direct";
  6. public final static String QUEUE_NAME = "test_direct_queue";
  7. public final static String ROUTING_KEY = "test.direct";
  8. public final static String ROUTING_KEY_ERROR = "test.direct.error";
  9. public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException,
  10. ConsumerCancelledException, InterruptedException {
  11. ConnectionFactory connectionFactory = new ConnectionFactory();
  12. connectionFactory.setHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
  13. connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
  14. connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
  15. connectionFactory.setAutomaticRecoveryEnabled(true);
  16. connectionFactory.setNetworkRecoveryInterval(3000);
  17. Connection connection = connectionFactory.newConnection();
  18. Channel channel = connection.createChannel();
  19. // 表示声明了一个交换机
  20. channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, false, null);
  21. // 表示声明了一个队列
  22. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  23. // 建立一个绑定关系:
  24. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
  25. // durable 是否持久化消息
  26. QueueingConsumer consumer = new QueueingConsumer(channel);
  27. // 参数:队列名称、是否自动ACK、Consumer
  28. channel.basicConsume(QUEUE_NAME, true, consumer);
  29. while (true) {
  30. // 获取消息,如果没有消息,这一步将会一直阻塞
  31. Delivery delivery = consumer.nextDelivery();
  32. String msg = new String(delivery.getBody());
  33. log.info("收到消息:{}", msg);
  34. }
  35. }
  36. }
  37. 复制代码

启动消费端

上管控台查看交换机和队列是否成功创建

点击进去查看绑定情况

生产端代码

  1. public class ProducerDirectExchange {
  2. private final static Logger log = LoggerFactory.getLogger(ProducerDirectExchange.class);
  3. public static void main(String[] args) throws IOException, TimeoutException {
  4. ConnectionFactory connectionFactory = new ConnectionFactory();
  5. connectionFactory.setHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
  6. connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
  7. connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
  8. Connection connection = connectionFactory.newConnection();
  9. Channel channel = connection.createChannel();
  10. String msg = "Hello World RabbitMQ Direct Exchange test.direct Message ... ";
  11. log.info("生产端发送了:{}", msg);
  12. channel.basicPublish(ConsumerDirectExchange.EXCHANGE_NAME, ConsumerDirectExchange.ROUTING_KEY, null, msg.getBytes());
  13. // channel.basicPublish(ConsumerDirectExchange.EXCHANGE_NAME, ConsumerDirectExchange.ROUTING_KEY_ERROR, null, msg.getBytes());
  14. }
  15. }
  16. 复制代码

然后把生产端run一下

再查看消费端的日志

该消费端只接收 routingkey 为 test.direct 的消息,证明 direct exchange 类型的,routingkey 必须完全匹配才会被队列接收,否则该消息会被抛弃。

2.topic exchange 类型

topic exchange: 所有发送到 topic exchange 的消息被转发到所有关心 routingkey 中 topic 的 queue 上 exchange 将 routingkey 和某 topic 进行模糊匹配,此时队列需要绑定一个 topic。 注意: topic 可以使用通配符进行模糊匹配 # 匹配一个或多个词,注意是词 * 只能匹配一个词 例如 “log.#” 能匹配到 “log.info.oa” “log.*” 只能匹配到 “log.erro” 这种格式 具体示例图如下图,usa.news 能被 usa.#,#.news 所消费,usa.weather 能被 usa.#,#.weather 所消费...

代码示例: 消费端:

  1. public class ConsumerTopicExchange {
  2. private final static Logger log = LoggerFactory.getLogger(ConsumerTopicExchange.class);
  3. // 声明
  4. public static final String EXCHANGE_NAME = "test_topic_exchange";
  5. public static final String EXCHANGE_TYPE = "topic";
  6. public static final String QUEUE_NAME = "test_topic_queue";
  7. public static final String ROUTING_KEY_one = "user.#";
  8. public static final String ROUTING_KEY = "user.*";
  9. public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException,
  10. ConsumerCancelledException, InterruptedException {
  11. ConnectionFactory connectionFactory = new ConnectionFactory();
  12. connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
  13. connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
  14. connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
  15. connectionFactory.setAutomaticRecoveryEnabled(true);
  16. connectionFactory.setNetworkRecoveryInterval(3000);
  17. Connection connection = connectionFactory.newConnection();
  18. Channel channel = connection.createChannel();
  19. // 1 声明交换机
  20. channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, false, null);
  21. // 2 声明队列
  22. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  23. // 3 建立交换机和队列的绑定关系:
  24. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
  25. // durable 是否持久化消息
  26. QueueingConsumer consumer = new QueueingConsumer(channel);
  27. // 参数:队列名称、是否自动ACK、Consumer
  28. channel.basicConsume(QUEUE_NAME, true, consumer);
  29. // 循环获取消息
  30. while (true) {
  31. // 获取消息,如果没有消息,这一步将会一直阻塞
  32. Delivery delivery = consumer.nextDelivery();
  33. String msg = new String(delivery.getBody());
  34. log.info("消费端收到消息:{}", msg);
  35. }
  36. }
  37. }
  38. 复制代码

启动消费端,上管控台查看创建、绑定是否成功

确认成功后,编写生产端代码

  1. public class ProducerTopicExchange {
  2. private final static Logger log = LoggerFactory.getLogger(ProducerTopicExchange.class);
  3. public static void main(String[] args) throws IOException, TimeoutException {
  4. ConnectionFactory connectionFactory = new ConnectionFactory();
  5. connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
  6. connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
  7. connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
  8. // 2 创建Connection
  9. Connection connection = connectionFactory.newConnection();
  10. // 3 创建Channel
  11. Channel channel = connection.createChannel();
  12. // 4 声明
  13. String routingKey1 = "user.save";
  14. String routingKey2 = "user.update";
  15. String routingKey3 = "user.delete.abc";
  16. String msg1 = "Hello World RabbitMQ Topic Exchange Message ..." + routingKey1;
  17. String msg2 = "Hello World RabbitMQ Topic Exchange Message ..." + routingKey2;
  18. String msg3 = "Hello World RabbitMQ Topic Exchange Message ..." + routingKey3;
  19. log.info("生产端, {} :{}", routingKey1, msg1);
  20. channel.basicPublish(ConsumerTopicExchange.EXCHANGE_NAME, routingKey1, null, msg1.getBytes());
  21. log.info("生产端, {} :{}", routingKey2, msg2);
  22. channel.basicPublish(ConsumerTopicExchange.EXCHANGE_NAME, routingKey2, null, msg2.getBytes());
  23. log.info("生产端, {} :{}", routingKey3, msg3);
  24. channel.basicPublish(ConsumerTopicExchange.EXCHANGE_NAME, routingKey3, null, msg3.getBytes());
  25. channel.close();
  26. connection.close();
  27. }
  28. }
  29. 复制代码

启动生产端

消费端接收到的

routingKey3 ="user.delete.abc" 的未被接收,符合 user.* 的规则 这时候在消费端把 routingKey 修改一下, routingKey ="user.#",重启消费端,上管控台

发现之前 * 的并没有解绑,需要我们手动解绑一下,然后再启动生产端的代码

发现三条都能接收到了,符合 # 的规则。

3.fanout exchange 类型

fanout exchange: 不处理路由键,只需要简单的将队列绑定到交换机上,发送到该交换机的消息都会被转发到于该交换机绑定的所有队列上,fanout 交换机由于不需要进行routingkey 的对比 直接发送所以绑定的 queue,所以转发消息是最快的 示意图如下图所示

代码实现:

  1. public class ConsumerFanoutExchange {
  2. private static final Logger log = LoggerFactory.getLogger(ConsumerFanoutExchange.class);
  3. public static final String EXCHANGE_NAME = "test_fanout_exchange";
  4. public static final String EXCHANGE_TYPE = "fanout";
  5. public static final String QUEUE_NAME = "test_fanout_queue";
  6. public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
  7. ConnectionFactory connectionFactory = new ConnectionFactory();
  8. connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
  9. connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
  10. connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
  11. connectionFactory.setAutomaticRecoveryEnabled(true);
  12. connectionFactory.setNetworkRecoveryInterval(3000);
  13. Connection connection = connectionFactory.newConnection();
  14. Channel channel = connection.createChannel();
  15. channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, false, null);
  16. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  17. // 不设置路由键
  18. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
  19. QueueingConsumer consumer = new QueueingConsumer(channel);
  20. //参数:队列名称、是否自动ACK、Consumer
  21. channel.basicConsume(QUEUE_NAME, true, consumer);
  22. log.info("消费端启动。。。");
  23. //循环获取消息
  24. while (true) {
  25. //获取消息,如果没有消息,这一步将会一直阻塞
  26. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  27. String msg = new String(delivery.getBody());
  28. log.info("消费端收到消息:{}", msg);
  29. }
  30. }
  31. }
  32. 复制代码

生产端代码:

  1. public class ProducerFanoutExchange {
  2. private static final Logger log = LoggerFactory.getLogger(ProducerFanoutExchange.class);
  3. public static void main(String[] args) throws IOException, TimeoutException {
  4. ConnectionFactory connectionFactory = new ConnectionFactory();
  5. connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
  6. connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
  7. connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
  8. // 2 创建Connection
  9. Connection connection = connectionFactory.newConnection();
  10. // 3 创建Channel
  11. Channel channel = connection.createChannel();
  12. // 5 发送
  13. for (int i = 0; i < 10; i++) {
  14. String msg = "Hello World RabbitMQ FANOUT Exchange Message ...";
  15. log.info("生产端,routingKey{}: {}", i, msg);
  16. channel.basicPublish(ConsumerFanoutExchange.EXCHANGE_NAME, "" + i, null, (msg + i).getBytes());
  17. }
  18. channel.close();
  19. connection.close();
  20. }
  21. }
  22. 复制代码

先启动消费端,再启动生产端

查看消费端的日志

routingkey0-9 的都能被就收,也就相当于该交换机上所有的队列都能接收来到该交换机的消息。 headers 类型的不常用,就不介绍了

5.binding

binding: 绑定 exchange 和 exchange/queue 之间的连接关心。binding 中可以包含 routingkey 或者参数

6. Queue

queue: 消息队列,实际存储消息数据,durability 表示是否持久化,durable 表示是,transient 表示否。auto delete: 如选择 yes,表示当最后一个监听被移除后,该 queue 会被自动删除。

7. Message

message: 服务器和应用程序之间传送的数据 本质上就是一段数据,由 properties 和 payload(body) 组成 常用属性: delivery mode,headersheaders(自定义属性),content_type,content_encoding,priority,correlation_id,reply_to,expiration,message_id,timestamp,type,user_id,app_id,cluster_id 代码实现: 消费端:

  1. public class Consumer {
  2. private static final Logger log = LoggerFactory.getLogger(Consumer.class);
  3. public static void main(String[] args) throws IOException, TimeoutException,
  4. ShutdownSignalException, ConsumerCancelledException, InterruptedException {
  5. ConnectionFactory connectionFactory = new ConnectionFactory();
  6. connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
  7. connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
  8. connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
  9. //2 通过连接工厂创建连接
  10. Connection connection = connectionFactory.newConnection();
  11. //3 通过connection创建一个Channel
  12. Channel channel = connection.createChannel();
  13. //4 声明(创建)一个队列
  14. String queueName = "test001";
  15. channel.queueDeclare(queueName, true, false, false, null);
  16. //5 创建消费者
  17. QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
  18. //6 设置Channel
  19. channel.basicConsume(queueName, true, queueingConsumer);
  20. while (true) {
  21. //7 获取消息
  22. Delivery delivery = queueingConsumer.nextDelivery();
  23. String msg = new String(delivery.getBody());
  24. log.info("消费端: " + msg);
  25. Map<String, Object> headers = delivery.getProperties().getHeaders();
  26. log.info("headers get myHeaders1 value: " + headers.get("myHeaders1"));
  27. log.info("headers get myHeaders2value: " + headers.get("myHeaders2"));
  28. //Envelope envelope = delivery.getEnvelope();
  29. }
  30. }
  31. }
  32. 复制代码

生产端:

  1. public class Procuder {
  2. private static final Logger log = LoggerFactory.getLogger(Procuder.class);
  3. public static void main(String[] args) throws IOException, TimeoutException {
  4. ConnectionFactory connectionFactory = new ConnectionFactory();
  5. connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
  6. connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
  7. connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
  8. // 2 通过连接工厂创建连接
  9. Connection connection = connectionFactory.newConnection();
  10. // 3 通过connection创建一个Channel
  11. Channel channel = connection.createChannel();
  12. Map<String, Object> headers = new HashMap<>();
  13. headers.put("myHeaders1", "111");
  14. headers.put("myHeaders2", "222");
  15. AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2).contentEncoding("UTF-8")
  16. .expiration("10000").headers(headers).build();
  17. // 4 通过Channel发送数据
  18. for (int i = 0; i < 5; i++) {
  19. String msg = "Hello RabbitMQ!";
  20. // 1 exchange 2 routingKey
  21. log.info("生产端,test001: {}", msg);
  22. channel.basicPublish("", "test001", properties, msg.getBytes());
  23. }
  24. // 5 记得要关闭相关的连接
  25. channel.close();
  26. connection.close();
  27. }
  28. }
  29. 复制代码

先启动消费端,上管控台确认交换机和队列是否创建和绑定成功,再启动生产端,消费端接收到如下的信息

8. virtual host

virtual host 虚拟主机 虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个 virtual host 里面可以有若干个 exchange 和 queue,但是里面不能有相同名称的 exchange 或 queue

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/639572
推荐阅读
相关标签
  

闽ICP备14008679号