赞
踩
一、ACK机制简介
ACK (Acknowledgement),即确认字符,在数据通信中,接收站发给发送站的一种传输类控制字符。表示发来的数据已确认接收无误。
JMS API中约定了Client端可以使用四种ACK_MODE,在javax.jms.Session接口中:
1 AUTO_ACKNOWLEDGE = 1 自动确认
2 CLIENT_ACKNOWLEDGE = 2 客户端手动确认
3 DUPS_OK_ACKNOWLEDGE = 3 自动批量确认
4 SESSION_TRANSACTED = 0 事务提交并确认
5 INDIVIDUAL_ACKNOWLEDGE = 4 单条消息确认(AcitveMQ补充了一个自定义的ACK_MODE)
Client端指定了ACK_MODE,但是在Client与broker在交换ACK指令的时候,还需要告知ACK_TYPE,ACK_TYPE表示此确认指令的类型,不同的ACK_TYPE将传递着消息的状态,broker可以根据不同的ACK_TYPE对消息进行不同的操作。比如Consumer消费消息时出现异常,就需要向broker发送ACK指令,ACK_TYPE为"REDELIVERED_ACK_TYPE",那么broker就会重新发送此消息。在JMS API中并没有定义ACT_TYPE,因为它通常是一种内部机制,并不会面向开发者。ActiveMQ中定义了如下几种ACK_TYPE(参看MessageAck类):
1 DELIVERED_ACK_TYPE = 0 消息"已接收",但尚未处理结束
2 STANDARD_ACK_TYPE = 2 “标准”类型,通常表示为消息"处理成功",broker端可以删除消息了
3 POSION_ACK_TYPE = 1 消息"错误",通常表示"抛弃"此消息,比如消息重发多次后,都无法正确处理时,消息将会被删除或者DLQ(死信队列)
4 REDELIVERED_ACK_TYPE = 3 消息需"重发",比如consumer处理消息时抛出了异常,broker稍后会重新发送此消息
5 INDIVIDUAL_ACK_TYPE = 4 表示只确认"单条消息",无论在任何ACK_MODE下
6 UNMATCHED_ACK_TYPE = 5 BROKER间转发消息时,接收端"拒绝"消息
到目前为止,我们已经清楚了大概的原理: Client端在不同的ACK_MODE时,将意味着在不同的时机发送ACK指令,每个ACK Command中会包含ACK_TYPE,那么broker端就可以根据ACK_TYPE来决定此消息的后续操作.。
二、ACK机制实现
1、在配置文件中配置应答方式。当在消息监听容器中配置如下属性:
- <!-- 消息确认机制 -->
- <property name="sessionAcknowledgeMode" value="4"></property>
当消费者从队列中处理消息,如果出现异常,该消息还会保存在带队列中,直到消息正常处理完,才会被标记为已出队。
2、具体配置如下:
- <!-- 消息监听容器 消息接收监听器用于异步接收消息 -->
- <bean id="jmsContainerOne" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
- <property name="connectionFactory" ref="connectionFactory" />
- <property name="destination" ref="destinationOne" />
- <property name="messageListener" ref="receiverOne" />
- <!-- 消息确认机制 -->
- <property name="sessionAcknowledgeMode" value="4"></property>
- </bean>
使用bean的方式配置的消息监听容器,会与之前的Mq入门一、二有所不同,之前的实现方式为:
- <!-- 定义Queue监听器 -->
- <jms:listener-container destination-type="queue" container-type="default" receive-timeout="5000" connection-factory="connectionFactory" acknowledge="auto">
- ......
- </jms:listener-container>
使用jms:listener-container配置的消息监听容器,该属性acknowledge 只支持 以下四种
- auto AUTO_ACKNOWLEDGE = 1 自动确认
- client CLIENT_ACKNOWLEDGE = 2 客户端手动确认
- dups-ok DUPS_OK_ACKNOWLEDGE = 3 自动批量确认
- transacted SESSION_TRANSACTED = 0 事务提交并确认
例子与上一遍文章的一样 MQ 入门(三)—— 消息重发机制
但消费者会有所不一样:具体代码如下:
- @Component
- public class ReceiverOne implements SessionAwareMessageListener {
- //测试方法
- public void onMessage(Message message, Session session) throws JMSException {
- System.out.println("--------------------准备开始接受消息------------------------");
- TextMessage textMsg = (TextMessage) message;
- try {
- if ("确认机制".equals(textMsg.getText())) {
- System.out.println("----------------");
- throw new RuntimeException("故意抛出的异常");
- }
- System.out.println(textMsg.getText());
- System.out.println("--------------------接受消息结束------------------------");
- // 注意 message.acknowledge() 当该代码注释时,确认机制也能起作用
- message.acknowledge();
- } catch (Exception e) {
- //此不可省略 重发信息使用,当该代码注释掉之后,该队列消息不能移出队列
- //session.recover(); //在使用确认机制时,此代码需要注释。如果不注释,消息会被重发,最终移入死信队列
- System.out.println("异常");
- }
- }
- }

在实现确认机制时、消费者可以实现MessageListener接口。如下:
- @Component
- public class ReceiverOne implements MessageListener {
- //测试方法 --- 该方法为实现SessionAwareMessageListener 监听器接口
- public void onMessage(Message message, Session session) throws JMSException {
- System.out.println("--------------------准备开始接受消息------------------------");
- TextMessage textMsg = (TextMessage) message;
- try {
- if ("确认机制".equals(textMsg.getText())) {
- System.out.println("----------------");
- throw new RuntimeException("故意抛出的异常");
- }
- System.out.println(textMsg.getText());
- System.out.println("--------------------接受消息结束------------------------");
- // 注意 message.acknowledge() 当该代码注释时,确认机制也能起作用
- message.acknowledge();
- } catch (Exception e) {
- //此不可省略 重发信息使用,当该代码注释掉之后,该队列消息不能移出队列
- //session.recover(); //在使用确认机制时,此代码需要注释。如果不注释,消息会被重发,最终移入死信队列
- System.out.println("异常");
- }
- }
-
- @Override
- public void onMessage(Message message) {
- System.out.println("--------------------准备开始接受消息------------------------");
- TextMessage textMsg = (TextMessage) message;
- try {
- if ("确认机制".equals(textMsg.getText())) {
- System.out.println("----------------");
- throw new RuntimeException("故意抛出的异常");
- }
- System.out.println(textMsg.getText());
- System.out.println("--------------------接受消息结束------------------------");
- // 注意 message.acknowledge() 当该代码注释时,确认机制也能起作用
- //message.acknowledge();
- } catch (Exception e) {
- //此不可省略 重发信息使用,当该代码注释掉之后,该队列消息不能移出队列
- //session.recover(); //在使用确认机制时,此代码需要注释。如果不注释,消息会被重发,最终移入死信队列
- System.out.println("异常");
- }
- }
- }

在测试前,队列的入队,出队数量相等。
先测试正常情况:
此时入队出队情况如下:
测试异常情况,当发送消息为确认机制时,异常抛出:设想:出现异常后,消息不能被确认,此时入队与出队数量不等
入队与出队数量不同。(测试了两次,所以入队比出队数量多2)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。