当前位置:   article > 正文

消息中间件系列教程(11) -RabbitMQ -案例代码(通配符模式)_c# rabbitmq 通配符模式示例

c# rabbitmq 通配符模式示例

引言

代码已上传至Github,有兴趣的同学可以下载看看:https://github.com/ylw-github/RabbitMQ-Demo

前面博客讲解了RabbitMQ的五种队列形式《消息中间件系列教程(06) -RabbitMQ -五种队列形式》,主要讲解一下五种队列的代码实现。

主要分为:

  1. 点对点队列模式(简单)
  2. 工作队列模式(公平性)
  3. 发布订阅模式
  4. 路由模式Routing
  5. 通配符模式Topics

本文主要讲解通配符模式。

配符模式

在这里插入图片描述
功能:此模式实在路由key模式的基础上,使用了通配符来管理消费者接收消息。生产者P发送消息到交换机X,type=topic,交换机根据绑定队列的routing key的值进行通配符匹配;

符号#:匹配一个或者多个词lazy.# 可以匹配lazy.irs或者lazy.irs.cor 符号*:只能匹配一个词lazy.*
可以匹配lazy.irs或者lazy.cor

1. 邮件短信案例

1.新建Maven项目RabbitMQ-Demo

2.添加Maven依赖:

<dependencies>
	<dependency>
		<groupId>com.rabbitmq</groupId>
		<artifactId>amqp-client</artifactId>
		<version>3.6.5</version>
	</dependency>
</dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

3.连接工具类

package com.ylw.rabbitmq;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitMQConnecUtils {

    public static Connection newConnection() throws IOException, TimeoutException {
        // 1.定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        // 2.设置服务器地址
        factory.setHost("127.0.0.1");

        // 3.设置协议端口号
        factory.setPort(5672);

        // 4.设置vhost
        factory.setVirtualHost("OrderHost");

        // 5.设置用户名称
        factory.setUsername("OrderAdmin");

        // 6.设置用户密码
        factory.setPassword("123456");

        // 7.创建新的连接
        Connection newConnection = factory.newConnection();
        return newConnection;
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
1.1 生产者
public class Producer {
    private static final String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建新的连接
        Connection connection = RabbitMQConnecUtils.newConnection();
       
        // 2.创建通道
        Channel channel = connection.createChannel();
       
        // 3.绑定的交换机 参数1交互机名称 参数2 exchange类型
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String routingKey = "log.info.error";
        String msg = "topic_exchange_msg" + routingKey;
       
        // 4.发送消息
        channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
        System.out.println("生产者发送msg:" + msg);
       
        // // 5.关闭通道、连接
        channel.close();
        connection.close();
        // 注意:如果消费没有绑定交换机和队列,则消息会丢失

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
1.2 消费者

邮件消费者:

public class ConsumerEmailDirect {
    private static final String QUEUE_NAME = "consumer_topic_email";
    private static final String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建新的连接
        Connection connection = RabbitMQConnecUtils.newConnection();
        // 2.创建通道
        Channel channel = connection.createChannel();
        // 3.消费者关联队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "log.#");
        // 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("邮件消费者获取生产者消息:" + msg);
            }
        };
        // 5.消费者监听队列消息
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

短信消费者:

public class ConsumerSMSDirect {
    private static final String QUEUE_NAME = "consumer_topic_sms";
    private static final String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建新的连接
        Connection connection = RabbitMQConnecUtils.newConnection();

        // 2.创建通道
        Channel channel = connection.createChannel();

        // 3.消费者关联队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "log.*");

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("短信消费者获取生产者消息:" + msg);
            }
        };

        // 5.消费者监听队列消息
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

3. 测试

启动生产者,并关闭,让其在RabbitMQ里面注册交换机,在控制台可以看出注册成功(如果不启动,可以手动注册,如下图Add a new exchange):
在这里插入图片描述
启动邮件消费者和短信消费者,在控制台可以看出有两个队列:
在这里插入图片描述
再启动生产者,可以看到邮件消费者消费信息
在这里插入图片描述
而短信消费者没有消费信息:
在这里插入图片描述

因为生产者的RouteKey为log.info.error,邮件消费者的匹配的RouteKey为log.#(匹配log后面的所有),SMS消费者的匹配的RouteKey为log.*(匹配log后面的一个)。所以只有邮件消费者能消费消息。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/994150
推荐阅读
相关标签
  

闽ICP备14008679号