赞
踩
RabbitMQ是一个开源的消息队列中间件,实现了高效、可靠的消息传递机制。它基于AMQP(Advanced Message Queuing Protocol)协议,提供了可靠的消息传递、灵活的路由、消息持久化、集群和高可用性等特性。队列的主要作用是消除高并发访问高峰,加快网站的响应速度。
不同进程传递消息时,两个进程的耦合程度太高,修改一个进程的代码,很有可能会联动另一个进程做出相应的修改,为了隔离这两个进程,可以在它们两个之间再剥离开一层,进程之间传递消息都必须通过它,这样两个进程之间就不会相互影响
有时处理某个业务时,我们无需马上得到它处理的结果,但是在一般的情况下我们的代码是同步进行的,我们必须得到上一个调用的结果才能继续往下执行,我们就可以将那些无需马上得到响应结果的业务放到消息队列中,进行异步处理,这样就不会影响到其他业务的执行。例如:在生成订单后,将它设置一个过期时间,到了时间它会修改订单状态为关闭,但是这个业务是无需我们马上得到结果的,因此就可以放到消息队列中异步处理
有时两个进程之间传递消息时,某个进程承受消息太多,一下子无法处理完,容易造成崩溃,这时就可以在两个进程之间加上消息中间件,让它们排队有序的执行,例如在双十一期间,某件商品在九点要开始抢购,在九点就必定会有大量的请求,在这种情况下,为了防止服务器崩溃,就可以采用消息队列的流量削峰,将消息放入MQ中保存起来,根据服务器的消费能力一点点的来进行处理
使用springboot整合RabbitMQ
rabbitmq:
host: 192.168.72.166
port: 5672
username: guest
password: guest
publisher-confirm-type: correlated #开启交换机确认机制
publisher-returns: true #开启队列确认机制
listener:
simple:
acknowledge-mode: manual #默认情况下消息消费者是自动确认消息的,如果要手动确认消息则需要修改确认模式为manual
prefetch: 1 # 消费者每次从队列获取的消息数量。此属性当不设置时为:轮询分发,设置为1为:公平分发
<!--rabbitmq消息队列-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!--rabbitmq 协议-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
负责产生消息并发送到RabbitMQ的消息队列中。
消费者(Consumer):从RabbitMQ的消息队列中获取消息并进行处理。
存储消息的容器,生产者将消息发送到队列,消费者从队列中获取消息进行处理。
接收生产者发送的消息,并根据一定的规则将消息路由到一个或多个队列中。
用于将交换器和队列进行绑定,Binding 中可以包含 路由键 (routing key)指定消息的路由规则。
RabbitMQ提供了5种消息模型,但是第6种其实是RPC,并不是MQ,因此不做介绍
这五种分别是简单模式、工作队列模式、广播模式、路由模式、通配符模式,但是第三、四、五这三种都属于订阅模型,只不过交换机的类型不用、进行路由的方式不同
生产者–>队列–>消费者 没有routingkey和交换机
1.1声明队列
/**
* 简单模式
* @return
*/
@Bean
public Queue SimpleQueue(){
return new Queue("simple.queue");
}
1.2生产者发送消息
@GetMapping("/simple")
public Result simple(){
String message="我是最简单模式";
//简单模式下routingKwy等于队列名称
rabbitTemplate.convertAndSend("simple.queue",new String(message.getBytes(StandardCharsets.UTF_8)));
return Result.ok();
}
1.3消费者接收消息
@RabbitListener(queues = "simple.queue")
@SneakyThrows
public void simple(Message message, Channel channel){
System.out.println("接受的消息:"+new String(message.getBody(),"UTF-8"));
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
1.4结果展示
模式队列工作与入简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
2.1声明队列
/**
* 工作队列模式
*
* @return
*/
@Bean
public Queue workQueue() {
return new Queue("work.queue");
}
2.2生产者发送消息
@GetMapping("/work")
public Result workQueue(){
String message="我是工作队列模式";
//工作队列模式下routingKwy等于队列名称
rabbitTemplate.convertAndSend("work.queue",new String(message.getBytes(StandardCharsets.UTF_8)));
return Result.ok();
}
2.3消费者接收消息
//消费者1
@RabbitListener(queues = "work.queue")
@SneakyThrows
public void work1(Message message, Channel channel) {
System.out.println("work1接受的消息:" + new String(message.getBody(), "UTF-8"));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
//消费者2
@RabbitListener(queues = "work.queue")
@SneakyThrows
public void work2(Message message, Channel channel) {
System.out.println("work2接受的消息:" + new String(message.getBody(), "UTF-8"));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
2.4结果展示
注意:工作队列模式中两个消费者属于竞争关系,即如果生产者发送一个消息,它们之间只能有一个接收到消息,不能两个都接收到
在广播模式下,将消息交给所有绑定了指定交换机的队列,每个收听绑定过交换机的队列的消费者都能接收到消息
3.1.1生产者发送消息
/**
* 广播模式 注解模式
*/
@GetMapping("/fanout")
public Result fanoutQueue() {
String message = "我是广播模式";
//因为不需要routingKey,所以设置为空
rabbitTemplate.convertAndSend("fanout.exchange","", new String(message.getBytes(StandardCharsets.UTF_8)));
return Result.ok();
}
3.1.2.消费者消费消息 声明交换机和队列并且绑定
//QueueBinding这种方法可以在注解中创建交换机、队列还有路由并且绑定 @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "fanout.queue1"), exchange = @Exchange(value = "fanout.exchange") )) @SneakyThrows public void fanoutWork1(Message message, Channel channel) { System.out.println("work1接受的消息:" + new String(message.getBody(), "UTF-8")); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "fanout.queue2"), exchange = @Exchange(value = "fanout.exchange") )) @SneakyThrows public void fanoutWork2(Message message, Channel channel) { System.out.println("work2接受的消息:" + new String(message.getBody(), "UTF-8")); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }
3.1.3.结果展示
3.2.1生产者发送消息
/**
* 广播模式 配置类方式
*/
@GetMapping("/fanout1")
public Result fanoutQueue1() {
String message = "我是广播模式(配置类方式)";
//因为不需要routingKey,所以设置为空
rabbitTemplate.convertAndSend("fanout.exchange1", "", new String(message.getBytes(StandardCharsets.UTF_8)));
return Result.ok();
}
3.2.2声明队列和交换机以及绑定关系
/** * 广播模式 */ //交换机 @Bean public Exchange fanoutExchange(){ return new FanoutExchange("fanout.exchange1"); } //队列 3 @Bean public Queue fanoutQueue3(){ return new Queue("fanout.queue3"); } //队列 4 @Bean public Queue fanoutQueue4(){ return new Queue("fanout.queue4"); } //绑定队列3和交换机关系 @Bean public Binding fanoutBinding1(){ return BindingBuilder.bind(fanoutQueue3()).to(fanoutExchange()).with("").noargs(); } //绑定队列4和交换机关系 @Bean public Binding fanoutBinding2(){ return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange()).with("").noargs(); }
3.2.3消费者消费消息
@RabbitListener(
queues = "fanout.queue3"
)
@SneakyThrows
public void fanoutWork3(Message message, Channel channel) {
System.out.println("work3接受的消息:" + new String(message.getBody(), "UTF-8"));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
@RabbitListener(queues ="fanout.queue4")
@SneakyThrows
public void fanoutWork4(Message message, Channel channel) {
System.out.println("work4接受的消息:" + new String(message.getBody(), "UTF-8"));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
3.2.4结果展示
注意:
1.与工作队列模式不同,工作队列模式中,没有交换机,它们绑定的是同一个队列,假如有两个消费者,只有一个能够监听到消息,而在广播模式中,假如有两个消费者,这两个消费者所监听的不同队列都已经和交换机绑定,那么它们两个都能监听到消息
2.交换机只参与消息的转发,不会存储消息,如果没有任何队列和交换机绑定,或者没有符合路由规则的队列,消息将会丢失
路由模式中,队列与交换机进行绑定,必须指定routingKey,消息在向exchange发送消息时,也必须指定rouingKey,exchange不再把消息交给每一个队列,而是交给与routingKey完全符合的队列
4.1.1生产者发送消息
/**
* 路由模式 注解方式
*/
@GetMapping("/direct")
public Result directQueue1() {
String message = "我是路由模式(注解方式),以key1为routingKey";
String message1 = "我是路由模式(注解方式),以key2为routingKey";
rabbitTemplate.convertAndSend("direct.exchange", "key1", new String(message.getBytes(StandardCharsets.UTF_8)));
rabbitTemplate.convertAndSend("direct.exchange", "key2", new String(message1.getBytes(StandardCharsets.UTF_8)));
return Result.ok();
}
4.1.2.消费者消费消息 声明交换机和队列和routingKey并且绑定
/** * 路由模式 */ //QueueBinding这种方法可以在注解中创建交换机、队列还有路由并且绑定 @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "direct.queue1"), exchange = @Exchange(value = "direct.exchange"), key = {"key1"} )) @SneakyThrows public void directWork1(Message message, Channel channel) { System.out.println("work1接受的消息:" + new String(message.getBody(), "UTF-8")); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "direct.queue2"), exchange = @Exchange(value = "direct.exchange"), key = {"key2"} )) @SneakyThrows public void directWork2(Message message, Channel channel) { System.out.println("work2接受的消息:" + new String(message.getBody(), "UTF-8")); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }
4.13.结果展示
4.2.1生产者生产消息
/**
* 路由模式 配置类方式
*/
@GetMapping("/direct1")
public Result directQueue2() {
String message = "我是路由模式(配置类方式),以key3为routingKey";
String message1 = "我是路由模式(配置类方式),以key4为routingKey";
rabbitTemplate.convertAndSend("direct.exchange1", "key3", new String(message.getBytes(StandardCharsets.UTF_8)));
rabbitTemplate.convertAndSend("direct.exchange1", "key4", new String(message1.getBytes(StandardCharsets.UTF_8)));
return Result.ok();
}
4.2.2声明队列、交换机、routingKey以及绑定关系
/** *路由模式 */ //交换机 @Bean public Exchange directExchange() { return new DirectExchange("direct.exchange1"); } //队列 3 @Bean public Queue directQueue3() { return new Queue("direct.queue3"); } //队列 4 @Bean public Queue directQueue4() { return new Queue("direct.queue4"); } //绑定队列3、key3和交换机关系 @Bean public Binding fanoutBinding3() { return BindingBuilder.bind(directQueue3()).to(directExchange()).with("key3").noargs(); } //绑定队列4、key4和交换机关系 @Bean public Binding fanoutBinding4() { return BindingBuilder.bind(directQueue4()).to(directExchange()).with("key4").noargs(); }
4.2.3消费者消费消息
/** * 路由模式 配置类方式 */ @RabbitListener(queues = "direct.queue3") @SneakyThrows public void directWork3(Message message, Channel channel) { System.out.println("work3接受的消息:" + new String(message.getBody(), "UTF-8")); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } @RabbitListener(queues = "direct.queue4") @SneakyThrows public void directWork4(Message message, Channel channel) { System.out.println("work4接受的消息:" + new String(message.getBody(), "UTF-8")); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }
4.2.4结果展示
通配符模式和路由模式相比,都可以根据routingKey把消息路由到不同的队列,只不过通配符模式可以在队列绑定routingKey的时候使用通配符
通配符规则:
#:匹配零个或多个词
星号:只能匹配1个词
举例:
a.# 可以匹配 a.b 和 a.b.c
a.星号 可以匹配 a.b
5.1.1生产者生产消息
/**
* 通配符模式 注解方式
*/
@GetMapping("/topic")
public Result topic1() {
String message = "我是通配符模式(注解类方式),以a.b.c为routingKey";
String message1 = "我是通配符模式(注解方式),以1.2.3为routingKey";
rabbitTemplate.convertAndSend("topic.exchange", "a.b.c", new String(message.getBytes(StandardCharsets.UTF_8)));
rabbitTemplate.convertAndSend("topic.exchange", "1.2.3", new String(message1.getBytes(StandardCharsets.UTF_8)));
return Result.ok();
}
5.1.2消费者消费消息 声明交换机和队列和routingKey并且绑定
/** * 通配符模式 注解方式 */ //QueueBinding这种方法可以在注解中创建交换机、队列还有路由并且绑定 @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "topic.queue1"), exchange = @Exchange(value = "topic.exchange",type = ExchangeTypes.TOPIC), key = "a.#" )) @SneakyThrows public void topicWork1(Message message, Channel channel) { System.out.println("work1接受的消息 routingKey为a.#:" + new String(message.getBody(), "UTF-8")); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "topic.queue2"), exchange = @Exchange(value = "topic.exchange",type = ExchangeTypes.TOPIC), key = "a.*" )) @SneakyThrows public void topicWork2(Message message, Channel channel) { System.out.println("work2接受的消息 routingKey为a.*:" + new String(message.getBody(), "UTF-8")); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "topic.queue3"), exchange = @Exchange(value = "topic.exchange",type = ExchangeTypes.TOPIC), key = "1.#" )) @SneakyThrows public void topicWork3(Message message, Channel channel) { System.out.println("work3接受的消息 routingKey为1.#:" + new String(message.getBody(), "UTF-8")); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "topic.queue4"), exchange = @Exchange(value = "topic.exchange",type = ExchangeTypes.TOPIC), key = "1.*" )) @SneakyThrows public void topicWork4(Message message, Channel channel) { System.out.println("work4接受的消息 routingKey为1.*:" + new String(message.getBody(), "UTF-8")); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }
5.1.3结果展示
注意看,生产者发送的消息是a.b.c和1.2.3而work2是a.星 ,work4是1.星,而星号只能匹配一个字符,#号才能匹配0个或多个字符,因此只有work1和work3能接收到消息
5.2.1 生产者生产消息
/**
* 通配符模式 配置类
*/
@GetMapping("/topic1")
public Result topic2() {
String message = "我是通配符模式(配置类方式),以a.b.c为routingKey";
String message1 = "我是通配符模式(配置类方式),以1.2.3为routingKey";
rabbitTemplate.convertAndSend("topic.exchange1", "a.b.c", new String(message.getBytes(StandardCharsets.UTF_8)));
rabbitTemplate.convertAndSend("topic.exchange1", "1.2.3", new String(message1.getBytes(StandardCharsets.UTF_8)));
return Result.ok();
}
5.2.2声明队列、交换机、routingKey以及绑定关系
/** * 通配符模式 */ //交换机 @Bean public Exchange topicExchange() { return new TopicExchange("topic.exchange1"); } //队列 5 @Bean public Queue topicQueue5() { return new Queue("topic.queue5"); } //队列 6 @Bean public Queue topicQueue6() { return new Queue("topic.queue6"); } //队列 7 @Bean public Queue topicQueue7() { return new Queue("topic.queue7"); } //队列 8 @Bean public Queue topicQueue8() { return new Queue("topic.queue8"); } //绑定队列5、#.c和交换机关系 @Bean public Binding topicBinding5() { return BindingBuilder.bind(topicQueue5()).to(topicExchange()).with("#.c").noargs(); } //绑定队列6、*.c和交换机关系 @Bean public Binding topicBinding6() { return BindingBuilder.bind(topicQueue6()).to(topicExchange()).with("*.c").noargs(); } //绑定队列7、#.3和交换机关系 @Bean public Binding topicBinding7() { return BindingBuilder.bind(topicQueue7()).to(topicExchange()).with("#.3").noargs(); } //绑定队列8、*.3和交换机关系 @Bean public Binding topicBinding8() { return BindingBuilder.bind(topicQueue8()).to(topicExchange()).with("*.3").noargs(); }
5.2.3消费者消费消息
/** * 通配符模式 配置类方式 */ @RabbitListener(queues = "topic.queue5") @SneakyThrows public void topicWork5(Message message, Channel channel) { System.out.println("work5接受的消息 routingKey为#.c:" + new String(message.getBody(), "UTF-8")); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } @RabbitListener(queues = "topic.queue6") @SneakyThrows public void topicWork6(Message message, Channel channel) { System.out.println("work6接受的消息 routingKey为*.c:" + new String(message.getBody(), "UTF-8")); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } @RabbitListener(queues = "topic.queue7") @SneakyThrows public void topicWork7(Message message, Channel channel) { System.out.println("work7接受的消息 routingKey为#.3:" + new String(message.getBody(), "UTF-8")); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } @RabbitListener(queues = "topic.queue8") @SneakyThrows public void topicWork8(Message message, Channel channel) { System.out.println("work8接受的消息 routingKey为*.3:" + new String(message.getBody(), "UTF-8")); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }
5.2.4结果展示
作为消息发送方,我们肯定希望可以杜绝消息丢失或者消息发送失败的情况,有以下方式可以确保消息的可靠性投递。
1.持久化消息:通过将消息标记为持久化,可以确保在RabbitMQ服务器重启或崩溃后,消息不会丢失。可以在发布消息时设置消息的delivery_mode属性为2来实现消息的持久化。
2.持久化队列:将队列标记为持久化,可以确保在RabbitMQ服务器重启或崩溃后,队列不会丢失。可以在声明队列时设置durable参数为true来实现队列的持久化。
3.事务机制:RabbitMQ支持事务机制,可以将一组操作作为一个原子操作进行提交或回滚。通过使用事务机制,可以确保消息在发送和确认之间不会丢失。但是,事务机制会降低系统的吞吐量,因此在性能要求较高的场景下,建议使用确认模式。
4.消费端确认模式:通过使用确认模式,可以确保消息在发送和接收之间的可靠传输。RabbitMQ提供了两种确认模式:确认模式(acknowledgement mode)和事务模式(transaction mode)。确认模式是默认的模式,它通过发送确认消息来确认消息的可靠投递。可以使用channel.basicAck()方法来手动确认消息的接收。另外,还可以设置channel.basicQos()方法来限制消费者一次接收的消息数量,从而提高系统的吞吐量。
5.发送端确认模式:有时消息发出去,但是我们并不知道消息是否成功到达了rabbitmq,如果由于网络等原因导致业务成功而消息发送失败,那么发送方将出现不一致的问题,此时可以使用rabbitmq的发送确认功能,主要由这两个接口完成:
ConfirmCallback 确认消息是否正确到达 Exchange 中
ReturnCallback 消息没有正确到达队列时触发回调,如果正确到达队列不执行
这里主要介绍4和5的方式
在整体架构中已经介绍
2.1 封装发送消息的方法
/** * 发送消息 */ public boolean sendMessage(String exchange,String routingKey,Object message){ //将要发送的消息封装成实体类、实体类继承CorrelationData String CorrelationDataId= UUID.randomUUID().toString().replaceAll("-",""); GmallCorrelationData gmallCorrelationData = new GmallCorrelationData(); gmallCorrelationData.setId(CorrelationDataId); gmallCorrelationData.setExchange(exchange); gmallCorrelationData.setRoutingKey(routingKey); gmallCorrelationData.setMessage(message); //以UUID为键存入redis中,以便在消息发送不成功时从reids中取出重新发送消息 redisTemplate.opsForValue().set(CorrelationDataId, JSON.toJSONString(gmallCorrelationData),10, TimeUnit.MINUTES); //调用rabbit模板方法 rabbitTemplate.convertAndSend(exchange,routingKey,message,gmallCorrelationData); return true; } //实体类 @Data public class GmallCorrelationData extends CorrelationData { //交换机 private String exchange; //路由键 private String routingKey; //消息主体 private Object message; //重试次数 private int retryCount; }
2.2使用发送端确认的两个接口
@Component @Slf4j public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private RedisTemplate redisTemplate; // 修饰一个非静态的void()方法,在服务器加载Servlet的时候运行, // 并且只会被服务器执行一次在构造函数之后执行,init()方法之前执行。 @PostConstruct public void init(){ rabbitTemplate.setConfirmCallback(this); //指定confirm rabbitTemplate.setReturnCallback(this); //指定return } //在消息到达交换机时执行 @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { //判断交换机是否成功接收到消息 if (ack){ log.info("消息发送成功"+ JSON.toJSONString(correlationData)); }else{ log.info("消息发送失败"+JSON.toJSONString(cause)); //如果没有到达交换机,调用重试方法 this.retryMsg(correlationData); } } //在消息没有到达队列时执行 @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { // 反序列化对象输出 System.out.println("消息主体: " + new String(message.getBody())); System.out.println("应答码: " + replyCode); System.out.println("描述:" + replyText); System.out.println("消息使用的交换器 exchange : " + exchange); System.out.println("消息使用的路由键 routing : " + routingKey); //获取到correlationDataId String correlationDataId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation"); //从redis中取到gmallCorrelationData对象,调用重试方法 String strJSON = (String) redisTemplate.opsForValue().get(correlationDataId); GmallCorrelationData gmallCorrelationData = JSONObject.parseObject(strJSON, GmallCorrelationData.class); this.retryMsg(gmallCorrelationData); } //重试方法 private void retryMsg(CorrelationData correlationData) { GmallCorrelationData gmallCorrelationData=(GmallCorrelationData) correlationData; //获取重试次数 int retryCount = gmallCorrelationData.getRetryCount(); //如果重试次数已经超过三次就不再重试了 if (retryCount>=3){ log.error("重试次数已到,发送消息失败"+gmallCorrelationData); }else{ retryCount++; gmallCorrelationData.setRetryCount(retryCount); System.out.println("重试次数:\t"+(retryCount)); //更新缓存里的数据 因为重试次数改变了 redisTemplate.opsForValue().set(gmallCorrelationData.getId(),JSONObject.toJSONString(gmallCorrelationData),10, TimeUnit.MINUTES); //重新发送消息 rabbitTemplate.convertAndSend(gmallCorrelationData.getExchange(),gmallCorrelationData.getRoutingKey(),gmallCorrelationData.getMessage(),gmallCorrelationData); } } }
2.3测试交换机的确认模式
//生产者 /** * 测试重试方法 */ @GetMapping("test") public Result test(){ rabbitService.sendMessage("exchangeRetry.exchange","exchangeRetry.key",new String("我来测试交换机重试方法".getBytes(StandardCharsets.UTF_8))); return Result.ok(); } //消费者 @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "exchangeRetry.queue"), exchange=@Exchange(value = "exchangeRetry.exchange"), key = {"exchangeRetry.key"} )) @SneakyThrows public void exchangeRetry(Message message, Channel channel){ System.out.println(new String(message.getBody(), "UTF-8")); //这个问题来解决第四点 消费端确认模式,将它改为手动 则必须等消费端手动确认后,才会删除此条消息 //否则表示这条消息还没有被消费,例如异常或者超时的情况,代码还没有走到手动确认这一行,那就会被重新发送 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }
结果:
此时交换机和routeingkey的名字都相互匹配,因此不会触发重试方法,那么接下来将消费者的交换机名字进行改动,
此时由于无法发送到交换机,会到达confirm方法,之后触发重试机制,重试三次以后将不再重试
2.3测试队列的 退回模式
将消费者的routingKey进行改动,此时队列无法匹配,因此会进入retryMsg方法
结果:
消息主体: 我来测试交换机重试方法 应答码: 312 描述:NO_ROUTE 消息使用的交换器 exchange : exchangeRetry.exchange 消息使用的路由键 routing : exchangeRetry.key 重试次数: 1 消息主体: 我来测试交换机重试方法 应答码: 312 描述:NO_ROUTE 消息使用的交换器 exchange : exchangeRetry.exchange 消息使用的路由键 routing : exchangeRetry.key 重试次数: 2 消息主体: 我来测试交换机重试方法 应答码: 312 描述:NO_ROUTE 消息使用的交换器 exchange : exchangeRetry.exchange 消息使用的路由键 routing : exchangeRetry.key 重试次数: 3 2023-07-14 23:58:14.087 INFO [service-mq,,,] 18348 --- [nectionFactory2] c.a.g.common.config.MQProducerAckConfig : 消息发送成功{"delay":false,"delayTime":10,"exchange":"exchangeRetry.exchange","future":{"cancelled":false,"done":true},"id":"4b28b0b4cf744b80bc369e19017e50d3","message":"我来测试交换机重试方法","retryCount":0,"returnedMessage":{"body":"5oiR5p2l5rWL6K+V5Lqk5o2i5py66YeN6K+V5pa55rOV","messageProperties":{"contentEncoding":"UTF-8","contentLength":0,"contentType":"text/plain","deliveryTag":0,"finalRetryForMessageWithNoId":false,"headers":{"b3":"04e5a4508afb97ae-56b6284165025a16-1","spring_listener_return_correlation":"cbab7433-6cd3-4ae3-ad33-49e517ded98b","spring_returned_message_correlation":"4b28b0b4cf744b80bc369e19017e50d3"},"lastInBatch":false,"priority":0,"publishSequenceNumber":0,"receivedDeliveryMode":"PERSISTENT","receivedExchange":"exchangeRetry.exchange","receivedRoutingKey":"exchangeRetry.key","redelivered":false}},"routingKey":"exchangeRetry.key"} 2023-07-14 23:58:14.089 INFO [service-mq,,,] 18348 --- [nectionFactory4] c.a.g.common.config.MQProducerAckConfig : 消息发送成功{"delay":false,"delayTime":10,"exchange":"exchangeRetry.exchange","future":{"cancelled":false,"done":true},"id":"4b28b0b4cf744b80bc369e19017e50d3","message":"我来测试交换机重试方法","retryCount":2,"returnedMessage":{"body":"5oiR5p2l5rWL6K+V5Lqk5o2i5py66YeN6K+V5pa55rOV","messageProperties":{"contentEncoding":"UTF-8","contentLength":0,"contentType":"text/plain","deliveryTag":0,"finalRetryForMessageWithNoId":false,"headers":{"b3":"0ee0469c9b91e90a-0ee0469c9b91e90a-0","spring_listener_return_correlation":"cbab7433-6cd3-4ae3-ad33-49e517ded98b","spring_returned_message_correlation":"4b28b0b4cf744b80bc369e19017e50d3"},"lastInBatch":false,"priority":0,"publishSequenceNumber":0,"receivedDeliveryMode":"PERSISTENT","receivedExchange":"exchangeRetry.exchange","receivedRoutingKey":"exchangeRetry.key","redelivered":false}},"routingKey":"exchangeRetry.key"} 2023-07-14 23:58:14.089 INFO [service-mq,,,] 18348 --- [nectionFactory3] c.a.g.common.config.MQProducerAckConfig : 消息发送成功{"delay":false,"delayTime":10,"exchange":"exchangeRetry.exchange","future":{"cancelled":false,"done":true},"id":"4b28b0b4cf744b80bc369e19017e50d3","message":"我来测试交换机重试方法","retryCount":1,"returnedMessage":{"body":"5oiR5p2l5rWL6K+V5Lqk5o2i5py66YeN6K+V5pa55rOV","messageProperties":{"contentEncoding":"UTF-8","contentLength":0,"contentType":"text/plain","deliveryTag":0,"finalRetryForMessageWithNoId":false,"headers":{"b3":"0ecf4d9ea6f5a8b4-0ecf4d9ea6f5a8b4-1","spring_listener_return_correlation":"cbab7433-6cd3-4ae3-ad33-49e517ded98b","spring_returned_message_correlation":"4b28b0b4cf744b80bc369e19017e50d3"},"lastInBatch":false,"priority":0,"publishSequenceNumber":0,"receivedDeliveryMode":"PERSISTENT","receivedExchange":"exchangeRetry.exchange","receivedRoutingKey":"exchangeRetry.key","redelivered":false}},"routingKey":"exchangeRetry.key"} 消息主体: 我来测试交换机重试方法 应答码: 312 描述:NO_ROUTE 消息使用的交换器 exchange : exchangeRetry.exchange 消息使用的路由键 routing : exchangeRetry.key 2023-07-14 23:58:14.093 ERROR [service-mq,,,] 18348 --- [nectionFactory3] c.a.g.common.config.MQProducerAckConfig : 重试次数已到,发送消息失败GmallCorrelationData(exchange=exchangeRetry.exchange, routingKey=exchangeRetry.key, message=我来测试交换机重试方法, retryCount=3, isDelay=false, delayTime=10) 2023-07-14 23:58:14.093 INFO [service-mq,,,] 18348 --- [nectionFactory4] c.a.g.common.config.MQProducerAckConfig : 消息发送成功{"delay":false,"delayTime":10,"exchange":"exchangeRetry.exchange","future":{"cancelled":false,"done":true},"id":"4b28b0b4cf744b80bc369e19017e50d3","message":"我来测试交换机重试方法","retryCount":3,"returnedMessage":{"body":"5oiR5p2l5rWL6K+V5Lqk5o2i5py66YeN6K+V5pa55rOV","messageProperties":{"contentEncoding":"UTF-8","contentLength":0,"contentType":"text/plain","deliveryTag":0,"finalRetryForMessageWithNoId":false,"headers":{"b3":"840da5305e393ccb-840da5305e393ccb-0","spring_listener_return_correlation":"cbab7433-6cd3-4ae3-ad33-49e517ded98b","spring_returned_message_correlation":"4b28b0b4cf744b80bc369e19017e50d3"},"lastInBatch":false,"priority":0,"publishSequenceNumber":0,"receivedDeliveryMode":"PERSISTENT","receivedExchange":"exchangeRetry.exchange","receivedRoutingKey":"exchangeRetry.key","redelivered":false}},"routingKey":"exchangeRetry.key"}
可以发现重试了三次后不再重试
应用场景:例如在电商中,订单生成后,如果两个小时之内未支付,就关闭这个订单。此时就可以用延迟队列来实现
RabbitMQ可以通过使用死信队列(Dead Letter Queue)来实现延迟消息的功能.我们需要注意两个问题
1.设置消息的TTL,即消息的存活时间,例如我们创建一个消息队列,在参数中设置x-message-ttl 为10000,那所在这个队列的消息将会在五秒后被消失
2.死信交换机,一个消息在死后会进入死信路由,一般来说满足以下条件会进入:
①:消息的TTl到了,消息过期了
②消息被消费者拒收
③队列长度满了,最先进入的消息会被扔掉
1.1测试死信队列
//配置死信队列 @Configuration public class DeadLetterMqConfig { // 声明一些变量 public static final String exchange_dead = "exchange.dead"; public static final String routing_dead_1 = "routing.dead.1"; public static final String routing_dead_2 = "routing.dead.2"; public static final String queue_dead_1 = "queue.dead.1"; public static final String queue_dead_2 = "queue.dead.2"; //定义交换机 @Bean public Exchange exchange(){ return new DirectExchange(exchange_dead,true,false); } //定义队列1 @Bean public Queue queue1(){ Map<String,Object> map=new HashMap<>(); map.put("x-dead-letter-exchange",exchange_dead); map.put("x-dead-letter-routing-key",routing_dead_2); map.put("x-message-ttl",10*1000); return new Queue(queue_dead_1,true,false,false,map); } //定义绑定关系 @Bean public Binding binding(){ return BindingBuilder.bind(queue1()).to(exchange()).with(routing_dead_1).noargs(); } //定义队列2 @Bean public Queue queue2(){ return new Queue(queue_dead_2,true,false,false); } //定义绑定关系 @Bean public Binding binding1(){ return BindingBuilder.bind(queue2()).to(exchange()).with(routing_dead_2).noargs(); } } //生产者发送消息 /** * 延迟消息发送基于死信队列 */ @GetMapping("sendDeadLettle") public Result sendDeadLettle() { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); rabbitService.sendMessage(DeadLetterMqConfig.exchange_dead, DeadLetterMqConfig.routing_dead_1, "ok"); System.out.println(sdf.format(new Date()) + " Delay sent."); return Result.ok(); } //消费者消费消息 @RabbitListener( queues = DeadLetterMqConfig.queue_dead_2 ) public void getDeadLetter(String msg){ System.out.println("Receive:"+msg); SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("延迟队列接受消息的时间:"+sdf.format(new Date())); }
结果:
可以发现,消息在十秒后到达了死亡队列2
在linux中
2.1代码实现
//定义延迟插件的交换机 队列 routingKey 绑定关系 @Configuration public class DelayedMqConfig { public static final String exchange_delay = "exchange.delay"; public static final String routing_delay = "routing.delay"; public static final String queue_delay_1 = "queue.delay.1"; //定义延迟队列 @Bean public Queue delayQueue1(){ return new Queue(queue_delay_1,true); } //定义延迟交换机 @Bean public CustomExchange delayExchange(){ Map<String,Object> map=new HashMap<>(); map.put("x-delayed-type","direct"); return new CustomExchange(exchange_delay,"x-delayed-message",true,false,map); } //绑定关系 @Bean public Binding delayBinding(){ return BindingBuilder.bind(delayQueue1()).to(delayExchange()).with(routing_delay).noargs(); } } //生产者发送消息 /** * 延迟消息基于延迟插件 */ @GetMapping("sendelay1") public Result sendDelay1() { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); this.rabbitTemplate.convertAndSend(DelayedMqConfig.exchange_delay, DelayedMqConfig.routing_delay, "hello", new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setDelay(10 * 1000); System.out.println(sdf.format(new Date()) + " Delay sent."); return message; } }); return Result.ok(); } //消费者消费消息 @SneakyThrows @RabbitListener(queues = DelayedMqConfig.queue_delay_1) public void get(String msg, Message message, Channel channel) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("消息到达时间: " + sdf.format(new Date()) + " 消息内容." + msg); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); }
结果:
可以看出,消息在十秒后,被消费者监听到
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。