当前位置:   article > 正文

RabbitMQ之交换机类型_mq交换机的作用

mq交换机的作用

一、交换机类型

RabbitMQ 中,交换机主要用来将生产者生产出来的消息,传送到对应的队列中,即交换机是一个消息传送的媒介,其英文被称为 exchange 。交换机在 RabbitMQ 中起着承上启下的作用。

交换机主要有四种类型:

  • direct: 直连
  • topic: 主题
  • fanout:广播
  • headers: 请求头

常用的只有前面三种:direct、topic、fanout

二、direct 交换机

所有发送到 direct 交换机的消息都被转发到RouteKey中指定的队列 queue。RouteKey必须完全匹配才会被队列接收。

RabbitMQ自带的Exchange:default Exchange就是一个 direct 类型的交换机。当生产端发送消息没有指定交换机的名称时,使用的就是该交换机。当队列名称和 routingKey 一样时,也可以不用将exchange和queue进行绑定(binding)操作。

如下代码所示:

  1. // 生产端:发送消息
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. ConnectionFactory factory = new ConnectionFactory();
  4. factory.setHost("121.43.153.00");
  5. factory.setPort(5672);
  6. factory.setVirtualHost("/");
  7. Connection connection = factory.newConnection();
  8. Channel channel = connection.createChannel();
  9. String queueName = "direct_queue";
  10. // 这里发布消息时,第一个参数为交换机名称,为空,则使用默认交换机 default exchange
  11. // 这里的第二个参数本应该是 routingKey,这里直接使用了队列名称,则就不需要再执行 exchange和queue通过 routingKey 的绑定操作了
  12. channel.basicPublish("", queueName, null, "didiok send a direct message".getBytes());
  13. channel.close();
  14. connection.close();
  15. }
  16. // 消费端:消费消息
  17. public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
  18. ConnectionFactory factory = new ConnectionFactory();
  19. factory.setHost("121.43.153.00");
  20. factory.setPort(5672);
  21. factory.setVirtualHost("/");
  22. factory.setAutomaticRecoveryEnabled(true);
  23. factory.setNetworkRecoveryInterval(3000);
  24. Connection connection = factory.newConnection();
  25. Channel channel = connection.createChannel();
  26. String queueName = "direct_queue";
  27. channel.queueDeclare(queueName, false, false, false, null);
  28. QueueingConsumer consumer = new QueueingConsumer(channel);
  29. channel.basicConsume(queueName, true, consumer);
  30. while (true) {
  31. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  32. System.out.println(new String(delivery.getBody()));
  33. }
  34. }

三、topic 交换机

所有发送到 topic 交换机 的消息被转发到所有 RouteKey匹配到的Queue上。这里的 routingKey 可以使用通配符进行模糊匹配

  • 符号 # 匹配一个或多个词
  • 符号 * 匹配一个词

例如:“didiok.#” 能够匹配到 “didiok.hello”、“didiok.hello.world”、“didiok.hello.world.abc”等。而
“didiok.*”只会匹配到“didiok.hello”这类后面只带有一个词的。

代码如下:

  1. // 生产端:发送消息
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. ConnectionFactory factory = new ConnectionFactory();
  4. factory.setHost("121.43.153.00");
  5. factory.setPort(5672);
  6. factory.setVirtualHost("/");
  7. factory.setAutomaticRecoveryEnabled(true);
  8. factory.setNetworkRecoveryInterval(3000);
  9. Connection connection = factory.newConnection();
  10. Channel channel = connection.createChannel();
  11. String exchangeName = "topic_exchange";
  12. String routingKey1 = "user.aa.dev";
  13. String routingKey2 = "user.bb";
  14. String routingKey3 = "user.cc";
  15. AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
  16. .contentEncoding("UTF-8")
  17. .headers(new HashMap<>())
  18. .deliveryMode(2)
  19. .build();
  20. String message = "didiok send a topic_message~~~";
  21. channel.basicPublish(exchangeName, routingKey1, props, message.getBytes());
  22. channel.basicPublish(exchangeName, routingKey2, props, message.getBytes());
  23. channel.basicPublish(exchangeName, routingKey3, props, message.getBytes());
  24. }
  25. // 消费端一:消费消息
  26. public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
  27. ConnectionFactory factory = new ConnectionFactory();
  28. factory.setHost("121.43.153.00");
  29. factory.setPort(5672);
  30. factory.setVirtualHost("/");
  31. factory.setAutomaticRecoveryEnabled(true);
  32. factory.setNetworkRecoveryInterval(3000);
  33. Connection connection = factory.newConnection();
  34. Channel channel = connection.createChannel();
  35. /**** 以下对 交换机和队列的声明和绑定 操作,最好不要再代码中执行,建议在 rabbitMQ控制台中进行操作,因为代码可能重复执行,导致出现异常 **/
  36. String queueName = "topic_queue1";
  37. String exchangeName = "topic_exchange";
  38. String exchangeType = "topic";
  39. String routingKey = "user.#";
  40. channel.queueDeclare(queueName, false, false, false, null);
  41. channel.exchangeDeclare(exchangeName, exchangeType, false, false, null);
  42. channel.queueBind(queueName, exchangeName, routingKey);
  43. QueueingConsumer consumer = new QueueingConsumer(channel);
  44. channel.basicConsume(queueName, true, consumer);
  45. System.err.println("consumer1 starting...");
  46. while (true) {
  47. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  48. System.out.println("消息内容:" + new String(delivery.getBody()) + ",routingKey:" + delivery.getEnvelope().getRoutingKey());
  49. }
  50. }
  51. // 消费端二:消费消息
  52. public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
  53. ConnectionFactory factory = new ConnectionFactory();
  54. factory.setHost("121.43.153.00");
  55. factory.setPort(5672);
  56. factory.setVirtualHost("/");
  57. factory.setAutomaticRecoveryEnabled(true);
  58. factory.setNetworkRecoveryInterval(3000);
  59. Connection connection = factory.newConnection();
  60. Channel channel = connection.createChannel();
  61. /**** 以下对 交换机和队列的声明和绑定 操作,最好不要再代码中执行,建议在 rabbitMQ控制台中进行操作,因为代码可能重复执行,导致出现问题 **/
  62. String queueName = "topic_queue2 ";
  63. String exchangeName = "topic_exchange";
  64. String exchangeType = "topic";
  65. String routingKey = "user.*";
  66. channel.queueDeclare(queueName,false,false,false,null);
  67. channel.exchangeDeclare(exchangeName,exchangeType,false, false,null);
  68. channel.queueBind(queueName, exchangeName, routingKey);
  69. QueueingConsumer consumer = new QueueingConsumer(channel);
  70. channel.basicConsume(queueName,true, consumer);
  71. System.err.println("consumer2 starting...");
  72. while (true) {
  73. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  74. System.out.println("消息内容:"+new String(delivery.getBody())+", routingKey:"+delivery.getEnvelope().getRoutingKey());
  75. }
  76. }

消费端一可以收到生产端发的三条消息,消费端二只能收到两条消息。

四、fanout 交换机

fanout 交换机 类似于广播,不走 routingKey,所以可以不设置 routingKey,设置了也没用。只需要简单的将队列绑定在交换机上。发送到交换机上的消息都会被转发到与该交换机绑定的所有队列上。Fanout交换机转发消息是最快的,其次是 direct 交换机,topic 交换机最慢,因为需要根据匹配规则寻找队列(通配符找起来速度慢)。

示例代码如下:

  1. // 生产端:发送消息
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. ConnectionFactory factory = new ConnectionFactory();
  4. factory.setHost("121.43.153.00");
  5. factory.setPort(5672);
  6. factory.setVirtualHost("/");
  7. factory.setAutomaticRecoveryEnabled(true);
  8. factory.setNetworkRecoveryInterval(3000);
  9. Connection connection = factory.newConnection();
  10. Channel channel = connection.createChannel();
  11. String exchangeName = "fanout_exchange";
  12. String routingKey = "";
  13. AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
  14. .headers(new HashMap<>())
  15. .contentEncoding("UTF-8")
  16. .deliveryMode(2)
  17. .build();
  18. for(int i=0; i<10; i++){
  19. String msg = "发送消息的序号:"+i;
  20. channel.basicPublish(exchangeName, routingKey, props, msg.getBytes());
  21. }
  22. channel.close();
  23. connection.close();
  24. }
  25. // 消费端一:消费消息
  26. public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
  27. ConnectionFactory factory = new ConnectionFactory();
  28. factory.setHost("121.43.153.00");
  29. factory.setPort(5672);
  30. factory.setVirtualHost("/");
  31. factory.setAutomaticRecoveryEnabled(true);
  32. factory.setNetworkRecoveryInterval(3000);
  33. Connection connection = factory.newConnection();
  34. Channel channel = connection.createChannel();
  35. String queueName = "fanout_queue";
  36. String exchangeName = "fanout_exchange";
  37. String routingKey = "";
  38. channel.queueDeclare(queueName, false, false, false, null);
  39. channel.exchangeDeclare(exchangeName, "fanout", false,false,null);
  40. channel.queueBind(queueName, exchangeName, routingKey);
  41. QueueingConsumer consumer = new QueueingConsumer(channel);
  42. channel.basicConsume(queueName, true, consumer);
  43. System.err.println("consumer1 starting...");
  44. while (true) {
  45. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  46. System.out.println("受到消息:"+new String(delivery.getBody())+",routingKey:"+delivery.getEnvelope().getRoutingKey());
  47. }
  48. }
  49. // 消费端二:消费消息
  50. public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
  51. ConnectionFactory factory = new ConnectionFactory();
  52. factory.setHost("121.43.153.00");
  53. factory.setPort(5672);
  54. factory.setVirtualHost("/");
  55. factory.setAutomaticRecoveryEnabled(true);
  56. factory.setNetworkRecoveryInterval(3000);
  57. Connection connection = factory.newConnection();
  58. Channel channel = connection.createChannel();
  59. String queueName = "fanout_queue2";
  60. String exchangeName = "fanout_exchange";
  61. String routingKey = "";
  62. channel.queueDeclare(queueName, false, false, false, null);
  63. channel.exchangeDeclare(exchangeName, "fanout", false,false,null);
  64. channel.queueBind(queueName, exchangeName, routingKey);
  65. QueueingConsumer consumer = new QueueingConsumer(channel);
  66. channel.basicConsume(queueName, true, consumer);
  67. System.err.println("consumer2 starting...");
  68. while (true) {
  69. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  70. System.out.println("受到消息:"+new String(delivery.getBody())+",routingKey:"+delivery.getEnvelope().getRoutingKey());
  71. }
  72. }

因为交换机和队列进行了绑定,消费端一和消费端二都会收到生产端发送的消息。 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/402421?site
推荐阅读
相关标签
  

闽ICP备14008679号