赞
踩
源码:链接: https://pan.baidu.com/s/1Kzc18jsD19w_uLq-BFWxRw?pwd=1314 提取码: 1314
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
#rabbitmq
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
这里使用的是 rabbitmq 的延时消息插件所以在声明交换机的时候需将其 type 声明为 “x-delayed-message“
@Configuration
public class RabbitMQConfig {
// 声明 1 个交换机 1个路由key 1个队列
//延迟交换机
public static final String DELAY_EXCHANGE_NAME = "delay.exchange";
//延迟队列
public static final String DELAY_QUEUE_NAME = "delay.queue";
//路由key
public static final String DELAY_QUEUE_ROUTING_KEY = "delay.queue.routing.key";
//声明延迟队列
@Bean("delayQueue")
public Queue delayQueue(){
return new Queue(DELAY_QUEUE_NAME);
}
//声明延迟交换机
@Bean("delayExchange")
public CustomExchange delayExchange(){
Map<String,Object> args = new HashMap<>();
args.put("x-delayed-type","direct");
return new CustomExchange(DELAY_EXCHANGE_NAME,"x-delayed-message",true,false,args);
}
//声明延迟队列的绑定关系
@Bean
public Binding delayBinding(@Qualifier("delayQueue") Queue queue, @Qualifier("delayExchange")CustomExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY).noargs();
}
}
@Component
public class DelayMessageProducer {
@Resource
protected RabbitTemplate rabbitTemplate;
/**
*
* @param message 消息体
* @param delayTime 延时时间
*/
public void send(String message,long delayTime){
rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME,DELAY_QUEUE_ROUTING_KEY,message,msg -> {
// msg.getMessageProperties().setDelay(delayTime);
msg.getMessageProperties().getHeaders().put("x-delay",delayTime);
return msg;
});
}
}
@Component
@Slf4j
public class DelayMessageConsumer {
//监听 DELAY_QUEUE_NAME 队列
@RabbitListener(queues = DELAY_QUEUE_NAME)
public void receive(Message message) {
// 获取消息
String msg = new String(message.getBody());
// 记录日志
log.info("延迟队列收到消息:{}", msg);
// 具体的消费逻辑
}
1.创建消息时判断传入的延时时间是否大于延时时间最大值,大于则在消息体中传入个 false 参数,小于等于则传入 true 参数,示例代码:
@Component
@Slf4j
public class PublishMessage {
protected static final int MAX_DELAY_SEGMENT = Integer.MAX_VALUE;
@Resource
private DelayMessageProducer producer;
public boolean pullMessage(String id, Date startAt, Date endAt){
Date now = new Date();
boolean flat = false;
//判断执行时间是否在当前时间之后,调用接口传入执行时间,消息体
if (startAt != null && startAt.compareTo(now) > 0){
long delayTime = startAt.getTime() - now.getTime();
if (delayTime > MAX_DELAY_SEGMENT){
log.info("开始延时时间超出,进行分段发布");
producer.send(id + "," + startAt + ",false" , MAX_DELAY_SEGMENT);
}else {
producer.send(id + "," + startAt + ",true", delayTime);
}
flat = true;
}
}
2.消费者拿出数据时,判断该参数是 true 还是 false,如果是 true 则直接执行消费逻辑,如果为 false 则需判断执行时间减去当前时间是否还大于延时时间最大值,如果还大于延时最大时间,继续传参 false ,等待下次消费,示例代码:
@RabbitListener(queues = DELAY_QUEUE_NAME)
public void receive(Message message) {
// 获取消息
String msg = new String(message.getBody());
// 记录日志
log.info("延迟队列收到消息:{}", msg);
//获取消息id 和 执行时间 ,参数
String[] split = msg.split(",");
String id = split[0];
String time = split[1];
DateTime dateTime = DateUtil.parseCST(time);
Boolean flat = Boolean.valueOf(split[2]);
if (flat){
// 消费 notice 消息
consumeNotice(id,dateTime);
}else {
long delayTime = dateTime.getTime() - System.currentTimeMillis();
// 如果还大于延时最大时间,继续传参 false ,等待下次消费
if (delayTime > MAX_DELAY_SEGMENT){
producer.send(id + "," + delayTime + ",false" , MAX_DELAY_SEGMENT);
}else {
// log.info("消息:{}",id + "," + time + ",true");
producer.send(id + "," + time + ",true" , delayTime);
}
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。