赞
踩
代码仓库:github:https://github.com/stopping5/RabbitMq-Operation-Record.git
本代码示例需要引入rabbitmq依赖
<!-- rabbitmq依赖客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
Publisher confirms are a RabbitMQ extension to implement reliable publishing. When publisher confirms are enabled on a channel, messages the client publishes are confirmed asynchronously by the broker, meaning they have been taken care of on the server side.
发布订阅即使生产者发送消息到broker时,broker可以通过同步或者异步的方式通知消息生产者是否成功。
Channel channel = connection.createChannel();
channel.confirmSelect();
/** * 单独确认 * 消息发送耗时:14492ms * */ public static void simpleConfirmMessage() throws IOException, InterruptedException { Connection connection = RabbitMqUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(RabbitMQConfig.ACK_EXCHANGE, BuiltinExchangeType.DIRECT); //声明队列关闭了自动确认 channel.queueDeclare(RabbitMQConfig.ACK_QUEUE,false,false,false,null); channel.queueBind(RabbitMQConfig.ACK_QUEUE,RabbitMQConfig.ACK_EXCHANGE,"ACK"); //开启消息确认 channel.confirmSelect(); long begin = System.currentTimeMillis(); for (int i = 0; i < PUBLISH_COUNT; i++) { String message = i + ""; channel.basicPublish(RabbitMQConfig.ACK_EXCHANGE,"ACK",null,message.getBytes(StandardCharsets.UTF_8)); boolean flag = channel.waitForConfirms(); if (!flag){ System.out.println("消息发送失败"); } } long end = System.currentTimeMillis(); System.out.println("消息发送耗时:"+(end-begin)+"ms"); }
或者可以使用Channel#waitForConfirmsOrDie(long)消息如果在限定时间内没有答复则抛出异常
Channel#waitForConfirmsOrDie(long) method. The method returns as soon as the message has been confirmed. If the message is not confirmed within the timeout or if it is nack-ed (meaning the broker could not take care of it for some reason), the method will throw an exception.
使用方式和单条发布确认一样,只不是对消息发布确认频率修改成批量确认模式,若确认失败只能知道一批消息有消息发送失败,无法精确到某一条。
/** * 批量发布确认 * 批量确认消息发送耗时:268ms * */ public static void batchConfirmMessage() throws IOException, InterruptedException { Connection connection = RabbitMqUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(RabbitMQConfig.ACK_EXCHANGE, BuiltinExchangeType.DIRECT); //声明队列关闭了自动确认 channel.queueDeclare(RabbitMQConfig.ACK_QUEUE,false,false,false,null); channel.queueBind(RabbitMQConfig.ACK_QUEUE,RabbitMQConfig.ACK_EXCHANGE,"ACK"); //开启消息确认 channel.confirmSelect(); long begin = System.currentTimeMillis(); int batchSize = 100; for (int i = 0; i < PUBLISH_COUNT; i++) { String message = i + ""; channel.basicPublish(RabbitMQConfig.ACK_EXCHANGE,"ACK",null,message.getBytes(StandardCharsets.UTF_8)); if (i%batchSize == 0){ boolean flag = channel.waitForConfirms(); if (flag){ System.out.println("消息发送成功"); } } } long end = System.currentTimeMillis(); System.out.println("批量确认消息发送耗时:"+(end-begin)+"ms"); }
相对于同步确认而言,异步确认机制新增监听器体提供确认成功和确认失败的方法,复用方法即可异步的获取到消息发送到broker的结果。
/** * 批量发布确认 * 异步确认消息发送耗时:36ms * */ public static void asynConfirmMessage() throws IOException, InterruptedException { Connection connection = RabbitMqUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(RabbitMQConfig.ACK_EXCHANGE, BuiltinExchangeType.DIRECT); //声明队列关闭了自动确认 channel.queueDeclare(RabbitMQConfig.ACK_QUEUE,false,false,false,null); channel.queueBind(RabbitMQConfig.ACK_QUEUE,RabbitMQConfig.ACK_EXCHANGE,"ACK"); //开启消息确认 channel.confirmSelect(); //设置异步监听回调方法 ConfirmCallback ackCallback = (deliveryTag,multipl)->{ System.out.println("消息发布确认成功:"+deliveryTag); }; ConfirmCallback unAckCallback = (deliveryTag,multipl)->{ System.out.println("消息发布失败:"+deliveryTag); }; channel.addConfirmListener(ackCallback,unAckCallback); long begin = System.currentTimeMillis(); for (int i = 0; i < PUBLISH_COUNT; i++) { String message = i + ""; channel.basicPublish(RabbitMQConfig.ACK_EXCHANGE,"ACK",null,message.getBytes(StandardCharsets.UTF_8)); } long end = System.currentTimeMillis(); System.out.println("异步确认消息发送耗时:"+(end-begin)+"ms"); }
测试结果可以看出异步确认效率是最高的,其次是批量确认,最后当然是一条一条的单条确认。
public static void main(String[] args) throws IOException, InterruptedException {
//simpleConfirmMessage();//消息发送耗时:14492ms
//batchConfirmMessage();//批量确认消息发送耗时:268ms
//asynConfirmMessage();//异步确认消息发送耗时:36ms
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。