赞
踩
xxl_job的数据库里有如下几个表:
- ### xxl-job, datasource
- spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
- spring.datasource.username=root
- spring.datasource.password=xdclass.net
- spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
xxl.job.accessToken=xdclass.net
运行报表
以图形化来展示了整体的任务执行情况
任务管理(配置执行任务)
示例执行器:所用到的执行器
任务描述:概述该任务是做什么的
路由策略:
Cron:执行规则
调度过期策略:调度中心错过调度时间的补偿处理策略,包括:忽略、立即补偿触发一次等
JobHandler:定义执行器的名字
阻塞处理策略:
子任务ID:输入子任务的任务id,可填写多个
任务超时时间:添加任务超时的时候,单位s,设置时间大于0的时候就会生效
失败重试次数:设置失败重试的次数,设置时间大于0的时候就会生效
负责人:填写该任务调度的负责人
报警邮件:出现报警,则发送邮件
调度日志
执行器管理
用户管理
- <dependency>
- <groupId>com.xuxueli</groupId>
- <artifactId>xxl-job-core</artifactId>
- <version>2.3.0</version>
- </dependency>
- #----------xxl-job配置--------------
- logging.config=classpath:logback.xml
- #调度中心部署地址,多个配置逗号分隔 "http://address01,http://address02"
- xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
-
- #执行器token,非空时启用 xxl-job, access token
- xxl.job.accessToken=xdclass.net
-
- # 执行器app名称,和控制台那边配置一样的名称,不然注册不上去
- xxl.job.executor.appname=xdclass-shop
-
- # [选填]执行器注册:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。
- #从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。
- xxl.job.executor.address=
-
- #[选填]执行器IP :默认为空表示自动获取IP(即springboot容器的ip和端口,可以自动获取,也可以指定),多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务",
- xxl.job.executor.ip=
-
- # [选填]执行器端口号:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
- xxl.job.executor.port=9999
-
- #执行器日志文件存储路径,需要对该路径拥有读写权限;为空则使用默认路径
- xxl.job.executor.logpath=./data/logs/xxl-job/executor
-
- #执行器日志保存天数
- xxl.job.executor.logretentiondays=30
- import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- /**
- * xxl-job config
- *
- * @author xuxueli 2017-04-28
- */
- @Configuration
- public class XxlJobConfig {
- private final Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
-
- @Value("${xxl.job.admin.addresses:}")
- private String adminAddresses;
-
- @Value("${xxl.job.accessToken:}")
- private String accessToken;
-
- @Value("${xxl.job.executor.appname}")
- private String appname;
-
- @Value("${xxl.job.executor.address}")
- private String address;
-
- @Value("${xxl.job.executor.ip}")
- private String ip;
-
- @Value("${xxl.job.executor.port}")
- private int port;
-
- @Value("${xxl.job.executor.logpath}")
- private String logPath;
-
- @Value("${xxl.job.executor.logretentiondays}")
- private int logRetentionDays;
-
-
- @Bean
- public XxlJobSpringExecutor xxlJobExecutor() {
- logger.info(">>>>>>>>>>> xxl-job config init.");
- XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
- xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
- xxlJobSpringExecutor.setAppname(appname);
- xxlJobSpringExecutor.setAddress(address);
- xxlJobSpringExecutor.setIp(ip);
- xxlJobSpringExecutor.setPort(port);
- xxlJobSpringExecutor.setAccessToken(accessToken);
- xxlJobSpringExecutor.setLogPath(logPath);
- xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
-
- return xxlJobSpringExecutor;
- }
-
- /**
- * 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
- *
- * 1、引入依赖:
- * <dependency>
- * <groupId>org.springframework.cloud</groupId>
- * <artifactId>spring-cloud-commons</artifactId>
- * <version>${version}</version>
- * </dependency>
- *
- * 2、配置文件,或者容器启动变量
- * spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
- *
- * 3、获取IP
- * String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
- */
-
-
- }
- /**
- * 每日任务
- * 每日凌晨1点执行
- *
- * @author Chopper
- */
- public interface EveryDayExecute {
-
- /**
- * 执行每日任务
- */
- void execute();
-
-
- }
- /**
- * 每分钟任务
- *
- * @author Chopper
- */
- public interface EveryFifteenMinuteExecute {
-
- /**
- * 执行
- */
- void execute();
-
-
- }
- /**
- * 每小时任务
- *
- * @author Chopper
- */
- public interface EveryHourExecute {
-
- /**
- * 执行
- */
- void execute();
-
-
- }
- /**
- * 每分钟任务
- *
- * @author Chopper
- */
- public interface EveryMinuteExecute {
-
- /**
- * 执行
- */
- void execute();
-
-
- }
- /**
- * 定时器任务
- *
- * @author Chopper
- * @version v1.0
-
- */
- @Slf4j
- @Component
- public class TimedTaskJobHandler {
-
- @Autowired(required = false)
- private List<EveryMinuteExecute> everyMinuteExecutes;
-
-
- @Autowired(required = false)
- private List<EveryHourExecute> everyHourExecutes;
-
-
- @Autowired(required = false)
- private List<EveryDayExecute> everyDayExecutes;
-
- @Autowired(required = false)
- private List<EveryFifteenMinuteExecute> everyFifteenMinuteExecute;
-
- /**
- * 每分钟任务
- *
- * @throws Exception
- */
- @XxlJob("everyMinuteExecute")
- public ReturnT<String> everyMinuteExecute(String param) {
- log.info("每分钟任务执行");
- if (everyMinuteExecutes == null || everyMinuteExecutes.size() == 0) {
- return ReturnT.SUCCESS;
- }
-
- for (int i = 0; i < everyMinuteExecutes.size(); i++) {
- try {
- everyMinuteExecutes.get(i).execute();
- } catch (Exception e) {
- log.error("每分钟任务异常", e);
- }
- }
- return ReturnT.SUCCESS;
- }
-
- /**
- * 每15分钟任务
- *
- * @throws Exception
- */
- @XxlJob("everyFifteenMinuteExecute")
- public ReturnT<String> everyFifteenMinuteExecute(String param) {
- log.info("每15分钟任务执行");
- if (everyFifteenMinuteExecute == null || everyFifteenMinuteExecute.size() == 0) {
- return ReturnT.SUCCESS;
- }
-
- for (int i = 0; i < everyFifteenMinuteExecute.size(); i++) {
- try {
- everyFifteenMinuteExecute.get(i).execute();
- } catch (Exception e) {
- log.error("每15分钟任务异常", e);
- }
- }
- return ReturnT.SUCCESS;
- }
-
- /**
- * 每小时任务
- *
- * @throws Exception
- */
- @XxlJob("everyHourExecuteJobHandler")
- public ReturnT<String> everyHourExecuteJobHandler(String param) {
- log.info("每小时任务执行");
- if (everyHourExecutes == null || everyHourExecutes.size() == 0) {
- return ReturnT.SUCCESS;
- }
-
- for (int i = 0; i < everyHourExecutes.size(); i++) {
- try {
- everyHourExecutes.get(i).execute();
- } catch (Exception e) {
- log.error("每小时任务异常", e);
- }
- }
- return ReturnT.SUCCESS;
- }
-
- /**
- * 每日任务
- *
- * @throws Exception
- */
- @XxlJob("everyDayExecuteJobHandler")
- public ReturnT<String> everyDayExecuteJobHandler(String param) {
-
- log.info("每日任务执行");
- if (everyDayExecutes == null || everyDayExecutes.size() == 0) {
- return ReturnT.SUCCESS;
- }
-
- for (int i = 0; i < everyDayExecutes.size(); i++) {
- try {
- everyDayExecutes.get(i).execute();
- } catch (Exception e) {
- log.error("每分钟任务异常", e);
- }
- }
- return ReturnT.SUCCESS;
- }
-
-
- }
- import cn.lili.modules.search.service.EsGoodsIndexService;
- import cn.lili.timetask.handler.EveryFifteenMinuteExecute;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- @Slf4j
- @Component
- public class IndexOrderTaskExcecute implements EveryFifteenMinuteExecute {
-
- @Autowired
- private EsGoodsIndexService esGoodsIndexService;
-
- @Override
- public void execute() {
- log.info("====IndexOrderTaskExcecute=====,每隔15分钟生成一次索引");
- esGoodsIndexService.init();
- }
- }
- // 当前分片数,从0开始,即执行器的序号
- int shardIndex = XxlJobHelper.getShardIndex();
- //总分片数,执行器集群总机器数量
- int shardTotal = XxlJobHelper.getShardTotal();
如果将100W数据均匀分给集群里的10台机器同时处理,
每台机器耗时,1万秒即可,耗时会大大缩短,也能充分利用集群资源
在xxl-job里,可以配置执行器集群有10个机器,那么分片总数是10,分片序号0~9 分别对应那10台机器。
分片方式
路由策略
- /**
- * 2、分片广播任务
- */
- @XxlJob("shardingJobHandler")
- public void shardingJobHandler() throws Exception {
-
- // 当前分片数,从0开始,即执行器的序号
- int shardIndex = XxlJobHelper.getShardIndex();
- //总分片数,执行器集群总机器数量
- int shardTotal = XxlJobHelper.getShardTotal();
-
-
- XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
-
- // 业务逻辑
- for (int i = 0; i < shardTotal; i++) {
- if (i == shardIndex) {
- log.info("第 {} 片, 命中分片开始处理", i);
-
- } else {
- log.info("第 {} 片, 忽略", i);
- }
- }
-
- }
根据id进行分片取模(部署3个执行器)
- /**
- * 2、分片广播任务
- */
- @XxlJob("shardingJobHandler")
- public void shardingJobHandler() throws Exception {
-
- // 当前分片数,从0开始,即执行器的序号
- int shardIndex = XxlJobHelper.getShardIndex();
- //总分片数,执行器集群总机器数量
- int shardTotal = XxlJobHelper.getShardTotal();
-
- XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
-
-
- List<Integer> allUserIds = getAllUserIds();
- allUserIds.forEach(obj -> {
- if (obj % shardTotal == shardIndex) {
- log.info("第 {} 片, 命中分片开始处理用户id={}",shardIndex,obj);
- }
- });
- }
-
- private List<Integer> getAllUserIds() {
-
- List<Integer> ids = new ArrayList<>();
- for (int i = 0; i < 100; i++) {
- ids.add(i);
- }
- return ids;
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。