当前位置:   article > 正文

RabbitMQ基础篇(七)-交换机(Exchanges)_rabbitmq exchanges

rabbitmq exchanges

概念

RabbitMQ中,生产者发送消息是不会直接将消息直接推送到消息推送到队列中的,实际上推送的过程是交由交换机推送到队列中(如果没有定义交换机会使用MQ默认的交换机推送),生产者只能推送消息到交换机中。其中,在之前的推送中,队列中的每条消息只能被消费者消费一次,通过交换机我们就可以实现把消息推到不同的队列,进而实现多个消费者消相同的消息。
注:交换机可以在提供者/消费者任意一端声明。
以下案例建议先启动消费者开启监听,后启动消息提供者,方便测试结果。

交换机类型

直接交换机(Direct Exchange)

通过routingKey和bindingKey将交换机和路由绑定,交换机通过对比Key值将消息推送到指定的队列中。

消息提供者
public class DirectProduct {
    private static final String EXCHANGE_NAME = "DIRECT_EXCHANGE";
    private static final String routingKey = "direct_key";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        String message = "测试一条消息";
        channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8));
        System.out.println("[提供者] ==> 推送成功!");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
消费者
public class DirectCustomer {
    private static final String EXCHANGE_NAME = "DIRECT_EXCHANGE";
    private static final String routingKey = "direct_key";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 获取临时队列,队列消息消费完后自动删除
        String queueName = channel.queueDeclare().getQueue();
        // 绑定队列
        channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
        DeliverCallback deliverCallback = (tag, message) -> {
            System.out.println("[消费者] ==> " + new String(message.getBody()));
        };
        CancelCallback cancelCallback = (tag) -> {
            System.out.println("消息接收失败");
        };
        // 绑定交换机和队列
        channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
测试

消费者启动(先启动,方便监听)

提供者启动

主题交换机(Topic Exchange)

将路由按模式匹配,可以通过设置 # 或 ***** 对队列中定义的routingKey模糊匹配,匹配成功后将消息转入队列
**#:**匹配一个或多个词,如:ta.#.te == ta.tb.tc.td.te
**:**匹配一个词,如:td..te == td.tf.te

消息提供者
public class TopicProducer {
    private static final String EXCHANGE_NAME = "EXCHANGE_NAME_TOPIC";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        // 输入
        Scanner scanner = new Scanner(System.in);
        while (true) {
            System.out.println("请输入发送消息:");
            String message = scanner.nextLine();
            if (Objects.equals(message, "exit")){
                break;
            }

            System.out.println("请输入发送路由:");
            String routerKey = scanner.nextLine();
            channel.basicPublish(EXCHANGE_NAME, routerKey, null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println("[主题交换机<" + routerKey + ">]: " + message);

        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
消费者A
public class TopicCustomer_A {
    private static final String EXCHANGE_NAME = "EXCHANGE_NAME_TOPIC";
    public static final String CODER_LURE = "coder.#.lure";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明一个临时队列,队列名称随机,当消费者端口与队列的连接时,队列自动删除
        String queueName = channel.queueDeclare().getQueue();

        // 绑定交换机和队列
        channel.queueBind(queueName, EXCHANGE_NAME, CODER_LURE);

        System.out.println("[主题交换机<消费者A>] ==<接收成功,routingKey:[" + CODER_LURE + "]>==> 监听开始!");
        DeliverCallback deliverCallback = (tag, message) -> {
            String messageContent = "[主题交换机<消费者A>] ==<接收成功,routingKey:[" + CODER_LURE + "]>==> " + new String(message.getBody());
            System.out.println(messageContent);
        };
        CancelCallback cancelCallback = (tag) -> {
            System.out.println("[主题交换机<消费者A>] ==<接收取消,routingKey:[" + CODER_LURE + "]>==> 失败!");
        };
        // 接收消息
        channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
消费者B
public class TopicCustomer_B {
    private static final String EXCHANGE_NAME = "EXCHANGE_NAME_TOPIC";
    public static final String CODER_LURE = "*.coder.lure";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明一个临时队列,队列名称随机,当消费者端口与队列的连接时,队列自动删除
        String queueName = channel.queueDeclare().getQueue();

        // 绑定交换机和队列
        channel.queueBind(queueName, EXCHANGE_NAME, CODER_LURE);

        System.out.println("[主题交换机<消费者B>] ==<接收成功,routingKey:[" + CODER_LURE + "]>==> 监听开始!");
        DeliverCallback deliverCallback = (tag, message) -> {
            String messageContent = "[主题交换机<消费者B>] ==<接收成功,routingKey:[" + CODER_LURE + "]>==> " + new String(message.getBody());
            System.out.println(messageContent);
        };
        CancelCallback cancelCallback = (tag) -> {
            System.out.println("[主题交换机<消费者B>] ==<接收取消,routingKey:[" + CODER_LURE + "]>==> 失败!");
        };
        // 接收消息
        channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
测试:

启动提供者方法,控制台信息如下:

消费者A接收

消费者B接收

首部交换机(Headers Exchange)

首部交换机和扇出交换机都不需要路由键(routingKey),交换机时通过headers头部来将消息映射到队列中。其中Hash结构中要求携带一个键 ‘x-match’ ,这个键值可以是any或者all。
any:只要在发布消息是携带有一对键值对headers满足队列定义的多个参数arguments中其中一个,就能匹配上。(需要key:value完全匹配才行)
all:需要所有entry和绑定在队列上的所有entry完全匹配

消息提供者
/**
 * HeadersProducer
 *
 * @author codeの诱惑
 * @version 1.0.0
 * @Descript 首部交换机:
 * 首部交换机和扇出交换机都不需要路由键(routingKey),交换机时通过headers头部来将消息映射到队列中
 * 其中Hash结构中要求携带一个键 ‘x-match’ ,这个键值可以是any或者all
 * any:只要在发布消息是携带有一对键值对headers满足队列定义的多个参数arguments中其中一个,就能匹配上。(需要key:value完全匹配才行)
 * all:需要所有entry和绑定在队列上的所有entry完全匹配
 * Created on 2023/5/2 17:42
 */
public class HeadersProducer {
    private static final String EXCHANGE_NAME = "HEADERS_EXCHANGE";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection conn = RabbitConnectionUtil.getConnection();
        Channel channel = conn.createChannel();
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);
        Map<String, Object> headers = new HashMap<String, Object>();
        headers.put("author", "coderの诱惑");
        headers.put("exchangeName", "Headers Exchange");
        headers.put("version", "v1.0");
        System.out.println("发送一条消息");
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().headers(headers).build();
        String message = "hello world!";
        channel.basicPublish(EXCHANGE_NAME, "", properties, message.getBytes(StandardCharsets.UTF_8));
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
消费者
/**
 * HeaderExchangeCustomer
 *
 * @author codeの诱惑
 * @version 1.0.0
 * @Descript 首部交换机
 * Created on 2023/5/3 0:11
 */
public class HeaderExchangeCustomer {
    private static final String EXCHANGE_NAME = "HEADERS_EXCHANGE";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection conn = RabbitConnectionUtil.getConnection();
        Channel channel = conn.createChannel();
        Map<String, Object> arguments = new HashMap<String, Object>();
        // arguments.put("x-match", "all");
        // 测试不匹配
        arguments.put("all", "测试不同headers,不匹配");

        arguments.put("x-match", "any");
        arguments.put("author", "coderの诱惑");
        arguments.put("exchangeName", "Headers Exchange");
        arguments.put("version", "v1.0");

        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "", arguments);
        System.out.println("[首部交换机]:消费者等待消息中...");
        DeliverCallback deliverCallback = (tag, message) -> {
            byte[] body = message.getBody();
            String string = new String(body);
            System.out.println(string);
        };
        CancelCallback cancelCallback = (tag) -> {
            System.out.println("取消");
        };
        channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
测试

执行提供者后,消费者监听结果如下:

扇出交换机(Fanount Exchange)

以广播的形式向队列推送消息,即类比通过广播发布通知,所有人接收通知内容。

以下以日志输出为案例,通过推送日志内容,一个负责将日志打印控制台,一个通过负责将日志写入日志文件当中。

消息提供者
  1. public class ExProducer {
 private static final String EXCHANGE_NAME = "EXCHANGE_NAME_LOG";
 public static void main(String[] args) throws IOException, TimeoutException {
     Connection connection = RabbitConnectionUtil.getConnection();
     Channel channel = connection.createChannel();
     channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
     Scanner scanner = new Scanner(System.in);
     while (scanner.hasNext()) {
         String inputText = scanner.next();
         channel.basicPublish(EXCHANGE_NAME, "", null, inputText.getBytes(StandardCharsets.UTF_8));
         System.out.println("[生产者输出日志]: " + inputText);
     }
 }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
消费者(控制台输出)
public class ExConsumerForConsoleLog {
 private static final String EXCHANGE_NAME = "EXCHANGE_NAME_LOG";
 public static void main(String[] args) throws IOException, TimeoutException {
     Connection connection = RabbitConnectionUtil.getConnection();
     Channel channel = connection.createChannel();
     /**
      * @param exchange 交换机名称
      * @param type 交换机类型
      */
     channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
     // 声明一个临时队列,队列名称随机,当消费者端口与队列的连接时,队列自动删除
     String queueName = channel.queueDeclare().getQueue();
     // 绑定交换机和队列
     /**
       * @param queueName 队列名称
       * @param exchangeName 交换机名称
       * @param routingKey 路由key(由于扇出交换机是以广播形式推送消息到队列,routingKey可以为空
       *                   ,即可以忽略routingKey值
       *                   ,忽略后会向所有队列推送,如果指定了routingKey,那就和 ‘直接交换机’ 差不多了)
       */
     channel.queueBind(queueName, EXCHANGE_NAME, "");
     System.out.println("等待接收队列消息, 打印控制台...");

     DeliverCallback deliverCallback = (tag, message) -> {
         System.out.println("[控制台<消费者A>] ==<接收成功>==> " + new String(message.getBody()));
     };
     CancelCallback cancelCallback = (tag) -> {
         System.out.println("[控制台<消费者A>] ==<接收取消>==> 失败!");
     };
     // 接收消息
     channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
 }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
消费者(日志文件输出)
public class ExConsumerForFileLog {
 private static final String EXCHANGE_NAME = "EXCHANGE_NAME_LOG";
 public static void main(String[] args) throws IOException, TimeoutException {
     Connection connection = RabbitConnectionUtil.getConnection();
     Channel channel = connection.createChannel();
     /**
      * @param exchange 交换机名称
      * @param type 交换机类型
      */
     channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

     // 声明一个临时队列,队列名称随机,当消费者端口与队列的连接时,队列自动删除
     String queueName = channel.queueDeclare().getQueue();

     // 绑定交换机和队列
     /**
      * @param queueName 队列名称
      * @param exchangeName 交换机名称
      * @param routingKey 路由key(由于扇出交换机是以广播形式推送消息到队列,routingKey可以为空
      *                   ,即可以忽略routingKey值
      *                   ,忽略后会向所有队列推送,如果指定了routingKey,那就和 ‘直接交换机’ 差不多了)
      */
     channel.queueBind(queueName, EXCHANGE_NAME, "");
     System.out.println("等待接收队列消息, 写入日志文件...");

     DeliverCallback deliverCallback = (tag, message) -> {
         String messageContent = "[日志文件<消费者B>] ==<接收成功>==> " + new String(message.getBody());
         SimpleDateFormat sf = new SimpleDateFormat("yyyyMMddHHmmss");
         String date = sf.format(new Date());
         System.out.println(messageContent);
         writeFile(messageContent, "" + date + ".log");
     };
     CancelCallback cancelCallback = (tag) -> {
         System.out.println("[日志文件<消费者B>] ==<接收取消>==> 失败!");
     };
     // 接收消息
     channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
 }

 public static void writeFile(String message, String path) {
     try {
         String basePath = new String(System.getProperty("user.dir").getBytes(StandardCharsets.UTF_8)) + "\\RabbitMQ-simple\\src\\main\\java\\com\\mc\\exchange\\logs\\";
         String absolutePath = basePath + path;
         System.out.println("存储路径:" + absolutePath);

         File Folder = new File(basePath);
         File file = new File(absolutePath);
         if (!Folder.exists()) {
             Folder.mkdirs();
         }
         if (!file.exists()) {
             file.createNewFile();
         }
         OutputStream out = new FileOutputStream(file);
         byte[] bytes = message.getBytes(StandardCharsets.UTF_8);
         out.write(bytes);
         out.close();
     } catch (IOException e) {
         e.printStackTrace();
     }
 }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
测试

消息提供者控制台案例

消费者输出控制台

消费者输出日志文件

默认交换机(Default Exchange)

默认交换机实际和直接交换机一样,只不过默认交换机并不用特意声明交换机名称和routingKey 。以下不作案例演示。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/431696
推荐阅读
相关标签
  

闽ICP备14008679号