当前位置:   article > 正文

新手小白入门Rabbitmq

新手小白入门Rabbitmq

一、简介

rabbitmq 称为 消息服务器,消息队列 Message Queue,消息中间件 Broker,用于处理来自客户端的异步消息。服务端将要发送的消息放入到队列池中。接收端可以根据RabbitMQ配置的转发机制接收服务端发来的消息。RabbitMQ依据指定的转发规则进行消息的转发、缓冲和持久化操作,主要用在多服务器间或单服务器的子系统间进行通信,是分布式系统标准的配置。
常见消息队列:Rabbitmq,Rocketmq,Kafka,Activemq,Tubemq
在这里插入图片描述

二、rabbitmq基本概念

1 Exchange

接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为。在RabbitMQ中,ExchangeType常用的有direct、Fanout和Topic三种。
在这里插入图片描述

2 Message Queue

消息队列。我们发送给RabbitMQ的消息最后都会到达各种queue,并且存储在其中(如果路由找不到相应的queue则数据会丢失),等待消费者来取。

3 Binding Key

它表示的是Exchange与Message Queue是通过binding key进行联系的,这个关系是固定。

4 Routing Key

生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则。这个routing key需要与Exchange Type及binding key联合使用才能生,我们的生产者只需要通过指定routing key来决定消息流向哪里。

三、应用场景

1 服务解耦

服务A产生数据, 而服务B,C,D需要这些数据, 那么我们可以在A服务中直接调用B,C,D服务,把数据传递到下游服务即可但是,随着我们的应用规模不断扩大,会有更多的服务需要A的数据,如果有几十甚至几百个下游服务,而且会不断变更,再加上还要考虑下游服务出错的情况,那么A服务中调用代码的维护会极为困难。这是由于服务之间耦合度过于紧密。
再来考虑用RabbitMQ解耦的情况,A服务只需要向消息服务器发送消息,而不用考虑谁需要这些数据;下游服务如果需要数据,自行从消息服务器订阅消息,不再需要数据时则取消订阅即可
在这里插入图片描述

2 流量削峰

假设我们有一个应用,平时访问量是每秒300请求,我们用一台服务器即可轻松应对,而在高峰期,访问量瞬间翻了十倍,达到每秒3000次请求,那么单台服务器肯定无法应对,这时我们可以考虑增加到10台服务器,来分散访问压力,但如果这种瞬时高峰的情况每天只出现一次,每次只有半小时,那么我们10台服务器在多数时间都只分担每秒几十次请求,这样就有点浪费资源了
这种情况,我们就可以使用RabbitMQ来进行流量削峰,高峰情况下,瞬间出现的大量请求数据,先发送到消息队列服务器,排队等待被处理,而我们的应用,可以慢慢的从消息队列接收请求数据进行处理,这样把数据处理时间拉长,以减轻瞬时压力
在这里插入图片描述

3 异步调用

考虑定外卖支付成功的情况,支付后要发送支付成功的通知,再寻找外卖小哥来进行配送,而寻找外卖小哥的过程非常耗时,尤其是高峰期,可能要等待几十秒甚至更长,这样就造成整条调用链路响应非常缓慢
在这里插入图片描述
而如果我们引入RabbitMQ消息队列,订单数据可以发送到消息队列服务器,那么调用链路也就可以到此结束,订单系统则可以立即得到响应,整条链路的响应时间只有200毫秒左右,寻找外卖小哥的应用可以以异步的方式从消息队列接收订单消息,再执行耗时的寻找操作
在这里插入图片描述

四、搭建Rabbitmq服务器

linux环境搭建docker点这里>>>

已有docker环境的linux虚拟机克隆命名为rabbitmq

在线下载rabbitmq镜像: docker pull rabbitmq:management
查看rabbitmq镜像: docker images
启动rabbitmq容器

# 在线拉取的新版本 3.9 以上
mkdir /etc/rabbitmq
vim /etc/rabbitmq/rabbitmq.conf

# 添加两行配置:
default_user = admin
default_pass = admin

docker run -d --name rabbit \
-p 5672:5672 \
-p 15672:15672 \
-v /etc/rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-e RABBITMQ_CONFIG_FILE=/etc/rabbitmq/rabbitmq.conf \
rabbitmq:management
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

端口说明:
5672:客户端通信的端口
15672:管理控制台界面
浏览器输入:http://192.168.64.140:15672 用户名/密码:admin/admin
在这里插入图片描述

五、Rabbitmq工作模式

pom.xml文件加入依赖

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.4.3</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

以下获取连接和通信的通道在各种模式下是共性代码,下面各种模式demo就不重复写啦

    //创建连接工厂,并设置连接信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("1.13.9.40");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        //与rabbitmq服务器建立连接,rabbitmq服务器端使用的是nio,会复用tcp连接,并开辟多个信道与客户端通信,以减轻服务器端建立连接的开销
        Connection connection = factory.newConnection();
        //建立通信的通道
        Channel channel = connection.createChannel();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

说明:下面所有图中 P 表示为生产者、 X 表示交换机、红色表示队列,C1,C2 表示为消费者

1 简单模式(点对点模式)

应用场景:发送验证码,
一个生产者,一个消费者。生产者将消息放入队列,消费者监听消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理就已经从队列中消失了,造成消息的丢失)
在这里插入图片描述
生产者产生消息

/**
         * 在mq服务器上创建helloworld队列,如果服务器端,队列已经存在,不会重复创建
         * 参数:
         * 1. queue队列名
         * 2. durable: 队列持久化,true表示RabbitMQ重启后队列仍存在
         * 3. exclusive 是否是排他队列(独占),,true表示限制仅当前连接可用
         * 4. autoDelete 是否是自动删除队列,当最后一个消费者断开后,是否删除队列
         * 5. 其他的参数属性
         */
        channel.queueDeclare("helloworld", false, false, false, null);
        /**
         * 向helloworld队列发送消息,这里把消息向默认交换机发送.* 默认交换机隐含与所有队列绑定,routing key即为队列名称
         * 参数:
         * 	exchange: 交换机名称,空串表示默认交换机"(AMQP default)",不能用 null
         * 	routingKey: 对于默认交换机,路由键就是目标队列名称
         * 	props: 其他参数,例如头信息
         * 	body: 消息内容byte[]数组
         */
        channel.basicPublish("", "helloworld", null, "hello rabbitmq!11111".getBytes());
        System.out.println("消息已发送");
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

消费者消费消息

        //创建helloworld队列,声明队列,如果该队列已经创建过,则不会重复创建
        channel.queueDeclare("helloworld", false, false, false, null);
        //收到消息后用来处理消息的回调对象
        DeliverCallback deliverCallback = (s, message) -> {
            byte[] body = message.getBody();
            System.out.println("收到: " + new String(body));
        };
        //消费者取消时的回调对象
        CancelCallback cancelCallback = (consumerTag) -> {

        };
        /**
         * 从helloworld队列接收消息,传递给回调对象进行处理
         * 第二个参数 autoAck 自动确认 auto acknowledgement
         *    消息回执,如果消费者处理完消息,可以向服务器发送回执进行确认
         *     autoAck=true--mq服务器自动确认,不等待消费者发动回执
         *     autoAck=false--要求消费者手动发送回执进行确认
         */
        channel.basicConsume("helloworld", true, deliverCallback, cancelCallback);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

2 work模式(竞争模式)

应用场景: 抢包红;大项目中的资源调度;秒杀系统的订单处理服务等
一个生产者,多个消费者,每个消费者获取到的消息唯一。消息产生者将消息放入队列后多个消费者同时监听同一个队列,消息被消费。多个消费者同时抢夺当前消息队列里的内容,谁先拿到就可以消费消息(隐患,高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关(syncronize,与同步锁的性能不一样) 保证一条消息只能被一个消费者使用

rabbitmq在所有消费者中轮询分发消息,把消息均匀地发送给所有消费者
在这里插入图片描述

消息确认
  • 1

为了确保消息不会丢失,rabbitmq支持消息确认(回执)。当一个消息被消费者接收到并且执行完成后,消费者会发送一个ack (acknowledgment) 给rabbitmq服务器, 告诉他我已经执行完成了,你可以把这条消息删除了。

如果一个消费者没有返回消息确认就挂掉了(信道关闭,连接关闭或者TCP链接丢失),rabbitmq就会明白,这个消息没有被处理完成,rebbitmq就会把这条消息重新放入队列,如果在这时有其他的消费者在线,那么rabbitmq就会迅速的把这条消息传递给其他的消费者,这样就确保了没有消息会丢失。

这里不存在消息超时, rabbitmq只在消费者挂掉时重新分派消息, 即使消费者花非常久的时间来处理消息也可以

合理地分发
  • 1

abbitmq会一次把多个消息分发给消费者, 这样可能造成有的消费者非常繁忙, 而其它消费者空闲. 而rabbitmq对此一无所知, 仍然会均匀的分发消息

我们可以使用 basicQos(1) 方法, 这告诉rabbitmq一次只向消费者发送一条消息, 在返回确认回执前, 不要向消费者发送新消息. 而是把消息发给下一个空闲的消费者
在这里插入图片描述

消息持久化
  • 1

当rabbitmq关闭时, 我们队列中的消息仍然会丢失, 除非明确要求它不要丢失数据
要求rabbitmq不丢失数据要做如下两点: 把队列和消息都设置为可持久化(durable)
队列设置为可持久化, 可以在定义队列时指定参数durable为true
生产着产生消息

        /**
         * 在mq服务器上创建helloworld队列,/如果服务器端,队列已经存在,不会重复创建
         * 参数:
         * 1. 队列名
         * 2. 是否是非持久队列
         * 3. 是否是排他队列(独占)
         * 4. 是否是自动删除队列
         * 5. 其他的参数属性
         */
        channel.queueDeclare("task_queue", true, false, false, null);
        while (true){
            System.out.print("输入消息:");
            String s = new Scanner(System.in).nextLine();
            channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, s.getBytes());
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

消费者消费消息

 /**
         * 在mq服务器上创建helloworld队列,/如果服务器端,队列已经存在,不会重复创建
         * 参数:
         * 1. 队列名
         * 2. 是否是非持久队列
         * 3. 是否是排他队列(独占)
         * 4. 是否是自动删除队列
         * 5. 其他的参数属性
         */
        channel.queueDeclare("task_queue", true, false, false, null);
        //创建回调对象
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            String s=new String(message.getBody());
            System.out.println("收到: "+s);
            //模拟处理耗时消息,遍历字符串,每找到一个“.”,暂停1秒
            for (int i = 0; i < s.length(); i++) {
                if(s.charAt(i)=='.'){
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
            /**
             * 手动发送回执
             * 参数
             *  1.回执
             *  2.是否同时确认之前接受过的所有消息 false表示只确认一条
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
            System.out.println("-----------------------------------------消息处理完成");
        };
        //每次收一条,处理完之前不收下一条
        channel.basicQos(1);

        //接收消息
        channel.basicConsume("task_queue", false, deliverCallback, (consumerTag) -> {});
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

3 Publish/Subscribe发布/订阅模式(共享资源)

应用场景: 短信群发;发送广播,公告消息等
一个生产者发送的消息会被多个消费者获取

在这里插入图片描述

Exchanges 交换机
  • 1

RabbitMQ消息传递模型的核心思想是,生产者只能向交换机(Exchange)发送消息,生产者永远不会将任何消息直接发送到队列,也不知道消息是否会被传递到任何队列。

常见交换类型:direct、topic、header和fanout,发布订阅模式常用fanout交换机,它只是将接收到的所有消息广播给它所知道的所有队列

临时队列
  • 1

日志系统也可以使用非命名队列(可以不手动命名),我们希望收到所有日志消息,而不是部分。并且我们希望总是接收到新的日志消息而不是旧的日志消息。为了解决这个问题,需要分两步走。
第一步:无论何时我们的消费者连接到RabbitMq,我们都需要一个新的、空的队列来接收日志消息,因此,消费者在连接上RabbitMq之后需要创建一个任意名字的队列,或者让RabbitMq生成任意的队列名字。
第二步:一旦该消费者断开了与RabbitMq的连接,队列也被自动删除。
  通过JAVA客户端的无参方法:queueDeclare()来创建一个非持久化、专有的、自动删除的、名字随机生成的队列。

绑定 Bindings
  • 1

在这里插入图片描述
我们已经创建了一个fanout交换机和一个队列。现在我们需要告诉exchange向指定队列发送消息。exchange和队列之间的关系称为绑定
生成者代码
将消息发布到logs交换机,而不是无名的日志交换机。我们需要在发送时提供一个routingKey,但是对于fanout交换机类型,该值会被忽略。

        //创建名字为logs的交换机,交换机类型为fanout,这一步是必须的,因为禁止发布到不存在的交换。
        channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
        //向交换机发送消息
        while (true){
            System.out.print("输入消息: ");
            String s = new Scanner(System.in).nextLine();
            /**
             * 第一个参数,向指定的交换机发送消息
             * 第二个参数队列名,不指定队列,由消费者向交换机绑定队列,对于fanout交换机无效
             */
        channel.basicPublish("logs", "", null, s.getBytes());
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

消费者代码
如果还没有队列绑定到交换器,消息就会丢失,但这对我们来说没有问题;如果还没有消费者在听,我们可以安全地丢弃这些信息

        //1.创建随机队列
        String queue = UUID.randomUUID().toString();
        channel.queueDeclare(queue, false, true, true, null);
        //2.创建名字为 logs 的交换机, 它的类型是 fanout
        channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
        //3.队列和交换机进行绑定,对于fanout交换机,第三个参数routingKey无效,会被忽略,不允许null值
        channel.queueBind(queue, "logs", "");
        //接收消息
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("收到消息: " + new String(message.getBody()));
        };
        channel.basicConsume(queue, true, deliverCallback, (consumerTag) -> {
        });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

4 Routing路由模式

发送消息到交换机并且要指定路由key ,消费者将队列绑定到交换机时需要指定路由key
需求:我们将只订阅所有消息中的一部分。例如,我们只接收关键错误消息并保存到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。
在这里插入图片描述

绑定 Bindings
  • 1

绑定是交换机和队列之间的关系。可理解为:队列对来自此交换的消息感兴趣。绑定可以使用额外的routingKey参数。为了避免与basic_publish参数混淆,我们将其称为bindingKey

channel.queueBind(queue, "direct_logs", k);
  • 1

bindingKey的含义取决于交换机类型。我们前面使用的fanout交换机完全忽略它。

直连交换机 Direct exchange
  • 1

需求:将日志消息写入磁盘的程序只接收关键error,而不是在warning或info日志消息上浪费磁盘空间。
前面我们使用的是fanout交换机,它只能进行简单的广播。

现在,我们将用直连交换机(Direct exchange)代替。它背后的路由算法很简单——消息传递到bindingKey与routingKey完全匹配的队列。
在这里插入图片描述
其中我们可以看到直连交换机X,它绑定了两个队列。第一个队列用绑定键orange绑定,第二个队列有两个绑定,一个绑定black,另一个绑定键green。

这样设置,使用路由键orange发布到交换器的消息将被路由到队列Q1。带有black或green路由键的消息将转到Q2。而所有其他消息都将被丢弃。

多重绑定 Multiple bindings
  • 1

在这里插入图片描述
使用相同的bindingKey绑定多个队列是完全允许的。如图所示,可以使用binding key black将X与Q1和Q2绑定。在这种情况下,直连交换机的行为类似于fanout,并将消息广播给所有匹配的队列。一条路由键为black的消息将同时发送到Q1和Q2。
生产着代码

        //创建交换机 参数1: 交换机名,参数2: 交换机类型
        channel.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
        //向交换机发送消息,携带关键词(路由键)
        while (true){
            System.out.print("输入消息: ");
            String message=new Scanner(System.in).nextLine();
            System.out.print("输入路由键: ");
            String key = new Scanner(System.in).nextLine();
            /**
             * 对“”默认交换机,路由键就是队列的队列名
             * 参数1: 交换机名
             * 参数2: routingKey, 路由键,
             * 参数3: 其他配置属性
             * 参数4: 发布的消息数据 
             */
            channel.basicPublish("direct_logs",key,null,message.getBytes());
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

消费者代码

        //1.定义名字为 direct_logs 的交换机, 它的类型是 "direct"
        channel.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
        //2.创建随机队列
        String queue = UUID.randomUUID().toString();
        channel.queueDeclare(queue, false, true, true, null);
        //3.该队列绑定到 direct_logs 交换机绑定,设定绑定键
        System.out.print("输入绑定键,用空格隔开: ");
        String[] arry = new Scanner(System.in).nextLine().split("\\s+");//\s是空白字符  +表示1到多个
        for (String k : arry) {
            channel.queueBind(queue, "direct_logs", k);
        }
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            String s = new String(message.getBody());
            String routingKey = message.getEnvelope().getRoutingKey();
            System.out.println(routingKey+"------------"+s);
        };
        //接收消息
        channel.basicConsume(queue, true, deliverCallback, (consumerTag) -> {
        });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

5 Topic主题模式

路由模式的一种,将路由键和某模式进行匹配,此时队列需要绑定在一个模式上。消息产生者产生消息,把消息交给交换机,交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息进行消费

主题交换机 Topic exchange
  • 1

发送到Topic交换机的消息,它的的routingKey,必须是由点分隔的多个单词,。routingKey可以有任意多的单词,最多255个字节。
bindingKey也必须采用相同的形式。Topic交换机的逻辑与直连交换机类似——使用特定routingKey发送的消息将被传递到所有使用匹配bindingKey绑定的队列。bindingKey有两个重要的特殊点:

 *可以通配单个单词。
 #可以通配零个或多个单词。
  • 1
  • 2

在这里插入图片描述
举例:将routingKey设置为"quick.orange.rabbit"的消息将被发送到两个队列。消息 "lazy.orange.elephant“也发送到它们两个。另外”quick.orange.fox“只会发到第一个队列,”lazy.brown.fox“只发给第二个。”lazy.pink.rabbit“将只被传递到第二个队列一次,即使它匹配两个绑定。”quick.brown.fox"不匹配任何绑定,因此将被丢弃。

如果我们违反约定,发送一个或四个单词的信息,比如"orange“或”quick.orange.male.rabbit",会发生什么?这些消息将不匹配任何绑定,并将丢失。

另外,“lazy.orange.male.rabbit”,即使它有四个单词,也将匹配最后一个绑定,并将被传递到第二个队列。
生产着代码

        //创建交换机名,并知道交换机类型为topic
        channel.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
        //向交换机发送消息,携带关键词(路由键)
        while (true) {
            System.out.print("输入消息: ");
            String message = new Scanner(System.in).nextLine();
            System.out.print("输入路由键: ");
            String key = new Scanner(System.in).nextLine();
            /**
             * 对“”默认交换机,路由键就是队列的队列名
             * 参数1: 交换机名
             * 参数2: routingKey
             * 参数3: 其他配置属性
             * 参数4: 发布的消息数据
             */
            channel.basicPublish("topic_logs", key, null, message.getBytes());
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

消费者

        //1.创建交换机
        channel.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
        //2.创建随机队列
        String queue = UUID.randomUUID().toString();
        channel.queueDeclare(queue, false, true, true, null);
        //3.把该队列绑定到 topic_logs 交换机,允许使用多个 bindingKey
        System.out.print("输入绑定键,用空格隔开: ");
        String[] arry = new Scanner(System.in).nextLine().split("\\s+");//\s是空白字符  +表示1到多个
        for (String k : arry) {
            channel.queueBind(queue, "topic_logs", k);
        }
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            String s = new String(message.getBody());
            String routingKey = message.getEnvelope().getRoutingKey();
            System.out.println(routingKey + "------------" + s);
        };
        //接收消息
        channel.basicConsume(queue, true, deliverCallback, (consumerTag) -> {
        });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

6 RPC模式

RPC 模式就是通过消息队列(Message Queue)来实现rpc的功能。就是客户端向服务端发送定义好的Queue消息,其中携带的消息就应该是服务端将要调用的方法的参数 ,并使用Propertis告诉服务端将结果返回到指定的Queue。
在这里插入图片描述

图解基本概念:
  • 1

allback queue 回调队列,客户端向服务器发送请求,服务器端处理请求后将其处理结果保存在一个存储体(回调队列)中。而客户端为了获得处理结果,那么客户在向服务器发送请求时,同时发送一个回调队列地址reply_to。服务器便将处理完的请求结果丢进这个回调队列中。

Correlation id 关联标识,客户端可能会发送多个请求给服务器,当服务器处理完后,客户端无法辨别在回调队列中的响应具体和那个请求时对应的。为了处理这种情况,客户端在发送每个请求时,同时会附带一个独有correlation_id属性(我们可以把它理解为一个请求ID),这样客户端在回调队列中根据correlation_id字段的值就可以分辨此响应属于哪个请求。

图解流程说明:
  • 1
  1. 对于RPC请求,客户端发送一条带有两个属性的消息:replyTo,设置为仅为请求创建的匿名独占队列,和correlationId,设置为每个请求的惟一id值。
  2. 请求被发送到rpc_queue队列。
  3. RPC工作进程(即:服务器)在队列上等待请求。当一个请求出现时,它执行任务,并使用replyTo字段中的队列将结果发回客户机。
  4. 客户机在回应消息队列上等待数据。当消息出现时,它检查correlationId属性。如果匹配请求中的值,则向程序返回该响应数据。
RPCClient client = new RPCClient();
String result = client.call("4");
System.out.println( "第四个斐波那契数是: " + result);
  • 1
  • 2
  • 3

回调队列 Callback Queue
使用RabbitMQ去实现RPC很容易。一个客户端发送请求信息,并得到一个服务器端回复的响应信息。为了得到响应信息,我们需要在请求的时候发送一个“回调”队列地址。我们可以使用默认队列。下面是示例代码:

//定义回调队列,
//自动生成对列名,非持久,独占,自动删除
callbackQueueName = ch.queueDeclare().getQueue();

//用来设置回调队列的参数对象
BasicProperties props = new BasicProperties
                            .Builder()
                            .replyTo(callbackQueueName)
                            .build();
//发送调用消息
ch.basicPublish("", "rpc_queue", props, message.getBytes());
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
消息属性 Message Properties
AMQP 0-9-1协议定义了消息的14个属性。大部分属性很少使用,下面是比较常用的4个:
deliveryMode:将消息标记为持久化(值为2)或非持久化(任何其他值)。
contentType:用于描述mime类型。例如,对于经常使用的JSON格式,将此属性设置为:application/json。
replyTo:通常用于指定回调队列。
correlationId:将RPC响应与请求关联起来非常有用。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

客户端完整代码

import java.io.IOException;
import java.util.Scanner;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.AMQP.BasicProperties;

public class RPCClient {
    Connection con;
    Channel ch;

    public RPCClient() throws Exception {
        ConnectionFactory f = new ConnectionFactory();
        f.setHost("1.13.9.40");
        f.setUsername("admin");
        f.setPassword("admin");
        con = f.newConnection();
        ch = con.createChannel();
    }

    public String call(String msg) throws Exception {
        //自动生成对列名,非持久,独占,自动删除
        String replyQueueName = ch.queueDeclare().getQueue();
        //生成关联id
        String corrId = UUID.randomUUID().toString();

        //设置两个参数:
        //1. 请求和响应的关联id
        //2. 传递响应数据的queue
        BasicProperties props = new BasicProperties.Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();
        //向 rpc_queue 队列发送请求数据, 请求第n个斐波那契数
        ch.basicPublish("", "rpc_queue", props, msg.getBytes("UTF-8"));

        //用来保存结果的阻塞集合,取数据时,没有数据会暂停等待
        BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);

        //接收响应数据的回调对象
        DeliverCallback deliverCallback = new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
                //如果响应消息的关联id,与请求的关联id相同,我们来处理这个响应数据
                if (message.getProperties().getCorrelationId().contentEquals(corrId)) {
                    //把收到的响应数据,放入阻塞集合
                    response.offer(new String(message.getBody(), "UTF-8"));
                }
            }
        };

        CancelCallback cancelCallback = new CancelCallback() {
            @Override
            public void handle(String consumerTag) throws IOException {
            }
        };

        //开始从队列接收响应数据
        ch.basicConsume(replyQueueName, true, deliverCallback, cancelCallback);
        //返回保存在集合中的响应数据
        return response.take();
    }

    public static void main(String[] args) throws Exception {
        RPCClient client = new RPCClient();
        while (true) {
            System.out.print("求第几个斐波那契数:");
            int n = new Scanner(System.in).nextInt();
            String r = client.call(""+n);
            System.out.println(r);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80

服务端代码

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.*;

import java.io.IOException;

public class RPCServer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory f = new ConnectionFactory();
        f.setHost("1.13.9.40");
        f.setPort(5672);
        f.setUsername("admin");
        f.setPassword("admin");
        Connection c = f.newConnection();
        Channel ch = c.createChannel();
        /*
         * 定义队列 rpc_queue, 将从它接收请求信息
         * 参数:
         * 1. queue, 对列名
         * 2. durable, 持久化
         * 3. exclusive, 排他
         * 4. autoDelete, 自动删除
         * 5. arguments, 其他参数属性
         */
        ch.queueDeclare("rpc_queue", false, false, false, null);
        ch.queuePurge("rpc_queue");//清除队列中的内容

        ch.basicQos(1);//一次只接收一条消息

        //收到请求消息后的回调对象
        DeliverCallback deliverCallback = new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
                //处理收到的数据(要求第几个斐波那契数)
                String msg = new String(message.getBody(), "UTF-8");
                int n = Integer.parseInt(msg);
                //求出第n个斐波那契数
                int r = fbnq(n);
                String response = String.valueOf(r);

                //设置发回响应的id, 与请求id一致, 这样客户端可以把该响应与它的请求进行对应
                BasicProperties replyProps = new BasicProperties.Builder()
                        .correlationId(message.getProperties().getCorrelationId())
                        .build();
                /*
                 * 发送响应消息
                 * 1. 默认交换机
                 * 2. 由客户端指定的,用来传递响应消息的队列名
                 * 3. 参数(关联id)
                 * 4. 发回的响应消息
                 */
                ch.basicPublish("", message.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
                //发送确认消息
                ch.basicAck(message.getEnvelope().getDeliveryTag(), false);
            }
        };

        CancelCallback cancelCallback = new CancelCallback() {
            @Override
            public void handle(String consumerTag) throws IOException {
            }
        };
        //消费者开始接收消息, 等待从 rpc_queue接收请求消息, 不自动确认
        ch.basicConsume("rpc_queue", false, deliverCallback, cancelCallback);
    }

    protected static int fbnq(int n) {
        if (n == 1 || n == 2) return 1;
        return fbnq(n - 1) + fbnq(n - 2);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70

六 、virtual host

在RabbitMQ中叫做虚拟消息服务器VirtualHost,每个VirtualHost相当于一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通

创建virtual host: /pd
  • 1

进入虚拟机管理界面
在这里插入图片描述
添加新的虚拟机’/pd’,名称必须以"/"开头
在这里插入图片描述
查看添加的结果
在这里插入图片描述
设置虚拟机的用户访问权限,点击 /pd 虚拟主机, 设置用户 admin 对它的访问权限
在这里插入图片描述
想了解springboot整合rabbitmq工作模式使用点进去

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/603519
推荐阅读
相关标签
  

闽ICP备14008679号