当前位置:   article > 正文

RabbitMQ发布确认

RabbitMQ发布确认

1.单个确认

单个确认发布是一种同步确认发布方式,也就是发布一个消息后只有它被确认发布,后续的消息才能继续发布。
缺点:发布速度特别慢,因为若是没有确认发布的消息会阻塞所有后续消息的发布

package com.hong.rabbitmq5;

import com.hong.utils.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

import java.util.UUID;

/**
 * @Description: 发布确认-单个确认发布
 * 单个确认发布是一种同步确认发布方式,也就是发布一个消息后只有它被确认发布,后续的消息才能继续发布。
 * 缺点:发布速度特别慢,因为若是没有确认发布的消息会阻塞所有后续消息的发布
 * @Author: hong
 * @Date: 2023-12-21 20:52
 * @Version: 1.0
 **/
public class Task5 {
    public static void main(String[] args) throws Exception{
        String queueName = UUID.randomUUID().toString();
        Channel channel = RabbitMQUtil.getChannel();
        //队列持久化   true持久化
        channel.queueDeclare(queueName,true,false,false,null);
        //开启发布确认  默认关闭
        channel.confirmSelect();
        long startTime = System.currentTimeMillis();
        for(int i = 0; i <= 1000; i++){
            String message = i + "";
            //消息持久化  MessageProperties.PERSISTENT_TEXT_PLAIN
            channel.basicPublish("",queueName, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
            //发布就确认
            boolean flag = channel.waitForConfirms();
            if(flag){
                System.out.println("第"+i+"个消息发布成功!");
            }
        }
        long endTime = System.currentTimeMillis();
        System.out.println("发布1000个消息单个确认发布总耗时:"+(endTime - startTime)+"ms");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

在这里插入图片描述

2. 批量确认

批量确认发布也是一种同步确认发布方式,一批确认一次,相比单个确认发布极大地提升了吞吐量
缺点:一旦出故障难以确认到底是哪个消息出问题了

package com.hong.rabbitmq5;

import com.hong.utils.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

import java.util.UUID;

/**
 * @Description: 批量确认发布
 * 批量确认发布也是一种同步确认发布方式,一批确认一次,相比单个确认发布极大地提升了吞吐量
 * 缺点:一旦出故障难以确认到底是哪个消息出问题了
 * @Author: hong
 * @Date: 2024-01-03 17:44
 * @Version: 1.0
 **/
public class BatchConfirmPublish {
    public static void main(String[] args) throws Exception {
        String queueName = UUID.randomUUID().toString();
        Channel channel = RabbitMQUtil.getChannel();
        //队列持久化   true持久化
        channel.queueDeclare(queueName,true,false,false,null);
        //开启发布确认  默认关闭
        channel.confirmSelect();
        //100个确认一次
        int batchSize = 100;
        long startTime = System.currentTimeMillis();
        for(int i = 1; i <= 1000; i++){
            String message = i + "";
            //消息持久化  MessageProperties.PERSISTENT_TEXT_PLAIN
            channel.basicPublish("",queueName, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));

            if(i%batchSize == 0){
                //发布就确认
                channel.waitForConfirms();
            }
        }
        long endTime = System.currentTimeMillis();
        System.out.println("发布1000个消息,批量确认发布总耗时:"+(endTime - startTime)+"ms");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

在这里插入图片描述

3. 异步确认

package com.hong.rabbitmq5;

import com.hong.utils.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.MessageProperties;

import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

/**
 * @Description: 异步确认发布
 * @Author: hong
 * @Date: 2024-01-06 20:41
 * @Version: 1.0
 **/
public class AsynConfirmPublish {
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtil.getChannel();
        //队列的声明
        String queueName = UUID.randomUUID().toString();
        //队列持久化   true持久化
        channel.queueDeclare(queueName,true,false,false,null);
        //开启发布确认  默认关闭
        channel.confirmSelect();

        ConcurrentSkipListMap<Long,String> concurrentSkipListMap =
                new ConcurrentSkipListMap<>();

        //消息确认回调的函数
        ConfirmCallback ackCallback = (deliveryTag, multiple) ->{
            if(multiple) {
                //2.删除掉已经确认的消息 剩下的就是未确认的消息
                ConcurrentNavigableMap<Long, String> confirmed =
                        concurrentSkipListMap.headMap(deliveryTag);
                confirmed.clear();
            }else {
                concurrentSkipListMap.remove(deliveryTag);
            }
            System.out.println("确认的消息:" + deliveryTag);
        };

        //消息确认失败回调函数
        ConfirmCallback nackCallback= (deliveryTag,multiple) ->{
            //3.打印一下未确认的消息都有哪些
            String message = concurrentSkipListMap.get(deliveryTag);
            System.out.println("未确认的消息tag:"  + deliveryTag + "-----------未确认的消息是:" + message);
        };

        /*
         * 消息的监听器   异步通知
         */
        channel.addConfirmListener(ackCallback,nackCallback);

        long begin = System.currentTimeMillis();

        //批量发送消息
        for (int i = 1; i <= 1000; i++) {
            String message= "消息" + i;
            //消息持久化  MessageProperties.PERSISTENT_TEXT_PLAIN
            channel.basicPublish("",queueName, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
            //1.此处记录下所有要发送的消息 消息的总和
            concurrentSkipListMap.put(channel.getNextPublishSeqNo(),message);
        }

        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布1000个消息,异步确认消息总耗时:"+(end-begin)+"ms");

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/70249
推荐阅读
相关标签
  

闽ICP备14008679号