赞
踩
在 RabbitMQ 中,交换机主要用来将生产者生产出来的消息,传送到对应的队列中,即交换机是一个消息传送的媒介,其英文被称为 exchange 。交换机在 RabbitMQ 中起着承上启下的作用。
交换机主要有四种类型:
常用的只有前面三种:direct、topic、fanout
所有发送到 direct 交换机的消息都被转发到RouteKey中指定的队列 queue。RouteKey必须完全匹配才会被队列接收。
RabbitMQ自带的Exchange:default Exchange就是一个 direct 类型的交换机。当生产端发送消息没有指定交换机的名称时,使用的就是该交换机。当队列名称和 routingKey 一样时,也可以不用将exchange和queue进行绑定(binding)操作。
如下代码所示:
- // 生产端:发送消息
- public static void main(String[] args) throws IOException, TimeoutException {
-
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("121.43.153.00");
- factory.setPort(5672);
- factory.setVirtualHost("/");
-
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
-
- String queueName = "direct_queue";
- // 这里发布消息时,第一个参数为交换机名称,为空,则使用默认交换机 default exchange
- // 这里的第二个参数本应该是 routingKey,这里直接使用了队列名称,则就不需要再执行 exchange和queue通过 routingKey 的绑定操作了
- channel.basicPublish("", queueName, null, "didiok send a direct message".getBytes());
- channel.close();
- connection.close();
- }
-
- // 消费端:消费消息
- public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
-
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("121.43.153.00");
- factory.setPort(5672);
- factory.setVirtualHost("/");
- factory.setAutomaticRecoveryEnabled(true);
- factory.setNetworkRecoveryInterval(3000);
-
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
-
- String queueName = "direct_queue";
- channel.queueDeclare(queueName, false, false, false, null);
-
- QueueingConsumer consumer = new QueueingConsumer(channel);
- channel.basicConsume(queueName, true, consumer);
-
- while (true) {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- System.out.println(new String(delivery.getBody()));
- }
- }
所有发送到 topic 交换机 的消息被转发到所有 RouteKey匹配到的Queue上。这里的 routingKey 可以使用通配符进行模糊匹配:
例如:“didiok.#” 能够匹配到 “didiok.hello”、“didiok.hello.world”、“didiok.hello.world.abc”等。而
“didiok.*”只会匹配到“didiok.hello”这类后面只带有一个词的。
代码如下:
- // 生产端:发送消息
- public static void main(String[] args) throws IOException, TimeoutException {
-
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("121.43.153.00");
- factory.setPort(5672);
- factory.setVirtualHost("/");
- factory.setAutomaticRecoveryEnabled(true);
- factory.setNetworkRecoveryInterval(3000);
-
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
-
- String exchangeName = "topic_exchange";
- String routingKey1 = "user.aa.dev";
- String routingKey2 = "user.bb";
- String routingKey3 = "user.cc";
-
- AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
- .contentEncoding("UTF-8")
- .headers(new HashMap<>())
- .deliveryMode(2)
- .build();
-
- String message = "didiok send a topic_message~~~";
- channel.basicPublish(exchangeName, routingKey1, props, message.getBytes());
- channel.basicPublish(exchangeName, routingKey2, props, message.getBytes());
- channel.basicPublish(exchangeName, routingKey3, props, message.getBytes());
- }
-
- // 消费端一:消费消息
- public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
-
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("121.43.153.00");
- factory.setPort(5672);
- factory.setVirtualHost("/");
- factory.setAutomaticRecoveryEnabled(true);
- factory.setNetworkRecoveryInterval(3000);
-
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
-
- /**** 以下对 交换机和队列的声明和绑定 操作,最好不要再代码中执行,建议在 rabbitMQ控制台中进行操作,因为代码可能重复执行,导致出现异常 **/
- String queueName = "topic_queue1";
- String exchangeName = "topic_exchange";
- String exchangeType = "topic";
- String routingKey = "user.#";
- channel.queueDeclare(queueName, false, false, false, null);
- channel.exchangeDeclare(exchangeName, exchangeType, false, false, null);
- channel.queueBind(queueName, exchangeName, routingKey);
-
- QueueingConsumer consumer = new QueueingConsumer(channel);
- channel.basicConsume(queueName, true, consumer);
-
- System.err.println("consumer1 starting...");
- while (true) {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- System.out.println("消息内容:" + new String(delivery.getBody()) + ",routingKey:" + delivery.getEnvelope().getRoutingKey());
- }
- }
-
- // 消费端二:消费消息
- public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("121.43.153.00");
- factory.setPort(5672);
- factory.setVirtualHost("/");
- factory.setAutomaticRecoveryEnabled(true);
- factory.setNetworkRecoveryInterval(3000);
-
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
-
- /**** 以下对 交换机和队列的声明和绑定 操作,最好不要再代码中执行,建议在 rabbitMQ控制台中进行操作,因为代码可能重复执行,导致出现问题 **/
- String queueName = "topic_queue2 ";
- String exchangeName = "topic_exchange";
- String exchangeType = "topic";
- String routingKey = "user.*";
- channel.queueDeclare(queueName,false,false,false,null);
- channel.exchangeDeclare(exchangeName,exchangeType,false, false,null);
- channel.queueBind(queueName, exchangeName, routingKey);
-
- QueueingConsumer consumer = new QueueingConsumer(channel);
- channel.basicConsume(queueName,true, consumer);
- System.err.println("consumer2 starting...");
-
- while (true) {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- System.out.println("消息内容:"+new String(delivery.getBody())+", routingKey:"+delivery.getEnvelope().getRoutingKey());
- }
- }
消费端一可以收到生产端发的三条消息,消费端二只能收到两条消息。
fanout 交换机 类似于广播,不走 routingKey,所以可以不设置 routingKey,设置了也没用。只需要简单的将队列绑定在交换机上。发送到交换机上的消息都会被转发到与该交换机绑定的所有队列上。Fanout交换机转发消息是最快的,其次是 direct 交换机,topic 交换机最慢,因为需要根据匹配规则寻找队列(通配符找起来速度慢)。
示例代码如下:
- // 生产端:发送消息
- public static void main(String[] args) throws IOException, TimeoutException {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("121.43.153.00");
- factory.setPort(5672);
- factory.setVirtualHost("/");
- factory.setAutomaticRecoveryEnabled(true);
- factory.setNetworkRecoveryInterval(3000);
-
-
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
-
- String exchangeName = "fanout_exchange";
- String routingKey = "";
- AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
- .headers(new HashMap<>())
- .contentEncoding("UTF-8")
- .deliveryMode(2)
- .build();
-
- for(int i=0; i<10; i++){
- String msg = "发送消息的序号:"+i;
- channel.basicPublish(exchangeName, routingKey, props, msg.getBytes());
- }
- channel.close();
- connection.close();
- }
-
- // 消费端一:消费消息
- public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
-
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("121.43.153.00");
- factory.setPort(5672);
- factory.setVirtualHost("/");
- factory.setAutomaticRecoveryEnabled(true);
- factory.setNetworkRecoveryInterval(3000);
-
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
-
- String queueName = "fanout_queue";
- String exchangeName = "fanout_exchange";
- String routingKey = "";
-
- channel.queueDeclare(queueName, false, false, false, null);
- channel.exchangeDeclare(exchangeName, "fanout", false,false,null);
- channel.queueBind(queueName, exchangeName, routingKey);
-
- QueueingConsumer consumer = new QueueingConsumer(channel);
- channel.basicConsume(queueName, true, consumer);
-
- System.err.println("consumer1 starting...");
- while (true) {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- System.out.println("受到消息:"+new String(delivery.getBody())+",routingKey:"+delivery.getEnvelope().getRoutingKey());
- }
- }
-
- // 消费端二:消费消息
- public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
-
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("121.43.153.00");
- factory.setPort(5672);
- factory.setVirtualHost("/");
- factory.setAutomaticRecoveryEnabled(true);
- factory.setNetworkRecoveryInterval(3000);
-
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
-
- String queueName = "fanout_queue2";
- String exchangeName = "fanout_exchange";
- String routingKey = "";
-
- channel.queueDeclare(queueName, false, false, false, null);
- channel.exchangeDeclare(exchangeName, "fanout", false,false,null);
- channel.queueBind(queueName, exchangeName, routingKey);
-
- QueueingConsumer consumer = new QueueingConsumer(channel);
- channel.basicConsume(queueName, true, consumer);
-
- System.err.println("consumer2 starting...");
- while (true) {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- System.out.println("受到消息:"+new String(delivery.getBody())+",routingKey:"+delivery.getEnvelope().getRoutingKey());
- }
- }
因为交换机和队列进行了绑定,消费端一和消费端二都会收到生产端发送的消息。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。