当前位置:   article > 正文

RabbitMQ实战

RabbitMQ实战

1、介绍

1.1、作用

  • 解耦:在项目启动之初来预测将来会碰到什么需求是极其困难的。消息中间件在处理过程

    中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口,这允许你独

    立地扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束即可

  • 冗余〈存储) 有些情况下,处理数据的过程会失败。消息中间件可以把数据进行持久化直

    到它们已经被完全处理,通过这一方式规避了数据丢失风险。在把 个消息从消息中间件中删

    除之前,需要你的处理系统明确地指出该消息己经被处理完成,从而确保你的数据被安全地保

    存直到你使用完毕。

  • 扩展性: 因为消息中间件解耦了应用的处理过程,所以提高消息入队和处理的效率是很容

    易的,只要另外增加处理过程即可,不需要改变代码,也不需要调节参数。

  • 削峰: 在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常

    见。如果以能处理这类峰值为标准而投入资源,无疑是巨大的浪费,使用消息中间件能够使关

    键组件支撑突发访问压力,不会因为突发的超负荷请求而完全崩惯

  • 可恢复性: 当系统一部分组件失效时,不会影响到整个系统 消息中间件降低了进程间的

    稿合度,所以即使 个处理消息的进程挂掉,加入消息中间件中的消息仍然可以在系统恢复后

    进行处理

  • 顺序保证: 在大多数使用场景下,数据处理的顺序很重要,大部分消息中间件支持一定程

    度上的顺序性。

  • 缓冲: 在任何重要的系统中,都会存在需要不同处理时间的元素。消息中间件通过 个缓

    冲层来帮助任务最高效率地执行,写入消息中间件的处理会尽可能快速 该缓冲层有助于控制

    和优化数据流经过系统的速度。

  • 异步通信: 在很多时候应用不想也不需要立即处理消息 消息中间件提供了异步处理机制,

    允许应用把 些消息放入消息中间件中,但并不立即处理它,在之后需要的时候再慢慢处理

1.2、消息组成

消息一般可以包含两个部分:消息体(payload)和标签(Label)

  • 消息体:消息体一般是一个带有业务逻辑结构的数据,比如一个 JSON 字符串
  • 标签:用来表述这条消息,比如一个交换器的名称和一个路由键

消费者连接到 RabbitMQ 服务器,并订阅到队列上。当消费者消费一条消息时,只是消费消息的消息体。在消息路由的过程中,消息的标签会丢弃,存入到队列中的消息只有消息体,消费者也只会消费到消息体,也就不知道消息的生产者是谁,当然消费者也不需要知道

1.3、队列

RabbitMQ中的消息都只能存储在队列中,这一点和Katka相反,Katka 将消息存储在topic (主题)这个逻辑层面,而相对应的队列逻辑只是 topic 实际存储文件中的位移标识。 RabbitMQ 的生产者生产消息并最终投递到队列中,消费者可以从队列中获取消息并消费。

多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊 CRound-Robin ,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。

RabbitMQ 不支持队列层面的广播消费,如果需要广播消费,需要在其上进行二次开发,处理逻辑会变得异常复杂,同时也不建议这么做。

1.4、交换机

交换器将消息路由到一个或者多个队列中,如果路由不到,或许会返回给生产者,或许直接丢弃。

生产者将消息发送给交换器时, 需要一个RoutingKey。交换器和队列绑定时需要一个BindingKey。当BindingKey和RoutingKey相匹时, 消息会被路由到对应的队列中。在绑定多个队列到同一个交换器的时候,这些绑定允许使用相同的BindingKey。

BindingKey并不是在所有的情况下都能生效,它依赖于交换器类型。如fanout 类型的交换器就会无视BindingKey,而是将消息路由到所有绑定到该交换器的队列中。

1.5、AMQP协议

RabbitMQ就是AMQP协议的 Erlang 的实现。 AMQP模型架构和RabbitMQ 的模型架构是一样的,生产者将消息发送给交换器,交换器和队列绑定。。当生产者发送消息时所携带的 RoutingKey 与绑定时 BindingKey 匹配时,消息即被存入相应的队列之中,消费者可以订阅相应的队列来获取消息。

AMQP 协议本身包括三层。

  • Module Layer: 位于协议最高层,主要定义了一些供客户端调用的命令,客户端可以利用这些命令实现自己的业务逻辑。例如,客户端可以使用 Queue Declare 命令声明一个队列或者使用 Basic.Consume 订阅消费一个队列中的消息。
  • Session Layer: 位于中间层,主要负责将客户端的命令发送给服务器,再将服务端的应答返回给客户端,主要为客户端与服务器之间的通信提供可靠性同步机制和错误处理。
  • Transport Layer: 位于最底层,主要传输二进制数据流,提供帧的处理、信道复用、错误检测和数据表示等。

2、组件

2.1、交换器

2.1.1、声明交换器

exchangeDeclare,交换器声明有多个重载方法,这些重载方法都是由下面这个方法中缺省的某些参数构成的。

public Exchange.DeclareOk exchangeDeclare(String exchange , String type , boolean durable , boolean autoDelete , boolean internal, Map<String, Object> arguments) throws IOException ; 
  • 1
  • 返回值Exchange.DeclareOk是用来标识成功声明了一个交换器

  • exchange:交换器的名称

  • type:交换器的类型,常见的如 fanout、direct、topic、headers

  • durable: 设置是否持久化,设置为 true 表示持久化, 反之是非持久 。持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息

  • autoDelete:设置是否自动删除, true表示自动删除。自动删除的前提是至少有一个队列或者交换器与这个交换器绑定过,绑定之后所有与这个交换器绑定的队列或者交换器都与此解绑。全部解绑后该交换器自动删除。

    • 解绑后,即使交换器是持久化的,也会自动删除

    • 当有未解绑的队列或交换器时,重启rabbitmq服务、或者与此交换器连接的客户端都断开时,RabbitMQ都不会删除该交换器

  • internal:设置是否是内置的。如果设置为 true,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。

  • argument 其他一些结构化参数。

2.1.2、删除交换器
public Exchange.DeleteOk exchangeDelete(String exchange, boo1ean ifUnused) throws IOException;
  • 1
  • exchange:表示交换器的名称
  • ifUnused: 设置是否在交换器没有被使用的情况下删除,如果 isUnused设置为 true,则只有在此交换器没有被使用的情况下才会被删除。如果设置 false,则无论如何这个交换器都要被删除。
2.1.2、检测交换器是否存在
public Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException; 
  • 1
  • 如果存在则正常返回DeclareOk对象
  • 如果不存在则抛出异常:404 channel exception ,同时 Channel 也会被关闭。

2.2、队列

2.2.1、声明队列
Queue.DeclareOk queueDeclare() throws IOException; 
  • 1
  • 默认创建一个由RabbitMQ命名的(类似amq.gen-LhQzlgv3GhDOv8PIDabOXA的名称,这种队列也称之为匿名队列〉、排他的、自动删除的、非持久化的队列
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, 
boolean autoDelete, Map<Str ng Object> arguments) throws IOException; 
  • 1
  • 2
  • queue:队列的名称

  • durable: 设置是否持久化。为true则设置队列为持久化。持久化的队列会存盘,在服务器重启的时候可以保证不丢失相关信息

  • exclusive:设置是否排他。为true则设置队列为排他的。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除

    • 排他队列是基于连接( Connection) 可见的,同一个连接的不同信道 (Channel)是可以同时访问同一连接创建的排他队列
    • "首次"是指如果一个连接己经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同(非排他队列可重复声明,但只创建一个)
    • 即使排他队列是持久化的,一旦声明该队列的连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景
  • autoDelete: 设置是否自动删除。为true则设置队列为自动删除(必须有至少一个消费者连接再断开后才能自动删除)

    • 自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。即使该队列是持久化的也会删除
    • 重启服务不会自动删除
  • argurnents: 设置队列的其他一些参数

2.2.2、清空队列消息
public Queue.PurgeOk queuePurge(String queue) throws IOException;
  • 1
  • queue:表示队列名称
2.2.2、删除队列
public Queue.De1eteOk queueDe1ete(String queue ,boo1ean ifUnused, boolean ifEmpty) 
throws IOExcept on;
  • 1
  • 2
  • queue:表示队列名称
  • ifUnused: 设置是否在队列没有被使用的情况下删除,如果 isUnused设置为 true,则只有在此队列没有被使用的情况下才会被删除。如果设置 false,则无论如何这个队列都要被删除
  • ifEmpty:设置为true表示在队列为空(队列里面没有任何消息堆积)的情况下才能够删除
2.1.2、检测队列是否存在
public Queue.Dec1areOk queueDec1arePassive(String queue) throws IOException;
  • 1
  • 如果存在则正常返回DeclareOk对象
  • 如果不存在则抛出异常:404 channel exception ,同时 Channel 也会被关闭

2.3、队列绑定交换器

2.3.1、绑定
public Queue.BindOk queueBind(String queue , String exchange , String routingKey, 
Map<String, Object> arguments) throws IOExcept on;
  • 1
  • 2
  • queue: 队列名称

  • exchange: 交换器的名称

  • routingKey: 用来绑定队列和交换器的路由键

  • argument: 定义绑定的一些参数

2.3.2、解绑
public Queue.UnbindOk queueUnbind (String queue , String exchange , String routingKey, 
Map<String , Object> arguments) throws IOException;
  • 1
  • 2

2.4、交换器绑定交换器

public Exchange.BindOk exchangeBind(String destination, String source , String 
routingKey, Map<String , Object> arguments) throws IOException ;
  • 1
  • 2
  • destination:目标路由器名称(接收转发)

  • source:资源路由器(转发消息)

  • routingKey:用来绑定交换器和交换器的路由键

  • argument: 定义绑定的一些参数

示例代码
// 声明资源交换器(类型可不同)
channel.exchangeDeclare( "source" , "direct" , false , true , null) ;
// 声明目标路由器(类型可不同)
channel.exchangeDeclare( "destination" , "fanout" , false , true , null);
// 交换器绑定交换器
channel.exchangeBind( "destination" , "source" , "exKey");
// 声明队列
channel.queueDeclare( "queue" , false , false , true , null);
// 队列绑定交换器
channel.queueBind("queue" , "destination"); 
// 发送消息
channel.basicPublish("source" , "exKey" , null , "exToExDemo".getBytes ());
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

2.5、何时创建队列

  • RabbitMQ的消息存储在队列中,交换器并不真正耗费服务器的性能,而队列会

  • 如果发送消息的交换器没有绑定任何队列,那么消息将会丢失

  • 交换器绑定了某个队列,但是发送消息时的路由键无法与现存的队列匹配,那么消息也会丢失

  • 使用预先分配创建队列资源的静态方式还是代码中动态声明式的创建队列,需要从业务逻辑本身、公

    司运维体系和公司硬件资源等方面考虑

3、消息的发送和接收

3.1、发送消息

public void basicPublish(String exchange , String routingKey, boolean mandatory, 
boolean immediate , BasicProperties props, byte[] body) throws IOException;
  • 1
  • 2
  • exchange:交换器名称,如果设置为空字符串,则消息会被发送到RabbitMQ默认的交换器中
  • routingKey:用来绑定队列和交换器的路由键,交换器根据路由键将消息存储到相应的队列之中
  • props:消息的基本属性集,其包含14个属性成员,分别有 contentType、contentEncoding、headers(Map<String, Object>)、deliveryMode、priority、correlationld、replyTo、expiration messageld、timestamp、type、userld、appld、cluster
  • body:消息体 ay1oad ,真正需要发送的消息
  • mandatory:查看示例代码
  • immediate:查看示例代码
3.1.1、props参数示例代码
  • 只展示了props常用的一些属性
AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
    .contentType("text/plain")
    // 持久化的消息
    .deliveryMode(2)
    // 消息的优先级
    .priority(1)
    .userId("hidden")
    // header信息
    .headers(new HashMap<>())
    // 消息过期时间
    .expiration("60000")
    .build();
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, props, message.getBytes());
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
3.1.2、mandatory参数示例代码
  • 设置为 true 时,当交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么 RabbitMq就会调用 Basic.Return 命令将消息返回给生产者

  • 设置为 false 时,出现上述情形,则消息直接被丢弃(发送消息时不设置该值,默认值为false)

// 发送消息时交换器根据路由键ROUTING_KEY_NO找不到对应的队列,mandatory = true
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY_NO, true, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
// 监听被返回的消息
channel.addReturnListener(new ReturnListener() {
    @Override
    public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] body) throws IOException {
        String message = new String(body);
        System.out.println("Basic.Return 返回的结果是: " + message);
    }
});
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
3.1.3、immediate参数示例代码

发送消息时不设置该值,默认值为false

  • 设置为 true 时,如果交换器在将消息路由到队列时发现队列上并不存在任何消费者,那么这条消息将不会存入队列中。当与路由键匹配的所有队列都没有消费者时该消息会通过 Basic Return 返回至生产者
  • mandatory参数告诉服务器至少将该消息路由到一个队列中,否则将消息返回给生产者。 imrnediate 参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递;如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了
  • RabbitMQ3.0版本开始去掉了对 imrnediate 参数的支持,RabbitMq官方解释是imrnediate 参数会影响镜像队列的性能,增加了代码复杂性,建议采用 TTL和DLX 的方法替代
3.1.4、备份交换器
  • 如果既不想设置 mandatory 参数,创建监听器复杂化生产者的编程逻辑,又不想消息丢失,那么可以使用备份交换器,这样可以将未被路由的消息存储在 RabbitMQ 中,再在需要的时候去处理这些消息
  • 添加备份交换器的方式(如果两者同时使用,则前者的优先级更高,会覆盖掉 Policy 的设置):
    • 声明交换器(调用 channel.exchangeDeclare方法)的时候添加alternate-exchange 参数
    • 通过策略Policy的方式实现
  • 消息被重新发送到备份交换器时的路由键和从生产者发出的路由键是一样的,如果路由键不能匹配到备份交换器上的任何队列,消息会丢失
  • 如果备份交换器和 mandatory 参数一起使用,那么 mandatory 参数无效
Map<String, Object> arguments = new HashMap<>();
// 设置备份交换器
arguments.put("alternate-exchange", "_1_backup_exchange");
// 声明主交换器(路由模式)持久化的(消息服务重启后,该交换机不会消失)durable = true、非自动删除的、有备份交换器的
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, arguments);
// 持久化的(消息服务重启后,该队列不会消失)durable = true、非排他、非自动删除的
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 通过路由绑定交换机和队列(此处的key就是BindingKey)
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

// 声明备份交换器(广播模式)
channel.exchangeDeclare("_1_backup_exchange", "fanout", true, false, null);
channel.queueDeclare("_1_backup_queue", true, false, false, null);
channel.queueDeclare("_2_backup_queue", true, false, false, null);
channel.queueBind("_1_backup_queue", "_1_backup_exchange", "");
channel.queueBind("_2_backup_queue", "_1_backup_exchange", "");
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
3.1.5、生产者确认消息

当生产者将消息发送出去之后,默认情况下生产者是不知道消息有没有正确地到达服务器。如果在消息到达服务器之前己经丢失,持久化操作也解决不了这个问题,因为消息根本没有到达服务器

RabbitMQ提供了两种生产者消息确认方式:

  • 通过事务机制实现:

  • 通过发送方确认 publisher confirm 机制实现

事务机制

事务机制在一条消息发送之后会使发送端阻塞,以等待 RabbitMQ 的回应,之后才能继续发送下一条消息。所以采用事务机制实现会严重降低 RabbitMQ 的消息吞吐量

确认流程
  • 客户端发送 Tx.Select,将信道置为事务模式
  • Broker 回复 Tx.Select-Ok,确认己将信道置为事务模式
  • 在发送完消息之后,客户端发送 Tx.Commit 提交事务
  • Broker回复Tx.Commit-Ok,确认事务提交
  • 在事务提交前程序抛异常,客户端发送 Tx.Rollback事务回滚
  • Broker回复Tx.Rollback-Ok,确认事务回滚
示例代码
// 特别注意:该事务是每次提交一条消息,回滚也只是回滚一条
channel.txSelect();
for (int i = 0; i < 1000; i++) {
    try {
        String message = "生成的message - " + i;
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
        if (i == 100) {
            int m = 1 / 0;
        }
        channel.txCommit();
    } catch (Exception e) {
        channel.txRollback();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
发送方确认机制
确认流程
  • 生产者将信道设置成 confirm(确认)模式,一旦信道进入 confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的 ID(从1开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ 会发送一个确认(Basic.Ack) 给生产者(包含消息的唯一 ID) ,这就使得生产者知晓消息已经正确到达了目的地了

  • 如果消息和队列是可持久化的,那么确认消息会在消息写入磁盘之后发出。 RabbitMQ 回传给生产者的确认消息中的 deliveryTag 包含了确认消息的序号,此外 RabbitMQ 可以设置 channel.basicAck 方法中的 multiple 参数,表示到这个序号之前的所有消息都己经得到了处理

  • 发送方确认机制最大的好处在于它是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用程序便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack (Basic.Nack) 命令,生产者应用程序同样可以在回调方法中处理该 nack (否定确认)命令

  • 生产者通过调用 channel.confirmSelect 方法(即 Confirm.Select 命令)将信道设置为 confirm模式,之后 RabbitMQ 会返回 Confirm Select-Ok 命令表示同意生产者将当前信道设置为 confirm 模式。所有被发送的后续消息都被 ack 或者 nack 一次,不会出现一条消息既被 ack 又被 nack 情况,并且 RabbitMQ 并没有对消息被confirm 的快慢做任何保证

单条确认示例代码
// 设置信道为 confirm 模式
channel.confirmSelect();
for (int i = 0; i < 1000; i++) {
    String message = "生成的message - " + i;
    // 发送持久化的消息 - deliveryMode = 2(persistent)
    channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
    // 带有超时时间的等待broke确认,发一条消息后等服务器确认,这是一种串行同步等待的方式
    if (!channel.waitForConfirms(1000)) {
        System.out.println( " send message failed" ) ;
        // 做消息重发逻辑
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
注意要点
  • 如果信道没有开启 publisher confirm 模式,调用任何 waitForConfirms 方法都会报出java.lang.IllegalStateException

  • 对于没有参数的 waitForConfirms 方法来说,其返回的条件是客户端收到了相应的Basic.Ack/.Nack 或者被中断。参数 timeout 表示超时时间,一旦等待 RabbitMQ 回应超时就会抛出 java.util.concurrent.TimeoutException 的异常

  • waitForConfirmsOrDie 方法在接收到 RabbitMQ 返回Basic.Nack 之后会抛出 java.io.IOException 。业务代码可以根据自身的特性灵活地运用这四种方法来保障消息的可靠发送

  • 对于持久化的消息来说,该方式和事务机制方式都需要等待消息确认落盘之后才会返回(调Linux内核的 fsync 方法) 。在同步等待的方式下, publisher confirm 机制发送一条消息需要通信交互的命令是 :Basic.Publish和Basic .Ack;,而事务机制是 :Basic.Publish、Tx.Commmit及Tx.Commit-Ok (或者Tx .Rollback及Tx .Rollback-Ok) 。事务机制比confirm机制多了一个命令帧报文的交互,所以 QPS 会略微下降,相差很小。

批量确认示例代码
  • 批量 confirm 方式的缺点在于遇到 RabbitMQ服务端返回Basic.Nack未确认时,需要重新发送当前批的所有消息,故而会导致性能降低
List<String> list = new ArrayList<>();
final int BATCH_COUNT = 1000;
channel.confirmSelect();
int MsgCount = 0;
while (true) {
    String message = "生成的message - " + MsgCount;
    // 发送持久化的消息 - deliveryMode = 2(persistent)
    channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
    //将发送出去的消息存入缓存中
    list.add(message);
    if (++MsgCount >= BATCH_COUNT) {
        MsgCount = 0;
        try {
            if (channel.waitForConfirms()) {
                //确认成功,将缓存中的消息清空
                list.clear();
            }
            // 确认失败,将缓存中的消息重新发送
        } catch (InterruptedException e) {
            e.printStackTrace();
            // 确认失败,将缓存中的消息重新发送
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
异步确认示例代码
  • 强烈建议使用异步 confirm 的方式

  • multiple参数如何指定,或是由broke服务器自主选择

// 消息序列号列表
SortedSet confirmSet = Collections.synchronizedSortedSet(new TreeSet());
// 开启confirm模式
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {

    // 消息确认处理
    @Override
    public void handleAck(long deliveryTag, boolean multiple) {
        System.out.println("消息序列号 : " + deliveryTag + ", 是否批量确认 : " + multiple);
        if (multiple) {
            // 批量确认时,将未确认消息列表中的当前序列号之前的消息批量确认删除
            confirmSet.headSet(deliveryTag - 1).clear();
        } else {
            // 单条确认时,只删除当前序列号的消息
            confirmSet.remove(deliveryTag);
        }
    }

    // 消息未确认处理
    @Override
    public void handleNack(long deliveryTag, boolean multiple) {
        if (multiple) {
            confirmSet.headSet(deliveryTag - 1).clear();
        } else {
            confirmSet.remove(deliveryTag);
        }
        //添加处理消息重发的场景
    }
});

// 模拟循环发送消息
for (int i = 0; i < 1000; i++) {
    // 获取当前消息的序列号
    long nextPublishSeqNo = channel.getNextPublishSeqNo();
    String message = "生成的message - " + i;
    channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
    // 将当前发送的消息序列号添加到列表中
    confirmSet.add(nextPublishSeqNo);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
3.1.6、消息分发
轮询机制
  • RabbitMQ 队列拥有多个消费者时 ,队列收到的消息将以轮询 (round-robin )的分发方式发送给消费者。每条消息只会发送给订阅队列的一个消费者。如果现在负载加重,那么只需要创建更多的消费者来消费处理消息即可。
  • 轮询分发机制,如果有n个消费者,那么 RabbitMQ会将第m条消息分发给第m%n (取余的方式)个消费者,如何每个消费者处理消息的速率差异较大,势必会有空闲的消费者,就会造成整体应用吞吐量的下降
公平机制
  • channel.basicQos(int prefetchCount)方法允许限制信道上的消费者所能保持的最大未确认消息的数量

  • 在订阅消费队列之前,消费端程序调用了 channel.basicQos(5) ,之后订阅了某个队列进行消费RabbitMq会保存一个消费者的列表,每发送一条消息都会为对应的消费者计数,如果达到了所设定的上限,那么 RabbitMQ 就不会向这个消费者再发送任何消息。直到消费者确认了某条消息之后 RabbitMQ 将相应的计数减1,之后消费者可以继续接收消息,直到再次到达计数上限

  • Basic.Qos的使用对于拉模式的消费方式无效

// prefetchSize表示消费者所能接收未确认消息的总体大小的上限,单位为B(字节),设置为0,则表示没有上限
// prefetchCount表示限制信道上的消费者所能保持的最大未确认消息的数量,设置为0,则表示没有上限
// global为false信道上新的消费者需要遵从prefetchCount的限定值(默认值)
// global为true信道上所有的消费者都需要遵从prefetchCount的限定值
void basicQos(int prefetchSize , int prefetchCount , boo1ean global);
  • 1
  • 2
  • 3
  • 4
  • 5
3.1.7、消息顺序性

消息的顺序性是指消费者消费到的消息和发送者发布的消息的顺序是一致的。在不使用任何 RabbitMq高级特性 ,也没有消息丢失、网络故障之类异常的情况发生,并且只有一个消费者的情况下,最好也只有一个生产者的情况下可以保证消息的顺序性。如果有多个生产者同时发送消息,无法确定消息到达 Broker 的前后顺序,也就无法验证消息的顺序性。

下列情形会打破RabbitMQ 的消息顺序性(不限于)

  • 发送消息之后遇到异常进行了事务回滚,如果补偿发送这条消息得是在另一个线程实现的
  • 启用 publisher confirm 时,在发生超时、中断,或是收到 RabbitMQBasic.Nack命令时,补偿发送结果与事务机制一样会错序
  • 消费设置了不同的过期时间的死信队列中的消息
  • 消息设置了优先级
  • 被拒绝的重入队列中的消息

如果要保证消息的顺序性,需要业务方使用 RabbitMQ 之后做进一步的处理,比如在消息体内添加全局有序标识(类似 Sequence ID) 来实现

3.1.8、消息传输保障

消息可靠传输一般是业务系统接入消息中间件时首要考虑的问题,一般消息中间件的消息传输保障分为三个层级。RabbitMQ 支持其中的"最多一次 “和"最少一次”。

  • 最多一次:消息可能会丢失,但绝不会重复传输

  • 最少一次:消息绝不会丢失,但可能会重复传输。

  • 恰好一次:每条消息肯定会被传输一次且仅传输一次

"最多一次"的方式生产者随意发送,消费者随意消费,不过这样很难确保消息不会丢失

"最少一次"投递实现需要考虑以下这个几个方面的内容:

  1. 消息生产者需要开启事务机制或者 publisher confirm 机制,以确保消息可以可靠地传输到 RabbitMQ 中。
  2. 消息生产者需要配合使用 mandatory 参数或者备份交换器来确保消息能够从交换器路由到队列中,进而能够保存下来而不会被丢弃。
  3. 消息和队列都需要进行持久化处理,以确保 RabbitMQ 服务器在遇到异常情况时不会造成消息丢失
  4. 消费者在消费消息的同时需要将 autoAck 设置为 false ,然后通过手动确认的方式去确认己经正确消费的消息,以避免在消费端引起不必要的消息丢失

“恰好一次” RabbitMQ目前无法保障的。

  • 消费者在消费完一条消息之后向RabbitMQ 发送确认 Basic.Ack 命令,此时由于网络断开或者其他原因造成 RabbitMQ并没有收到这个确认命令,那么 RabbitMQ 不会将此条消息标记删除。在重新建立连接之后,消费者还是会消费到这一条消息,这就造成了重复消费
  • 生产者在使用publisher confirm机制,发送完一条消息等待 RabbitMQ返回确认通知,此时网络断开,生产者捕获到异常情况,为了确保消息可靠性选择重新发送,这样 RabbitMQ 中就有两条同样,在消费的时候,消费者就会重复消费

去重处理一般是在业务客户端实现,比如引入GUID (Globally Unique Identifier) 的概念。针对GUID ,如果从客户端的角度去重,那么要引入集中式缓存,必然会增加依赖复杂度,另外缓存的大小也难以界定。建议在实际生产环境中,业务方根据自身的业务特性进行去重,比如业务消息本身具备幂等性,或者借助 Redis 等其他产品进行去重处理

3.2、消费消息

RabbitMQ的消费模式分两种:推(Push) 模式和拉(Pull) 模式

推模式采用 BasicConsume进行消费,而拉模式则是调用 BasicGet 进行消费

3.1.1、推模式
  • 接收消息一般通过实现Consumer接口或者继承DefaultConsumer类来实现。当调用与Consumer 相关的 API 方法时,不同的订阅采用不同消费者标签 (consumerTag) 来区分彼

  • 在同一个 Channe中的不同消费者也需要通过唯一的消费者标签以作区分

  • BasicConsume将信道 (Channel)置为接收模式,直到取消队列的订阅为止。在接收模式期间, RabbitMQ 会不断地推送消息给消费者,使用 BasicConsume方法可实现高吞吐量

  • 推送消息的个数还会受到basicQos的限制

    • basicQos必须配置消息非自动确认使用,即autoAck = false
    • basicQos(1)表示一次最多接收1个未被确认的消息(该消费者在接收到队列里的消息但没有返回确认结果之前,将不会有新的消息分发给它)
    // 1次能接收的未确认的最大消息数(接收的消息未确认数到达64时,不再被分发新消息)
    channel.basicQos(64);
    // 非自动确认
    boolean autoAck = false;
    // 消费者
    DefaultConsumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("message = " + new String(body));
            // 消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息(成功消费,消息从队列中删除 )
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
    };
    // 消费者订阅消息,手动确认
    channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
basicConsume方法详解
public String basicConsume(String queue ,boolean autoAck ,String consumerTag, 
boolean noLocal ,boolean exclusive ,Map<String,Object> arguments ,Consumer callback) 
throws IOException
  • 1
  • 2
  • 3
  • queue:队列的名称

  • autoAck:设置是否自动确认。建议设成false ,即不自动确认:

  • consumerTag:消费者标签,用来区分多个消费者

  • noLocal:设置为true则表示不能将同一个Connection中生产者发送的消息传送给这个Connection中的消费者

  • exclusive:设置是否排他

  • arguments:设置消费者的其他参数

  • callback:设置消费者的回调函数。用来处理RabbitMq推送过来的消息,比如DefaultConsumer使用时需要客户端重写 (override) 其中的方法。

3.1.2、拉模式
  • 如果只想从队列获得单条消息而不是持续订阅,建议使用Basic.Get进行消费
  • 不能将BasicGet放在一个循环里代替BasicConsume,这样做会严重影响RabbitMQ的性能
public GetResponse basicGet(String queue , boolean autoAck) throws IOException;
  • 1
  • autoAck设置为false时,同样需要消费者确认

3.3、消息的确认和拒绝

3.3.1、消息确认
  • 当 autoAck = false时, RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移去消息(实质上是先打上删除标记,之后再删除)

  • 当 autoAck 等于 true 时, RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息

  • 队列中的消息分成了两个部分:

    • 一部分是等待投递给消费者的消息(ready)
    • 一部分是己经投递给消费者,但是还没有收到消费者确认信号的消息(unacked)
  • RabbitMQ不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的依据是消费该消息的消费者连接是否己经断开,这么设计的原因是RabbitMQ允许消费者消费一条消息的时间可以很久

3.3.2、消息拒绝
  • 在消费者接收到消息后,如需要明确拒绝当前的消息而不是确认,即可使用以下两种方式
一次拒绝一条
public void basicReject(long deliveryTag, boolean multiple , boolean requeue) throws IOException;
  • 1
  • deliveryTag:消息的编号,是一个64位的长整型值,最大值是9223372036854775807
  • requeue:设置为 true ,RabbitMQ会重新将这条消息存入队列,以便发送给下一个订阅的消费者。requeue设置为 false ,则 RabbitMQ立即会把消息从队列中移除,而不会把它发送给新的消费者
批量拒绝
public void basicNack(long deliveryTag, boolean multiple , boolean requeue) throws IOException;
  • 1
  • deliveryTag、requeue的用法同上

  • multiple:设置为false,则表示拒绝编号为deliveryTag的这一条消息,此时basicNack和basicReject方法作用一样,都是拒绝一条消息

    multiple设置为true,则表示拒绝deliveryTag编号之前所有未被当前消费者确认的消息

3.4、消息重入队列

  • 请求RabbitMQ重新发送未被确认的消息进入队列
public Basic.RecoverOk basicRecover(boolean requeue) throws IOException;
  • 1
  • requeue:控制消息是否分配给同一个消费者

    • 设置为 true ,则未被确认的消息会被重新加入到队列中,对于同一条消息,可能会被分配给与之前不同的消费者

    • 设置为false,同一条消息会被分配给与之前相同的消费者

3.5、设置消息的TTL

有两种方法可以设置消息的TTL(time to live),若两种方法一起使用,则消息的TTL以两者之间较小的那个数值为准

  • 通过队列属性设置,队列中所有消息都有相同的过期时间
  • 通过消息本身进行单独设置,每条消息的TTL可以不同。
    • 消息在队列中的生存时一旦超过设置 TTL 值时,就会变成"死信" (Dead Message) ,消费者将无法再收到该消息
3.5.1、通过队列属性设置

通过队列属性设置消息TTL的方法是在声明队列时加入x-message-ttl参数来实现,单位是毫秒

  • 如果不设置 TTL,则表示此消息不会过期 ;
  • 如果将 TTL 设置为0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃,这个特性可以部分替代 RabbitMQ 3.0 版本之前的 immediate 参数,之所以部分代替,是因为 immediate 参数在投递失败时会用Basic Return 将消息返回
Map<String, Object> arguments = Maps.newHashMap();
// 队列中的消息5s后过期
arguments.put("x-message-ttl", 5000);
// 持久化的(消息服务重启后,该队列不会消失)durable = true、非排他、非自动删除的、消息可过期
channel.queueDeclare(QUEUE_NAME, true, false, false, map);
// 通过路由绑定交换机和队列(此处的key就是BindingKey)
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
3.5.2、通过消息本身设置

通过消息本身设置设置消息TTL,单位是毫秒

String message = "生成的message - " + i;
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
    .deliveryMode(2)
    // 消息的过期时间为5s
    .expiration(String.valueOf(5000)).build();
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, message.getBytes());
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
3.5.3、两种方式的比较
  • 设置队列TTL属性的方法,一旦消息过期,就会从队列中抹去
  • 设置消息过期属性的方法,即使消息过期,也不会马上从队列中抹去(也可能马上抹去,取决队列中消息数量的多少),因为每条消息是否过期是在即将投递到消费者之前判定的

为什么这两种方法处理的方式不一样?

因为第一种方法里,队列中己过期的消息肯定在队列头部, RabbitMQ只要定期从队头开始扫描是否有过期的消息即可。

而第二种方法里,每条消息的过期时间不同,如果要删除所有过期消息势必要扫描整个队列,所以不如等到此消息即将被消费时再判定是否过期,如果过期再进行删除即可。

3.6、设置队列的TTL

通过声明队列时,添加x-expires参数可以控制队列被自动删除前处于未使用状态的时间。未使用的意思是队列上没有任何的消费者,队列也没有被重新声明,并且在过期时间段内也未调用过 Basic.Get 命令。

设置队列里的TTL可以应用于类似RPC方式的回复队列,在RPC中,许多队列会被创建出来,但是却是未被使用的。

RabbitMQ会确保在过期时间到达后将队列删除,但是不保障删除的动作有多及时 。RabbitMQ重启后,持久化的队列的过期时间会被重新计算。

用于表示过期时间的 x-expires 参数以毫秒为单位,并且服从和x-message-ttl的约束条件,不过不能设置为0比如该参数设置为1000 ,则表示该队列如果在1秒钟之内未使用则会被删除

Map<String, Object> arguments = new HashMap<>();
// 队列未使用5s后删除
arguments.put("x-expires", 5000);
// 队列中的消息10s后过期(消息过期时间大于队列删除时间,不会影响队列的删除时间,删除后消息丢失)
arguments.put("x-message-ttl", 10000);
// 持久化的(消息服务重启后,该队列不会消失)durable = true、非排他、非自动删除的
channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

3.7、持久化

持久化可以提高 RabbitMQ 的可靠性,以防在异常情况(重启、关闭、宕机等)下的数据丢失。RabbitMQ的持久化分为三个部分:交换器的持久化、队列的持久化和消息的持久化。

  • 交换器、队列的持久化是通过在声明交换器、队列时将 durable参数置为 true
    • 队列的持久化并不能保证消息持久化,需要将消息也设置为持久化的
  • 消息持久化是通过将消息的投递模式(BasicProperties中的 deliveryMode(枚举) 属性)设置为2即可实现消息的持久化
    • 当队列被删除或者消失时,即使消息是持久化的也会丢失。

4、队列

4.1、死信队列

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3wHLhfBE-1661678466107)(C:\Users\wl\Desktop\笔记\image\RabbitMQ.assets\image-20220428092233878.png)]

DLX,全称为 Dead-Letter-Exchange,可以称之为死信交换器。当消息在一个队列中变成死信 (dea message) 之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX,绑定 DLX 的队列就称之为死信队列。

  • 消息变成死信一般是由于以下几种情况:

    • 消息被拒绝 (Basic.Reject/Basic.Nack) ,井且设置requeue参数为false(ture为重新放入原队列,false则直接丢弃)
    • 消息过期
    • 队列达到最大长度
  • DLX是一个正常的交换器,和一般的交换器没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中存在死信时,RabbitMQ 就会自动地将这个消息重新发布到设置的DLX 上去,进而被路由到另一个队列,即死信队列。

  • 死信队列的特性与将消息的 TTL设置为0配合使用可以弥补imrnediate参数的功能

  • 死信队列绑定死信交换器的路由键,可以和原队列和原交换器的路由键设置为不同

// 声明死信交换器(不设置参数,默认非持久化、非自动删除的),路由模式
channel.exchangeDeclare("dlx_exchange", "direct");
// 声明死信队列
channel.queueDeclare("dlx_queue", false, false, false, null);
// 绑定死信队列到死信交换器(具有路由key)
channel.queueBind("dlx_queue", "dlx_exchange", "dlx-routing-key");

Map<String, Object> map = new HashMap<>();
// 设置死信交换器
map.put("x-dead-letter-exchange", "dlx_exchange");
// 设置允许路由到死信交换器消息的路由key
map.put("x-dead-letter-routing-key", "dlx-routing-key");
// 持久化的(消息服务重启后,该交换器不会消失)durable = true、非自动删除的
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
// 持久化的(消息服务重启后,该队列不会消失)durable = true、非排他、非自动删除的,设置死信交换器
channel.queueDeclare(QUEUE_NAME, true, false, false, map);
// 绑定普通队列到普通交换器
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

4.2、延迟队列

"延迟消息"是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费

  • 延迟队列也是死信队列,不同点在于消费者订阅的是死信队列
  • 消息在队列中的过期时间就相当于需要延迟接收的时间

4.3、优先级队列

优先级队列,具有高优先级的队列具有高的优先权,优先级高的消息具备优先被消费的特权。

  • 优先级高的消息可以被优先消费,这个也是有前提的:如果在消费者的消费速度大于生产者的速度,且Broke 中没有消息堆积的情况下,对发送的消息设置优先级也就没有什么实际意义。因为生产者刚发送完一条消息就被消费者消费了,那么就相当于Broker中至多只有一条消息,对于单条消息来说优先级是没有什么意义的
  • 代码中设置消息的优先级为5,默认最0,最高为队列设置的最大优先级10
Map<String, Object> map = new HashMap<>();
// 设置队列的最大优先级 = 10
map.put("x-max-priority", 10);

// 持久化的(消息服务重启后,该交换机不会消失)durable = true、非自动删除的
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
// 持久化的(消息服务重启后,该队列不会消失)durable = true、非排他、非自动删除的,有队列优先级
channel.queueDeclare(QUEUE_NAME, true, false, false, map);
// 通过路由绑定交换机和队列(此处的key就是BindingKey)
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

String message = "生成的message";
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
    .deliveryMode(2)
    // 设置消息的优先级为5
    .priority(5)
    .build();
// 发送持久化的消息 - deliveryMode = 2(persistent)
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, message.getBytes());
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

4.4、RPC

RPC,是Remote Procedure Call 的简称,即远程过程调用

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-UQqhkdob-1661678466108)(C:\Users\wl\Desktop\笔记\image\RabbitMQ.assets\image-20220428113631428.png)]

4.4.1、RPC处理流程
  1. 当客户端启动时,创建一个匿名的回调队列(名称由RabbitMQ自动创建)
  2. 客户端为RPC请求设置2个属性:replyTo用来告知 RPC 服务端回复请求时的目的队列,即回调队列,correlationld 用来标记一个请求,即请求标识
  3. 请求的消息携带回调队列,该请求标识参数被发送到rpc_queue队列中
  4. RPC服务端订阅rpc_queue队列,当请求到来时,服务端会处理并且把带有结果的消息发送到回调队列中(replyTo参数的值)
  5. 客户端订阅回调队列,当有消息时,校验请求标识correlationld 属性,如果与请求匹配,即为调用结果
4.4.2、示例代码
连接工厂
public class RabbitmqFactory {

    private static final String IP_ADDRESS = "172.16.57.5";
    private static final int PORT = 5672;//RabbitMQ 服务端默认端口号为 5672

    protected Connection connection;

    protected Channel channel;

    /**
     * @return com.rabbitmq.client.Connection
     * @Description 获取连接
     * @Param
     */
    public void initRabbitmqConnection() {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(IP_ADDRESS);
        connectionFactory.setPort(PORT);
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("root");
        connectionFactory.setVirtualHost("rootDB");
        if (channel != null && channel.isOpen()) {
            try {
                channel.close();
            } catch (Exception e) {
                System.out.println("获取连接前,关闭信道失败");
                e.printStackTrace();
            }
        }
        if (connection != null && connection.isOpen()) {
            try {
                connection.close();
            } catch (Exception e) {
                System.out.println("获取连接前,关闭连接失败");
                e.printStackTrace();
            }
        }
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
        } catch (Exception e) {
            System.out.println("获取连接失败");
            e.printStackTrace();
        }
    }

    /**
     * 重连Rabbitmq
     */
    public void reConnect() {
        System.out.println("5s后重连MQ");
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        initRabbitmqConnection();
    }
	
    // 关闭连接
    public void shutdownConnect() {
        if (channel != null && channel.isOpen()) {
            try {
                channel.close();
            } catch (Exception e) {
                System.out.println("关闭信道失败");
                e.printStackTrace();
            }
        }
        if (connection != null && connection.isOpen()) {
            try {
                connection.close();
            } catch (Exception e) {
                System.out.println("关闭连接失败");
                e.printStackTrace();
            }
        }
    }

    public Connection getConnection() {
        return connection;
    }

    public Channel getChannel() {
        return channel;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
客户端(服务消费)
public class RPCClient {

    // 封装的获取连接、信道及关闭资源的工厂
    private RabbitmqFactory rabbitmqFactory;
    private String requestQueueName = "rpc_queue";
    private String replyQueueName;
    private QueueingConsumer consumer;

    public RPCClient() throws IOException, TimeoutException {
        rabbitmqFactory = new RabbitmqFactory();
        rabbitmqFactory.initRabbitmqConnection();
        Channel channel = rabbitmqFactory.getChannel();

        // 声明回调队列,并订阅回调队列
        replyQueueName = channel.queueDeclare().getQueue();
        consumer = new QueueingConsumer(channel);
        channel.basicConsume(replyQueueName, true, consumer);
    }

    public String call(String message) throws IOException, InterruptedException {
        String response;
        // 设置请求标识id
        String corrId = UUID.randomUUID().toString();
        // 设置消息参数:回调队列、请求标识
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();
        rabbitmqFactory.getChannel().basicPublish("", requestQueueName, props, message.getBytes());
        // 监听
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            if (delivery.getProperties().getCorrelationId().equals(corrId)) {
                response = new String(delivery.getBody());
                break;
            }
        }
        return response;
    }

    public static void main(String args[]) throws Exception {
        RPCClient fibRpc = new RPCClient();
        System.out.println(" [x) Requesting fib(30)");
        String response = fibRpc.call("30");
        System.out.println(" [.) Got '" + response + "'");
        fibRpc.rabbitmqFactory.shutdownConnect();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
服务端(服务提供)
public class RPCServer {

    private static final String RPC_QUEUE_NAME = "rpc_queue";

    public static void main(String args[]) throws Exception {
        RabbitmqFactory rabbitmqFactory = new RabbitmqFactory();
        rabbitmqFactory.initRabbitmqConnection();
        Channel channel = rabbitmqFactory.getChannel();

        channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
        channel.basicQos(1);
        System.out.println(" [x] Awaiting RPC requests ");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String response = "";
                try {
                    String message = new String(body, "UTF-8");
                    int n = Integer.parseInt(message);
                    System.out.println(" [ . ] fib( " + message + " ) ");
                    response += fib(n);
                } catch (RuntimeException e) {
                    System.out.println(" [.] " + e.toString());
                } finally {
                    AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
                            .correlationId(properties.getCorrelationId())
                            .build();
                    // 使用回调队列、请求标识
                    channel.basicPublish("", properties.getReplyTo(),
                            replyProps, response.getBytes("UTF-8"));
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
    }

    private static int fib(int n) {
        if (n == 0) return 0;
        if (n == 1) return 1;
        return fib(n - 1) + fib(n - 2);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

5、用户管理

5.1、多租户及权限

AMQP协议中并没有指定权限在Vhost级别还是在服务器级别实现,由具体的应用自定义。在RabbitMQ中,权限控制则是以Vhost为单位的 。当创建一个用户时,用户通常会被指派给至少一个vhost ,并且只能访问被指派的vhost内的队列、交换器和绑定关系等。因此,RabbitMQ中的授予权限是指在vhost级别对用户而言的权限授予

可配置权限:指的是队列和交换器的创建及删除之类的操作

可写权限:指的是发布消息

可读权限:指与消息有关的操作,包括读取消息及清空整个队列等

5.2、用户管理

RabbitMQ中,用户是访问控制(Access Control)的基本单元,且单个用户可以跨越vhost进行授权。针对一至多个vhost ,用户可以被赋予不同级别的访问权限,并使用标准的用户名和密码来认证用户。

用户的角色分为5种类型。

  • none:无任何角色。新创建的用户的角色默认为none

  • management:可以访问Web管理页面

  • policymaker:包含 management 的所有权限,并且可以管理策略(Policy)和参数( Parameter )

  • monitoring:包含 management 的所有权限,并且可以看到所有连接、信道及节点相关的信息

  • administartor:包含 monitoring的所有权限,井且可以管理用户、虚拟主机、权限、策略、参数等。 administartor代表了最高的权限

response.getBytes(“UTF-8”));
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
}

private static int fib(int n) {
    if (n == 0) return 0;
    if (n == 1) return 1;
    return fib(n - 1) + fib(n - 2);
}
  • 1
  • 2
  • 3
  • 4
  • 5

}


## 5、用户管理

### 5.1、多租户及权限

AMQP协议中并没有指定权限在Vhost级别还是在服务器级别实现,由具体的应用自定义。在RabbitMQ中,权限控制则是以Vhost为单位的 。当创建一个用户时,用户通常会被指派给至少一个vhost ,并且只能访问被指派的vhost内的队列、交换器和绑定关系等。因此,RabbitMQ中的授予权限是指在vhost级别对用户而言的权限授予

可配置权限:指的是队列和交换器的创建及删除之类的操作

可写权限:指的是发布消息

可读权限:指与消息有关的操作,包括读取消息及清空整个队列等

### 5.2、用户管理

RabbitMQ中,用户是访问控制(Access Control)的基本单元,且单个用户可以跨越vhost进行授权。针对一至多个vhost ,用户可以被赋予不同级别的访问权限,并使用标准的用户名和密码来认证用户。

用户的角色分为5种类型。

*  none:无任何角色。新创建的用户的角色默认为none

* management:可以访问Web管理页面

* policymaker:包含 management 的所有权限,并且可以管理策略(Policy)和参数( Parameter )

* monitoring:包含 management 的所有权限,并且可以看到所有连接、信道及节点相关的信息

* administartor:包含 monitoring的所有权限,井且可以管理用户、虚拟主机、权限、策略、参数等。 administartor代表了最高的权限

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/459423
推荐阅读
相关标签
  

闽ICP备14008679号