赞
踩
RabbitMQ 的 AMQP 协议实现了事务机制,允许开发者保证消息的发送和接收时原子性的,也就是说,要么消息全都发送成功,要么全都发送失败(只与发送方有关).
AMQP 事务实现类似于传统数据库事务,允许在一个事务中发送多条消息,并在最后提交或回滚.
事务开始:
客户端发送 tx.select 方法,RabbitMQ 开启一个新的事务上下文.事务内的操作:
客户端发送消息 basic.publish,RabbitMQ 将这些消息暂存在内存中,并标记为未提交.事务提交:
客户端发送 tx.commit 方法,RabbitMQ 将所有暂存的消息写入队列,并且如果消息标记为持久化,那么就把消息保存到磁盘.事务回滚
:客户端发送 tx.rollback 方法,RabbitMQ 丢弃所有暂存的消息,不会写入队列.a)配置文件
spring:
application:
name: rabbitmq
rabbitmq:
host: env-base
port: 5672
username: root
password: 1111
b)配置常量
object MQConst {
// 事务
const val TRANS_QUEUE = "trans.queue"
}
c)定义队列
@Configuration
class MQConfig {
@Bean
fun transQueue() = Queue(MQConst.TRANS_QUEUE)
}
d)发送接口
两条消息之间故意触发异常.
@RestController
@RequestMapping("/mq")
class MQApi(
val rabbitTemplate: RabbitTemplate,
) {
@RequestMapping("/trans-close")
fun transClose(): String {
rabbitTemplate.convertAndSend("", MQConst.TRANS_QUEUE, "trans msg 1")
//触发异常
val a = 1 / 0
rabbitTemplate.convertAndSend("", MQConst.TRANS_QUEUE, "trans msg 1")
return "ok"
}
}
e)效果如下:
此时队列中只有一条消息,说明消息不具备事务特性.
a)配置两个 Bean
Template bean
.RabbitMQ事务管理器
.@Configuration
class GlobalConfig {
@Bean("transRabbitTemplate")
fun transRabbitTemplate(
connectionFactory: ConnectionFactory
): RabbitTemplate {
val mq = RabbitTemplate(connectionFactory)
mq.isChannelTransacted = true // 开启事务机制
return mq
}
@Bean
fun rabbitTransactionManager(
connectionFactory: ConnectionFactory
): RabbitTransactionManager {
return RabbitTransactionManager(connectionFactory)
}
}
b)如下步骤:
mq bean
.@Transaction
注解.@RestController
@RequestMapping("/mq")
class MQApi(
val transRabbitTemplate: RabbitTemplate,
) {
@Transactional
@RequestMapping("/trans-open")
fun transOpen(): String {
transRabbitTemplate.convertAndSend("", MQConst.TRANS_QUEUE, "trans msg 1")
//触发异常
val a = 1 / 0
transRabbitTemplate.convertAndSend("", MQConst.TRANS_QUEUE, "trans msg 1")
return "ok"
}
}
c)效果演示
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。