赞
踩
RabbitMQ消息确认的本质也就是为了解决RabbitMQ消息丢失问题,因为哪怕我们做了RabbitMQ持久化,其实也并RabbitMQ持久化 不能保证解决我们的消息丢失问题
RabbitMQ的消息确认有两种
正常情况下,生产者会通过交换机发送消息至队列中,再由消费者来进行消费,但是其实RabbitMQ在接收到消息后,还需要一段时间消息才能存入磁盘,并且其实也不是每条消息都会存入磁盘,可能仅仅只保存到cache中,这时,如果RabbitMQ正巧发生崩溃,消息则就会丢失,所以为了避免该情况的发生,我们引入了生产者确认机制,rabbitmq对此提供了两种方式:
publisher confirm
)实现channel.txSelect()
: 将当前信道设置成事务模式channel.txCommit()
: 用于提交事务channel.txRollback()
: 用于回滚事务通过事务实现机制,只有消息成功被rabbitmq服务器接收,事务才能提交成功,否则便可在捕获异常之后进行回滚,然后进行消息重发,但是事务非常影响rabbitmq的性能。还有就是事务机制是阻塞的过程,只有等待服务器回应之后才会处理下一条消息
- /**
- * 创建生产者
- */
- public class Send {public static void main(String[] args) throws IOException, TimeoutException {
- //从MQ工具类获取连接信息
- Connection connection = MqConnectionUtils.getConnection();
- //创建一个通道
- Channel channel = connection.createChannel();
-
- //准备发送的消息内容
- String msg = "你好";
-
- //准备交换机(已创建的交换机)
- String exchangeName = "direct-exchange";
-
- //准备路由
- String routekey = "email";
-
- try{
- //将信道设置为事务模式
- channel.txSelect();
- //发送消息给交换机
- /**
- * 参数1:交换机,不定义也会有默认的,因为我们的消息是通过交换机来进行投递给队列的,所以交换机不可能没有
- * 参数2:routekey
- * 参数3:消息的状态控制
- * 参数4:消息内容
- */
- //该模式因为是由交换机发给该交换机绑定的所有队列,所以可以不标明队列名称
- channel.basicPublish(exchangeName,routekey,null,msg.getBytes());
- //事务提交
- channel.txCommit();
- System.out.print("发送成功");
- } catch (Exception e){
- //如果消息发送给交换机的过程出现异常,则捕捉并进行回滚
- channel.txRollback();
- System.out.print("发送失败并回滚");
- }
-
- //关闭通道
- channel.close();
- connection.close();
- }
- }
confirm方式有三种模式:普通confirm模式、批量confirm模式、异步confirm模式
channel.confirmSelect()
: 将当前信道设置成了confirm模式普通confirm模式
每发送一条消息,就调用waitForConfirms()方法,等待服务端返回Ack或者nack消息
- /**
- * 创建生产者
- */
- public class Send {
-
- public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
- //从MQ工具类获取连接信息
- Connection connection = MqConnectionUtils.getConnection();
- //创建一个通道
- Channel channel = connection.createChannel();
-
- //准备发送的消息内容
- String msg = "你好";
-
- //准备交换机(已创建的交换机)
- String exchangeName = "direct-exchange";
-
- //准备路由
- String routekey = "email";
-
- //将当前信道设置成confirm模式
- channel.confirmSelect();
- for(int i = 0;i<20;i++){
- //发送消息给交换机
- /**
- * 参数1:交换机,不定义也会有默认的,因为我们的消息是通过交换机来进行投递给队列的,所以交换机不可能没有
- * 参数2:routekey
- * 参数3:消息的状态控制
- * 参数4:消息内容
- */
- //该模式因为是由交换机发给该交换机绑定的所有队列,所以可以不标明队列名称
- channel.basicPublish(exchangeName,routekey,null,msg.getBytes());
- //信道为confirm模式后,即可通过waitForConfirms()接收服务端返回来的信息
- if(channel.waitForConfirms()){
- System.out.print("发送成功");
- }
- }
-
- final long start = System.currentTimeMillis();
- System.out.println("执行waitForConfirmsOrDie耗费时间"+(System.currentTimeMillis()-start)+"ms");
-
- //关闭通道
- channel.close();
- connection.close();
- }
- }
批量confirm模式
每发送一批消息,就调用waitForConfirmsOrDie()方法,该方法会等到最后一条消息得到ack或者得到nack才会结束,也就是说在waitForConfirmsOrDie处才会造成程序的阻塞
- /**
- * 创建生产者
- */
- public class Send {
-
- public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
- //从MQ工具类获取连接信息
- Connection connection = MqConnectionUtils.getConnection();
- //创建一个通道
- Channel channel = connection.createChannel();
-
- //准备发送的消息内容
- String msg = "你好";
-
- //准备交换机(已创建的交换机)
- String exchangeName = "direct-exchange";
-
- //准备路由
- String routekey = "email";
- //将当前信道设置成confirm模式
- channel.confirmSelect();
- for(int i = 0;i<20;i++){
- //发送消息给交换机
- /**
- * 参数1:交换机,不定义也会有默认的,因为我们的消息是通过交换机来进行投递给队列的,所以交换机不可能没有
- * 参数2:routekey
- * 参数3:消息的状态控制
- * 参数4:消息内容
- */
- //该模式因为是由交换机发给该交换机绑定的所有队列,所以可以不标明队列名称
- channel.basicPublish(exchangeName,routekey,null,msg.getBytes());
- }
-
- final long start = System.currentTimeMillis();
- //消息批量发送完成后,通过waitForConfirmsOrDie()方法来接收服务端返回的信息
- channel.waitForConfirmsOrDie();
- System.out.println("执行waitForConfirmsOrDie耗费时间"+(System.currentTimeMillis()-start)+"ms");
-
- //关闭通道
- channel.close();
- connection.close();
- }
- }
异步confirm模式
通过channel,addConfirmListener()监听发送方确认模式,通过信道中的waitForConfirmsOrDie等待传回ack或者nack
- /**
- * 创建生产者
- */
- public class Send {
-
- public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
- //从MQ工具类获取连接信息
- Connection connection = MqConnectionUtils.getConnection();
- //创建一个通道
- Channel channel = connection.createChannel();
-
- //准备发送的消息内容
- String msg = "你好";
-
- //准备交换机(已创建的交换机)
- String exchangeName = "fanout-exchanges";
- //准备路由
- String routekey = "";
- //将当前信道设置成confirm模式
- channel.confirmSelect();
- for(int i = 0;i<100;i++){
- msg= i + "chen";
- //发送消息给交换机
- /**
- * 参数1:交换机,不定义也会有默认的,因为我们的消息是通过交换机来进行投递给队列的,所以交换机不可能没有
- * 参数2:routekey
- * 参数3:消息的状态控制
- * 参数4:消息内容
- */
- //该模式因为是由交换机发给该交换机绑定的所有队列,所以可以不标明队列名称
- channel.basicPublish(exchangeName,routekey,null,msg.getBytes());
- }
- final long start = System.currentTimeMillis();
- //通过addConfirmListener来监听信道
- channel.addConfirmListener(new ConfirmListener() {
- //消息发送成功
- @Override
- public void handleAck(long deliveryTag, boolean multiple) throws IOException {
- System.out.println("以确认消息:"+ deliveryTag + " 多个消息:" + multiple);
- }
- //消息发送失败
- @Override
- public void handleNack(long deliveryTag, boolean multiple) throws IOException {
- System.out.println("no ack");
- }
- });
- System.out.println("执行waitForConfirmsOrDie耗费时间"+(System.currentTimeMillis()-start)+"ms");
-
- //关闭通道
- channel.close();
- connection.close();
- }
- }
消息接收确认机制,分为消息自动确认模式和消息手动确认模式,当消息确认后,我们队列中的消息将会移除
那这两种模式要如何选择呢?
注:自动确认模式,消费者不会判断消费者是否成功接收到消息,也就是当我们程序代码有问题,我们的消息都会被自动确认,消息被自动确认了,我们队列就会移除该消息,这就会造成我们的消息丢失
- /**
- * 消费者
- */
- public class Recv {
- //设定队列名称(已存在的队列)
- private static final String QUEUE_NAME = "queue1";
- public static void main(String[] args) throws IOException, TimeoutException {
- //从mq工具类获取连接信息
- Connection connection = MqConnectionUtils.getConnection();
- //获取一个通道
- Channel channel = connection.createChannel();
- //监听该队列,true代表自动确认
- channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] body) throws IOException{
- System.out.println("接收到的消息:"+ new String(body,"UTF-8"));
- }
- });
- }
- }
实现效果,消费者会将我们队列中的消息全部接收然后确认,并移除队列
- /**
- * 消费者
- */
- public class Recv {
- //设定队列名称(已存在的队列)
- private static final String QUEUE_NAME = "queue1";
- public static void main(String[] args) throws IOException, TimeoutException {
- //从mq工具类获取连接信息
- Connection connection = MqConnectionUtils.getConnection();
- //获取一个通道
- Channel channel = connection.createChannel();
- //监听该队列,false代表手动确认
- channel.basicConsume(QUEUE_NAME,false,new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] body) throws IOException{
- System.out.println("接收到的消息:"+ new String(body,"UTF-8"));
- }
- });
- }
- }
手动确认模式下,当我们消费者成功接收到消息后,在队列中消息会进入Unacked项,也就是待确认模式
所以我们还需要加上下列代码,来实现消息者在成功接收到消息后,手动确认
#添加红色字段
- /**
- * 消费者
- */
- public class Recv {
- //设定队列名称(已存在的队列)
- private static final String QUEUE_NAME = "queue1";
- public static void main(String[] args) throws IOException, TimeoutException {
- //从mq工具类获取连接信息
- Connection connection = MqConnectionUtils.getConnection();
- //获取一个通道
- Channel channel = connection.createChannel();
- //监听该队列
- channel.basicConsume(QUEUE_NAME,false,new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] body) throws IOException{
- System.out.println("接收到的消息:"+ new String(body,"UTF-8"));
-
- //获取消息的编号,我们需要根据消息的编号来确认消息
- long tag = envelope.getDeliveryTag();
- //获取当前内部类中的通道
- Channel c = this.getChannel();
- //手动确认消息,确认以后,则表示消息已经成功处理,消息就会从队列中移除,false代表只确认当前一个消息,true确认所有consumer获得的消息
- c.basicAck(tag,false);
- }
- });
- }
- }
此时,我们的消息才会成功被确认,并移除队列
基于rabbitmq实现延迟队列
一、rabbitmq延迟队列概念
延时队列 , 队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望
在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。
二、延迟队列使用场景
1.顾客在食堂吃饭的订单,在十分钟之后支付。
2. 订单在十分钟之内未支付则自动取消
3. 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
4. 用户注册成功后,如果三天内没有登陆则进行短信提醒。
5. 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
6. 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。
这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如: 发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎 使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?如果 数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求, 如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支 付的账单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,如:“订单十 分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万 级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。
————————————————
原文链接:https://blog.csdn.net/baozaoderenlei/article/details/125976008
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。