当前位置:   article > 正文

RabbitMQ基于Java实现消息应答

RabbitMQ基于Java实现消息应答

RabbitMQ

概念

RabbitMQ是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑RabbitMQ是一个快递站, 一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收,存储和转发消息数据、|

四大核心概念

生产者

产生数据发送消息的程序时生产者

交换机

交换机是RabbitMQ非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定。

队列

队列是RabbitMQ内部使用的一种数据结构,尽管消息流经RabbitMQ和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。 许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式

消费者

消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。 请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。

工作原理

Broker:接收和分发消息的应用,RabbitMQ Server就是Message Broker 一个交换机(Exchange)对应多个队列(Queue) 每一个生产者(Producer)与RabbitMQ Server建立连接,每一个连接(Connection)有多个信道(channel),在连接中通过信道发送信息给Broker。

Connection:publisher/consumer 和 broker 之间的 TCP 连接

Channel:如果每一次访问 RabbitMQ 都建立一个 Connection ,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效力也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别channel,所以 channel 之间是完全隔离的。Channel 作为轻量级 Connection 极大减少了操作系统建立TCP connection的开销。

Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct(point-to-point 点到点),topic(publish-subscribe)和 fanout(multicast)

Virrual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个RabbitMQ提供的服务时,可以划分出多个 virtual host(简称vhost),每个用户在自己的vhost创建 exchange/queue等

多租户:一个Broker中可以有多个用户,一个用户有自己的Exchange

Queue:消息最终被送到这里等待 consumer 取走

Binding:exchange 和 queue 之间的虚拟连接,binding 中 可以包含routing key,Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

Producer 发出消息 -> Connection(Channel) -> Broker -> Exchange -> 匹配查询表中的 routing key -> 分发到对应的queue中 -> Connection(Channel) -> 给对应的Consumer

为了让各个用户可以互不干扰的工作,RabbitMQ添加了虚拟主机(Virtual Hosts)的概念。其实就是一个独立的访问路径,不同用户使用不同路径,各自有自己的队列、交换机,互相不会影响。

Java实现

1.导入依赖

2.生产者代码

3.消费者代码

工作队列

工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。(异步处理

一个消息只能被处理一次,不可以被处理多次。

轮询分发消息

工作队列轮流接收消息

消息应答

概念

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理了一个长的任务并仅完成了部分突然它挂掉了,会发生什么情况。RabbitMQ一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费者的消息,因为消费者无法接收。

为了保证消息在发送过程中不丢失,RabbitMQ引入消息应答机制,消息应答就是:消费者在收到消息并且处理该消息之后,告诉RabbitMQ它已经处理了,RabbitMQ可以把该消息删除了。

自动应答

消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者端出现连接或者 channel 关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制。当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用于在消费者可以高效并以某种速率能够处理这些消息的情况下使用。

手动应答

消息在手动应答时是不丢失的,失败将会放回队列中重新消费。

方法
  • Channel.basicAck(DeliverTag deliverTag, multiple)(用于肯定确认)

    RabbitMQ 已知到该消息并且成功的处理消息,可以将其丢弃了

  • Channel.basicNack(CancelTag cancelTag , multiple)(用于否定确认)

  • Channel.basicReject(CancelTag cancelTag )(用于否定确认)

    不处理该消息了,直接拒绝,将消息丢弃

Multipart

手动应答的好处是可以批量应答并且减少网络拥堵

multiple的true和false 代表不同意思

true代表批量应答 channel 上未应答的消息

比如说 channel 上有传送 tag 的消息 5,6,7,8,当前 tag 是 8,那么此时5-8这些还未应答的消息将会被确认到消息应答

true 会应答前面未确认的消息

false同上面相比

只会应答 tag=8 的消息,5,6,7这三个消息依然不会被确认收到消息应答

false 只会应答当前成功处理的消息

消息确认

单个确认发布

这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认成功发布后,后续的消息才能继续发布,waitForConfirmsOrDie(long) 这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。

这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某些应用程序来说这可能以及足够了。

  1. // 单个确认
  2.    public static void publishMessageIndividual()throws Exception{
  3.        Channel channel = RabbitMQUtils.getChannel();
  4.        // 队列的声明
  5.        String queueName = UUID.randomUUID().toString();
  6.        channel.queueDeclare(queueName, false, false, false, null);
  7.        // 开启发布确认
  8.        channel.confirmSelect();
  9.        // 开启时间
  10.        long begin = System.currentTimeMillis();
  11.        // 批量发消息
  12.        for (int i = 0; i < MESSAGE_COUNT; i++) {
  13.            String message = i + "";
  14.            channel.basicPublish("", queueName, null, message.getBytes());
  15.            // 单个消息就马上进行发布确认
  16.            boolean flag = channel.waitForConfirms();
  17.            if (flag){
  18.                System.out.println("消息发送成功");
  19.           }else {
  20.                System.out.println("发布失败");
  21.           }
  22.       }
  23.        // 结束时间
  24.        long end = System.currentTimeMillis();
  25.        System.out.println("发布" + MESSAGE_COUNT + "条单独确认的消息,耗时:" + (end - begin) + "ms");
  26.   }

批量确认发布

单个确认发布方式非常慢,与其相比,先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布。

  1. // 批量发布确认
  2.    public static void publicMessageBatch() throws Exception{
  3.        // 获取信道
  4.        Channel channel = RabbitMQUtils.getChannel();
  5.        // 创建队列
  6.        String queueName = UUID.randomUUID().toString();
  7.        channel.queueDeclare(queueName, false, false, false, null);
  8.        // 开启发布确认
  9.        channel.confirmSelect();
  10.        // 开始时间
  11.        long begin = System.currentTimeMillis();
  12.        // 批量确认消息的大小
  13.        int basicSize = 100;
  14.        // 未确认消息个数
  15.        // 发布消息
  16.        for (int i = 0; i < MESSAGE_COUNT; i++) {
  17.            String message = i + "";
  18.            channel.basicPublish("", queueName, null, message.getBytes());
  19.            if (i % 100 == 0){
  20.                channel.waitForConfirms();
  21.           }
  22.       }
  23.        // 最后确认发布
  24.        channel.waitForConfirms();
  25.        // 结束时间
  26.        long end = System.currentTimeMillis();
  27.        System.out.println("发布" + MESSAGE_COUNT + "条批量确认的消息,耗时:" + (end - begin) + "ms");
  28.   }
异步确认发布

异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说,它是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功。

如何处理异步未确认消息

最好的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如所用 ConcurrentLinkedQueue 这个队列在从firm callbacks 与发布线程之间的消息传递。

  1. // 异步发布确认
  2.    public static void publishMessageAsync() throws Exception {
  3.        // 创建信道
  4.        Channel channel = RabbitMQUtils.getChannel();
  5.        // 创建队列
  6.        String queueName = UUID.randomUUID().toString();
  7.        channel.queueDeclare(queueName, false, false, false, null);
  8.        // 开启发布确认
  9.        channel.confirmSelect();
  10.        /**
  11.         * 线程安全有序的一个哈希表 适用于高并发的情况下
  12.         * 1. 轻松的将序号与消息进行管理那
  13.         * 2. 轻松批量删除条目 只需要序号
  14.         * 3. 支持高并发
  15.         */
  16.        ConcurrentSkipListMap<Long, Object> outstandingConfirms = new ConcurrentSkipListMap<>();
  17.        // 准备消息的监听器 监听哪些消息成功了 哪些消息失败了
  18.        // 消息确认成功 回调函数
  19.        /**
  20.         * 1、 deliveryTag 消息的标记 默认从1开始
  21.         * 2、 multiple 是否为批量确认
  22.         */
  23.        ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
  24.            // 删除已经确认的消息 剩下的就是未确认的消息
  25.            if (multiple){
  26.                // 批量发消息 批量删除
  27.                ConcurrentNavigableMap<Long, Object> confirmed =
  28.                        outstandingConfirms.headMap(deliveryTag);
  29.                confirmed.clear();
  30.           }else {
  31.                outstandingConfirms.remove(deliveryTag);
  32.           }
  33.            System.out.println("确认的消息" + deliveryTag);
  34.       };
  35.        // 消息确认失败 回调函数
  36.        ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
  37.            // 有哪些未确认的消息
  38.            String message = (String) outstandingConfirms.get(deliveryTag);
  39.            System.out.println("未确认的消息是" + message);
  40.            System.out.println("未确认的消息" + deliveryTag);
  41.       };
  42.        /**
  43.         * 1、监听哪些消息成功了
  44.         * 2、监听哪些消息失败了
  45.         */
  46.        channel.addConfirmListener(ackCallback, nackCallback);
  47.        // 开始时间
  48.        long begin = System.currentTimeMillis();
  49.        // 批量发送消息
  50.        for (int i = 0; i < MESSAGE_COUNT; i++) {
  51.            String message = "消息" + i;
  52.            // 在此处记录下所有要发送的消息
  53.            outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
  54.            channel.basicPublish("", queueName, null, message.getBytes());
  55.       }
  56.        long end = System.currentTimeMillis();
  57.        System.out.println("发布" + MESSAGE_COUNT + "条异步发布确认的消息,耗时:" + (end - begin) + "ms");
  58.   }
总结

单独发布消息:

同步等待确认,简单,但吞吐量非常有限

批量发布消息:

批量同步等待确认,简单,合理的吞吐量,一旦出现问题很难知道具体是哪条消息出现了问题

异步处理:

最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现较复杂

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/393736
推荐阅读
相关标签
  

闽ICP备14008679号