赞
踩
在很多软件系统功能中都会出现定时任务的业务场景,比如提前点单,比如定时发布动态,文章等而出现这样的的定时的任务为延迟队任务
任务的持久化一般都需要建立一个任务表和任务日志表,避免宕机导致任务失效,先新建立一个数据库,创建基本的任务表和任务日志表,其中的parameter设置为longbite,是防止消息使用json传递消耗较大,节省资源
参数是在任务中存储操作任务需要的关键信息,设置为比特位是更节省资源
模拟业务文章定时发布,前端传递了一个文章对象,其中包含了预期发布的时间
传递的文章dto中包含了对应的文章信息,我们需要做的就是根据审核内容进行保存在文章表,设置是否上架到用户端,如果携带的发布时间比现在要大,说明是延迟任务,那么此时就涉及到延迟任务的实现
主要是接收应用的http请求
@RestController @RequestMapping("/api/v1/news") public class NewsController { @Autowired NewsService wmNewsService; @PostMapping("/list") public ResponseResult findAll(@RequestBody WmNewsPageReqDto dto){ return wmNewsService.findAll(dto); } /** * 文章的提交 */ @PostMapping("/submit") public ResponseResult submit(@RequestBody WmNewsDto dto){ return wmNewsService.submitNews(dto); } }
用于处理文章提交的逻辑
@Service @Slf4j @Transactional public class WmNewsServiceImpl extends ServiceImpl<WmNewsMapper, WmNews> implements WmNewsService { /** * 自动扫描审核文章业务 这里不是重点不用关注 / @Autowired WmTaskService taskService;// /** * 发布修改文章或保存为草稿 * @param dto * @return */ @Override public ResponseResult submitNews(WmNewsDto dto) { //0.条件判断 if(dto == null || dto.getContent() == null){ return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID); } //1.保存或修改文章 WmNews wmNews = new WmNews(); //属性拷贝 属性名词和类型相同才能拷贝 BeanUtils.copyProperties(dto,wmNews); //封面图片 list---> string if(dto.getImages() != null && dto.getImages().size() > 0){ //[1dddfsd.jpg,sdlfjldk.jpg]--> 1dddfsd.jpg,sdlfjldk.jpg String imageStr = StringUtils.join(dto.getImages(), ","); wmNews.setImages(imageStr); } //如果当前封面类型为自动 -1 if(dto.getType().equals(WemediaConstants.WM_NEWS_TYPE_AUTO)){ wmNews.setType(null); } saveOrUpdateWmNews(wmNews); //2.判断是否为草稿 如果为草稿结束当前方法 if(dto.getStatus().equals(WmNews.Status.NORMAL.getCode())){ return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS); } //3.不是草稿,保存文章内容图片与素材的关系 //获取到文章内容中的图片信息 List<String> materials = ectractUrlInfo(dto.getContent()); saveRelativeInfoForContent(materials,wmNews.getId()); //4.不是草稿,保存文章封面图片与素材的关系,如果当前布局是自动,需要匹配封面图片 saveRelativeInfoForCover(dto,wmNews,materials); /* * *上面都不用看,是对文章的处理逻辑, *当对文章完成处理后 */ //上面的一大堆都是对文章的处理逻辑 if (wmNews.getId()!=null){ taskService.addNewsToTask(wmNews.getId(),wmNews.getPublishTime()); } return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS); } } }
文章处理的逻辑并不重要,重要的是完成对延迟任务的实现
所以重点关注定时任务,原本的模块是管理端完成文章的处理–>客户端的文章上架(保存到数据库还是修改库不需要关注),现在变成了:管理端完成了–>定时任务的处理–>时间到了–>客户端文章的处理,所以我们需要关注的就是任务处理的部分,因为这里假设的场景是微服务的场景下,所以就需要新建一个定时任务模块,并且将上述的taskService改为feign模块的远程调用接口
<!--spring data redis & cache-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- redis依赖commons-pool 这个依赖一定要添加 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
</dependency>
2.实现思路
添加一个任务对象到数据库,然后进行判断,如果执行时间<=当前的时间说明需要放入list队列,等待被消费者消费,如果大于当前时间,但是小于设置预设时间(如果大于预设时间,说明任务举例处理还很遥远, 不对其进行处理避免多余的io操作),说明是将要消费任务,使用set数据类型进行存储,并且使用时间错作为score,这样做到排序的效果
@Component public class CacheService extends CachingConfigurerSupport { @Autowired private StringRedisTemplate stringRedisTemplate; public StringRedisTemplate getstringRedisTemplate() { return this.stringRedisTemplate; } /** -------------------key相关操作--------------------- */ /** * 删除key * * @param key */ public void delete(String key) { stringRedisTemplate.delete(key); } /** * 批量删除key * * @param keys */ public void delete(Collection<String> keys) { stringRedisTemplate.delete(keys); } /** * 序列化key * * @param key * @return */ public byte[] dump(String key) { return stringRedisTemplate.dump(key); } /** * 是否存在key * * @param key * @return */ public Boolean exists(String key) { return stringRedisTemplate.hasKey(key); } /** * 设置过期时间 * * @param key * @param timeout * @param unit * @return */ public Boolean expire(String key, long timeout, TimeUnit unit) { return stringRedisTemplate.expire(key, timeout, unit); } /** * 设置过期时间 * * @param key * @param date * @return */ public Boolean expireAt(String key, Date date) { return stringRedisTemplate.expireAt(key, date); } /** * 查找匹配的key * * @param pattern * @return */ public Set<String> keys(String pattern) { return stringRedisTemplate.keys(pattern); } /** * 将当前数据库的 key 移动到给定的数据库 db 当中 * * @param key * @param dbIndex * @return */ public Boolean move(String key, int dbIndex) { return stringRedisTemplate.move(key, dbIndex); } /** * 移除 key 的过期时间,key 将持久保持 * * @param key * @return */ public Boolean persist(String key) { return stringRedisTemplate.persist(key); } /** * 返回 key 的剩余的过期时间 * * @param key * @param unit * @return */ public Long getExpire(String key, TimeUnit unit) { return stringRedisTemplate.getExpire(key, unit); } /** * 返回 key 的剩余的过期时间 * * @param key * @return */ public Long getExpire(String key) { return stringRedisTemplate.getExpire(key); } /** * 从当前数据库中随机返回一个 key * * @return */ public String randomKey() { return stringRedisTemplate.randomKey(); } /** * 修改 key 的名称 * * @param oldKey * @param newKey */ public void rename(String oldKey, String newKey) { stringRedisTemplate.rename(oldKey, newKey); } /** * 仅当 newkey 不存在时,将 oldKey 改名为 newkey * * @param oldKey * @param newKey * @return */ public Boolean renameIfAbsent(String oldKey, String newKey) { return stringRedisTemplate.renameIfAbsent(oldKey, newKey); } /** * 返回 key 所储存的值的类型 * * @param key * @return */ public DataType type(String key) { return stringRedisTemplate.type(key); } /** -------------------string相关操作--------------------- */ /** * 设置指定 key 的值 * @param key * @param value */ public void set(String key, String value) { stringRedisTemplate.opsForValue().set(key, value); } /** * 获取指定 key 的值 * @param key * @return */ public String get(String key) { return stringRedisTemplate.opsForValue().get(key); } /** * 返回 key 中字符串值的子字符 * @param key * @param start * @param end * @return */ public String getRange(String key, long start, long end) { return stringRedisTemplate.opsForValue().get(key, start, end); } /** * 将给定 key 的值设为 value ,并返回 key 的旧值(old value) * * @param key * @param value * @return */ public String getAndSet(String key, String value) { return stringRedisTemplate.opsForValue().getAndSet(key, value); } /** * 对 key 所储存的字符串值,获取指定偏移量上的位(bit) * * @param key * @param offset * @return */ public Boolean getBit(String key, long offset) { return stringRedisTemplate.opsForValue().getBit(key, offset); } /** * 批量获取 * * @param keys * @return */ public List<String> multiGet(Collection<String> keys) { return stringRedisTemplate.opsForValue().multiGet(keys); } /** * 设置ASCII码, 字符串'a'的ASCII码是97, 转为二进制是'01100001', 此方法是将二进制第offset位值变为value * * @param key * @param * @param value * 值,true为1, false为0 * @return */ public boolean setBit(String key, long offset, boolean value) { return stringRedisTemplate.opsForValue().setBit(key, offset, value); } /** * 将值 value 关联到 key ,并将 key 的过期时间设为 timeout * * @param key * @param value * @param timeout * 过期时间 * @param unit * 时间单位, 天:TimeUnit.DAYS 小时:TimeUnit.HOURS 分钟:TimeUnit.MINUTES * 秒:TimeUnit.SECONDS 毫秒:TimeUnit.MILLISECONDS */ public void setEx(String key, String value, long timeout, TimeUnit unit) { stringRedisTemplate.opsForValue().set(key, value, timeout, unit); } /** * 只有在 key 不存在时设置 key 的值 * * @param key * @param value * @return 之前已经存在返回false,不存在返回true */ public boolean setIfAbsent(String key, String value) { return stringRedisTemplate.opsForValue().setIfAbsent(key, value); } /** * 用 value 参数覆写给定 key 所储存的字符串值,从偏移量 offset 开始 * * @param key * @param value * @param offset * 从指定位置开始覆写 */ public void setRange(String key, String value, long offset) { stringRedisTemplate.opsForValue().set(key, value, offset); } /** * 获取字符串的长度 * * @param key * @return */ public Long size(String key) { return stringRedisTemplate.opsForValue().size(key); } /** * 批量添加 * * @param maps */ public void multiSet(Map<String, String> maps) { stringRedisTemplate.opsForValue().multiSet(maps); } /** * 同时设置一个或多个 key-value 对,当且仅当所有给定 key 都不存在 * * @param maps * @return 之前已经存在返回false,不存在返回true */ public boolean multiSetIfAbsent(Map<String, String> maps) { return stringRedisTemplate.opsForValue().multiSetIfAbsent(maps); } /** * 增加(自增长), 负数则为自减 * * @param key * @param * @return */ public Long incrBy(String key, long increment) { return stringRedisTemplate.opsForValue().increment(key, increment); } /** * * @param key * @param * @return */ public Double incrByFloat(String key, double increment) { return stringRedisTemplate.opsForValue().increment(key, increment); } /** * 追加到末尾 * * @param key * @param value * @return */ public Integer append(String key, String value) { return stringRedisTemplate.opsForValue().append(key, value); } /** -------------------hash相关操作------------------------- */ /** * 获取存储在哈希表中指定字段的值 * * @param key * @param field * @return */ public Object hGet(String key, String field) { return stringRedisTemplate.opsForHash().get(key, field); } /** * 获取所有给定字段的值 * * @param key * @return */ public Map<Object, Object> hGetAll(String key) { return stringRedisTemplate.opsForHash().entries(key); } /** * 获取所有给定字段的值 * * @param key * @param fields * @return */ public List<Object> hMultiGet(String key, Collection<Object> fields) { return stringRedisTemplate.opsForHash().multiGet(key, fields); } public void hPut(String key, String hashKey, String value) { stringRedisTemplate.opsForHash().put(key, hashKey, value); } public void hPutAll(String key, Map<String, String> maps) { stringRedisTemplate.opsForHash().putAll(key, maps); } /** * 仅当hashKey不存在时才设置 * * @param key * @param hashKey * @param value * @return */ public Boolean hPutIfAbsent(String key, String hashKey, String value) { return stringRedisTemplate.opsForHash().putIfAbsent(key, hashKey, value); } /** * 加锁 * * @param name * @param expire * @return */ public String tryLock(String name, long expire) { name = name + "_lock"; String token = UUID.randomUUID().toString(); RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory(); RedisConnection conn = factory.getConnection(); try { //参考redis命令:在redis中存入数据 数据明 uuid token 为锁的名字 // 因为这个数据的名字都是name+_locak 所以每次调用加锁方法setnx 只有一个成功调用的才能才能成功的保存的一个锁数据,其他的只能无法设置 //set key value [EX seconds] [PX milliseconds] [NX|XX] Boolean result = conn.set( name.getBytes(), token.getBytes(), Expiration.from(expire, TimeUnit.MILLISECONDS), RedisStringCommands.SetOption.SET_IF_ABSENT //NX ); if (result != null && result) return token; } finally { RedisConnectionUtils.releaseConnection(conn, factory,false); } return null; } /** * 删除一个或多个哈希表字段 * * @param key * @param fields * @return */ public Long hDelete(String key, Object... fields) { return stringRedisTemplate.opsForHash().delete(key, fields); } /** * 查看哈希表 key 中,指定的字段是否存在 * * @param key * @param field * @return */ public boolean hExists(String key, String field) { return stringRedisTemplate.opsForHash().hasKey(key, field); } /** * 为哈希表 key 中的指定字段的整数值加上增量 increment * * @param key * @param field * @param increment * @return */ public Long hIncrBy(String key, Object field, long increment) { return stringRedisTemplate.opsForHash().increment(key, field, increment); } /** * 为哈希表 key 中的指定字段的整数值加上增量 increment * * @param key * @param field * @param delta * @return */ public Double hIncrByFloat(String key, Object field, double delta) { return stringRedisTemplate.opsForHash().increment(key, field, delta); } /** * 获取所有哈希表中的字段 * * @param key * @return */ public Set<Object> hKeys(String key) { return stringRedisTemplate.opsForHash().keys(key); } /** * 获取哈希表中字段的数量 * * @param key * @return */ public Long hSize(String key) { return stringRedisTemplate.opsForHash().size(key); } /** * 获取哈希表中所有值 * * @param key * @return */ public List<Object> hValues(String key) { return stringRedisTemplate.opsForHash().values(key); } /** * 迭代哈希表中的键值对 * * @param key * @param options * @return */ public Cursor<Map.Entry<Object, Object>> hScan(String key, ScanOptions options) { return stringRedisTemplate.opsForHash().scan(key, options); } /** ------------------------list相关操作---------------------------- */ /** * 通过索引获取列表中的元素 * * @param key * @param index * @return */ public String lIndex(String key, long index) { return stringRedisTemplate.opsForList().index(key, index); } /** * 获取列表指定范围内的元素 * * @param key * @param start * 开始位置, 0是开始位置 * @param end * 结束位置, -1返回所有 * @return */ public List<String> lRange(String key, long start, long end) { return stringRedisTemplate.opsForList().range(key, start, end); } /** * 存储在list头部 * * @param key * @param value * @return */ public Long lLeftPush(String key, String value) { return stringRedisTemplate.opsForList().leftPush(key, value); } /** * * @param key * @param value * @return */ public Long lLeftPushAll(String key, String... value) { return stringRedisTemplate.opsForList().leftPushAll(key, value); } /** * * @param key * @param value * @return */ public Long lLeftPushAll(String key, Collection<String> value) { return stringRedisTemplate.opsForList().leftPushAll(key, value); } /** * 当list存在的时候才加入 * * @param key * @param value * @return */ public Long lLeftPushIfPresent(String key, String value) { return stringRedisTemplate.opsForList().leftPushIfPresent(key, value); } /** * 如果pivot存在,再pivot前面添加 * * @param key * @param pivot * @param value * @return */ public Long lLeftPush(String key, String pivot, String value) { return stringRedisTemplate.opsForList().leftPush(key, pivot, value); } /** * * @param key * @param value * @return */ public Long lRightPush(String key, String value) { return stringRedisTemplate.opsForList().rightPush(key, value); } /** * * @param key * @param value * @return */ public Long lRightPushAll(String key, String... value) { return stringRedisTemplate.opsForList().rightPushAll(key, value); } /** * * @param key * @param value * @return */ public Long lRightPushAll(String key, Collection<String> value) { return stringRedisTemplate.opsForList().rightPushAll(key, value); } /** * 为已存在的列表添加值 * * @param key * @param value * @return */ public Long lRightPushIfPresent(String key, String value) { return stringRedisTemplate.opsForList().rightPushIfPresent(key, value); } /** * 在pivot元素的右边添加值 * * @param key * @param pivot * @param value * @return */ public Long lRightPush(String key, String pivot, String value) { return stringRedisTemplate.opsForList().rightPush(key, pivot, value); } /** * 通过索引设置列表元素的值 * * @param key * @param index * 位置 * @param value */ public void lSet(String key, long index, String value) { stringRedisTemplate.opsForList().set(key, index, value); } /** * 移出并获取列表的第一个元素 * * @param key * @return 删除的元素 */ public String lLeftPop(String key) { return stringRedisTemplate.opsForList().leftPop(key); } /** * 移出并获取列表的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止 * * @param key * @param timeout * 等待时间 * @param unit * 时间单位 * @return */ public String lBLeftPop(String key, long timeout, TimeUnit unit) { return stringRedisTemplate.opsForList().leftPop(key, timeout, unit); } /** * 移除并获取列表最后一个元素 * * @param key * @return 删除的元素 */ public String lRightPop(String key) { return stringRedisTemplate.opsForList().rightPop(key); } /** * 移出并获取列表的最后一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止 * * @param key * @param timeout * 等待时间 * @param unit * 时间单位 * @return */ public String lBRightPop(String key, long timeout, TimeUnit unit) { return stringRedisTemplate.opsForList().rightPop(key, timeout, unit); } /** * 移除列表的最后一个元素,并将该元素添加到另一个列表并返回 * * @param sourceKey * @param destinationKey * @return */ public String lRightPopAndLeftPush(String sourceKey, String destinationKey) { return stringRedisTemplate.opsForList().rightPopAndLeftPush(sourceKey, destinationKey); } /** * 从列表中弹出一个值,将弹出的元素插入到另外一个列表中并返回它; 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止 * * @param sourceKey * @param destinationKey * @param timeout * @param unit * @return */ public String lBRightPopAndLeftPush(String sourceKey, String destinationKey, long timeout, TimeUnit unit) { return stringRedisTemplate.opsForList().rightPopAndLeftPush(sourceKey, destinationKey, timeout, unit); } /** * 删除集合中值等于value得元素 * * @param key * @param index * index=0, 删除所有值等于value的元素; index>0, 从头部开始删除第一个值等于value的元素; * index<0, 从尾部开始删除第一个值等于value的元素; * @param value * @return */ public Long lRemove(String key, long index, String value) { return stringRedisTemplate.opsForList().remove(key, index, value); } /** * 裁剪list * * @param key * @param start * @param end */ public void lTrim(String key, long start, long end) { stringRedisTemplate.opsForList().trim(key, start, end); } /** * 获取列表长度 * * @param key * @return */ public Long lLen(String key) { return stringRedisTemplate.opsForList().size(key); } /** --------------------set相关操作-------------------------- */ /** * set添加元素 * * @param key * @param values * @return */ public Long sAdd(String key, String... values) { return stringRedisTemplate.opsForSet().add(key, values); } /** * set移除元素 * * @param key * @param values * @return */ public Long sRemove(String key, Object... values) { return stringRedisTemplate.opsForSet().remove(key, values); } /** * 移除并返回集合的一个随机元素 * * @param key * @return */ public String sPop(String key) { return stringRedisTemplate.opsForSet().pop(key); } /** * 将元素value从一个集合移到另一个集合 * * @param key * @param value * @param destKey * @return */ public Boolean sMove(String key, String value, String destKey) { return stringRedisTemplate.opsForSet().move(key, value, destKey); } /** * 获取集合的大小 * * @param key * @return */ public Long sSize(String key) { return stringRedisTemplate.opsForSet().size(key); } /** * 判断集合是否包含value * * @param key * @param value * @return */ public Boolean sIsMember(String key, Object value) { return stringRedisTemplate.opsForSet().isMember(key, value); } /** * 获取两个集合的交集 * * @param key * @param otherKey * @return */ public Set<String> sIntersect(String key, String otherKey) { return stringRedisTemplate.opsForSet().intersect(key, otherKey); } /** * 获取key集合与多个集合的交集 * * @param key * @param otherKeys * @return */ public Set<String> sIntersect(String key, Collection<String> otherKeys) { return stringRedisTemplate.opsForSet().intersect(key, otherKeys); } /** * key集合与otherKey集合的交集存储到destKey集合中 * * @param key * @param otherKey * @param destKey * @return */ public Long sIntersectAndStore(String key, String otherKey, String destKey) { return stringRedisTemplate.opsForSet().intersectAndStore(key, otherKey, destKey); } /** * key集合与多个集合的交集存储到destKey集合中 * * @param key * @param otherKeys * @param destKey * @return */ public Long sIntersectAndStore(String key, Collection<String> otherKeys, String destKey) { return stringRedisTemplate.opsForSet().intersectAndStore(key, otherKeys, destKey); } /** * 获取两个集合的并集 * * @param key * @param otherKeys * @return */ public Set<String> sUnion(String key, String otherKeys) { return stringRedisTemplate.opsForSet().union(key, otherKeys); } /** * 获取key集合与多个集合的并集 * * @param key * @param otherKeys * @return */ public Set<String> sUnion(String key, Collection<String> otherKeys) { return stringRedisTemplate.opsForSet().union(key, otherKeys); } /** * key集合与otherKey集合的并集存储到destKey中 * * @param key * @param otherKey * @param destKey * @return */ public Long sUnionAndStore(String key, String otherKey, String destKey) { return stringRedisTemplate.opsForSet().unionAndStore(key, otherKey, destKey); } /** * key集合与多个集合的并集存储到destKey中 * * @param key * @param otherKeys * @param destKey * @return */ public Long sUnionAndStore(String key, Collection<String> otherKeys, String destKey) { return stringRedisTemplate.opsForSet().unionAndStore(key, otherKeys, destKey); } /** * 获取两个集合的差集 * * @param key * @param otherKey * @return */ public Set<String> sDifference(String key, String otherKey) { return stringRedisTemplate.opsForSet().difference(key, otherKey); } /** * 获取key集合与多个集合的差集 * * @param key * @param otherKeys * @return */ public Set<String> sDifference(String key, Collection<String> otherKeys) { return stringRedisTemplate.opsForSet().difference(key, otherKeys); } /** * key集合与otherKey集合的差集存储到destKey中 * * @param key * @param otherKey * @param destKey * @return */ public Long sDifference(String key, String otherKey, String destKey) { return stringRedisTemplate.opsForSet().differenceAndStore(key, otherKey, destKey); } /** * key集合与多个集合的差集存储到destKey中 * * @param key * @param otherKeys * @param destKey * @return */ public Long sDifference(String key, Collection<String> otherKeys, String destKey) { return stringRedisTemplate.opsForSet().differenceAndStore(key, otherKeys, destKey); } /** * 获取集合所有元素 * * @param key * @param * @param * @return */ public Set<String> setMembers(String key) { return stringRedisTemplate.opsForSet().members(key); } /** * 随机获取集合中的一个元素 * * @param key * @return */ public String sRandomMember(String key) { return stringRedisTemplate.opsForSet().randomMember(key); } /** * 随机获取集合中count个元素 * * @param key * @param count * @return */ public List<String> sRandomMembers(String key, long count) { return stringRedisTemplate.opsForSet().randomMembers(key, count); } /** * 随机获取集合中count个元素并且去除重复的 * * @param key * @param count * @return */ public Set<String> sDistinctRandomMembers(String key, long count) { return stringRedisTemplate.opsForSet().distinctRandomMembers(key, count); } /** * * @param key * @param options * @return */ public Cursor<String> sScan(String key, ScanOptions options) { return stringRedisTemplate.opsForSet().scan(key, options); } /**------------------zSet相关操作--------------------------------*/ /** * 添加元素,有序集合是按照元素的score值由小到大排列 * * @param key * @param value * @param score * @return */ public Boolean zAdd(String key, String value, double score) { return stringRedisTemplate.opsForZSet().add(key, value, score); } /** * * @param key * @param values * @return */ public Long zAdd(String key, Set<TypedTuple<String>> values) { return stringRedisTemplate.opsForZSet().add(key, values); } /** * * @param key * @param values * @return */ public Long zRemove(String key, Object... values) { return stringRedisTemplate.opsForZSet().remove(key, values); } public Long zRemove(String key, Collection<String> values) { if(values!=null&&!values.isEmpty()){ Object[] objs = values.toArray(new Object[values.size()]); return stringRedisTemplate.opsForZSet().remove(key, objs); } return 0L; } /** * 增加元素的score值,并返回增加后的值 * * @param key * @param value * @param delta * @return */ public Double zIncrementScore(String key, String value, double delta) { return stringRedisTemplate.opsForZSet().incrementScore(key, value, delta); } /** * 返回元素在集合的排名,有序集合是按照元素的score值由小到大排列 * * @param key * @param value * @return 0表示第一位 */ public Long zRank(String key, Object value) { return stringRedisTemplate.opsForZSet().rank(key, value); } /** * 返回元素在集合的排名,按元素的score值由大到小排列 * * @param key * @param value * @return */ public Long zReverseRank(String key, Object value) { return stringRedisTemplate.opsForZSet().reverseRank(key, value); } /** * 获取集合的元素, 从小到大排序 * * @param key * @param start * 开始位置 * @param end * 结束位置, -1查询所有 * @return */ public Set<String> zRange(String key, long start, long end) { return stringRedisTemplate.opsForZSet().range(key, start, end); } /** * 获取zset集合的所有元素, 从小到大排序 * */ public Set<String> zRangeAll(String key) { return zRange(key,0,-1); } /** * 获取集合元素, 并且把score值也获取 * * @param key * @param start * @param end * @return */ public Set<TypedTuple<String>> zRangeWithScores(String key, long start, long end) { return stringRedisTemplate.opsForZSet().rangeWithScores(key, start, end); } /** * 根据Score值查询集合元素 * * @param key * @param min * 最小值 * @param max * 最大值 * @return */ public Set<String> zRangeByScore(String key, double min, double max) { return stringRedisTemplate.opsForZSet().rangeByScore(key, min, max); } /** * 根据Score值查询集合元素, 从小到大排序 * * @param key * @param min * 最小值 * @param max * 最大值 * @return */ public Set<TypedTuple<String>> zRangeByScoreWithScores(String key, double min, double max) { return stringRedisTemplate.opsForZSet().rangeByScoreWithScores(key, min, max); } /** * * @param key * @param min * @param max * @param start * @param end * @return */ public Set<TypedTuple<String>> zRangeByScoreWithScores(String key, double min, double max, long start, long end) { return stringRedisTemplate.opsForZSet().rangeByScoreWithScores(key, min, max, start, end); } /** * 获取集合的元素, 从大到小排序 * * @param key * @param start * @param end * @return */ public Set<String> zReverseRange(String key, long start, long end) { return stringRedisTemplate.opsForZSet().reverseRange(key, start, end); } public Set<String> zReverseRangeByScore(String key, long min, long max) { return stringRedisTemplate.opsForZSet().reverseRangeByScore(key, min, max); } /** * 获取集合的元素, 从大到小排序, 并返回score值 * * @param key * @param start * @param end * @return */ public Set<TypedTuple<String>> zReverseRangeWithScores(String key, long start, long end) { return stringRedisTemplate.opsForZSet().reverseRangeWithScores(key, start, end); } /** * 根据Score值查询集合元素, 从大到小排序 * * @param key * @param min * @param max * @return */ public Set<String> zReverseRangeByScore(String key, double min, double max) { return stringRedisTemplate.opsForZSet().reverseRangeByScore(key, min, max); } /** * 根据Score值查询集合元素, 从大到小排序 * * @param key * @param min * @param max * @return */ public Set<TypedTuple<String>> zReverseRangeByScoreWithScores( String key, double min, double max) { return stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, min, max); } /** * * @param key * @param min * @param max * @param start * @param end * @return */ public Set<String> zReverseRangeByScore(String key, double min, double max, long start, long end) { return stringRedisTemplate.opsForZSet().reverseRangeByScore(key, min, max, start, end); } /** * 根据score值获取集合元素数量 * * @param key * @param min * @param max * @return */ public Long zCount(String key, double min, double max) { return stringRedisTemplate.opsForZSet().count(key, min, max); } /** * 获取集合大小 * * @param key * @return */ public Long zSize(String key) { return stringRedisTemplate.opsForZSet().size(key); } /** * 获取集合大小 * * @param key * @return */ public Long zZCard(String key) { return stringRedisTemplate.opsForZSet().zCard(key); } /** * 获取集合中value元素的score值 * * @param key * @param value * @return */ public Double zScore(String key, Object value) { return stringRedisTemplate.opsForZSet().score(key, value); } /** * 移除指定索引位置的成员 * * @param key * @param start * @param end * @return */ public Long zRemoveRange(String key, long start, long end) { return stringRedisTemplate.opsForZSet().removeRange(key, start, end); } /** * 根据指定的score值的范围来移除成员 * * @param key * @param min * @param max * @return */ public Long zRemoveRangeByScore(String key, double min, double max) { return stringRedisTemplate.opsForZSet().removeRangeByScore(key, min, max); } /** * 获取key和otherKey的并集并存储在destKey中 * * @param key * @param otherKey * @param destKey * @return */ public Long zUnionAndStore(String key, String otherKey, String destKey) { return stringRedisTemplate.opsForZSet().unionAndStore(key, otherKey, destKey); } /** * * @param key * @param otherKeys * @param destKey * @return */ public Long zUnionAndStore(String key, Collection<String> otherKeys, String destKey) { return stringRedisTemplate.opsForZSet() .unionAndStore(key, otherKeys, destKey); } /** * 交集 * * @param key * @param otherKey * @param destKey * @return */ public Long zIntersectAndStore(String key, String otherKey, String destKey) { return stringRedisTemplate.opsForZSet().intersectAndStore(key, otherKey, destKey); } /** * 交集 * * @param key * @param otherKeys * @param destKey * @return */ public Long zIntersectAndStore(String key, Collection<String> otherKeys, String destKey) { return stringRedisTemplate.opsForZSet().intersectAndStore(key, otherKeys, destKey); } /** * * @param key * @param options * @return */ public Cursor<TypedTuple<String>> zScan(String key, ScanOptions options) { return stringRedisTemplate.opsForZSet().scan(key, options); } /** * 扫描主键,建议使用 * @param patten * @return */ public Set<String> scan(String patten){ Set<String> keys = stringRedisTemplate.execute((RedisCallback<Set<String>>) connection -> { Set<String> result = new HashSet<>(); try (Cursor<byte[]> cursor = connection.scan(new ScanOptions.ScanOptionsBuilder() .match(patten).count(10000).build())) { while (cursor.hasNext()) { result.add(new String(cursor.next())); } } catch (IOException e) { e.printStackTrace(); } return result; }); return keys; } /** * 管道技术,提高性能 * @param type * @param values * @return */ public List<Object> lRightPushPipeline(String type,Collection<String> values){ List<Object> results = stringRedisTemplate.executePipelined(new RedisCallback<Object>() { public Object doInRedis(RedisConnection connection) throws DataAccessException { StringRedisConnection stringRedisConn = (StringRedisConnection)connection; //集合转换数组 String[] strings = values.toArray(new String[values.size()]); //直接批量发送 stringRedisConn.rPush(type, strings); return null; } }); return results; } public List<Object> refreshWithPipeline(String future_key,String topic_key,Collection<String> values){ List<Object> objects = stringRedisTemplate.executePipelined(new RedisCallback<Object>() { @Nullable @Override public Object doInRedis(RedisConnection redisConnection) throws DataAccessException { StringRedisConnection stringRedisConnection = (StringRedisConnection)redisConnection; String[] strings = values.toArray(new String[values.size()]); stringRedisConnection.rPush(topic_key,strings); stringRedisConnection.zRem(future_key,strings); return null; } }); return objects; } }
这里的场景是微服务,任务模块也是,所以所有缓存的操作在这个模块做,在使用feign 对外抛出接口,并且数据库也是单独存在
配置文件
server: port: 10001 spring: application: name: schedule cloud: nacos: discovery: server-addr: 192.168.249.132:8848 username: nacos password: nacos config: server-addr: 192.168.249.132:8848 file-extension: yaml main: allow-bean-definition-overriding: true ### 上面是bootstrap.yaml的内容,主要配置nacos和服务名 ##nacos中的配置信 spring: # redis redis: host: 192.168.249.132 password: 222222 port: 6379 datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://localhost:3306/leadnews_schedule?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false username: root password: 111111 # 设置Mapper接口所对应的XML文件位置,如果你在Mapper接口中有自定义方法,需要进行该配置 mybatis-plus: mapper-locations: classpath*:mapper/*.xml # 设置别名包扫描路径,通过该属性可以给包中的类注册别名 type-aliases-package: com.heima.leadnews.schedule.pojos
实现MybatisPlus快速完成低代码框架
因为日志log类设计到多个操作都会写入log,所以之前的log表有乐观锁的版本字段,启动类中添加Mp集合的乐观锁逻辑
@EnableScheduling @SpringBootApplication @MapperScan("com.heima.schedule.mapper") public class ScheduleApplication { public static void main(String[] args) { SpringApplication.run(ScheduleApplication.class,args); } /** * mybatis-plus乐观锁支持 * @return */ @Bean public MybatisPlusInterceptor optimisticLockerInterceptor(){ MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor(); interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor()); return interceptor; } }
指定乐观锁版本字段
在taskservice中实现业务,既然是实现延迟队列,那么一定有添加,取消,消费的基本操作
public interface TaskService extends IService<Taskinfo> {
// 返回当任务id
public Long addTask(Task task);
//todo 无法从redis删除数据
public boolean cancelTask(long taskid);
public Task poll(int type,int priority);
}
实现
@Slf4j @Service public class TaskServiceImpl extends ServiceImpl<TaskinfoMapper, Taskinfo> implements TaskService { @Autowired TaskinfoLogsMapper taskinfoLogsMapper; @Autowired CacheService cacheService; /** * 添加任务到数据库中 并且日志也需要传报 * @param task * @return */ @Transactional @Override public Long addTask(Task task) { //1.添加任务到数据库中 // 传递过来的是一个dto long tinfoId= saveSql(task); if (tinfoId!=0) { task.setTaskId(tinfoId); saveCache(task); return tinfoId; } else{ throw new RuntimeException("保存任务到数据库失败"); } //避免redis和sql不一致 } /** * 删除任务并且保证日志的更新 * @param taskid 保存的时候日志id任务id设置的同一个值 所以取数据的时候也可以这样 * @return */ /** * 取消任务 * @param taskId * @return */ @Override public boolean cancelTask(long taskId) { boolean flag = false; //删除任务,更新日志 Task task = updateDb(taskId,ScheduleConstants.EXECUTED); //删除redis的数据 if(task != null){ removeTaskFromCache(task); flag = true; } return false; } /** * 消费任务 * @param type * @param priority * @return */ @Override public Task poll(int type,int priority) { Task task = null; try { String key = type+"-"+priority; String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key); if(StringUtils.isNotBlank(task_json)){ task = JSON.parseObject(task_json, Task.class); if (task.getTaskId()!=null){ //更新数据库信息 System.out.println(task.getTaskId()); updateDb(task.getTaskId(),ScheduleConstants.EXECUTED); }else { System.out.println("没有id信息"); } } }catch (Exception e){ e.printStackTrace(); log.error("poll task exception"); } return task; } /** * 删除redis中的任务数据 * @param task */ private void removeTaskFromCache(Task task) { String key = task.getTaskType()+"-"+task.getPriority(); String s = JSON.toJSONString(task.getTaskType()+"-"+task.getPriority()+task); //这里不做区分因为之前的数据一定是小于现在时间得 cacheService.lRemove(ScheduleConstants.TOPIC+key,0,s); cacheService.zRemove(ScheduleConstants.FUTURE+key, s); } /** * 删除任务,更新任务日志状态 * @param taskId * @param status * @return */ private Task updateDb(long taskId, int status) { Task task = null; try { //删除任务 removeById(taskId); TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId); taskinfoLogs.setStatus(status); taskinfoLogsMapper.updateById(taskinfoLogs); task = new Task(); BeanUtils.copyProperties(taskinfoLogs,task); task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime()); }catch (Exception e){ log.error("task cancel exception taskid"+taskId); } return task; } /** * redis 操作必须要和数据库操作区分开保持数据一致 * 只把将要5分钟后执行的时间加入队列,如果从设置定时时间开始加, * 那么redis中将会保存很多还有一俩天才执行的任务,并且没每分钟做数据刷新的时候资源消耗变大 * @param task * @return */ public void saveCache(Task task){ // 获取5分钟后的时间 Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.MINUTE,5); long furtime = calendar.getTime().getTime(); //添加任务到redis if (task.getExecuteTime()<=System.currentTimeMillis()) { // 2.1 如果任务时间小于等于当前时间存入list 立即执行 String key=ScheduleConstants.TOPIC+ task.getTaskType()+"-"+task.getPriority(); cacheService.lLeftPush(key, JSON.toJSONString(task)); } //2.2 如果任务的执行时间大于当前时间 && 小于等于预设时间(未来5分钟) 存入zset中 else if(task.getExecuteTime()>System.currentTimeMillis()&&task.getExecuteTime()<=furtime){ String key=ScheduleConstants.FUTURE+ task.getTaskType()+"-"+task.getPriority(); cacheService.zAdd( key, JSON.toJSONString(task), task.getExecuteTime()); } } public long saveSql(Task task){ //1.添加任务到数据库中 // 传递过来的是一个dto Taskinfo taskinfo = taskTransinfo(task); long takeId=0; try{ save(taskinfo); //1.1 对日志进行保存 takeId=taskinfo.getTaskId(); TaskinfoLogs logs = InitaskLogs(taskinfo); taskinfoLogsMapper.insert(logs); return takeId; }catch (Exception e){ throw new IllegalStateException("保存任务到数据库失败"); } } //dto 和pojo 之间有些属性不能bean转换 public Taskinfo taskTransinfo(Task task) { Taskinfo taskinfo = new Taskinfo(); BeanUtils.copyProperties(task, taskinfo); //info对象中的执行时间是date 这里和long 进行转换 taskinfo.setExecuteTime(new Date(task.getExecuteTime())); return taskinfo; } /** * 日志进行初始化 * @param taskinfo * @return */ public TaskinfoLogs InitaskLogs(Taskinfo taskinfo) { TaskinfoLogs taskinfoLogs = new TaskinfoLogs(); BeanUtils.copyProperties(taskinfo, taskinfoLogs); // 初始化乐观锁的版本号 taskinfoLogs.setVersion(1); taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED); return taskinfoLogs; } /** * 数据同步刷新 * 每隔一分钟坚持数据是否快到执行时间 */ /** * 未来数据定时刷新 */ @Scheduled(cron = "0 */1 * * * ?") public void refresh(){ String token = cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30); if(StringUtils.isNotBlank(token)){ log.info("未来数据定时刷新---定时任务"); //获取所有未来数据的集合key Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*"); for (String futureKey : futureKeys) {//future_100_50 //获取当前数据的key topic String topicKey = ScheduleConstants.TOPIC+futureKey.split(ScheduleConstants.FUTURE)[1]; //按照key和分值查询符合条件的数据 Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis()); //同步数据 if(!tasks.isEmpty()){ cacheService.refreshWithPipeline(futureKey,topicKey,tasks); log.info("成功的将"+futureKey+"刷新到了"+topicKey); } } } } /** * 将数据库中到到期的未来任务同步一遍 */ @Scheduled(cron = "0 */5 * * * ?") @PostConstruct//和启动类初始化同时执行 public void refreshData() { clearCache(); Calendar calendar=Calendar.getInstance(); calendar.add(Calendar.MINUTE,5); List<Taskinfo> taskinfoList = lambdaQuery().lt(Taskinfo::getExecuteTime, calendar.getTimeInMillis()).list(); // 数据的任务添加到redis if(taskinfoList.size()>0&& taskinfoList!=null){ taskinfoList.forEach(i->{ Task task = new Task(); BeanUtils.copyProperties(i,task); task.setExecuteTime(i.getExecuteTime().getTime()); //保存到保证数据库和redis的一个同步 addTask(task); }); } log.info("数据库和redis 进行同步"); } public void clearCache(){ // 清楚缓存中的数据 Set<String> topicKeys = cacheService.scan(ScheduleConstants.TOPIC + "*"); Set<String> furtureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*"); cacheService.delete(topicKeys); cacheService.delete(furtureKeys); } }
逻辑 任务添加时候插入数据库,以及任务日志然后根据执行时间和系统设置预处理缓冲时间(这里指的是5分钟),取消任务时,删除任务数据,修改日志状态,其中redis删除数据,并且有定时刷新set队列,将时间满足的移动到list立即执行队列
对外抛出
@RequestMapping("/api/task") @RestController public class ScheduleClient implements IScheduleClient { @Autowired TaskService taskService; @RequestMapping("/add") @Override public ResponseResult addTask(@RequestBody Task task) { return ResponseResult.okResult(taskService.addTask(task)); } @GetMapping("/{taskId}") @Override public ResponseResult cancelTask(@PathVariable("taskId") long taskid) { return ResponseResult.okResult(taskService.cancelTask(taskid)); } @GetMapping("/{type}/{priority}") @Override public ResponseResult poll( @PathVariable("type") int type,@PathVariable("priority") int priority) { return ResponseResult.okResult(taskService.poll(type, priority)); } }
feign模块添加对应调用者
@FeignClient(value = "schedule") public interface IScheduleClient { /** * * @param task * @return 任务id */ @PostMapping("/api/v1/task/add") public ResponseResult addTask(@RequestBody Task task); /** * * @param taskid * @return 是否成功 */ @GetMapping("/api/v1/task/{taskId}") public ResponseResult cancelTask(@PathVariable("taskId") long taskid); @GetMapping("/api/v1/task/{type}/{priority}") public ResponseResult poll( @PathVariable("type") int type,@PathVariable("priority") int priority); }
发布文章的服务在完成文章的处理逻辑后,调用该模块的添加任务方法,根据执行时间放在哪一个队列,set/list,其中值得主义的是,数据表之间的参数字段是长比特类型,Mp映射也是,所以在其他模块需要调用该缓存模块方法时候,对传递的任务擦拭布参数进行字节序列化化调用者模块
@Override @Async public void addNewsToTask(Integer Newsid, Date published) { log.info("addNewsToTask Newsid:"+Newsid+" published:"+published); if (published == null||Newsid==null){ throw new IllegalArgumentException("传递参数不全"); } Task task = new Task(); task.setExecuteTime(published.getTime()); task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType()); task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority()); WmNews news = new WmNews(); news.setId(Newsid); task.setParameters(ProtostuffUtil.serialize(news)); schduleClient.addTask(task); log.info("addNewsToTask success"); }
在调用feign api之前需要把对应的参数准备,其中包括序列化,而这里的序列化为bite采用的是第三方库
序列化工具对比
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-core</artifactId>
</dependency>
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-runtime</artifactId>
</dependency>
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。