赞
踩
上图中可以看出,从生产者发送消息
到消费者接收到消息并正确处理
,这些里路线都可能会出现问题,那么为了保证这些消息最后能被正确处理,RabbitMQ 就提供了消息确认机制.
为了保证消息从 队列 到 消费者正确消费,那么就引入了消费者消息确认机制.
a)消费者在订阅队列时,可以指定 autoAck 参数,根据这个参数设置,消息确认机制分为以下两种(以下讲到的方法和参数来自于 RabbitMQ 原生的 SDK,非 Spring 提供).
Ps:可靠性高了,性能也就下降了,所以请综合考虑.
b)对于 MQ队列 中的消息,在 MQ管理平台上可以看到以下两种类别:
Ready:队列已经准备好消息,随时准备发送给消费者 的消息数量(只要消费者来要,就立刻发送).
Unacked:消息已经发送给消费者,但是消费者没有返回消息确认 的消息数量(消息确认包括 ack肯定确认
和 nack否定确认
)
消费者在收到消息之后,可以选择确认,也可以选择拒绝或者跳过,RabbitMQ因此提供了不同的确认应答方式,消费者客户端可通过调用 channel 的相关方法实现.
a)肯定确认:消费者已经接收到消息,并且成功处理消息,可以将其丢弃了.
Channel.basicAck(long deliveryTag, boolean multiple)
Ps:deliveryTag 确保了消息传递的可靠性和顺序性.
b)否定确认(单个):用来拒绝这个消息. 被拒绝的消息如何处理,具体要看 requeue 参数.
Channel.basicReject(long deliveryTag, boolean requeue)
c)否定确认(批量):Channel.basicReject 只能拒绝一条消息,如果要批量拒绝消息,就可以使用 Channel.basicNack.
Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
multiple:参数设置为 true 则表示拒绝 deliveryTag 编号之前所有未被当前消费者确认的消息.
d)MQ 的管理平台上也提供了几种确认方式.
Spring-AMQP 对消息确认提供了三种策略:
public enum AcknowledgeMode {
NONE,
MANUAL,
AUTO;
}
这里根 RabbitMQ 原生 SDK 是有些不同的.
不管消费者是否成功处理了消息,RabbitMQ 都会自动确认消息,然后从 队列 中移除消息.
a)配置手动确认
spring:
application:
name: rabbitmq
rabbitmq:
host: env-base
port: 5672
username: root
password: 1111
listener:
simple:
acknowledge-mode: none
b)生产者接口
@RestController
@RequestMapping("/mq")
class MQApi(
val rabbitTemplate: RabbitTemplate
) {
@RequestMapping("/ack")
fun ack(): String {
rabbitTemplate.convertAndSend(MQConst.ACK_EXCHANGE, MQConst.ACK_BINDING, "ack msg 1")
return "ok"
}
}
c)消费者
import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset
@Component
class AckListener {
@RabbitListener(queues = [MQConst.ACK_QUEUE])
fun handMessage(
message: Message,
channel: Channel,
) {
println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, ${message.messageProperties.deliveryTag}")
//业务处理...
println("业务逻辑处理完成")
}
}
d)效果演示
触发接口之后,回到 MQ 管理平台,可以看到队列中消息已经被删除.
分为以下情况:
没有抛出异常
,则自动确认消息,然后从 队列 中移除消息.抛出异常
,则不会确认消息,消息会重返队列,并且不断重试(MQ 管理平台中 Unacked +1).a)配置文件
spring:
application:
name: rabbitmq
rabbitmq:
host: env-base
port: 5672
username: root
password: 1111
listener:
simple:
acknowledge-mode: auto
b)生产者接口
@RestController
@RequestMapping("/mq")
class MQApi(
val rabbitTemplate: RabbitTemplate
) {
@RequestMapping("/ack")
fun ack(): String {
rabbitTemplate.convertAndSend(MQConst.ACK_EXCHANGE, MQConst.ACK_BINDING, "ack msg 1")
return "ok"
}
}
c)消费者(正常处理消息)
import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset
@Component
class AckListener {
@RabbitListener(queues = [MQConst.ACK_QUEUE])
fun handMessage(
message: Message,
channel: Channel,
) {
println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, ${message.messageProperties.deliveryTag}")
//业务处理...
println("业务逻辑处理完成")
}
}
效果如下:
d)消费者(异常处理消息)
import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset
@Component
class AckListener {
@RabbitListener(queues = [MQConst.ACK_QUEUE])
fun handMessage(
message: Message,
channel: Channel,
) {
println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, ${message.messageProperties.deliveryTag}")
//业务处理...
val a = 1 / 0
println("业务逻辑处理完成")
}
}
效果如下:
消息未被确认,会不断重返队列,进行重试,因此 IDEA 中会循环报错输出.
分为以下情况:
a)配置文件
spring:
application:
name: rabbitmq
rabbitmq:
host: env-base
port: 5672
username: root
password: 1111
listener:
simple:
acknowledge-mode: manual
b)生产者接口
@RestController
@RequestMapping("/mq")
class MQApi(
val rabbitTemplate: RabbitTemplate
) {
@RequestMapping("/ack")
fun ack(): String {
rabbitTemplate.convertAndSend(MQConst.ACK_EXCHANGE, MQConst.ACK_BINDING, "ack msg 1")
return "ok"
}
}
c)消费者(异常处理消息,requeue = true)
import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset
@Component
class AckListener {
@RabbitListener(queues = [MQConst.ACK_QUEUE])
fun handMessage(
message: Message,
channel: Channel,
) {
val deliveryTag = message.messageProperties.deliveryTag
try {
println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, $deliveryTag")
//业务处理...
val a = 1 / 0
println("业务逻辑处理完成")
channel.basicAck(deliveryTag, false)
} catch (e: Exception) {
channel.basicNack(deliveryTag, false, true) //requeue: true
}
}
}
由于消息处理异常,发送 nack,并且 requeue = true,因此消息会重返队列,不断重试.
d)消费者(异常处理消息,requeue = false)
import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset
@Component
class AckListener {
@RabbitListener(queues = [MQConst.ACK_QUEUE])
fun handMessage(
message: Message,
channel: Channel,
) {
val deliveryTag = message.messageProperties.deliveryTag
try {
println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, $deliveryTag")
//业务处理...
val a = 1 / 0
println("业务逻辑处理完成")
channel.basicAck(deliveryTag, false)
} catch (e: Exception) {
channel.basicNack(deliveryTag, false, false) //requeue: false
}
}
}
由于消息处理异常,发送 nack,并且 requeue = false,因此消息不会重返队列,消息被丢弃.
d)消费者(正常处理消息)
import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset
@Component
class AckListener {
@RabbitListener(queues = [MQConst.ACK_QUEUE])
fun handMessage(
message: Message,
channel: Channel,
) {
val deliveryTag = message.messageProperties.deliveryTag
try {
println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, $deliveryTag")
//业务处理...
println("业务逻辑处理完成")
channel.basicAck(deliveryTag, false)
} catch (e: Exception) {
channel.basicNack(deliveryTag, false, false) //requeue: false
}
}
}
消息被正常处理,返回 ack.
如果这里捕获的不是 Exception 异常,那么消费者处理消息的时候,可能会引发一些不会被捕获的异常,就会导致没有返回 nack.
也就意味着,没有进行确认应答,那么 mq管理平台 上就会显示 Unacked 数值 +1.
Ps:具体还是需要根据业务场景而定
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。