赞
踩
1.1 版本要求
JDK要求1.7及以上版本
Maven要求3.0.4及以上版本
zookeeper要求采用3.4.6及以上版本
1.2 Zookeeper安装&运行
自行查看我的zookeeper专题
ZooKeeper(3):ZooKeeper集群环境搭建_不死鸟.亚历山大.狼崽子的博客-CSDN博客
1.3 创建maven工程
创建maven工程elastic-job-quickstart,并导入以下依赖:
- <dependencies>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.11</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>com.dangdang</groupId>
- <artifactId>elastic-job-lite-core</artifactId>
- <version>2.1.5</version>
- </dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <version>1.18.0</version>
- </dependency>
- </dependencies>
2.1 编写定时任务类
此任务在每次执行时获取一定数目的文件,进行备份处理,由File实体类的backedUp属性来标识该文件是否已备份。
- package org.example.elasticjob.quickstart.job;
-
- import com.dangdang.ddframe.job.api.ShardingContext;
- import com.dangdang.ddframe.job.api.simple.SimpleJob;
- import org.example.elasticjob.quickstart.model.FileCustom;
-
- import java.lang.management.ManagementFactory;
- import java.text.SimpleDateFormat;
- import java.util.ArrayList;
- import java.util.Date;
- import java.util.List;
-
- public class FileBackupJob implements SimpleJob {
-
- //每次任务执行要备份文件的数量
- private final int FETCH_SIZE = 1;
- //文件列表(模拟)
- public static List<FileCustom> files = new ArrayList<>();
-
- @Override
- public void execute(ShardingContext shardingContext) {
- //作业分片信息
- int shardingItem = shardingContext.getShardingItem();
- System.out.println(String.format("作业分片:%d",shardingItem));
- //获取未备份文件
- List<FileCustom> fileCustoms = fetchUnBackupFiles(FETCH_SIZE);
- //文件备份
- backupFiles(fileCustoms);
- }
-
- /**
- * 获取未备份的文件
- * @param count
- * @return
- */
- public List<FileCustom> fetchUnBackupFiles(int count) {
- List<FileCustom> fetchList = new ArrayList<>();
- int num = 0;
- for (FileCustom fileCustom : files) {
- if (num >= count) {
- break;
- }
- //未备份的文件则放入列表
- if (!fileCustom.getBackedUp()) {
- fetchList.add(fileCustom);
- num++;
- }
- }
- //ManagementFactory.getRuntimeMXBean()获取当前JVM进程的PID
- System.out.println(String.format("%sTime:%s,已获取%d文件",
- ManagementFactory.getRuntimeMXBean().getName(),new SimpleDateFormat("hh:mm:ss").format(new
- Date()),num));
- return fetchList;
- }
-
- /**
- * 备份文件
- * @param files
- */
- public void backupFiles(List<FileCustom> files){
- for(FileCustom file : files){
- //标记文件数据为已备份
- file.setBackedUp(Boolean.TRUE);
- System.out.println(String.format("已备份文件:%s 文件类型:%s",file.getName(),file.getType()));
- }
- }
-
- }
文件实体类如下:
- package org.example.elasticjob.quickstart.model;
-
- import lombok.Data;
-
- @Data
- public class FileCustom {
-
- /**
- * 标识
- */
- private String id;
- /**
- * 文件名
- */
- private String name;
- /**
- * 文件类型,如text、image、radio、vedio
- */
- private String type;
- /**
- * 文件内容
- */
- private String content;
- /**
- * 是否已备份
- */
- private Boolean backedUp = false;
- public FileCustom(String id,String name,String type,String content){
- this.id = id;
- this.name = name;
- this.type = type;
- this.content = content;
- }
- }
- package org.example.elasticjob.quickstart;
-
- import com.dangdang.ddframe.job.config.JobCoreConfiguration;
- import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
- import com.dangdang.ddframe.job.lite.api.JobScheduler;
- import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
- import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
- import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
- import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
- import org.example.elasticjob.quickstart.job.FileBackupJob;
- import org.example.elasticjob.quickstart.model.FileCustom;
-
- public class JobMain {
- //zookeeper端口
- // private static final int ZOOKEEPER_PORT = 2181;
- //zookeeper链接字符串 localhost:2181
- // private static final String ZOOKEEPER_CONNECTION_STRING = "192.168.222.130:" + ZOOKEEPER_PORT;
- private static final String ZOOKEEPER_CONNECTION_STRING = "192.168.222.130:2181,192.168.222.131:2181,192.168.222.132:2181" ;
- //定时任务命名空间
- private static final String JOB_NAMESPACE = "elastic‐job‐example‐java";
- //启动任务
- public static void main(String[] args) {
- //生成测试文件
- generateTestFiles();
- //配置zookeeper
- CoordinatorRegistryCenter registryCenter = setUpRegistryCenter();
- //启动任务
- startJob(registryCenter);
- }
-
- //生成测试文件
- private static void generateTestFiles(){
- for(int i=1;i<11;i++){
- FileBackupJob.files.add(new FileCustom(String.valueOf(i+10),"文件"+
- (i+10),"text","content"+ (i+10)));
- FileBackupJob.files.add(new FileCustom(String.valueOf(i+20),"文件"+
- (i+20),"image","content"+ (i+20)));
- FileBackupJob.files.add(new FileCustom(String.valueOf(i+30),"文件"+
- (i+30),"radio","content"+ (i+30)));
- FileBackupJob.files.add(new FileCustom(String.valueOf(i+40),"文件"+
- (i+40),"vedio","content"+ (i+40)));
- }
- }
-
- //注册中心配置
- private static CoordinatorRegistryCenter setUpRegistryCenter(){
- //注册中心配置
- ZookeeperConfiguration zookeeperConfiguration = new
- ZookeeperConfiguration(ZOOKEEPER_CONNECTION_STRING,JOB_NAMESPACE);
- //减少zk的超时时间
- zookeeperConfiguration.setSessionTimeoutMilliseconds(100);
- //创建注册中心
- CoordinatorRegistryCenter registryCenter = new
- ZookeeperRegistryCenter(zookeeperConfiguration);
- registryCenter.init();
- return registryCenter;
- }
-
- //配置并启动任务
- private static void startJob(CoordinatorRegistryCenter registryCenter) {
- //创建JobCoreConfiguration
- JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder("files‐job",
- "0/3 * * * * ?", 1)
- .build();
- //创建SimpleJobConfiguration
- SimpleJobConfiguration simpleJobConfiguration = new
- SimpleJobConfiguration(jobCoreConfiguration, FileBackupJob.class.getCanonicalName());
- //启动任务
- new JobScheduler(registryCenter,
- LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true).build()).init();
- }
- }
2.3 测试
(1)启动main方法查看控制台
- 作业分片:0
- 40200@MSITime:03:20:42,已获取1文件
- 已备份文件:文件11 文件类型:text
- 作业分片:0
- 40200@MSITime:03:20:45,已获取1文件
- 已备份文件:文件21 文件类型:image
- 作业分片:0
- 40200@MSITime:03:20:48,已获取1文件
- 已备份文件:文件31 文件类型:radio
- 作业分片:0
- 40200@MSITime:03:20:51,已获取1文件
- 已备份文件:文件41 文件类型:vedio
- 作业分片:0
- 40200@MSITime:03:20:54,已获取1文件
- 已备份文件:文件12 文件类型:text
定时任务每3秒批量执行一次,符合基础预期。
(2)测试窗口1不关闭,再次运行main方法观察控制台日志(窗口2)会出现以下两种情况:
窗口1继续执行任务,窗口2不执行任务
窗口2接替窗口1执行任务,窗口1停止执行任务
可通过反复启停窗口2查看到以上现象。
(3)窗口1、窗口2同时运行的情况下,停止正在执行任务的窗口
未停止的窗口开始执行任务。
分片测试:
当前作业没有被分片,所以多个实例共同执行时只有一个实例在执行,如果我们将作业分片执行,作业将被拆分为多个独立的任务项,然后由分布式的应用实例分别执行某一个或几个分片项。
修改上边的代码,改为作业分3片执行:
- //创建JobCoreConfiguration
- JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder("files‐job", "0/3 * * * * ?", 3).build();
同时启动三个JobMain:
每个JobMain窗口分别执行一片作业。
总结:
通过以上简单的测试,就可以看出Elastic-Job帮我们解决了分布式调度的以下三个问题:
1)多实例部署时避免任务重复执行,在任务执行时间到来时,从所有实例中选举出来一个,让它来执行任务,从而避免多个实例同时执行任务。
2)高可用,若某一个实例宕机,不影响其他实例来执行任务。
3)弹性扩容,当集群中增加某一个实例,它应当也能够被选举并执行任务,如果作业分片将参与执行某个分片作业。
3.1 Elastic-Job整体架构
App:应用程序,内部包含任务执行业务逻辑和Elastic-Job-Lite组件,其中执行任务需要实现ElasticJob接口完成与Elastic-Job-Lite组件的集成,并进行任务的相关配置。应用程序可启动多个实例,也就出现了多个任务执行实例。
Elastic-Job-Lite:Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务,此组件负责任务的调度,并产生日志及任务调度记录。
无中心化,是指没有调度中心这一概念,每个运行在集群中的作业服务器都是对等的,各个作业节点是自治的、平等的、节点之间通过注册中心进行分布式协调。
Registry:以Zookeeper作为Elastic-Job的注册中心组件,存储了执行任务的相关信息。同时,Elastic-Job利用该组件进行执行任务实例的选举。
Console:Elastic-Job提供了运维平台,它通过读取Zookeeper数据展现任务执行状态,或更新Zookeeper数据修改全局配置。通过Elastic-Job-Lite组件产生的数据来查看任务执行历史记录。
应用程序在启动时,在其内嵌的Elastic-Job-Lite组件会向Zookeeper注册该实例的信息,并触发选举(此时可能已经启动了该应用程序的其他实例),从众多实例中选举出一个Leader,让其执行任务。当到达任务执行时间时,Elastic-Job-Lite组件会调用由应用程序实现的任务业务逻辑,任务执行后会产生任务执行记录。当应用程序的某一个实例宕机时,Zookeeper组件会感知到并重新触发leader选举。
3.2 ZooKeeper
在学习Elastic-Job执行原理时,有必要大致了解一下ZooKeeper是用来做什么的,因为:
ZooKeeper是一个分布式一致性协调服务,它是Apache Hadoop 的一个子项目,它主要是用来解决分布式应用中经常遇到的一些数据管理问题,如:统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等。
咱们可以把ZooKeeper想象为一个特殊的数据库,它维护着一个类似文件系统的树形数据结构,ZooKeeper的客户端(如Elastic-Job任务执行实例)可以对数据进行存取:
每个子目录项如 /app1都被称作为 znode(目录节点),和文件系统一样,我们能够自由的增加、删除znode,在一 个znode下增加、删除子znode,唯一的不同在于znode是可以存储数据的。
ZooKeeper为什么称之为一致性协调服务呢?因为ZooKeeper拥有数据监听通知机制,客户端注册监听它关心的 znode,当znode发生变化(数据改变、被删除、子目录节点增加删除)时,ZooKeeper会通知所有客户端。简单 来说就是,当分布式系统的若干个服务都关心一个数据时,当这个数据发生改变,这些服务都能够得知,那么这些服务就针对此数据达成了一致。
应用场景思考,使用ZooKeeper管理分布式配置项的机制:
假设我们的程序是分布式部署在多台机器上,如果我们要改变程序的配置文件,需要逐台机器去修改,非常麻烦, 现在把这些配置全部放到zookeeper上去,保存在 zookeeper 的某个目录节点中,然后所有相关应用程序作为 ZooKeeper的客户端对这个目录节点进行监听,一旦配置信息发生变化,每个应用程序就会收到 ZooKeeper的通知,从而获取新的配置信息应用到系统中。
Elastic-Job使用ZooKeeper完成对任务信息的存取,任务执行实例作为ZooKeeper客户端对其znode操作,任务信息保存在znode中。
使用ZooInspector查看zookeeper节点
1、zookeeper图像化客户端工具的下载地址:
https://issues.apache.org/jira/secure/attachment/12436620/ZooInspector.zip;
2、下载完后解压压缩包,双击地址为ZooInspector\build\zookeeper-dev-ZooInspector.jar的jar包;
如果双击没有反应?首先电脑要配好java环境,使用java -jar 再加上你的jar文件的路径 启动即可.
config节点内容如下:
- {
- "jobName": "files‐job",
- "jobClass": "com.itheima.scheduling.job.FileBackupJob",
- "jobType": "SIMPLE",
- "cron": "0/3 * * * * ?",
- "shardingTotalCount": 1,
- "shardingItemParameters": "",
- "jobParameter": "",
- "failover": true,
- "misfire": true,
- "description": "",
- "jobProperties": {
- "job_exception_handler":
- "com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler",
- "executor_service_handler":
- "com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler"
- },
- "monitorExecution": true,
- "maxTimeDiffSeconds": ‐1,
- "monitorPort": ‐1,
- "jobShardingStrategyClass": "",
- "reconcileIntervalMinutes": 10,
- "disabled": false,
- "overwrite": false
- }
节点记录了任务的配置信息,包含执行类,cron表达式,分片算法类,分片数量,分片参数。默认状态下,如果你修改了Job的配置比如cron表达式,分片数量等是不会更新到zookeeper上去的,需要把LiteJobConfifiguration的参数overwrite修改成true,或者删除zk的结点再启动作业重新创建。
instances节点:
同一个Job下的elastic-job的部署实例。一台机器上可以启动多个Job实例,也就是Jar包。instances的命名是[IP+@-@+PID]。
leader节点:
任务实例的主节点信息,通过zookeeper的主节点选举,选出来的主节点信息。下面的子节点分为election,sharding和failover三个子节点。分别用于主节点选举,分片和失效转移处理。election下面的instance节点显式了当前主节点的实例ID:jobInstanceId。latch节点也是一个永久节点用于选举时候的实现分布式锁。
sharding节点下面有一个临时节点necessary,是否需要重新分片的标记,如果分片总数变化或任务实例节点上下 线,以及主节点选举,都会触发设置重分片标记,主节点会进行分片计算。
sharding节点:
任务的分片信息,子节点是分片项序号,从零开始,至分片总数减一。从这个节点可以看出哪个 分片在哪个实例上运行
3.2.2 Elastic-Job任务执行实例选举
Elastic-Job使用ZooKeeper实现任务执行实例选举,若要使用ZooKeeper完成选举,就需要了解ZooKeeper的znode类型了,ZooKeeper有四种类型的znode,客户端在创建znode时可以指定:
PERSISTENT-持久化目录节点
客户端创建该类型znode,此客户端与ZooKeeper断开连接后该节点依旧存在,如果创建了重复的key,比如/data,第二次创建会失败。
PERSISTENT_SEQUENTIAL-持久化顺序编号目录节点
客户端与ZooKeeper断开连接后该节点依旧存在,允许重复创建相同key,Zookeeper给该节点名称进行顺序编号,如zk会在后面加一串数字比如 /data/data0000000001,如果重复创建,会创建一个/data/data0000000002节点(一直往后加1)
EPHEMERAL-临时目录节点
客户端与ZooKeeper断开连接后,该节点被删除,不允许重复创建相同key。
EPHEMERAL_SEQUENTIAL-临时顺序编号目录节点
客户端与ZooKeeper断开连接后,该节点被删除,允许重复创建相同key,依然采取顺序编号机制。
实例选举实现过程分析:
每个Elastic-Job的任务执行实例作为ZooKeeper的客户端来操作ZooKeeper的znode
通过本章,我们完成了对Elastic-Job技术的快速入门程序,并了解了Elastic-Job整体架构和工作原理。
对于应用程序,只需要将任务执行细节包装为ElasticJob接口的实现类并对任务细节进行配置即可完成与Elastic-Job的集成,而Elastic-Job需要依赖Zookeeper进行执行任务信息的存取,执行任务实例的选举。通过对快速入门程序的测试,我们可以看到Elastic-Job确实解决了分布式任务调度的核心问题。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。