赞
踩
业务中经常会遇到一些延迟执行的需求;通常想到的都是rabbitmq
或者rocketmq
的延迟消息;
但是系统中不一定集成了mq
,但为了控制分布式下的并发,一般redis
都是有集成的;
redis
的key
过期监听那个时间不准确,在集群环境下节点挂了也容易丢失;
那么用redisson
的延迟队列,正好可以用来解决轻量级的延时消息;
简单的来说就是消费者生产了一个消息任务,塞到ZSet
里(用当前时间戳+延迟时间作为分数),等时间到了,就会放到任务List中,然后消费者真正去执行任务都是从任务List
中获取任务;
redisson
中的消费者并不是一直轮询获取任务;而是有具体时间的延迟任务,时间到了去任务队列中获取任务;
注意点,在消费者监听处如果使用thread
相关操作因为redisson
的默认线程name
为redisson-netty
会抛异常,我的处理方式是把相关操作都放到自己的线程池中操作.
官方解释是在netty
线程中调用同步方法可能会导致超时;
issue
:https://github.com/redisson/redisson/issues/3549
异常见源码
org.redisson.command.CommandAsyncService.get(org.redisson.api.RFuture<V>)
版本
redisson
:redisson-spring-boot-starter-3.17.6.jar
redis
:6.2.7
生产者先将任务push
到delay_queue_timeout
等待队列中,延迟时间到了,消费者会把任务从timeout
队列挪到SANYOU
任务队列中(消费者实际获取任务的队列),然后消费者就能拿到最终要执行的任务了;
这里具体要说的就是客户端通知和获取机制;
消费者在启动时通常都会去get
一下队列,达到订阅队列的目的;
RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue("SANYOU");
RDelayedQueue<String> delayQueue = redissonClient.getDelayedQueue(blockingQueue);
这样做的目的:
消费者订阅队列,从delay_queue_timeout
等待延迟队列中将已经到达时间的任务挪到真正的任务List
队列中,然后再将delay_queue_timeout
队列中第一个(也就是第一个要执行的)的任务的时间拿到,用这个时间开启一个延迟任务,时间到了之后,会发布一个消息到时间通知channel
中;然后客户端监听到这个channel
中的消息后,会再次重复上述步骤,让delay_queue_timeout
中的任务,可以都放到真正的任务List
队列中;
这样有一个好处就是不用一直while
扫描等待,客户端的延迟任务时间和delay_queue_timeout
中的延迟时间是一样的,可以精准利用cpu
,理论上是没有延迟的,但是实际消息数量大量增加,消费者消费比较慢,还是会造成延迟任务消费延迟;
另外由于客户端都是用lua
脚本去redis
的同一个List
队列中获取任务,lua
脚本在redis
中都是原子任务,而且redis
真正的操作是单线程的,所以不会存在任务广播情况(并发获取时,一个任务不会被多个消费者同时拿到);
捞一张图片
package com.xxx.xxx.enums; /** * redis延迟队列配置,消费者初始化会从当前enum获取相关信息初始化订阅 */ public enum DelayQueueEnum { TARGET_GROUP_CONF_DELAY_PROCESS( //队列name "xxxx", //描述 "描述", //消费者bean name "test1") ; //延迟队列,redis key private String code; private String des; private String beanName; DelayQueueEnum(String code, String des, String beanName) { this.code = code; this.des = des; this.beanName = beanName; } public String getCode() { return code; } public String getDes() { return des; } public String getBeanName() { return beanName; } } //消费者接口,实现下这个方法,然后实现类维护到上面的DelayQueueEnum枚举类中即可 package com.xxx.xxx.xxx.listener; import com.xxx.xxx.xxx.xxx.DelayMessageDTO; public interface DelayQueueConsumerListener { void execute(DelayMessageDTO delayMessage); } import cn.hutool.extra.spring.SpringUtil; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RBlockingDeque; import org.redisson.api.RBlockingQueue; import org.redisson.api.RDelayedQueue; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @Slf4j @Component public class RedissonDelayQueueConfig implements InitializingBean { @Resource private RedissonClient redissonClient; //延时队列map private final Map<String, RDelayedQueue<DelayMessageDTO>> delayQueueMap = new ConcurrentHashMap<>(16); @Override public void afterPropertiesSet() { // 有新的延迟队列在这里添加,队列消费类需要继承DelayQueueConsumer DelayQueueEnum[] queueEnums = DelayQueueEnum.values(); for (DelayQueueEnum queueEnum : queueEnums) { DelayQueueConsumer delayQueueConsumer = SpringUtil.getBean(queueEnum.getBeanName()); if (delayQueueConsumer == null) { throw new ServiceException("queueName=" + queueEnum.getBeanName() + ",delayQueueConsumer=null,请检查配置..."); } // Redisson的延时队列是对另一个队列的再包装,使用时要先将延时消息添加到延时队列中,当延时队列中的消息达到设定的延时时间后, // 该延时消息才会进行进入到被包装队列中,因此,我们只需要对被包装队列进行监听即可。 RBlockingQueue<DelayMessageDTO> rBlockingQueue = redissonClient.getBlockingDeque(queueEnum.getCode()); //消费者初始化队列 RDelayedQueue<DelayMessageDTO> rDelayedQueue = redissonClient.getDelayedQueue(rBlockingQueue); //set到map中方便获取 delayQueueMap.put(queueEnum.getCode(), rDelayedQueue); // 订阅新元素的到来,调用的是takeAsync(),异步执行 rBlockingQueue.subscribeOnElements(delayQueueConsumer::execute); } } public RedissonClient getRedissonClient() { return redissonClient; } public Map<String, RDelayedQueue<DelayMessageDTO>> getDelayQueueMap() { return delayQueueMap; } } import cn.hutool.core.date.DateUtil; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RBlockingDeque; import org.redisson.api.RDelayedQueue; import org.redisson.api.RedissonClient; import org.springframework.stereotype.Component; import org.springframework.util.Assert; import javax.annotation.Resource; import java.util.Collections; import java.util.Map; import java.util.concurrent.TimeUnit; @Slf4j @Component public class DelayQueueUtil { private static RedissonDelayQueueConfig redissonDelayQueueConfig; @Resource public void setRedissonDelayQueueConfig(RedissonDelayQueueConfig redissonDelayQueueConfig) { DelayQueueUtil.redissonDelayQueueConfig = redissonDelayQueueConfig; } private static Map<String, RDelayedQueue<DelayMessageDTO>> getDelayQueueMap() { if(null == redissonDelayQueueConfig) return Collections.emptyMap(); return redissonDelayQueueConfig.getDelayQueueMap(); } private static RedissonClient getRedissonClient() { if(null == redissonDelayQueueConfig) return null; return redissonDelayQueueConfig.getRedissonClient(); } /** * 添加延迟消息 */ public static void addDelayMessage(DelayMessageDTO delayMessage) { log.info("delayMessage={}", delayMessage); Assert.isTrue(getDelayQueueMap().containsKey(delayMessage.getQueueName()), "队列不存在"); delayMessage.setCreateTime(DateUtil.now()); if(null == delayMessage.getTimeUnit()){ delayMessage.setTimeUnit(TimeUnit.SECONDS); } RDelayedQueue<DelayMessageDTO> rDelayedQueue = getDelayQueueMap().get(delayMessage.getQueueName()); //移除相同的消息 rDelayedQueue.remove(delayMessage); //添加消息 rDelayedQueue.offer(delayMessage, delayMessage.getDelayTime(), delayMessage.getTimeUnit()); } /** * 移除指定队列中的消息 */ public static void removeDelayMessage(DelayMessageDTO delayMessage) { log.info("取消:delayMessage={}", delayMessage); if (!getDelayQueueMap().containsKey(delayMessage.getQueueName())) { log.error("queueName={},该延迟队列不存在,请确认后再试...", delayMessage.getQueueName()); return; } RDelayedQueue<DelayMessageDTO> rDelayedQueue = getDelayQueueMap().get(delayMessage.getQueueName()); rDelayedQueue.remove(delayMessage); removeDelayQueue(delayMessage); } /** * 从所有队列中删除消息 */ public static void removeDelayQueue(DelayMessageDTO value) { DelayQueueEnum[] queueEnums = DelayQueueEnum.values(); for (DelayQueueEnum queueEnum : queueEnums) { RBlockingDeque<Object> blockingDeque = getRedissonClient().getBlockingDeque(queueEnum.getCode()); RDelayedQueue<Object> delayedQueue = getRedissonClient().getDelayedQueue(blockingDeque); delayedQueue.remove(value); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。