赞
踩
我们选用Redis
CREATE TABLE `taskinfo_logs` ( `task_id` bigint(20) NOT NULL COMMENT '任务id', `execute_time` datetime(3) NOT NULL COMMENT '执行时间', `parameters` longblob NULL COMMENT '参数', `priority` int(11) NOT NULL COMMENT '优先级', `task_type` int(11) NOT NULL COMMENT '任务类型', `version` int(11) NOT NULL COMMENT '版本号,用乐观锁', `status` int(11) NULL DEFAULT 0 COMMENT '状态 0=初始化状态 1=EXECUTED 2=CANCELLED', PRIMARY KEY (`task_id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic; CREATE TABLE `taskinfo` ( `task_id` bigint(20) NOT NULL COMMENT '任务id', `execute_time` datetime(3) NOT NULL COMMENT '执行时间', `parameters` longblob NULL COMMENT '参数', `priority` int(11) NOT NULL COMMENT '优先级', `task_type` int(11) NOT NULL COMMENT '任务类型', PRIMARY KEY (`task_id`) USING BTREE, INDEX `index_taskinfo_time`(`task_type`, `priority`, `execute_time`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
@Autowired private CacheService cacheService; @Override public long addTask(Task task) { //1.添加任务到数据库中 boolean success = addTaskToDb(task); if (success) { //2.添加任务到redis addTaskToCache(task); } return task.getTaskId(); } /** * 添加任务到数据库中 * * @param task * @return */ private boolean addTaskToDb(Task task) { boolean flag = false; try { //保存任务表 Taskinfo taskinfo = new Taskinfo(); BeanUtils.copyProperties(task, taskinfo); taskinfo.setExecuteTime(new Date(task.getExecuteTime())); taskinfoMapper.insert(taskinfo); //设置taskID task.setTaskId(taskinfo.getTaskId()); //保存任务日志数据 TaskinfoLogs taskinfoLogs = new TaskinfoLogs(); BeanUtils.copyProperties(taskinfo, taskinfoLogs); taskinfoLogs.setVersion(1); taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED); taskinfoLogsMapper.insert(taskinfoLogs); flag = true; } catch (Exception e) { e.printStackTrace(); } return flag; } /** * 把任务添加到redis中 * * @param task */ private void addTaskToCache(Task task) { String key = task.getTaskType() + "_" + task.getPriority(); //获取5分钟之后的时间 毫秒值 Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.MINUTE, 5); long nextScheduleTime = calendar.getTimeInMillis(); //2.1 如果任务的执行时间小于等于当前时间,存入list if (task.getExecuteTime() <= System.currentTimeMillis()) { cacheService.lLeftPush(ScheduleConstants.TOPIC + key, JSON.toJSONString(task)); } else if (task.getExecuteTime() <= nextScheduleTime) { //2.2 如果任务的执行时间大于当前时间 && 小于等于预设时间(未来5分钟) 存入zset中 cacheService.zAdd(ScheduleConstants.FUTURE + key, JSON.toJSONString(task), task.getExecuteTime()); } }
@Override public boolean cancelTask(long taskId) { //删除任务,更新任务日志 Task task = updateDb(taskId, ScheduleConstants.EXECUTED); //删除Redis数据 if (task != null) { removeTaskFromCache(task); } return false; } private void removeTaskFromCache(Task task) { String key = task.getTaskType() + "_" + task.getPriority(); if (task.getExecuteTime() <= System.currentTimeMillis()) { cacheService.lRemove(ScheduleConstants.TOPIC + key, 0, JSON.toJSONString(task)); } else { cacheService.zRemove(ScheduleConstants.FUTURE + key, JSON.toJSONString(task)); } } private Task updateDb(long taskId, int status) { Task task = null; try { //删除任务 taskinfoMapper.deleteById(taskId); TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId); taskinfoLogs.setStatus(status); taskinfoLogsMapper.updateById(taskinfoLogs); task = new Task(); BeanUtils.copyProperties(taskinfoLogs, task); task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime()); } catch (Exception e) { log.error("task cancel exception taskid={}", taskId); } return task; }
@Autowired private CacheService cacheService; /** * 定时刷新任务 */ @Scheduled(cron = "0 */1 * * * ?") public void refresh() { String token = cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30); if (StringUtils.isNotBlank(token)) { log.info("未来数据定时刷新---定时任务"); //获取所有未来数据的集合key Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*"); for (String futureKey : futureKeys) {//future_100_50 //获取当前数据的key topic String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1]; //按照key和分值查询符合条件的数据 Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis()); //同步数据 if (!tasks.isEmpty()) { cacheService.refreshWithPipeline(futureKey, topicKey, tasks); log.info("成功的将" + futureKey + "刷新到了" + topicKey); } } } }
@Override public Task poll(int type, int priority) { Task task = null; try { String key = type + "_" + priority; //从Redis中拉去数据 String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key); if (StringUtils.isNotBlank(task_json)) { task = JSON.parseObject(task_json, Task.class); //修改数据库信息 updateDb(task.getTaskId(), ScheduleConstants.EXECUTED); } } catch (Exception e) { e.printStackTrace(); log.error("poll task exception"); } return task; } @Scheduled(cron = "0 */5 * * * ?") @PostConstruct public void reloadData() { clearCache(); log.info("数据库数据同步到缓存"); Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.MINUTE, 5); //查看小于未来5分钟的所有任务 List<Taskinfo> allTasks = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime, calendar.getTime())); if (allTasks != null && allTasks.size() > 0) { for (Taskinfo taskinfo : allTasks) { Task task = new Task(); BeanUtils.copyProperties(taskinfo, task); task.setExecuteTime(taskinfo.getExecuteTime().getTime()); addTaskToCache(task); } } } private void clearCache() { // 删除缓存中未来数据集合和当前消费者队列的所有key Set<String> futurekeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_ Set<String> topickeys = cacheService.scan(ScheduleConstants.TOPIC + "*");// topic_ cacheService.delete(futurekeys); cacheService.delete(topickeys); }
@EnableScheduling //开启调度任务
public class ScheduleApplication {
}
import com.heima.model.common.dtos.ResponseResult; import com.heima.model.common.schedule.dtos.Task; import org.springframework.cloud.openfeign.FeignClient; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; @FeignClient("leadnews-schedule") public interface IScheduleClient { /** * 添加任务 * @param task 任务对象 * @return 任务id */ @PostMapping("/api/v1/task/add") public ResponseResult addTask(@RequestBody Task task); /** * 取消任务 * @param taskId 任务id * @return 取消结果 */ @GetMapping("/api/v1/task/cancel/{taskId}") public ResponseResult cancelTask(@PathVariable("taskId") long taskId); /** * 按照类型和优先级来拉取任务 * @param type * @param priority * @return */ @GetMapping("/api/v1/task/poll/{type}/{priority}") public ResponseResult poll(@PathVariable("type") int type,@PathVariable("priority") int priority); }
import com.heima.apis.schedule.IScheduleClient; import com.heima.model.common.dtos.ResponseResult; import com.heima.model.common.schedule.dtos.Task; import com.heima.schedule.service.TaskService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; @RestController public class ScheduleClient implements IScheduleClient { @Autowired private TaskService taskService; /** * 添加任务 * @param task 任务对象 * @return 任务id */ @PostMapping("/api/v1/task/add") @Override public ResponseResult addTask(@RequestBody Task task) { return ResponseResult.okResult(taskService.addTask(task)); } /** * 取消任务 * @param taskId 任务id * @return 取消结果 */ @GetMapping("/api/v1/task/cancel/{taskId}") @Override public ResponseResult cancelTask(@PathVariable("taskId") long taskId) { return ResponseResult.okResult(taskService.cancelTask(taskId)); } /** * 按照类型和优先级来拉取任务 * @param type * @param priority * @return */ @GetMapping("/api/v1/task/poll/{type}/{priority}") @Override public ResponseResult poll(@PathVariable("type") int type, @PathVariable("priority") int priority) { return ResponseResult.okResult(taskService.poll(type,priority)); } }
@Resource IScheduleClient scheduleClient; @Override @Async public void addNewsToTask(Integer id, Date publishTime) { log.info("添加任务到延迟服务中----begin"); Task task = new Task(); task.setExecuteTime(publishTime.getTime()); task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType()); task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority()); WmNews wmNews = new WmNews(); wmNews.setId(id); task.setParameters(ProtostuffUtil.serialize(wmNews)); scheduleClient.addTask(task); log.info("添加任务到延迟服务中----end"); }
在配置类中添加注解
@EnableAsync //开启异步调用
@EnableFeignClients(basePackages = "com.heima.apis.*")
public class WemediaApplication {
因为通过异步调用,所以要开启事务,并且Feign要扫描到
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。