当前位置:   article > 正文

RabbitMQ入门(三)消息应答与发布确认_rabbitmq消息应答

rabbitmq消息应答

前言:

消息应答与发布确认都是保证消息不丢失。而重复消费问题则是消息幂等性。(之后会说幂等性)

消息应答:

应答功能属于消费者,消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。

消息应答有自动应答、手动应答。mq默认为自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改为手动应答。

  1. //在消费时候,设置 false;
  2. channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);

 在消费者成功消费的回调方法 DeliverCallback 中应答:

  1. //声明接收消息
  2. DeliverCallback deliverCallback = (consumerTag, message) -> {
  3. System.out.println(new String(message.getBody()));
  4. /**
  5. * 手动确认应答
  6. * 1.消息的标记Tag
  7. * 2.是否批量应答 false表示不批量应答信道中的消息
  8. */
  9. channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
  10. };

除了手动应答外,还有:

  1. /**
  2. * 否定确认应答
  3. * 1.拒绝 deliveryTag 对应的消息
  4. * 2.是否 requeue:true 则重新入队列,false 则丢弃或者进入死信队列。
  5. * 该方法 reject 后,该消费者还是会消费到该条被 reject 的消息。
  6. */
  7. // channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
  8. /**
  9. * 用于否定确认,表示己拒绝处理该消息,可以将其丢弃了
  10. * 1.拒绝 deliveryTag 对应的消息
  11. * 2.是否 应用于多消息
  12. * Multiple 的解释:手动应答的好处是可以批量应答并且减少网络拥堵
  13. * true 代表批量应答 channel 上未应答的消息
  14. * 比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 ,
  15. * 那么此时 5-8 的这些还未应答的消息都会被确认收到消息应答
  16. * false 同上面相比只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答
  17. * 3.是否 requeue,与 basicReject 区别就是同时支持多个消息,
  18. * 可以 拒绝签收 该消费者先前接收未 ack 的所有消息。拒绝签收后的消息也会被自己消费到。
  19. */
  20. // channel.basicNack(message.getEnvelope().getDeliveryTag(),false,false);
  21. /**
  22. * 是否恢复消息到队列
  23. * 1.是否 requeue,true 则重新入队列,并且尽可能的将之前 recover 的消息投递给其他消费者消费,
  24. * 而不是自己再次消费。false 则消息会重新被投递给自己。
  25. * 消息自动重新入队:
  26. * 如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),
  27. * 导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。
  28. * 如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。
  29. * 这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。
  30. */
  31. // channel.basicRecover(false);

特别注意:消息在手动应答是不丢失的,它会放回队列中重新消费

发布确认:

发布功能属于生产者,生产消息到 RabbitMQ,RabbitMQ 需要告诉生产者已经收到消息。

发布确认逻辑

生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置 basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。

confirm 模式最大的好处在于是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息, 生产者应用程序同样可以在回调方法中处理该 nack 消息。

 发布确认有:单个确认批量确认异步确认

发布确认命令:channel.waitForConfirms();

单个确认:发布一个消息之后只有它被确认发布,后续的消息才能继续发布

  1. package com.example.mqtest.mqtest;
  2. import com.rabbitmq.client.Channel;
  3. import java.io.IOException;
  4. import java.util.UUID;
  5. import java.util.concurrent.TimeoutException;
  6. public class ConfirmMessage {
  7. //单个发消息的个数
  8. public static final int MESSAGE_COUNT = 1000; //Ctrl+Shift+U 变大写
  9. public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {
  10. publishMessageIndividually();//发布1000个单独确认消息,耗时:599ms
  11. }
  12. //单个确认
  13. public static void publishMessageIndividually() throws IOException, TimeoutException, InterruptedException {
  14. Channel channel = RabbitMQUtils.getChannel();
  15. //队列的声明
  16. String queueName = UUID.randomUUID().toString();
  17. channel.queueDeclare(queueName,false,true,false,null);
  18. //开启发布确认
  19. channel.confirmSelect();
  20. //开始时间
  21. long begin = System.currentTimeMillis();
  22. //批量发消息
  23. for (int i = 0; i < 1000; i++) {
  24. String message = i+"";
  25. channel.basicPublish("",queueName,null,message.getBytes());
  26. //单个消息就马上进行发布确认
  27. boolean flag = channel.waitForConfirms();
  28. if(flag){
  29. System.out.println("消息发送成功");
  30. }
  31. }
  32. //结束时间
  33. long end = System.currentTimeMillis();
  34. System.out.println("发布"+MESSAGE_COUNT+"个单独确认消息,耗时:"+(end-begin)+"ms");
  35. }
  36. }

批量确认:先发布一批消息然后一起确认可以极大地提高吞吐量,

缺点:当发生故障导致发布出现问题时,不知道是哪个消息出问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布。

  1. public class ConfirmMessage2 {
  2. //批量发消息的个数
  3. public static final int MESSAGE_COUNT = 1000; //Ctrl+Shift+U 变大写
  4. public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {
  5. publishMessageBatch(); //发布1000个批量确认消息,耗时:111ms
  6. }
  7. //批量发布确认
  8. public static void publishMessageBatch() throws IOException, TimeoutException, InterruptedException {
  9. Channel channel = RabbitMQUtils.getChannel();
  10. //队列的声明
  11. String queueName = UUID.randomUUID().toString();
  12. channel.queueDeclare(queueName, false, true, false, null);
  13. //开启发布确认
  14. channel.confirmSelect();
  15. //开始时间
  16. long begin = System.currentTimeMillis();
  17. //批量确认消息大小
  18. int batchSize =100;
  19. //批量发送消息,批量发布确认
  20. for (int i = 0; i < MESSAGE_COUNT; i++) {
  21. String message=i+"";
  22. channel.basicPublish("",queueName,null,message.getBytes());
  23. //判断达到100条消息的时候,批量确认一次
  24. if((i+1)%batchSize==0){
  25. //发布确认
  26. channel.waitForConfirms();
  27. }
  28. }
  29. //结束时间
  30. long end = System.currentTimeMillis();
  31. System.out.println("发布"+MESSAGE_COUNT+"个批量确认消息,耗时:"+(end-begin)+"ms");
  32. }
  33. }

异步发布:利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功。

  1. public class ConfirmMessage3 {
  2. public static final int MESSAGE_COUNT = 1000; //Ctrl+Shift+U 变大写
  3. public static void main(String[] args) throws Exception {
  4. publishMessageAsync(); //发布1000个异步发布确认消息,耗时:43ms
  5. }
  6. //异步发布确认
  7. public static void publishMessageAsync() throws Exception{
  8. Channel channel = RabbitMQUtils.getChannel();
  9. //队列的声明
  10. String queueName = UUID.randomUUID().toString();
  11. channel.queueDeclare(queueName, false, true, false, null);
  12. //开启发布确认
  13. channel.confirmSelect();
  14. /**
  15. * 线程安全有序的一个哈希表,适用于高并发的情况下
  16. * 1.轻松的将序号与消息进行关联
  17. * 2.轻松批量删除条目 只要给到序号
  18. * 3.支持高并发(多线程)
  19. */
  20. ConcurrentSkipListMap<Long,String> outstandingConfirms=
  21. new ConcurrentSkipListMap<>();
  22. //消息确认回调的函数
  23. ConfirmCallback ackCallback = (deliveryTag,multiple) ->{
  24. if(multiple) {
  25. //2.删除掉已经确认的消息 剩下的就是未确认的消息
  26. ConcurrentNavigableMap<Long, String> confirmed =
  27. outstandingConfirms.headMap(deliveryTag);
  28. confirmed.clear();
  29. }else {
  30. outstandingConfirms.remove(deliveryTag);
  31. }
  32. System.out.println("确认的消息:" + deliveryTag);
  33. };
  34. /**
  35. * 1.消息的标记
  36. * 2.是否为批量确认
  37. */
  38. //消息确认失败回调函数
  39. ConfirmCallback nackCallback= (deliveryTag,multiple) ->{
  40. //3.打印一下未确认的消息都有哪些
  41. String message = outstandingConfirms.remove(deliveryTag);
  42. System.out.println("未确认的消息是:"+message+":::未确认的消息tag:"+deliveryTag);
  43. };
  44. //准备消息的监听器 监听那些消息成功了,哪些消息失败了
  45. /**
  46. * 1.监听哪些消息成功了
  47. * 2.监听哪些消息失败了
  48. */
  49. channel.addConfirmListener(ackCallback,nackCallback);//异步通知
  50. //开始时间
  51. long begin = System.currentTimeMillis();
  52. //批量发送消息
  53. for (int i = 0; i < MESSAGE_COUNT; i++) {
  54. String message=i+"消息";
  55. channel.basicPublish("",queueName,null,message.getBytes());
  56. //1.此处记录下所有要发送的消息 消息的总和
  57. outstandingConfirms.put(channel.getNextPublishSeqNo(),message);
  58. }
  59. //结束时间
  60. long end = System.currentTimeMillis();
  61. System.out.println("发布"+MESSAGE_COUNT+"个异步发布确认消息,耗时:"+(end-begin)+"ms");
  62. }
  63. }

将发布的消息存入 Map 里,方便获取。headMap 方法用于将已确认的消息存入新的 Map 缓存区里,然后清除该新缓存区的内容。因为 headMap 方法是浅拷贝,所以清除了缓存区,相当于清除了内容的地址,也就清除了队列的确认的消息。

处理异步未确认消息最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。

以上 3 种发布确认速度对比:

  • 单独发布消息

    同步等待确认,简单,但吞吐量非常有限。

  • 批量发布消息

    批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是那条消息出现了问题。

  • 异步处理

    最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/393770
推荐阅读
相关标签
  

闽ICP备14008679号