赞
踩
<!-- RabbitMQ起步依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
rabbitmq:
host: 192.168.70.130 # 虚拟机的地址
port: 5672
username: admin
password: admin
virtual-host: /
#日志格式
logging:
pattern:
console: '%d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n'
@Configuration
public class RabbitmqConfig1 {
private final String EXCHANGE_NAME = "boot_exchange";
private final String QUEUE_NAME = "boot_queue";
private final String ROUTE_NAME = "boot_route";
//创建交换机
@Bean(EXCHANGE_NAME)
public Exchange getExchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
//创建队列
@Bean(QUEUE_NAME)
public Queue getQueue(){
return new Queue(QUEUE_NAME);
}
//交换机和队列绑定
@Bean
public Binding exchangeBindQueue(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTE_NAME).noargs();
}
}
//编写发送消息测试类
@SpringBootTest
public class RabbitmqTest {
// 注入RabbitTemplate工具类
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage(){
/**
* 发送消息
* 参数1:交换机
* 参数2:路由key
* 参数3:要发送的消息
*/
rabbitTemplate.convertAndSend("boot_exchange","boot_route","你好我有一个毛衫");
System.out.println("发送消息成功");
}
}
<!-- RabbitMQ起步依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
rabbitmq:
host: 192.168.70.130 # 虚拟机的地址
port: 5672
username: admin
password: admin
virtual-host: /
#日志格式
logging:
pattern:
console: '%d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n'
@Component
public class Consumer1 {
/**
* 监听队列
* @param message
* queues表示监听的队列的名称
*/
@RabbitListener(queues = "boot_queue")
public void listener(String message){
System.out.println("接受到消息 = " + message);
}
}
RabbitMQ消息投递的路径为:
生产者--->交换机--->队列--->消费者
在RabbitMQ工作的过程中,每个环节消息都可能传递失败,那么RabbitMQ是如何监听消息是否成功投递的呢?
交换机
。从交换机成功传递到队列
。消费者
是否成功处理消息。交换机
。只是添加了一句代码
spring:
rabbitmq:
host: 192.168.70.130
port: 5672
username: admin
password: admin
virtual-host: / # 表示使用默认的virtual-host
#开启确认模式
publisher-confirm-type: correlated
#????
logging:
pattern:
console: '%d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n'
@Configuration
public class RabbitmqConfig2Confirm {
public final String EXCHANGE_NAME = "confirm_exchange";
public final String QUEUE_NAME = "confirm_queue";
public final String ROUTING_NAME = "confirm_routing";
// 创建交换机
@Bean(EXCHANGE_NAME)
public Exchange exchange(){
return ExchangeBuilder
.topicExchange(EXCHANGE_NAME)
.durable(true)
.build();
}
// 创建队列
@Bean(QUEUE_NAME)
public Queue queue(){
return QueueBuilder
.durable(QUEUE_NAME)
.build();
}
// 创建交换机和队列绑定
@Bean
public Binding exchangeBindQueue(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){
return BindingBuilder
.bind(queue).
to(exchange)
.with(ROUTING_NAME)
.noargs();
}
}
@Test
void testConfirm() {
//回调确认
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
*
* @param correlationData 配置信息
* @param b 是否成功,true 是 ,false 否
* @param s 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
if (b){
System.out.println("发送成功");
}else{
System.out.println("发送失败,原因:"+s);
}
}
});
//发送消息
/**
* 发送消息
* 参数1:交换机
* 参数2:路由key
* 参数3:要发送的消息
*/
rabbitTemplate.convertAndSend("confirm_exchange","confirm_routing","send message...confirm");
}
由于rabbitmq的confirm确认模式是确认消息是否从生产者成功传递到交换机的,所以就没必要写消费者进行信息的消费了
从交换机成功传递到队列
。只是添加了一句
# rabbitmq???
spring:
rabbitmq:
host: 192.168.70.130
port: 5672
username: admin
password: admin
virtual-host: /
#开启确认模式
publisher-confirm-type: correlated
#开始回退模式
publisher-returns: true
#????
logging:
pattern:
console: '%d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n'
@Configuration
public class RabbitmqConfig3Return {
public final String EXCHANGE_NAME = "return_exchange";
public final String QUEUE_NAME = "return_queue";
public final String ROUTING_NAME = "return_routing";
// 创建交换机
@Bean(EXCHANGE_NAME)
public Exchange exchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
// 创建队列
@Bean(QUEUE_NAME)
public Queue queue(){
return QueueBuilder.durable(QUEUE_NAME).build();
}
// 创建交换机和队列绑定
@Bean
public Binding exchangeBindQueue(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){
return BindingBuilder
.bind(queue)
.to(exchange)
.with(ROUTING_NAME)
.noargs();
}
}
@Test
void testReturnSendMessage(){
// 调用回退模式的回调方法,只有失败才会回调,成功不会回调哦
// 失败后将失败信息封装到参数中
rabbitTemplate.setReturnsCallback(returned ->{
Message message = returned.getMessage();
System.out.println("消息对象:"+message);
System.out.println("错误码:"+returned.getReplyCode());
System.out.println("错误信息:"+returned.getReplyText());
System.out.println("交换机:"+returned.getExchange());
System.out.println("路由键:"+returned.getRoutingKey());
});
// 发送消息
/**
* 发送消息
* 参数1:交换机
* 参数2:路由key
* 参数3:要发送的消息
*/
rabbitTemplate.convertAndSend("return_exchange","return_routing","send message...return");
}
由于rabbitmq的return回退模式是确认消息是否从交换机成功传递到队列的,还没有传递到消费者,所以就没必要写消费者进行信息的消费了
成功发送到队列
上则不会
调用 rabbitTemplate.setReturnsCallback方法,如果发送步成功则调用回调方法rabbitTemplate.setReturnsCallback,
向队列发送确认签收的消息
,只有确认签收的消息才会被移除队列
。这种机制称为消费者消息确认(Consumer Acknowledge,简称Ack)。
自动确认
和手动确认
。
自动签收,并将消息从队列中移除
。出现异常
,那么消息就会丢失
。此时需要设置手动签收,即在业务处理成功后再通知签收消息,如果出现异常,则拒签消息
,让消息依然保留
在队列当中。● 自动确认:spring.rabbitmq.listener.simple.acknowledge=“none”
● 手动确认:spring.rabbitmq.listener.simple.acknowledge=“manual”
不用修改
# rabbitmq???
spring:
rabbitmq:
host: 192.168.70.130
port: 5672
username: admin
password: admin
virtual-host: /
#开启确认模式
publisher-confirm-type: correlated
#开始回退模式
publisher-returns: true
#????
logging:
pattern:
console: '%d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n'
@Configuration
public class RabbitmqConfig4ACK {
public final String EXCHANGE_NAME = "ack_exchange";
public final String QUEUE_NAME = "ack_queue";
public final String ROUTING_NAME = "ack_routing";
// 创建交换机
@Bean(EXCHANGE_NAME)
public Exchange exchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
// 创建队列
@Bean(QUEUE_NAME)
public Queue queue(){
return QueueBuilder.durable(QUEUE_NAME).build();
}
// 创建交换机和队列绑定
@Bean
public Binding exchangeBindQueue(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){
return BindingBuilder
.bind(queue)
.to(exchange)
.with(ROUTING_NAME)
.noargs();
}
}
@Test
void testAck(){
// 发送消息
rabbitTemplate.convertAndSend("ack_exchange","ack_routing","send message...ack");
}
spring:
rabbitmq:
host: 192.168.70.130
port: 5672
username: admin
password: admin
virtual-host: /
#开动手动签收
listener:
simple:
acknowledge-mode: none # 默认就是自动确认
#????
logging:
pattern:
console: '%d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n'
自动签收,并将消息从队列中移除
。当我们拿到消息的时候,业务出现异常了,所以无法正确处理消息,导致消息丢失了。@Component
public class AckConsumer {
// 自动签收
@RabbitListener(queues = "ack_queue")
public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException, IOException {
// 获取消息
String s = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println(s);
// TODO,处理事务
// 故意出错
int i= 1/0;
}
}
spring:
rabbitmq:
host: 192.168.70.130
port: 5672
username: admin
password: admin
virtual-host: /
#开动手动签收
listener:
simple:
acknowledge-mode: manual
#????
logging:
pattern:
console: '%d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n'
@Component
public class AckConsumer {
// 手动签收
@RabbitListener(queues = "ack_queue")
public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException, IOException {
// 消息投递序号,消息每次投递该值都会+1
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// int i = 1/0; //模拟处理消息出现bug
System.out.println("成功接受到消息:"+message);
// 签收消息
/**
* 参数1:消息投递序号
* 参数2:是否一次可以签收多条消息
*/
channel.basicAck(deliveryTag,true);
}catch (Exception e){
System.out.println("消息消费失败!");
Thread.sleep(2000);
// 拒签消息
/**
* 参数1:消息投递序号
* 参数2:是否一次可以拒签多条消息
* 参数3:拒签后消息是否重回队列
*/
channel.basicNack(deliveryTag,true,true);
}
}
}
拒签消息
,让消息依然保留
在队列当中。方便下次请求能够请求到这次因为异常而没有接收到的消息。@Test
public void testLimitSendBatch() {
// 发送十条消息
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("ack_exchange", "ack_routing", "这是第"+i+"条消息");
}
}
最主要就是配置文件的修改:
spring:
rabbitmq:
host: 192.168.70.130
port: 5672
username: admin
password: admin
virtual-host: /
#开动手动签收
listener:
simple:
acknowledge-mode: manual #none是默认的
prefetch: 5 # 每次消费者从队列拉取的消息数量(限制)
#????
logging:
pattern:
console: '%d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n'
@Component
public class ConsumerLimit {
// 手动签收
@RabbitListener(queues = "limit_queue")
public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException, IOException {
// 获取消息
String s = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println(s);
// 模拟业务处理
Thread.sleep(3000);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
// 手动签收
channel.basicAck(deliveryTag,true);
}
}
最主要就是配置文件的修改:
spring:
rabbitmq:
host: 192.168.70.130
port: 5672
username: admin
password: admin
virtual-host: /
#开动手动签收
listener:
simple:
acknowledge-mode: manual #none是默认的
prefetch: 1 # 消费端最多拉取1条消息消费,这样谁处理的快谁拉取下一条消息,实现了不公平分发
#????
logging:
pattern:
console: '%d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n'
@Component
public class ConsumerUnfair {
// 消费者1
@RabbitListener(queues = "ack_queue")
public void listenMessage1(Message message, Channel channel) throws IOException, InterruptedException, IOException {
// 获取消息
String s = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("消费者1"+s);
Thread.sleep(3000);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
// 手动签收
channel.basicAck(deliveryTag,true);
}
// 消费者2
@RabbitListener(queues = "ack_queue")
public void listenMessage2(Message message, Channel channel) throws IOException, InterruptedException, IOException {
// 获取消息
String s = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("消费者2"+s);
Thread.sleep(1000);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
// 手动签收
channel.basicAck(deliveryTag,true);
}
// .......监听方法
}
配置消费端最多拉取1条消息消费
,这样谁处理的快谁拉取下一条消息,实现了不公平分发。消息到达存活时间后还没有被消费
,会被移出队列。RabbitMQ可以对队列的所有消息设置存活时间,也可以对某条消息设置存活时间。@Configuration
public class RabbitmqConfig7ttl {
public final String EXCHANGE_NAME = "ack_exchange";
public final String QUEUE_NAME = "ack_queue";
public final String ROUTING_NAME = "ack_routing";
// 创建交换机
@Bean(EXCHANGE_NAME)
public Exchange exchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
// 创建队列
@Bean(QUEUE_NAME)
public Queue queue(){
return QueueBuilder
.durable(QUEUE_NAME)
// 设置队列的超时的时间,单位是毫秒
.ttl(10000)
.build();
}
// 创建交换机和队列绑定
@Bean
public Binding exchangeBindQueue(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){
return BindingBuilder
.bind(queue)
.to(exchange)
.with(ROUTING_NAME)
.noargs();
}
}
设置单条消息存活时间
@Test
public void testTtlSendBatch() {
// 发送十条消息
for (int i = 0; i < 100; i++) {
if (i%5 == 0) {
//设置消息属性
MessageProperties messageProperties = new MessageProperties();
//设置存活时间
messageProperties.setExpiration("10000");
// 创建消息对象(可以配置消息的一些配置)
Message message = new Message(("这是第" + i + "条消息").getBytes(StandardCharsets.UTF_8), messageProperties);
// 发送消息
rabbitTemplate.convertAndSend("ack_exchange", "ack_routing", message);
}else {
rabbitTemplate.convertAndSend("ack_exchange", "ack_routing", "这是第" + i + "条消息");
}
}
}
设置了单条消息的存活时间,也设置了队列的存活时间
,以时间短
的为准。大型商家和小型商家
。比如像苹果,小米这样大商家一年能给我们创造很大的利润,所以在订单量大时,他们的订单必须得到优先处理,此时就需要为不同的消息设置不同的优先级,此时我们要使用优先级队列。@Configuration
public class RabbitmqConfig8Priority {
public final String EXCHANGE_NAME = "priority_exchange";
public final String QUEUE_NAME = "priority_queue";
public final String ROUTING_NAME = "priority_routing";
// 创建交换机
@Bean(EXCHANGE_NAME)
public Exchange exchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
// 创建队列
@Bean(QUEUE_NAME)
public Queue queue(){
return QueueBuilder
.durable(QUEUE_NAME)
// 设置队列的优先级,值越大优先级越高,一般不超过10
.maxPriority(10)
.build();
}
// 创建交换机和队列绑定
@Bean
public Binding exchangeBindQueue(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){
return BindingBuilder
.bind(queue)
.to(exchange)
.with(ROUTING_NAME)
.noargs();
}
}
@Test
public void testPrioritySendBatch() {
// 发送十条消息
for (int i = 0; i < 100; i++) {
if (i%5 == 0) {
//设置消息属性
MessageProperties messageProperties = new MessageProperties();
// 设置优先级
messageProperties.setPriority(9);
// 创建消息对象(可以配置消息的一些配置)
Message message = new Message(("这是第" + i + "条消息").getBytes(StandardCharsets.UTF_8), messageProperties);
// 发送消息
rabbitTemplate.convertAndSend("priority_exchange", "priority_routing", message);
}else {
rabbitTemplate.convertAndSend("priority_exchange", "priority_routing", "这是第" + i + "条消息");
}
}
}
从当前队列发送到另一个队列中
,当前队列就是死信队列。而在RabbitMQ中,由于有交换机的概念,实际是将死信发送给了死信交换机(Dead Letter Exchange,简称DLX)。死信交换机和死信队列和普通的没有区别。# rabbitmq???
spring:
rabbitmq:
host: 192.168.70.130
port: 5672
username: admin
password: admin
virtual-host: /
#开启确认模式
publisher-confirm-type: correlated
#开始回退模式
publisher-returns: true
#????
logging:
pattern:
console: '%d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n'
@Configuration
public class RabbitmqConfig9Dead {
// 死信
private final String DEAD_EXCHANGE = "dead_exchange";
private final String DEAD_QUEUE = "dead_queue";
private final String DEAD_ROUTING = "dead_routing";
// 死信交换机
@Bean(DEAD_EXCHANGE)
public Exchange deadExchange(){
return ExchangeBuilder
.topicExchange(DEAD_EXCHANGE)
.durable(true)
.build();
}
// 死信队列
@Bean(DEAD_QUEUE)
public Queue deadQueue(){
return QueueBuilder
.durable(DEAD_QUEUE)
.build();
}
// 死信交换机绑定死信队列
@Bean
public Binding bindDeadQueue(@Qualifier(DEAD_EXCHANGE) Exchange exchange,@Qualifier(DEAD_QUEUE)Queue queue){
return BindingBuilder
.bind(queue)
.to(exchange)
.with(DEAD_ROUTING)
.noargs();
}
// 普通
private final String NORMAL_EXCHANGE = "normal_exchange";
private final String NORMAL_QUEUE = "normal_queue";
private final String NORMAL_ROUTING = "normal_routing";
// 普通交换机
@Bean(NORMAL_EXCHANGE)
public Exchange normalExchange(){
return ExchangeBuilder
.topicExchange(NORMAL_EXCHANGE)
.durable(true)
.build();
}
// 普通队列
@Bean(NORMAL_QUEUE)
public Queue normalQueue(){
return QueueBuilder
.durable(NORMAL_QUEUE)
.deadLetterExchange(DEAD_EXCHANGE) // 绑定死信交换机
.deadLetterRoutingKey(DEAD_ROUTING) // 死信队列路由关键字
.ttl(10000) // 消息存活10s
.maxLength(10) // 队列最大长度为10
.build();
}
// 普通交换机绑定普通队列
@Bean
public Binding bindNormalQueue(@Qualifier(NORMAL_EXCHANGE) Exchange exchange,@Qualifier(NORMAL_QUEUE)Queue queue){
return BindingBuilder
.bind(queue)
.to(exchange)
.with(NORMAL_ROUTING)
.noargs();
}
}
@Test
public void testDlx(){
// 存活时间过期后变成死信
// rabbitTemplate.convertAndSend("normal_exchange","normal_routing","测试死信");
// 超过队列长度后变成死信
// for (int i = 0; i < 20; i++) {
// rabbitTemplate.convertAndSend("normal_exchange","normal_routing","测试死信");
// }
// 消息拒签但不返回原队列后变成死信
rabbitTemplate.convertAndSend("normal_exchange","normal_routing","测试死信");
}
spring:
rabbitmq:
host: 192.168.70.130
port: 5672
username: admin
password: admin
virtual-host: /
#开动手动签收
listener:
simple:
acknowledge-mode: manual
#????
logging:
pattern:
console: '%d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n'
@Component
public class ConsumerDead {
@RabbitListener(queues = "normal_queue")
public void listenMessage1(Message message, Channel channel) throws IOException, InterruptedException, IOException {
// 获取消息
String s = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("消费者1"+s);
Thread.sleep(500);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
// 拒绝签收
channel.basicNack(deliveryTag,true,false);
}
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
spring:
rabbitmq:
host: 192.168.70.130
port: 5672
username: admin
password: admin
virtual-host: /
#开动手动签收
listener:
simple:
acknowledge-mode: manual
# ????
logging:
pattern:
console: '%d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n'
@Configuration
public class RabbitMQConfig {
private final String DEAD_EXCHANGE = "order_expire_exchange";
private final String DEAD_QUEUE = "order_expire_queue";
private final String DEAD_ROUTING = "order_expire_routing";
private final String ORDER_EXCHANGE = "order_exchange";
private final String ORDER_QUEUE = "order_queue";
private final String ORDER_ROUTING = "order_routing";
// 死信交换机
@Bean(DEAD_EXCHANGE)
public Exchange deadExchange(){
return ExchangeBuilder
.topicExchange(DEAD_EXCHANGE)
.durable(true)
.build();
}
// 死信队列
@Bean(DEAD_QUEUE)
public Queue deadQueue(){
return QueueBuilder
.durable(DEAD_QUEUE)
.build();
}
// 死信交换机绑定死信队列
@Bean
public Binding bindDeadQueue(@Qualifier(DEAD_EXCHANGE) Exchange exchange, @Qualifier(DEAD_QUEUE)Queue queue){
return BindingBuilder
.bind(queue)
.to(exchange)
.with(DEAD_ROUTING)
.noargs();
}
// 普通交换机
@Bean(ORDER_EXCHANGE)
public Exchange normalExchange(){
return ExchangeBuilder
.topicExchange(ORDER_EXCHANGE)
.durable(true)
.build();
}
// 普通队列
@Bean(ORDER_QUEUE)
public Queue normalQueue(){
return QueueBuilder
.durable(ORDER_QUEUE)
.deadLetterExchange(DEAD_EXCHANGE) // 绑定死信交换机
.deadLetterRoutingKey(DEAD_ROUTING) // 死信队列路由关键字
.ttl(10000) // 消息存活10s(模拟30min超时)
.build();
}
// 普通交换机绑定普通队列
@Bean
public Binding bindNormalQueue(@Qualifier(ORDER_EXCHANGE) Exchange exchange,@Qualifier(ORDER_QUEUE)Queue queue){
return BindingBuilder
.bind(queue)
.to(exchange)
.with(ORDER_ROUTING)
.noargs();
}
}
@RestController
public class OrderController {
//注入MQ
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/addOrder")
public String addOrder(){
//生成订单号
String orderNumber = "2030061812251234";
//在service层完成订单逻辑
//将订单号发送到订单mq,30分钟过期进入死信队列,死信队列消费查询订单支付状态,做对应处理
rabbitTemplate.convertAndSend("order_exchange","order_routing",orderNumber);
return "下单成功! 您的订单号为 :"+orderNumber;
}
}
@Component
public class ListenerOrder {
//监听订单过期队列
@RabbitListener(queues = "order_expire_queue")
public void orderListener(String orderId){
System.out.println("orderId = " + orderId);
//根据订单id查询订单状态是否支付
/**
* 监听死信队列的类,回去30min超时订单号,根据订单号查询订单的支付状态
* 支付:走下一步流程
* 未支付:关闭订单,库存回滚
*/
}
}
spring:
rabbitmq:
host: 192.168.70.130
port: 5672
username: admin
password: admin
virtual-host: /
#开动自动签收
listener:
simple:
acknowledge-mode: none # 默认的
# ????
logging:
pattern:
console: '%d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n'
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。