赞
踩
-- 定时任务调度表 CREATE TABLE `scheduled_task` ( `id` varchar(20) NOT NULL COMMENT 'id', `name` varchar(255) DEFAULT NULL COMMENT '定时任务名称', `task_type` varchar(100) DEFAULT NULL COMMENT '定时任务分类', `task_describe` varchar(255) DEFAULT NULL COMMENT '定时任务描述', `cron` varchar(50) NOT NULL COMMENT 'cron策略', `method` varchar(10) NOT NULL COMMENT '请求方法', `url` varchar(500) NOT NULL COMMENT '请求路径', `enable` varchar(2) NOT NULL DEFAULT '0' COMMENT '是否启用定时任务', `open_log` varchar(2) DEFAULT '0' COMMENT '是否开启日志', `create_user` varchar(40) DEFAULT NULL COMMENT '创建人', `create_time` varchar(40) DEFAULT NULL COMMENT '创建时间', `update_user` varchar(40) DEFAULT NULL COMMENT '更新人', `update_time` varchar(40) DEFAULT NULL COMMENT '更新时间', PRIMARY KEY (`id`), UNIQUE KEY `idx_url` (`url`) BLOCK_SIZE 16384 LOCAL ) COMMENT = '定时任务调度表'; -- 定时任务调度日志表 CREATE TABLE `scheduled_log` ( `id` varchar(20) NOT NULL COMMENT 'id', `task_id` varchar(20) DEFAULT NULL COMMENT '定时任务ID', `task_name` varchar(255) DEFAULT NULL COMMENT '定时任务名称', `execute_status` varchar(40) DEFAULT NULL COMMENT '执行状态', `content` longtext DEFAULT NULL COMMENT '内容', `execute_time` varchar(40) DEFAULT NULL COMMENT '执行时间', PRIMARY KEY (`id`) ) COMMENT = '定时任务调度日志表';
/**
* @ClassName JobConstant
* @Description 常量类,定时任务操作触发器的key
* @Author huxuehao
**/
public class JobConstant {
public static final String OPEN_SCHEDULE = "openSchedule";
public static final String CLOSE_SCHEDULE = "closeSchedule";
public static final String DELETE_SCHEDULE = "deleteSchedule";
}
import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; /** * @ClassName LogProperties * @Description 日志配置文件 * @Author huxuehao **/ @Data @Component @ConfigurationProperties(prefix = "task.log", ignoreUnknownFields = false) public class LogProperties { /* 启用操作成功日志 */ private boolean successOpen = true; /* 启用操作失败日志 */ private boolean failOpen = true; /* 日志保存天数 */ private int saveDays = 2; }
import lombok.Data;
import java.io.Serializable;
/**
* @ClassName BaseDto
* @Description 基础dto
* @Author huxuehao
**/
@Data
public class BaseDto implements Serializable {
private static final long serialVersionUID = -153746138274322843L;
private String createUser;
private String createTime;
private String updateUser;
private String updateTime;
}
import com.baomidou.mybatisplus.annotation.TableName; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import java.io.Serializable; /** * @ClassName ScheduleLogDto * @Description TODO * @Author huxuehao **/ @Data @AllArgsConstructor @NoArgsConstructor @TableName("scheduled_log") public class ScheduleLogDto implements Serializable { private static final long serialVersionUID = -153746138274322843L; private String id; private String taskId; private String taskName; private String executeStatus; private String content; private String executeTime; }
import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import lombok.Data; import org.springframework.scheduling.support.CronTrigger; /** * @ClassName ScheduleTask * @Description 定时任务信息表 * @Author huxuehao **/ @Data @TableName("scheduled_task") public class ScheduleTaskDto extends BaseDto { private String id; private String name; private String taskType; private String taskDescribe; private String cron; private String method; private String url; private String enable; private String openLog; // 提供转换为CronTrigger的工具方法 public CronTrigger toCronTrigger() { return new CronTrigger(this.cron); } }
import lombok.Data; /** * @ClassName ScheduleTaskPo * @Description TODO * @Author huxuehao **/ @Data public class ScheduleTaskPo extends BaseDto { private String id; private String name; private String taskType; private String taskDescribe; private String cron; private String method; private String url; private String enable; private String openLog; private String total; }
import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.studioustiger.job.entity.ScheduleLogDto; import org.apache.ibatis.annotations.Param; import org.springframework.stereotype.Repository; import java.util.List; /** * @InterfaceName ScheduleLogMapper * @Description TODO * @Author huxuehao **/ @Repository public interface ScheduleLogMapper extends BaseMapper<ScheduleLogDto> { /* 分页 */ List<ScheduleLogDto> getPage(); /* 添加 */ int add(@Param("log") ScheduleLogDto log); /* 清除日志 */ int clearLog(@Param("endTime") String endTime); /* 根据定时任务获取最新的错误日志*/ ScheduleLogDto latestLogByTask(@Param("taskId") String taskId); /* 上一条日志 */ ScheduleLogDto lastLog(String taskId, String executeTime); /* 下一条日志 */ ScheduleLogDto nextLog(String taskId, String executeTime); }
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.studioustiger.job.mapper.ScheduleLogMapper"> <resultMap id="scheduleLogMap" type="com.studioustiger.job.entity.ScheduleLogDto"> <result column="id" property="id"/> <result column="task_id" property="taskId"/> <result column="task_name" property="taskName"/> <result column="execute_status" property="executeStatus"/> <result column="content" property="content"/> <result column="execute_time" property="executeTime"/> </resultMap> <select id="getPage" parameterType="string" resultMap="scheduleLogMap"> select `id`, `task_id`, `task_name`, `execute_status`, `content`, `execute_time` from `scheduled_log` order by `name` desc, `execute_time` desc, id </select> <insert id="add"> insert into `scheduled_log`(`id`,`task_id`,`task_name`,`execute_status`,`content`,`execute_time`) values ( #{log.id}, #{log.taskId}, #{log.taskName}, #{log.executeStatus}, #{log.content}, #{log.executeTime} ) </insert> <delete id="clearLog"> delete from `scheduled_log` where `execute_time` <= #{endTime} </delete> <select id="latestLogByTask" resultMap="scheduleLogMap"> select `id`, `task_id`, `task_name`, `execute_status`, `content`, `execute_time` from `scheduled_log` where `task_id` = #{taskId} and execute_status = 'fail' order by `execute_time` desc, `id` desc limit 1 </select> <select id="lastLog" resultMap="scheduleLogMap"> select `id`, `task_id`, `task_name`, `execute_status`, `content`, `execute_time` from `scheduled_log` where `task_id` = #{taskId} and `execute_time` > #{executeTime} and `execute_status` = 'fail' order by `execute_time` asc, `id` asc limit 1 </select> <select id="nextLog" resultMap="scheduleLogMap"> select `id`, `task_id`, `task_name`, `execute_status`, `content`, `execute_time` from `scheduled_log` where `task_id` = #{taskId} and `execute_time` < #{executeTime} and `execute_status` = 'fail' order by `execute_time` desc, `id` desc limit 1 </select> </mapper>
import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.studioustiger.job.entity.ScheduleTaskDto; import com.studioustiger.job.entity.ScheduleTaskPo; import org.apache.ibatis.annotations.Param; import org.springframework.stereotype.Repository; import java.util.List; /** * @ClassName ScheduleTaskMapper * @Description TODO * @Author huxuehao **/ @Repository public interface ScheduleTaskMapper extends BaseMapper<ScheduleTaskDto> { /* 添加定时任务信息 */ int add(@Param("task") ScheduleTaskDto task); /* 更新定时任务信息 */ int update(@Param("task") ScheduleTaskDto task); /* 获取总数 */ int getTotals(@Param("taskName") String taskName, @Param("taskType") String taskType, @Param("taskStatus") String taskStatus); List<ScheduleTaskPo> getPage(@Param("current") Integer current, @Param("size") Integer size, @Param("taskName") String taskName, @Param("taskType") String taskType, @Param("taskStatus") String taskStatus); /* 批量开启定时任务信息 */ int enableByIds(@Param("tasks") List<ScheduleTaskDto> tasks); /* 批量关闭定时任务信息 */ int disableByIds(@Param("tasks") List<ScheduleTaskDto> tasks); /* 批量删除定时任务信息 */ int deleteByIds(@Param("ids") List<String> ids); /* 根据定时任务id获取最新的定时任务信息*/ ScheduleTaskPo refreshResult(@Param("taskId")String taskId); }
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.studioustiger.job.mapper.ScheduleTaskMapper"> <resultMap id="scheduleTaskMap" type="com.studioustiger.job.entity.ScheduleTaskPo"> <result column="id" property="id"/> <result column="name" property="name"/> <result column="task_type" property="taskType"/> <result column="task_describe" property="taskDescribe"/> <result column="url" property="url"/> <result column="cron" property="cron"/> <result column="method" property="method"/> <result column="enable" property="enable"/> <result column="open_log" property="openLog"/> <result column="create_user" property="createUser"/> <result column="create_time" property="createTime"/> <result column="update_user" property="updateUser"/> <result column="update_time" property="updateTime"/> <result column="total" property="total"/> </resultMap> <select id="getPage" parameterType="string" resultMap="scheduleTaskMap"> select a.`id`, a.`name`, b.`dict_label` as task_type, a.`task_describe`, a.`url`, a.`cron`, a.`method`, a.`enable`, a.`open_log`, a.`create_user`, a.`create_time`, a.`update_user`, a.`update_time`, concat(ifnull(c.fail_count,0),'/',ifnull(d.success_count,0)) as total from `scheduled_task` as a left join `sys_dict` as b on a.`task_type` = b.`dict_value` and b.`code` = 'schedule_task' and is_sealed = 0 and b.`del_flag` = 0 left join (select task_id, count(id) as fail_count from `scheduled_log` where `execute_status` = 'fail' group by task_id) c on c.task_id = a.id left join (select task_id, count(id) as success_count from `scheduled_log` where `execute_status` = 'success' group by task_id) d on d.task_id = a.id <where> <if test="taskName != null and taskName != ''"> and a.`name` like concat('%',#{taskName},'%') </if> <if test="taskType != null and taskType != ''"> and a.`task_type` like concat('%',#{taskType},'%') </if> <if test="taskStatus != null and taskStatus != ''"> and a.`enable` like concat('%',#{taskStatus},'%') </if> </where> order by a.`task_type` desc, a.`name` desc, `create_time` desc, id <if test="current != null"> limit #{current} , #{size} </if> </select> <select id="getTotals" parameterType="string" resultType="int"> select count(*) from `scheduled_task` <where> <if test="taskName != null and taskName != ''"> and `name` like concat('%',#{taskName},'%') </if> <if test="taskType != null and taskType != ''"> and `task_type` like concat('%',#{taskType},'%') </if> <if test="taskStatus != null and taskStatus != ''"> and `enable` like concat('%',#{taskStatus},'%') </if> </where> </select> <insert id="add"> insert into `scheduled_task`( `id`, `name`, `task_type`, `task_describe`, `url`, `cron`, `method`, `enable`, `open_log`, `create_user`, `create_time` ) values ( #{task.id}, #{task.name}, #{task.taskType}, #{task.taskDescribe}, #{task.url}, #{task.cron}, #{task.method}, #{task.enable}, #{task.openLog}, #{task.createUser}, #{task.createTime} ) </insert> <update id="update"> update `scheduled_task` set `name` = #{task.name}, `task_type` = #{task.taskType}, `task_describe` = #{task.taskDescribe}, `url` = #{task.url}, `cron` = #{task.cron}, `method` = #{task.method}, `enable` = #{task.enable}, `open_log` = #{task.openLog}, `update_user` = #{task.updateUser}, `update_time` = #{task.updateTime} where `id` = #{task.id} </update> <update id="enableByIds"> update `scheduled_task` set `enable` = '1', `update_user` = #{tasks[0].updateUser}, `update_time` = #{tasks[0].updateTime} where `id` in <foreach collection="tasks" item="task" open="(" separator="," close=")"> #{task.id} </foreach> </update> <update id="disableByIds"> update `scheduled_task` set `enable` = '0', `update_user` = #{tasks[0].updateUser}, `update_time` = #{tasks[0].updateTime} where `id` in <foreach collection="tasks" item="task" open="(" separator="," close=")"> #{task.id} </foreach> </update> <delete id="deleteByIds" parameterType="list"> delete from `scheduled_task` where `id` in <foreach collection="ids" item="val" open="(" separator="," close=")"> #{val} </foreach> </delete> <select id="refreshResult" resultMap="scheduleTaskMap"> select a.`id`, a.`name`, b.`dict_label` as task_type, a.`task_describe`, a.`url`, a.`cron`, a.`method`, a.`enable`, a.`open_log`, a.`create_user`, a.`create_time`, a.`update_user`, a.`update_time`, concat(ifnull(c.fail_count,0),'/',ifnull(d.success_count,0)) as total from `scheduled_task` as a left join `sys_dict` as b on a.`task_type` = b.`dict_value` and b.`code` = 'schedule_task' and is_sealed = 0 and b.`del_flag` = 0 left join (select task_id, count(id) as fail_count from `scheduled_log` where `execute_status` = 'fail' group by task_id) c on c.task_id = a.id left join (select task_id, count(id) as success_count from `scheduled_log` where `execute_status` = 'success' group by task_id) d on d.task_id = a.id where a.id = #{taskId} </select> </mapper>
import com.baomidou.mybatisplus.extension.service.IService; import com.studioustiger.job.entity.ScheduleTaskDto; import com.studioustiger.job.entity.ScheduleTaskPo; import java.util.List; /** * @InterfaceName ScheduleTaskService * @Description TODO * @Author huxuehao **/ public interface ScheduleTaskService extends IService<ScheduleTaskDto> { /* 新增 */ int add(ScheduleTaskDto scheduleTask); /* 更新 */ int update(ScheduleTaskDto scheduleTask); /* 总数 */ int getTotals(String taskName, String taskType, String taskStatus); /* 分页 */ List<ScheduleTaskPo> getPage(Integer current, Integer size, String taskName, String taskType, String taskStatus); /* 启用 */ int enableByIds(List<String> ids); /* 禁用 */ int disableByIds(List<String> ids); /* 删除 */ int deleteByIds(List<String> ids); ScheduleTaskPo refreshResult(String tasId); }
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.studioustiger.core.tool.utils.DateUtil; import com.studioustiger.dataapi.function.MeFunctions; import com.studioustiger.job.entity.ScheduleTaskDto; import com.studioustiger.job.entity.ScheduleTaskPo; import com.studioustiger.job.mapper.ScheduleTaskMapper; import com.studioustiger.job.service.ScheduleTaskService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.ArrayList; import java.util.Date; import java.util.List; /** * @ClassName ScheduleTaskServiceImpl * @Description TODO * @Author huxuehao **/ @Service public class ScheduleTaskServiceImpl extends ServiceImpl<ScheduleTaskMapper, ScheduleTaskDto> implements ScheduleTaskService { @Autowired ScheduleTaskMapper scheduleTaskMapper; @Autowired MeFunctions me; /* 新增*/ public int add(ScheduleTaskDto scheduleTask){ String id = String.valueOf(me.nextId()); String createUser = String.valueOf(me.id()); String createTime = DateUtil.formatDateTime(new Date()); scheduleTask.setId(id); scheduleTask.setEnable("0"); scheduleTask.setCreateUser(createUser); scheduleTask.setCreateTime(createTime); return scheduleTaskMapper.add(scheduleTask); } /* 更新*/ public int update(ScheduleTaskDto scheduleTask){ String updateUser = String.valueOf(me.id()); String updateTime = DateUtil.formatDateTime(new Date()); scheduleTask.setUpdateUser(updateUser); scheduleTask.setUpdateTime(updateTime); return scheduleTaskMapper.update(scheduleTask); } @Override public int getTotals(String taskName, String taskType, String taskStatus) { return scheduleTaskMapper.getTotals(taskName, taskType, taskStatus); } /* 分页 */ public List<ScheduleTaskPo> getPage(Integer current, Integer size, String taskName, String taskType, String taskStatus) { return scheduleTaskMapper.getPage(current, size, taskName, taskType, taskStatus); } /* 启用 */ public int enableByIds(List<String> ids){ return scheduleTaskMapper.enableByIds(fillTaskInfo(ids)); } /* 禁用 */ public int disableByIds(List<String> ids){ return scheduleTaskMapper.disableByIds(fillTaskInfo(ids)); } /* 填充id、更新人、更新时间 */ private List<ScheduleTaskDto> fillTaskInfo (List<String> ids) { String updateUser = String.valueOf(me.id()); String updateTime = DateUtil.formatDateTime(new Date()); List<ScheduleTaskDto> tasks = new ArrayList<>(); ids.forEach(item -> { ScheduleTaskDto task = new ScheduleTaskDto(); task.setId(item); task.setUpdateUser(updateUser); task.setUpdateTime(updateTime); tasks.add(task); }); return tasks; } /* 删除 */ public int deleteByIds(List<String> ids) { return scheduleTaskMapper.deleteByIds(ids); } @Override public ScheduleTaskPo refreshResult(String taskId) { return scheduleTaskMapper.refreshResult(taskId); } }
import com.studioustiger.job.entity.ScheduleLogDto; import java.util.List; /** * @InterfaceName ScheduleLogService * @Description TODO * @Author huxuehao **/ public interface ScheduleLogService { /* 分页 */ List<ScheduleLogDto> getPage(); /* 添加 */ int add(ScheduleLogDto scheduleLog); /* 清除日志 */ int clearLog(String endTime); /* 根据定时任务获取最新的错误日志*/ ScheduleLogDto latestLogByTask(String taskId); /* 上一条日志 */ ScheduleLogDto lastLog(String taskId, String executeTime); /* 下一条日志 */ ScheduleLogDto nextLog(String taskId, String executeTime); }
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.studioustiger.job.entity.ScheduleLogDto; import com.studioustiger.job.mapper.ScheduleLogMapper; import com.studioustiger.job.service.ScheduleLogService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.List; /** * @ClassName ScheduleLogServiceImpl * @Description TODO * @Author huxuehao **/ @Service public class ScheduleLogServiceImpl extends ServiceImpl<ScheduleLogMapper, ScheduleLogDto> implements ScheduleLogService { @Autowired ScheduleLogMapper scheduleLogMapper; @Override public List<ScheduleLogDto> getPage() { return scheduleLogMapper.getPage(); } @Override public int add(ScheduleLogDto scheduleLog) { return scheduleLogMapper.add(scheduleLog); } @Override public int clearLog(String endTime) { return scheduleLogMapper.clearLog(endTime); } @Override /* 根据定时任务获取最新的错误日志*/ public ScheduleLogDto latestLogByTask(String taskId) { return scheduleLogMapper.latestLogByTask(taskId); } @Override public ScheduleLogDto lastLog(String taskId, String executeTime) { return scheduleLogMapper.lastLog(taskId, executeTime); } @Override public ScheduleLogDto nextLog(String taskId, String executeTime) { return scheduleLogMapper.nextLog(taskId, executeTime); } }
import com.studioustiger.job.entity.ScheduleTaskDto;
/**
* @InterfaceName TaskExecutor
* @Description 任务执行器
* @Author huxuehao
**/
public interface TaskExecutor {
/**
* 执行任务
* @param task
* @return
*/
boolean execute(ScheduleTaskDto task);
}
import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.studioustiger.core.tool.utils.DateUtil; import com.qstudioustigeranmo.core.tool.utils.StringUtil; import com.studioustiger.dataapi.function.MeFunctions; import com.studioustiger.job.constant.JobConstant; import com.studioustiger.job.constant.LogProperties; import com.studioustiger.job.entity.ScheduleLogDto; import com.studioustiger.job.entity.ScheduleTaskDto; import com.studioustiger.job.executor.TaskExecutor; import com.studioustiger.job.service.ScheduleLogService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.io.PrintWriter; import java.io.StringWriter; import java.util.Date; /** * @ClassName TaskExecutorImpl * @Description 定时任务执行器,最终定时任务会通过触发execute方法被执行 * @Author huxuehao **/ @Service public class TaskExecutorImpl implements TaskExecutor { private static final Log log = LogFactory.get(TaskExecutorImpl.class); @Autowired ScheduleLogService scheduleLogService; @Autowired MeFunctions me; @Autowired LogProperties logProperties; @Override public boolean execute(ScheduleTaskDto task) { return this.doJob(task); } /* 用于定时任务执行 */ public boolean doJob(ScheduleTaskDto task) { String message = null; try { /** * 此处执行定时任务内容,具体应该怎么执行,下面提供两种思路 * 思路1:基于bean去执行定时任务,定时任务与bean一一对应,将定时任务的逻辑写在@Bean中,ScheduleTaskDto中需要存储 读取 数据库存储的Bean名称。 * 思路2:基于接口Controller去执行定时任务,将定时任务的逻辑写在接口中,ScheduleTaskDto中需要存储 读取 数据库存储的接口路径。 */ log.info("定时任务[{}]执行成功", task.getName()); } catch (Exception e) { log.error("定时任务[{}]执行失败", task.getName()); StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw); e.printStackTrace(pw); message = sw.toString(); } finally { genLog(task, message); } return message == null; } /* 用户日志采集 */ private void genLog(ScheduleTaskDto task, String message){ String status = message == null ? "success" : "fail"; if (!logProperties.isFailOpen() && !logProperties.isSuccessOpen()) { return; } if ("success".equals(status) && !logProperties.isSuccessOpen()) { return; } if ("fail".equals(status) && !logProperties.isFailOpen()) { return; } if ("0".equals(task.getOpenLog())) { return; } try { ScheduleLogDto scheduleLog = new ScheduleLogDto( String.valueOf(me.nextId()), task.getId(), task.getName(), status, message, DateUtil.formatDateTime(new Date()) ); scheduleLogService.add(scheduleLog); } catch (Exception e) { log.error("日志持久化错误"); } } }
import com.studioustiger.job.entity.ScheduleTaskDto; import com.studioustiger.job.executor.TaskExecutor; import org.springframework.stereotype.Component; /** * @InterfaceName Worker * @Description 开启定时任务需要传入Runnable接口的实现类,作为scheduledFuture.schedule(worker, cronTrigger)中的第一个参数, * 所以"工作内容"实现Runnable接口是必须要做的。在run()中要执行的内容,我们可以封装成任务执行器TaskExecutor。 * * 注意:对于任务执行器(TaskExecutor)我们最好不使用(也要看实际情况)spring的bean进行注入,因为使用bean后,上下文中始终使用的是一个 * 对象,可能会给你带来额外的问题(反正我是遇到了),建议使用构造函数传参。 * @Author huxuehao **/ @Component public class Worker implements Runnable { private TaskExecutor taskExecutor; private ScheduleTaskDto scheduleTask; public Worker() { } public Worker(ScheduleTaskDto scheduleTask, TaskExecutor taskExecutor) { this.scheduleTask = scheduleTask; this.taskExecutor = taskExecutor; } public void run() { taskExecutor.execute(scheduleTask); } }
import com.studioustiger.job.entity.ScheduleTaskDto; import lombok.AllArgsConstructor; import org.springframework.boot.SpringBootConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; import java.util.function.Consumer; /** * @ClassName TaskConfig * @Description 定时任务配置 * @Author huxuehao **/ @AllArgsConstructor @SpringBootConfiguration public class TaskConfig { /** * 描述:我们没开启一个定时任务,就会产生一个ScheduledFuture实体,当我们需要动态的操作定时任务(上述的实体)时, * 我们需要调用scheduledFuture.cancel(true)或scheduledFuture.schedule(worker, cronTrigger),这就意味着 * 我们需要将每一个ScheduledFuture实体存储起来,当需要动态的控制定时任务时,我们去改Map中取出对应的实体进行上述的 * cancel或schedule操作即可。所以我们需要在spring容器中维护一个ScheduledFuture的注册表注册表。 */ @Bean(name = "scheduledFutureMap") public Map<String, ScheduledFuture> scheduledFutureMap() { return new ConcurrentHashMap<>(); } /** * 描述:对于定时任务的动态操作(添加、开启、停止、删除)我们需要将上述操作封装成触发器,想要使用的时候 * 我们只需要取出已经初始化好的代码逻辑,取出对应的代码逻辑,传入参数,执行即可。上述描述是一种预处理的思想。 * * 在这里我们使用java8的Consumer(消费型函数式接口)进行时间,并将实现了不同代码逻辑的Consumer存储到Map中, * 这样我们一方面可以实现动态扩展,另一方面省去了多 if-else 的操作。 */ @Bean(name = "operationMap") public Map<String, Consumer<ScheduleTaskDto>> operationMap() { return new ConcurrentHashMap<>(); } /** * 一个用于开启定时任务的线程池;我们关注的核心方法是:threadPoolTaskScheduler.schedule(工作内容, 触发器) */ @Bean(name = "threadPoolTaskScheduler") public ThreadPoolTaskScheduler threadPoolTaskScheduler() { ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler(); threadPoolTaskScheduler.setPoolSize(Runtime.getRuntime().availableProcessors()); threadPoolTaskScheduler.setThreadNamePrefix("WorKerThread:"); threadPoolTaskScheduler.setWaitForTasksToCompleteOnShutdown(true); threadPoolTaskScheduler.setAwaitTerminationSeconds(30); return threadPoolTaskScheduler; } }
import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.studioustiger.job.constant.JobConstant; import com.studioustiger.job.entity.ScheduleTaskDto; import com.studioustiger.job.executor.TaskExecutor; import com.studioustiger.job.service.ScheduleTaskService; import lombok.RequiredArgsConstructor; import org.springframework.boot.SpringBootConfiguration; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.scheduling.support.CronTrigger; import javax.annotation.PostConstruct; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.ScheduledFuture; import java.util.function.Consumer; import java.util.stream.Collectors; /** * @ClassName InitConfig * @Description 初始化配置 * scheduledFutureMap:作为ScheduledFuture的注册表,用于我们来操作其开启关闭,key为定时任务id。 * operationMap: 存储的定时任务操作触发器,参数是ScheduleTaskDto。 * threadPoolTaskScheduler:定时任务线程池,线程池的大小使用的是Runtime.getRuntime().availableProcessors()。 * scheduleTaskService:操作schedule_task的Service。 * initSchedule():从数据表中寻找有效的定时任务,并执行定时任务。 * initOperationMap():初始化定时任务操作触发器,最终初始化好的定时任务操作会被填充到operationMap中, * 其目的是为了动态的控制定时任务的新增、开启、关闭。使用Consumer作为定时任务预处理载体。 * @Author huxuehao **/ @SpringBootConfiguration @RequiredArgsConstructor public class InitConfig { private static final Log log = LogFactory.get(TaskConfig.class); /* 走构造注入,使用lombok生成全参数构造 */ private final Map<String, ScheduledFuture> scheduledFutureMap; private final Map<String, Consumer<ScheduleTaskDto>> operationMap; private final ThreadPoolTaskScheduler threadPoolTaskScheduler; private final ScheduleTaskService scheduleTaskService; private final TaskExecutor taskExecutor; /** * 描述:初始化定时任务 */ @PostConstruct public void initSchedule() { List<ScheduleTaskDto> taskList = scheduleTaskService.list(); List<ScheduleTaskDto> enableTask = taskList.stream().filter(item -> "1".equals(item.getEnable())).collect(Collectors.toList()); enableTask.forEach(item -> { Worker worker = new Worker(item, taskExecutor); CronTrigger cronTrigger = item.toCronTrigger(); ScheduledFuture<?> schedule = threadPoolTaskScheduler.schedule(worker, cronTrigger); scheduledFutureMap.put(item.getId(), schedule); log.info("定时任务:[{}]初始化完成", item.getName()); }); initOperationMap(); } /** * 描述:初始化定时任务操作触发器 * 我们关注的核心方法是: * threadPoolTaskScheduler.schedule(工作内容, 触发器) //此方法用于开启一个定时任务 * scheduledFuture.cancel(true) //此方法用于取消一个定时任务 */ private void initOperationMap() { /* 定时任务:打开/更新操作 */ Consumer<ScheduleTaskDto> openSchedule = item -> { String scheduleId = item.getId(); /* 当定时任务已经存在与scheduledFutureMap中*/ if (scheduledFutureMap.containsKey(scheduleId)) { /* 重新计算scheduledFutureMap中key为scheduledId的value的值 */ scheduledFutureMap.compute(scheduleId, (k, v) -> { /* 先判空,如果对象(ScheduledFuture)存在,则将其先停跳 */ Optional.ofNullable(v).ifPresent(v0 -> v0.cancel(true)); /* 开启一个新的定时 */ Worker worker = new Worker(item, taskExecutor); CronTrigger cronTrigger = item.toCronTrigger(); return threadPoolTaskScheduler.schedule(worker, cronTrigger); }); } /* 当定时任务不存在scheduledFutureMap中,则新建定时任务,并添加到map中 */ else { Worker worker = new Worker(item, taskExecutor); if (worker != null) { CronTrigger cronTrigger = item.toCronTrigger(); ScheduledFuture<?> schedule = threadPoolTaskScheduler.schedule(worker, cronTrigger); scheduledFutureMap.put(scheduleId, schedule); } } }; operationMap.put(JobConstant.OPEN_SCHEDULE, openSchedule); log.info("定时任务操作注册表:[{}]初始化完成",JobConstant.OPEN_SCHEDULE); // 定时任务:关闭操作 Consumer<ScheduleTaskDto> closeSchedule = item -> { String scheduleId = item.getId(); // 从scheduledFutureMap中获取scheduledId对应的定时任务 ScheduledFuture scheduledFuture = scheduledFutureMap.get(scheduleId); /* 先判空,如果对象(ScheduledFuture)存在,则停止定时 */ Optional.ofNullable(scheduledFuture).ifPresent(schedule -> schedule.cancel(true)); }; operationMap.put(JobConstant.CLOSE_SCHEDULE, closeSchedule); log.info("定时任务操作注册表:[{}]初始化完成",JobConstant.CLOSE_SCHEDULE); // 定时任务:删除操作 Consumer<ScheduleTaskDto> deleteSchedule = item -> { String scheduleId = item.getId(); // 从scheduledFutureMap中获取scheduledId对应的定时任务 ScheduledFuture scheduledFuture = scheduledFutureMap.get(scheduleId); /* 先判空,如果对象(ScheduledFuture)存在,则停止定时 */ Optional.ofNullable(scheduledFuture).ifPresent(schedule -> schedule.cancel(true)); // 从注册表中移除 scheduledFutureMap.remove(scheduleId); }; operationMap.put(JobConstant.DELETE_SCHEDULE, deleteSchedule); log.info("定时任务操作注册表:[{}]初始化完成",JobConstant.DELETE_SCHEDULE); } }
import com.studioustiger.job.constant.LogProperties; import com.studioustiger.job.service.ScheduleLogService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; /** * @ClassName defaultScheduled * @Description 默认定时任务,可以在该类中进行一些常规的定时任务操作 * @Author huxuehao **/ @Component public class defaultScheduled { @Autowired ScheduleLogService scheduleLogService; @Autowired LogProperties logProperties; /* 每天1点30分清除一次日志 */ @Scheduled(cron = "0 30 1 * * ?") public void clearLog() { String endTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(System.currentTimeMillis() - 86400000 * logProperties.getSaveDays()); scheduleLogService.clearLog(endTime); } }
import com.studioustiger.core.tool.api.R; import com.studioustiger.job.constant.JobConstant; import com.studioustiger.job.entity.ScheduleTaskDto; import com.studioustiger.job.entity.ScheduleTaskPo; import com.studioustiger.job.executor.TaskExecutor; import com.studioustiger.job.service.ScheduleLogService; import com.studioustiger.job.service.ScheduleTaskService; import lombok.RequiredArgsConstructor; import org.springframework.context.annotation.Description; import org.springframework.scheduling.support.CronExpression; import org.springframework.web.bind.annotation.*; import java.util.Arrays; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.function.Consumer; import java.util.stream.Collectors; /** * @ClassName TaskManagerController * @Description 任务管理接口,这里我就使用构造函数进行依赖注入了 * @Author huxuehao **/ @RequiredArgsConstructor @RestController @RequestMapping(value = "/task") public class TaskManagerController { private final ScheduleTaskService scheduleTaskService; private final ScheduleLogService scheduleLogService; private final Map<String, Consumer<ScheduleTaskDto>> operationMap; private final TaskExecutor taskExecutor; @Description(value = "获取最新的错误日志") @GetMapping(value = "/log/by-task-id") public R latestLogByTask(@RequestParam("taskId") String taskId) { return R.data(scheduleLogService.latestLogByTask(taskId)); } @Description(value = "上一条日志") @GetMapping(value = "/log/last") public R lastLog(@RequestParam("taskId") String taskId, @RequestParam("executeTime") String executeTime) { return R.data(scheduleLogService.lastLog(taskId, executeTime)); } @Description(value = "下一条日志") @GetMapping(value = "/log/next") public R nextLog(@RequestParam("taskId") String taskId, @RequestParam("executeTime") String executeTime) { return R.data(scheduleLogService.nextLog(taskId, executeTime)); } @Description(value = "执行任务") @GetMapping(value = "/execute") public R execute(@RequestParam("id") String id) { ScheduleTaskDto task = scheduleTaskService.getById(id); boolean execute = taskExecutor.execute(task); if (execute) { return R.data("success"); } return R.data("fail"); } @Description(value = "刷新执行结果") @GetMapping(value = "/refresh-result") public R refreshResult(@RequestParam("id") String id) { return R.data(scheduleTaskService.refreshResult(id)); } @Description(value = "添加任务") @PostMapping(value = "/add") public R addTask(@RequestBody ScheduleTaskDto scheduleTask) { if (!isValidCron(scheduleTask.getCron())) { return R.fail("cron表达式校验失败"); } scheduleTaskService.add(scheduleTask); return R.success("操作成功"); } @Description(value = "更新任务") @PostMapping(value = "/update") public R updateTask(@RequestBody ScheduleTaskDto scheduleTask) { if (!isValidCron(scheduleTask.getCron())) { return R.fail("cron表达式校验失败"); } scheduleTaskService.update(scheduleTask); if ("1".equals(scheduleTask.getEnable())) { openSchedule(Arrays.asList(scheduleTask.getId())); } return R.success("操作成功"); } @Description(value = "删除任务") @PostMapping(value = "/delete") public R deleteTask(@RequestBody List<String> ids) { if (ids != null && ids.size() > 0) { deleteSchedule(ids); scheduleTaskService.deleteByIds(ids); } return R.success("操作成功"); } @Description(value = "暂停任务") @PostMapping(value = "/disable") public R disableTask(@RequestBody List<String> ids) { if (ids != null && ids.size() > 0) { scheduleTaskService.disableByIds(ids); closeSchedule(ids); } return R.success("操作成功"); } @Description(value = "启动任务") @PostMapping(value = "/enable") public R enableTask(@RequestBody List<String> ids) { if (ids != null && ids.size() > 0) { scheduleTaskService.enableByIds(ids); openSchedule(ids); } return R.success("操作成功"); } @Description(value = "启动全部任务") @GetMapping(value = "/enable-all") public R enableAllTask() { List<ScheduleTaskDto> list = scheduleTaskService.list(); if (list != null && list.size() >0) { List<String> ids = list.stream().map(item -> item.getId()).collect(Collectors.toList()); openSchedule(ids); } return R.success("操作成功"); } @Description(value = "关闭全部任务") @GetMapping(value = "/disable-all") public R disableAllTask() { List<ScheduleTaskDto> list = scheduleTaskService.list(); if (list != null && list.size() >0) { List<String> ids = list.stream().map(item -> item.getId()).collect(Collectors.toList()); closeSchedule(ids); } return R.success("操作成功"); } @Description(value = "获取任务详情") @GetMapping(value = "/detail") public R taskDetail(@RequestParam("id") String id) { return R.data(scheduleTaskService.getById(id)); } @Description(value = "获取任务分页") @GetMapping(value = "/page") public R taskPage(@RequestParam(required = false, value = "current") Integer current, @RequestParam(required = false, value = "size") Integer size, @RequestParam(required = false, value = "taskName") String taskName, @RequestParam(required = false, value = "taskType") String taskType, @RequestParam(required = false, value = "taskStatus") String taskStatus) { int total = scheduleTaskService.getTotals(taskName, taskType, taskStatus); List<ScheduleTaskPo> page = scheduleTaskService.getPage((current - 1) * size, size, taskName, taskType, taskStatus); Map pageRes = new LinkedHashMap<>(); int pages = total / size + (total % size == 0 ? 0: 1); pageRes.put("pages",pages); pageRes.put("records",page); pageRes.put("size",size); pageRes.put("current",current); pageRes.put("total",total); pageRes.put("optimizeCountSql",true); pageRes.put("searchCount",true); return R.data(pageRes); } @Description(value = "获取任务列表") @GetMapping(value = "/list") public R taskList() { List<ScheduleTaskDto> list = scheduleTaskService.list(); return R.data(list); } /* 开启定时 */ private void openSchedule(List<String> ids) { List<ScheduleTaskDto> taskList = scheduleTaskService.listByIds(ids); if (taskList != null && taskList.size() > 0) { taskList.forEach(item -> { operationMap.get(JobConstant.OPEN_SCHEDULE).accept(item); }); } } /* 关闭定时 */ private void closeSchedule(List<String> ids) { System.out.println(operationMap.toString()); List<ScheduleTaskDto> taskList = scheduleTaskService.listByIds(ids); if (taskList != null && taskList.size() > 0) { taskList.forEach(item -> { operationMap.get(JobConstant.CLOSE_SCHEDULE).accept(item); }); } } /* 删除定时 */ private void deleteSchedule(List<String> ids) { List<ScheduleTaskDto> taskList = scheduleTaskService.listByIds(ids); if (taskList != null && taskList.size() > 0) { taskList.forEach(item -> { operationMap.get(JobConstant.DELETE_SCHEDULE).accept(item); }); } } private boolean isValidCron(String cronStr) { return CronExpression.isValidExpression(cronStr); } }
抱歉,没有前端代码(我的前端是基于配置文件配出来的),大家使用elementUI搭即可。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。