当前位置:   article > 正文

MQ 入门(四)—— 消息确认机制Ack_mq ack

mq ack

一、ACK机制简介
ACK (Acknowledgement),即确认字符,在数据通信中,接收站发给发送站的一种传输类控制字符。表示发来的数据已确认接收无误。

JMS API中约定了Client端可以使用四种ACK_MODE,在javax.jms.Session接口中:

 

1 AUTO_ACKNOWLEDGE = 1               自动确认
2 CLIENT_ACKNOWLEDGE = 2             客户端手动确认   
3 DUPS_OK_ACKNOWLEDGE = 3            自动批量确认
4 SESSION_TRANSACTED = 0             事务提交并确认
5 INDIVIDUAL_ACKNOWLEDGE = 4         单条消息确认(AcitveMQ补充了一个自定义的ACK_MODE)
 

Client端指定了ACK_MODE,但是在Client与broker在交换ACK指令的时候,还需要告知ACK_TYPE,ACK_TYPE表示此确认指令的类型,不同的ACK_TYPE将传递着消息的状态,broker可以根据不同的ACK_TYPE对消息进行不同的操作。比如Consumer消费消息时出现异常,就需要向broker发送ACK指令,ACK_TYPE为"REDELIVERED_ACK_TYPE",那么broker就会重新发送此消息。在JMS API中并没有定义ACT_TYPE,因为它通常是一种内部机制,并不会面向开发者。ActiveMQ中定义了如下几种ACK_TYPE(参看MessageAck类):

1 DELIVERED_ACK_TYPE = 0          消息"已接收",但尚未处理结束
2 STANDARD_ACK_TYPE = 2           “标准”类型,通常表示为消息"处理成功",broker端可以删除消息了
3 POSION_ACK_TYPE = 1             消息"错误",通常表示"抛弃"此消息,比如消息重发多次后,都无法正确处理时,消息将会被删除或者DLQ(死信队列)
4 REDELIVERED_ACK_TYPE = 3        消息需"重发",比如consumer处理消息时抛出了异常,broker稍后会重新发送此消息
5 INDIVIDUAL_ACK_TYPE = 4         表示只确认"单条消息",无论在任何ACK_MODE下    
6 UNMATCHED_ACK_TYPE = 5          BROKER间转发消息时,接收端"拒绝"消息
 

到目前为止,我们已经清楚了大概的原理: Client端在不同的ACK_MODE时,将意味着在不同的时机发送ACK指令,每个ACK Command中会包含ACK_TYPE,那么broker端就可以根据ACK_TYPE来决定此消息的后续操作.。

二、ACK机制实现
1、在配置文件中配置应答方式。当在消息监听容器中配置如下属性:

  1. <!-- 消息确认机制 -->
  2.         <property name="sessionAcknowledgeMode" value="4"></property>

当消费者从队列中处理消息,如果出现异常,该消息还会保存在带队列中,直到消息正常处理完,才会被标记为已出队。

2、具体配置如下:

  1.     <!-- 消息监听容器 消息接收监听器用于异步接收消息 -->
  2.     <bean id="jmsContainerOne" class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
  3.         <property name="connectionFactory" ref="connectionFactory" />  
  4.         <property name="destination" ref="destinationOne" />  
  5.         <property name="messageListener" ref="receiverOne" />
  6.         <!-- 消息确认机制 -->
  7.         <property name="sessionAcknowledgeMode" value="4"></property>
  8.     </bean>

使用bean的方式配置的消息监听容器,会与之前的Mq入门一、二有所不同,之前的实现方式为:

  1. <!-- 定义Queue监听器 -->
  2.      <jms:listener-container destination-type="queue" container-type="default" receive-timeout="5000" connection-factory="connectionFactory" acknowledge="auto">
  3.         ......
  4.     </jms:listener-container>

使用jms:listener-container配置的消息监听容器,该属性acknowledge 只支持 以下四种
    - auto         AUTO_ACKNOWLEDGE = 1               自动确认
    - client       CLIENT_ACKNOWLEDGE = 2             客户端手动确认 
    - dups-ok      DUPS_OK_ACKNOWLEDGE = 3            自动批量确认
    - transacted   SESSION_TRANSACTED = 0             事务提交并确认

例子与上一遍文章的一样 MQ 入门(三)—— 消息重发机制

但消费者会有所不一样:具体代码如下:

  1. @Component
  2. public class ReceiverOne implements SessionAwareMessageListener {
  3. //测试方法
  4. public void onMessage(Message message, Session session) throws JMSException {
  5. System.out.println("--------------------准备开始接受消息------------------------");
  6. TextMessage textMsg = (TextMessage) message;
  7. try {
  8. if ("确认机制".equals(textMsg.getText())) {
  9. System.out.println("----------------");
  10. throw new RuntimeException("故意抛出的异常");
  11. }
  12. System.out.println(textMsg.getText());
  13. System.out.println("--------------------接受消息结束------------------------");
  14. // 注意 message.acknowledge() 当该代码注释时,确认机制也能起作用
  15. message.acknowledge();
  16. } catch (Exception e) {
  17. //此不可省略 重发信息使用,当该代码注释掉之后,该队列消息不能移出队列
  18. //session.recover(); //在使用确认机制时,此代码需要注释。如果不注释,消息会被重发,最终移入死信队列
  19. System.out.println("异常");
  20. }
  21. }
  22. }

在实现确认机制时、消费者可以实现MessageListener接口。如下:

  1. @Component
  2. public class ReceiverOne implements MessageListener {
  3. //测试方法 --- 该方法为实现SessionAwareMessageListener 监听器接口
  4. public void onMessage(Message message, Session session) throws JMSException {
  5. System.out.println("--------------------准备开始接受消息------------------------");
  6. TextMessage textMsg = (TextMessage) message;
  7. try {
  8. if ("确认机制".equals(textMsg.getText())) {
  9. System.out.println("----------------");
  10. throw new RuntimeException("故意抛出的异常");
  11. }
  12. System.out.println(textMsg.getText());
  13. System.out.println("--------------------接受消息结束------------------------");
  14. // 注意 message.acknowledge() 当该代码注释时,确认机制也能起作用
  15. message.acknowledge();
  16. } catch (Exception e) {
  17. //此不可省略 重发信息使用,当该代码注释掉之后,该队列消息不能移出队列
  18. //session.recover(); //在使用确认机制时,此代码需要注释。如果不注释,消息会被重发,最终移入死信队列
  19. System.out.println("异常");
  20. }
  21. }
  22. @Override
  23. public void onMessage(Message message) {
  24. System.out.println("--------------------准备开始接受消息------------------------");
  25. TextMessage textMsg = (TextMessage) message;
  26. try {
  27. if ("确认机制".equals(textMsg.getText())) {
  28. System.out.println("----------------");
  29. throw new RuntimeException("故意抛出的异常");
  30. }
  31. System.out.println(textMsg.getText());
  32. System.out.println("--------------------接受消息结束------------------------");
  33. // 注意 message.acknowledge() 当该代码注释时,确认机制也能起作用
  34. //message.acknowledge();
  35. } catch (Exception e) {
  36. //此不可省略 重发信息使用,当该代码注释掉之后,该队列消息不能移出队列
  37. //session.recover(); //在使用确认机制时,此代码需要注释。如果不注释,消息会被重发,最终移入死信队列
  38. System.out.println("异常");
  39. }
  40. }
  41. }

在测试前,队列的入队,出队数量相等。

先测试正常情况:

此时入队出队情况如下:

测试异常情况,当发送消息为确认机制时,异常抛出:设想:出现异常后,消息不能被确认,此时入队与出队数量不等

入队与出队数量不同。(测试了两次,所以入队比出队数量多2)

参考文章:https://www.cnblogs.com/sjshare/p/8915877.html

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/秋刀鱼在做梦/article/detail/878452?site
推荐阅读
相关标签
  

闽ICP备14008679号