赞
踩
目录
・ER图生成 (使用Eclipse重点 ERMaster插件)
・【BATCH_JOB_EXECUTION_PARAMS】 表的说明
三.启动类,参数 (使用Eclipse中的【実行の構成】(Run Configurantions) )
1.SpringApplication.java [ spring-boot-xxx.jar]
★ 有 ①【ApplicationRunner】,②【CommandLineRunner】两种形式 ★
2.JobLauncherApplicationRunner.java [ spring-boot-autoconfigure-xxx.jar]
launchJobFromProperties(Properties properties)
★扩展:args.getNonOptionArgs(); // 获取没有key的值 ===START
executeLocalJobs(JobParameters jobParameters)
execute(Job job, JobParameters jobParameters)
JobParameters getNextJobParameters(Job job, JobParameters jobParameters)
3.JobParametersBuilder.java [ spring-batch-core-xxx.jar]
★★★★★★★关于参数 parameters中的 【所有】参数的取得 ★★★★★★★
4.RunIdIncrementer.java [ spring-boot-core-xxx.jar]
★★★取得已经存在的run.id、加1处理,不存在的话,初始值1★★★
getNext(@Nullable JobParameters parameters) {
2.JobLauncherApplicationRunner.java(继续执行) [ spring-boot-autoconfigure-xxx.jar]
★★★关于参数 parameters中的 run.id ★★★
run(final Job job, final JobParameters jobParameters)
【这里的Log参数,与Batch启动时,传入的参数无关,与上一次的参数有关】
6.SimpleJobRepository.java [30行]
createJobExecution(String jobName, JobParameters jobParameters)
★★★本次整理原因★★★由于【run.id】对应的值过小,出现行下面9行(大段代码26行)的如下错误
JobInstance jobInstance = this.jobInstanceDao.getJobInstance(jobName, jobParameters);
getJobInstance(String jobName, JobParameters jobParameters)
★★ ★★ ★★ 如果生成了和以前 (同JobName)重复的值 ,就凉凉了,系统一定会报错 ★★ ★★ ★★
7.1 DefaultJobKeyGenerator.java
★★★↓出错根本原因(Cannot find any job execution for job instance: )★★★
6.SimpleJobRepository.java (继续执行) [55行]
createJobInstance(String jobName, JobParameters jobParameters) {
6.SimpleJobRepository.java (继续执行) [65行]
createJobExecution(String jobName, JobParameters jobParameters)
saveJobExecution(JobExecution jobExecution) {
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>com.sxz</groupId>
- <artifactId>test001</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- </properties>
-
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>2.3.10.RELEASE</version>
- </parent>
-
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-batch</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-thymeleaf</artifactId>
- </dependency>
-
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- <configuration>
- <includeSystemScope>true</includeSystemScope>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
- </project>
---
从【BATCH_JOB_INSTANCE】 表开始(这个表只有一条记录,对应其它表,多条记录)
Eclipse中的,ER图生成工具:【ERMaster】_sun0322的博客-CSDN博客
SpringBatch学习_sun0322的博客-CSDN博客
- CREATE TABLE BATCH_JOB_SEQ (ID BIGINT NOT NULL) type=InnoDB;
- INSERT INTO BATCH_JOB_SEQ values(0);
-
- CREATE TABLE BATCH_JOB_EXECUTION_SEQ (ID BIGINT NOT NULL) type=InnoDB;
- INSERT INTO BATCH_JOB_EXECUTION_SEQ values(0);
-
- CREATE TABLE BATCH_STEP_EXECUTION_SEQ (ID BIGINT NOT NULL) type=InnoDB;
- INSERT INTO BATCH_STEP_EXECUTION_SEQ values(0);
----
报下面错误时
ERROR 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'type=InnoDB' at line 1
使用下面语句
- CREATE TABLE BATCH_JOB_SEQ (ID BIGINT NOT NULL) ENGINE=InnoDB;
- INSERT INTO BATCH_JOB_SEQ values(0);
-
- CREATE TABLE BATCH_JOB_EXECUTION_SEQ (ID BIGINT NOT NULL) ENGINE=InnoDB;
- INSERT INTO BATCH_JOB_EXECUTION_SEQ values(0);
-
- CREATE TABLE BATCH_STEP_EXECUTION_SEQ (ID BIGINT NOT NULL) ENGINE=InnoDB;
- INSERT INTO BATCH_STEP_EXECUTION_SEQ values(0);
・一个JOB_EXECUTION_ID,可用对应多条记录(record)
・每条记录的【KEY_NAME】不相同
・一般,有一条记录的【KEY_NAME】为「run.id」,对应的值【LONG_VAL】有值, 是「数字」
・但是使用Junit执行时,没有「run.id」,取代它的是「random」
・其他的记录,是启动时的参数,比如【KEY_NAM】spring.batch.job.names、对应的值【STRING_VAL】有值, 是「yourJobName」
下面前5个表的数据都被删除了,只有第六个表的数据没有被删除。
- delete from BATCH_JOB_EXECUTION_PARAMS;
-
- delete from BATCH_STEP_EXECUTION_CONTEXT;
-
- delete from BATCH_JOB_EXECUTION_CONTEXT;
-
- delete from BATCH_STEP_EXECUTION;
-
- delete from BATCH_JOB_EXECUTION;
-
- -- delete from BATCH_JOB_INSTANCE;
因为都被删除了,
所以,run.id从1开始, 在第六个表【BATCH_JOB_INSTANCE】中,找到了数据。
但是,其它表中的数据都已经被删除了,所以,Log中,出了下面的错误
Cannot find any job execution for job instance: id= XXXX
这个instance id 就是,第六个表【BATCH_JOB_INSTANCE】中的值。
根据下面的分析得知,【BATCH_JOB_INSTANCE】的JOB_KEY由[run.id=1;]生成。
而,同一个JOB_NAME有重复的JOB_KEY,在运行的时候,就会使用之前的这JOB_KEY对应的JOB_INSTANCE_ID、因此,就产生了问题。
问:【BATCH_JOB_EXECUTION_PARAMS】 表中,KEY_NAME为【run.id】时,对应的值,LONG_VAL是如何【発番】的
答:如果都被删除,从1开始
问:【BATCH_JOB_EXECUTION_PARAMS】 表中,【run.id】的值,如果都被删除了,重新从1开始,对JobInstanceID是否有影响。
答:影响
启动类(Main Class):
被@SpringBootApplication标注的类
参数(Arugments):
--spring.profiles.active=prod --spring.batch.job.names=yourJobName --spring.config.location=xxx\xxx\xxx\application.yml
---
下面的25行 执行
- public class SpringApplication {
-
- .......
- public ConfigurableApplicationContext run(String... args) {
- StopWatch stopWatch = new StopWatch();
- stopWatch.start();
- ConfigurableApplicationContext context = null;
- configureHeadlessProperty();
- SpringApplicationRunListeners listeners = getRunListeners(args);
- listeners.starting();
- try {
- ApplicationArguments applicationArguments = new DefaultApplicationArguments(args);
- ConfigurableEnvironment environment = prepareEnvironment(listeners, applicationArguments);
- configureIgnoreBeanInfo(environment);
- Banner printedBanner = printBanner(environment);
- context = createApplicationContext();
- prepareContext(context, environment, listeners, applicationArguments, printedBanner);
- refreshContext(context);
- afterRefresh(context, applicationArguments);
- stopWatch.stop();
- if (this.logStartupInfo) {
- new StartupInfoLogger(this.mainApplicationClass).logStarted(getApplicationLog(), stopWatch);
- }
- listeners.started(context);
- callRunners(context, applicationArguments);
- }
- catch (Throwable ex) {
- handleRunFailure(context, ex, listeners);
- throw new IllegalStateException(ex);
- }
-
- try {
- listeners.running(context);
- }
- catch (Throwable ex) {
- handleRunFailure(context, ex, null);
- throw new IllegalStateException(ex);
- }
- return context;
- }
---
下面的11行 执行
spring boot:ApplicationRunner和CommandLineRunner用法区别_小贼驴的博客-CSDN博客
业务场景:
应用服务启动时,加载一些数据和执行一些应用的初始化动作。如:删除临时文件,清除缓存信息,读取配置文件信息,数据库连接等。
1、SpringBoot提供了CommandLineRunner和ApplicationRunner接口。当接口有多个实现类时,提供了@order注解实现自定义执行顺序,也可以实现Ordered接口来自定义顺序。
注意:数字越小,优先级越高,也就是@Order(1)注解的类会在@Order(2)注解的类之前执行。
两者的区别在于:
ApplicationRunner中run方法的参数为ApplicationArguments,而CommandLineRunner接口中run方法的参数为String数组。想要更详细地获取命令行参数,那就使用ApplicationRunner接口
- 共同点:
-
- 其一 执行时机都是在容器启动完成的时候进行执行;其二 这两个接口中都有一个run()方法;
-
- 如果程序里既有ApplicationRunner ,也有CommandLineRunner代码 ,
-
- 则ApplicationRunner 先运行,而CommandLineRunner 后运行。
-
- 不同点:
-
- ApplicationRunner中run方法的参数为ApplicationArguments,
-
- 而CommandLineRunner接口中run方法的参数为String数组。
-
- 问题提出: 如果有多个实现类,而我们需要按照一定的顺序执行的话,那么应该怎么办呢?
-
- 解决方案:
-
- 方法1)可以在实现类上加上@Order注解指定执行的顺序;
-
- 方法2)可以在实现类上实现Ordered接口来标识。
-
- 需要注意:数字越小,优先级越高,也就是@Order(1)注解的类会在@Order(2)注解的类之前执行。
这里①被执行 (下面代码第11行)
- public class SpringApplication {
-
- .......
- private void callRunners(ApplicationContext context, ApplicationArguments args) {
- List<Object> runners = new ArrayList<>();
- runners.addAll(context.getBeansOfType(ApplicationRunner.class).values());
- runners.addAll(context.getBeansOfType(CommandLineRunner.class).values());
- AnnotationAwareOrderComparator.sort(runners);
- for (Object runner : new LinkedHashSet<>(runners)) {
- if (runner instanceof ApplicationRunner) {
- callRunner((ApplicationRunner) runner, args);
- }
- if (runner instanceof CommandLineRunner) {
- callRunner((CommandLineRunner) runner, args);
- }
- }
- }
---
- public class SpringApplication {
-
- .......
-
- private void callRunner(ApplicationRunner runner, ApplicationArguments args) {
- try {
- (runner).run(args);
- }
- catch (Exception ex) {
- throw new IllegalStateException("Failed to execute ApplicationRunner", ex);
- }
- }
---
下面的 11,16,21,22 行 依次 被 执行
第6行的方法中,会舍弃 【--xxx=xxx】的所有参数!!!
所以,根据第17行显示如下Log信息
o.s.b.a.b.JobLauncherApplicationRunner Running default command line with:[]
----
- public class JobLauncherApplicationRunner implements ApplicationRunner, Ordered, ApplicationEventPublisherAware {
-
- /**
- * The default order for the command line runner.
- */
- public static final int DEFAULT_ORDER = 0;
-
- .......
-
- @Override
- public void run(ApplicationArguments args) throws Exception {
- String[] jobArguments = args.getNonOptionArgs().toArray(new String[0]);
- run(jobArguments);
- }
-
- public void run(String... args) throws JobExecutionException {
- logger.info("Running default command line with: " + Arrays.asList(args));
- launchJobFromProperties(StringUtils.splitArrayElementsIntoProperties(args, "="));
- }
-
- protected void launchJobFromProperties(Properties properties) throws JobExecutionException {
- JobParameters jobParameters = this.converter.getJobParameters(properties);
- executeLocalJobs(jobParameters);
- executeRegisteredJobs(jobParameters);
- }
- .......
---
启动jar包
java -jar hello.jar --name=sxz --age=2 hello word
====
- @Component
- @Order(99)
- public class MyApplicationRunner01 implements ApplicationRunner {
- @Override
- public void run(ApplicationArguments args) throws Exception {
- String[] sourceArgs = args.getSourceArgs();//获取启动的所有参数
- System.out.println("sourceArgs:" + Arrays.toString(sourceArgs));
- List<String> nonOptionArgs = args.getNonOptionArgs(); // 获取没有key的值
- System.out.println("nonOptionArgs:" + nonOptionArgs);
- Set<String> optionNames = args.getOptionNames();// 获取key-vlue形式值的key
- for (String optionName : optionNames) {
- // 获取获取key-vlue的值
- System.out.println(optionName + ":" + args.getOptionValues(optionName));
- }
- System.out.println(">>>>>>>>>>>>>>>MyApplicationRunner01结束>>>>>>>>>>>>>>>>");
- }
- }
-
---
- sourceArgs:[--name=sxz, --age=2, hello, word]
- nonOptionArgs:[hello, word]
- name:[sxz]
- age:[2]
- >>>>>>>>>>>>>>>MyApplicationRunner01结束>>>>>>>>>>>>>>>>
---
★扩展:args.getNonOptionArgs(); // 获取没有key的值 ===END
- private void executeLocalJobs(JobParameters jobParameters) throws JobExecutionException {
- for (Job job : this.jobs) {
- if (StringUtils.hasText(this.jobNames)) {
- String[] jobsToRun = this.jobNames.split(",");
- if (!PatternMatchUtils.simpleMatch(jobsToRun, job.getName())) {
- logger.debug(LogMessage.format("Skipped job: %s", job.getName()));
- continue;
- }
- }
- execute(job, jobParameters);
- }
- }
----
先执行第四行,(取得、设定Parameters的值 (run.id))
后面回来,接着执行第5行。
- protected void execute(Job job, JobParameters jobParameters)
- throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException,
- JobParametersInvalidException, JobParametersNotFoundException {
- JobParameters parameters = getNextJobParameters(job, jobParameters);
- JobExecution execution = this.jobLauncher.run(job, parameters);
- if (this.publisher != null) {
- this.publisher.publishEvent(new JobExecutionEvent(execution));
- }
- }
----
下面的9行 执行
- private JobParameters getNextJobParameters(Job job, JobParameters jobParameters) {
- if (this.jobRepository != null && this.jobRepository.isJobInstanceExists(job.getName(), jobParameters)) {
- return getNextJobParametersForExisting(job, jobParameters);
- }
- if (job.getJobParametersIncrementer() == null) {
- return jobParameters;
- }
- JobParameters nextParameters = new JobParametersBuilder(jobParameters, this.jobExplorer)
- .getNextJobParameters(job).toJobParameters();
- return merge(nextParameters, jobParameters);
- }
- public class CommandLineJobRunner {
- public class SimpleJobOperator implements JobOperator, InitializingBean {
-
- public class JobLauncherApplicationRunner implements ApplicationRunner, Ordered, ApplicationEventPublisherAware {
下面的15行是重点
- JobExecution previousExecution =
- this.jobExplorer.getLastJobExecution(lastInstance);
根据JobName,从【 batch_job_instance】 表中获取数据,
这个表中有,job_instance_id, version, job_name , job_key 字段
因为有 job_name , 即使多个job 同时使用这一张表,
也可以取得, 对应 jobname的 上一次的 job_instance_id)
下面的21行, 使参数【run.id】的值加 1
- nextParameters = incrementer.getNext(
- previousExecution.getJobParameters());
===
- public JobParametersBuilder getNextJobParameters(Job job) {
- Assert.state(this.jobExplorer != null, "A JobExplorer is required to get next job parameters");
- Assert.notNull(job, "Job must not be null");
- Assert.notNull(job.getJobParametersIncrementer(), "No job parameters incrementer found for job=" + job.getName());
-
- String name = job.getName();
- JobParameters nextParameters;
- JobInstance lastInstance = this.jobExplorer.getLastJobInstance(name);
- JobParametersIncrementer incrementer = job.getJobParametersIncrementer();
- if (lastInstance == null) {
- // Start from a completely clean sheet
- nextParameters = incrementer.getNext(new JobParameters());
- }
- else {
- JobExecution previousExecution = this.jobExplorer.getLastJobExecution(lastInstance);
- if (previousExecution == null) {
- // Normally this will not happen - an instance exists with no executions
- nextParameters = incrementer.getNext(new JobParameters());
- }
- else {
- nextParameters = incrementer.getNext(previousExecution.getJobParameters());
- }
- }
-
- // start with parameters from the incrementer
- Map<String, JobParameter> nextParametersMap = new HashMap<>(nextParameters.getParameters());
- // append new parameters (overriding those with the same key)
- nextParametersMap.putAll(this.parameterMap);
- this.parameterMap = nextParametersMap;
- return this;
- }
----
- public class RunIdIncrementer implements JobParametersIncrementer {
-
- private static String RUN_ID_KEY = "run.id";
-
- private String key = RUN_ID_KEY;
-
- /**
- * The name of the run id in the job parameters. Defaults to "run.id".
- *
- * @param key the key to set
- */
- public void setKey(String key) {
- this.key = key;
- }
-
- /**
- * Increment the run.id parameter (starting with 1).
- *
- * @param parameters the previous job parameters
- * @return the next job parameters with an incremented (or initialized) run.id
- * @throws IllegalArgumentException if the previous value of run.id is invalid
- */
- @Override
- public JobParameters getNext(@Nullable JobParameters parameters) {
-
- JobParameters params = (parameters == null) ? new JobParameters() : parameters;
- JobParameter runIdParameter = params.getParameters().get(this.key);
- long id = 1;
- if (runIdParameter != null) {
- try {
- id = Long.parseLong(runIdParameter.getValue().toString()) + 1;
- }
- catch (NumberFormatException exception) {
- throw new IllegalArgumentException("Invalid value for parameter "
- + this.key, exception);
- }
- }
- return new JobParametersBuilder(params).addLong(this.key, id).toJobParameters();
- }
-
- }
--
第4行执行的时候,仅仅是把 run.id 加1
也就是说,如果上一次什么都没有,或者只是run.id那么,这次 就只是run.id。
如果上一次还有一些其它的值,比如下面这些
parameters: [{-spring.config.location=/xxx/xxx/xxx/application.xml, -spring.profiles.active=prod, run.id=123, -spring.batch.job.names=xxxxx}]
那么本次执行时,还会附带上这些参数
下面的5行执行 (【重要】 使用上面,4行处理中,生成的parameters !!!)
- protected void execute(Job job, JobParameters jobParameters)
- throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException,
- JobParametersInvalidException, JobParametersNotFoundException {
- JobParameters parameters = getNextJobParameters(job, jobParameters);
- JobExecution execution = this.jobLauncher.run(job, parameters);
- if (this.publisher != null) {
- this.publisher.publishEvent(new JobExecutionEvent(execution));
- }
- }
---
下面的47行是重点
下面的56行会显示log信息 (可以显示 【所有的】 参数 信息 !!! )
o.s.b.c.SimpleJobLauncher Job: [Simple:[name=xxxxxxx]] completed with the following parameters: [{-spring.config.location=/xxx/xxx/xxx/application.xml, -spring.profiles.active=prod, run.id=123, -spring.batch.job.names=xxxxx}]
====
- public class SimpleJobLauncher implements JobLauncher, InitializingBean {
-
- 。。。。。。。。
-
- @Override
- public JobExecution run(final Job job, final JobParameters jobParameters)
- throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException,
- JobParametersInvalidException {
-
- Assert.notNull(job, "The Job must not be null.");
- Assert.notNull(jobParameters, "The JobParameters must not be null.");
-
- final JobExecution jobExecution;
- JobExecution lastExecution = jobRepository.getLastJobExecution(job.getName(), jobParameters);
- if (lastExecution != null) {
- if (!job.isRestartable()) {
- throw new JobRestartException("JobInstance already exists and is not restartable");
- }
- /*
- * validate here if it has stepExecutions that are UNKNOWN, STARTING, STARTED and STOPPING
- * retrieve the previous execution and check
- */
- for (StepExecution execution : lastExecution.getStepExecutions()) {
- BatchStatus status = execution.getStatus();
- if (status.isRunning() || status == BatchStatus.STOPPING) {
- throw new JobExecutionAlreadyRunningException("A job execution for this job is already running: "
- + lastExecution);
- } else if (status == BatchStatus.UNKNOWN) {
- throw new JobRestartException(
- "Cannot restart step [" + execution.getStepName() + "] from UNKNOWN status. "
- + "The last execution ended with a failure that could not be rolled back, "
- + "so it may be dangerous to proceed. Manual intervention is probably necessary.");
- }
- }
- }
-
- // Check the validity of the parameters before doing creating anything
- // in the repository...
- job.getJobParametersValidator().validate(jobParameters);
-
- /*
- * There is a very small probability that a non-restartable job can be
- * restarted, but only if another process or thread manages to launch
- * <i>and</i> fail a job execution for this instance between the last
- * assertion and the next method returning successfully.
- */
- jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters);
-
- try {
- taskExecutor.execute(new Runnable() {
-
- @Override
- public void run() {
- try {
- if (logger.isInfoEnabled()) {
- logger.info("Job: [" + job + "] launched with the following parameters: [" + jobParameters
- + "]");
- }
- job.execute(jobExecution);
- if (logger.isInfoEnabled()) {
- Duration jobExecutionDuration = BatchMetrics.calculateDuration(jobExecution.getStartTime(), jobExecution.getEndTime());
- logger.info("Job: [" + job + "] completed with the following parameters: [" + jobParameters
- + "] and the following status: [" + jobExecution.getStatus() + "]"
- + (jobExecutionDuration == null ? "" : " in " + BatchMetrics.formatDuration(jobExecutionDuration)));
- }
- }
- catch (Throwable t) {
- if (logger.isInfoEnabled()) {
- logger.info("Job: [" + job
- + "] failed unexpectedly and fatally with the following parameters: [" + jobParameters
- + "]", t);
- }
- rethrow(t);
- }
- }
-
- private void rethrow(Throwable t) {
- if (t instanceof RuntimeException) {
- throw (RuntimeException) t;
- }
- else if (t instanceof Error) {
- throw (Error) t;
- }
- throw new IllegalStateException(t);
- }
- });
- }
- catch (TaskRejectedException e) {
- jobExecution.upgradeStatus(BatchStatus.FAILED);
- if (jobExecution.getExitStatus().equals(ExitStatus.UNKNOWN)) {
- jobExecution.setExitStatus(ExitStatus.FAILED.addExitDescription(e));
- }
- jobRepository.update(jobExecution);
- }
-
- return jobExecution;
- }
- public class SimpleJobRepository implements JobRepository {
-
- .......
-
- @Override
- public JobExecution createJobExecution(String jobName, JobParameters jobParameters)
- throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
-
- Assert.notNull(jobName, "Job name must not be null.");
- Assert.notNull(jobParameters, "JobParameters must not be null.");
-
- /*
- * Find all jobs matching the runtime information.
- *
- * If this method is transactional, and the isolation level is
- * REPEATABLE_READ or better, another launcher trying to start the same
- * job in another thread or process will block until this transaction
- * has finished.
- */
-
- JobInstance jobInstance = jobInstanceDao.getJobInstance(jobName, jobParameters);
- ExecutionContext executionContext;
-
- // existing job instance found
- if (jobInstance != null) {
-
- List<JobExecution> executions = jobExecutionDao.findJobExecutions(jobInstance);
-
- if (executions.isEmpty()) {
- throw new IllegalStateException("Cannot find any job execution for job instance: " + jobInstance);
- }
-
- // check for running executions and find the last started
下面的9行,15行是重点
・9行:生成 JOB_KEY (根据 JobParameters 的值来生成)
START ============ START
String jobKey = jobKeyGenerator.generateKey(jobParameters);
下面17行中, 对参数进行了筛选
if(jobParameter.isIdentifying()) {
identify 英 [aɪˈdɛntɪˌfaɪ] 识别;认出;辨认;确认;确定
这个字段来源于,batch_job_execution_params 表中的【identifying】字段。
如果只有run.id对应的这个字段为【Y】,那么
即使有多个参数,下面32行中, 变量【sringBuffer】的值是【run.id=xxx】
- public class DefaultJobKeyGenerator implements JobKeyGenerator<JobParameters> {
-
- /**
- * Generates the job key to be used based on the {@link JobParameters} instance
- * provided.
- */
- @Override
- public String generateKey(JobParameters source) {
-
- Assert.notNull(source, "source must not be null");
- Map<String, JobParameter> props = source.getParameters();
- StringBuilder stringBuffer = new StringBuilder();
- List<String> keys = new ArrayList<>(props.keySet());
- Collections.sort(keys);
- for (String key : keys) {
- JobParameter jobParameter = props.get(key);
- if(jobParameter.isIdentifying()) {
- String value = jobParameter.getValue()==null ? "" : jobParameter.toString();
- stringBuffer.append(key).append("=").append(value).append(";");
- }
- }
-
- MessageDigest digest;
- try {
- digest = MessageDigest.getInstance("MD5");
- } catch (NoSuchAlgorithmException e) {
- throw new IllegalStateException(
- "MD5 algorithm not available. Fatal (should be in the JDK).");
- }
-
- try {
- byte[] bytes = digest.digest(stringBuffer.toString().getBytes(
- "UTF-8"));
- return String.format("%032x", new BigInteger(1, bytes));
- } catch (UnsupportedEncodingException e) {
- throw new IllegalStateException(
- "UTF-8 encoding not available. Fatal (should be in the JDK).");
- }
- }
- }
END ============ END
・15行:使用JOB_NAME 和 JOB_KEY (9行生成) 在 【BATCH_JOB_INSTANCE】表中查询。
SELECT JOB_INSTANCE_ID, JOB_NAME from %PREFIX%JOB_INSTANCE where JOB_NAME = ? and JOB_KEY = ?
・正常情况是不应该查询到数据的,因为每次 JobParameters 中, 【run.id】的值都是不同的
・之所以会出错,是因为【BATCH_JOB_EXECUTION_PARAMS】表达数据被删除了,造成【run.id】的発番出现了问题。 重复使用了以前的值。
- public class JdbcJobInstanceDao extends AbstractJdbcBatchMetadataDao implements JobInstanceDao, InitializingBean {
-
- 。。。。。。。
-
- @Nullable
- public JobInstance getJobInstance(String jobName, JobParameters jobParameters) {
- Assert.notNull(jobName, "Job name must not be null.");
- Assert.notNull(jobParameters, "JobParameters must not be null.");
- String jobKey = this.jobKeyGenerator.generateKey(jobParameters);
- RowMapper<JobInstance> rowMapper = new JobInstanceRowMapper(this);
- List instances;
- if (StringUtils.hasLength(jobKey)) {
- instances = this.getJdbcTemplate()
- .query(this.getQuery(
- "SELECT JOB_INSTANCE_ID, JOB_NAME from %PREFIX%JOB_INSTANCE where JOB_NAME = ? and JOB_KEY = ?"),
- rowMapper, new Object[]{jobName, jobKey});
- } else {
- instances = this.getJdbcTemplate()
- .query(this.getQuery(
- "SELECT JOB_INSTANCE_ID, JOB_NAME from %PREFIX%JOB_INSTANCE where JOB_NAME = ? and (JOB_KEY = ? OR JOB_KEY is NULL)"),
- rowMapper, new Object[]{jobName, jobKey});
- }
-
- if (instances.isEmpty()) {
- return null;
- } else {
- Assert.state(instances.size() == 1, "instance count must be 1 but was " + instances.size());
- return (JobInstance) instances.get(0);
- }
- }
下面的55行是重点
- public class SimpleJobRepository implements JobRepository {
-
- .......
-
- @Override
- public JobExecution createJobExecution(String jobName, JobParameters jobParameters)
- throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
-
- Assert.notNull(jobName, "Job name must not be null.");
- Assert.notNull(jobParameters, "JobParameters must not be null.");
-
- /*
- * Find all jobs matching the runtime information.
- *
- * If this method is transactional, and the isolation level is
- * REPEATABLE_READ or better, another launcher trying to start the same
- * job in another thread or process will block until this transaction
- * has finished.
- */
-
- JobInstance jobInstance = jobInstanceDao.getJobInstance(jobName, jobParameters);
- ExecutionContext executionContext;
-
- // existing job instance found
- if (jobInstance != null) {
-
- List<JobExecution> executions = jobExecutionDao.findJobExecutions(jobInstance);
-
- if (executions.isEmpty()) {
- throw new IllegalStateException("Cannot find any job execution for job instance: " + jobInstance);
- }
-
- // check for running executions and find the last started
- for (JobExecution execution : executions) {
- if (execution.isRunning() || execution.isStopping()) {
- throw new JobExecutionAlreadyRunningException("A job execution for this job is already running: "
- + jobInstance);
- }
- BatchStatus status = execution.getStatus();
- if (status == BatchStatus.UNKNOWN) {
- throw new JobRestartException("Cannot restart job from UNKNOWN status. "
- + "The last execution ended with a failure that could not be rolled back, "
- + "so it may be dangerous to proceed. Manual intervention is probably necessary.");
- }
- if (execution.getJobParameters().getParameters().size() > 0 && (status == BatchStatus.COMPLETED || status == BatchStatus.ABANDONED)) {
- throw new JobInstanceAlreadyCompleteException(
- "A job instance already exists and is complete for parameters=" + jobParameters
- + ". If you want to run this job again, change the parameters.");
- }
- }
- executionContext = ecDao.getExecutionContext(jobExecutionDao.getLastJobExecution(jobInstance));
- }
- else {
- // no job found, create one
- jobInstance = jobInstanceDao.createJobInstance(jobName, jobParameters);
- executionContext = new ExecutionContext();
- }
-
- JobExecution jobExecution = new JobExecution(jobInstance, jobParameters, null);
- jobExecution.setExecutionContext(executionContext);
- jobExecution.setLastUpdated(new Date(System.currentTimeMillis()));
-
- // Save the JobExecution so that it picks up an ID (useful for clients
- // monitoring asynchronous executions):
- jobExecutionDao.saveJobExecution(jobExecution);
- ecDao.saveExecutionContext(jobExecution);
-
- return jobExecution;
-
- }
- public class JdbcJobInstanceDao extends AbstractJdbcBatchMetadataDao implements
- JobInstanceDao, InitializingBean {
-
- private static final String STAR_WILDCARD = "*";
-
- private static final String SQL_WILDCARD = "%";
-
- private static final String CREATE_JOB_INSTANCE = "INSERT into %PREFIX%JOB_INSTANCE(JOB_INSTANCE_ID, JOB_NAME, JOB_KEY, VERSION)"
- + " values (?, ?, ?, ?)";
-
- private static final String FIND_JOBS_WITH_NAME = "SELECT JOB_INSTANCE_ID, JOB_NAME from %PREFIX%JOB_INSTANCE where JOB_NAME = ?";
-
- private static final String FIND_JOBS_WITH_KEY = FIND_JOBS_WITH_NAME
-
- 。。。。。。。
- @Override
- public JobInstance createJobInstance(String jobName,
- JobParameters jobParameters) {
-
- Assert.notNull(jobName, "Job name must not be null.");
- Assert.notNull(jobParameters, "JobParameters must not be null.");
-
- Assert.state(getJobInstance(jobName, jobParameters) == null,
- "JobInstance must not already exist");
-
- Long jobId = jobIncrementer.nextLongValue();
-
- JobInstance jobInstance = new JobInstance(jobId, jobName);
- jobInstance.incrementVersion();
-
- Object[] parameters = new Object[] { jobId, jobName,
- jobKeyGenerator.generateKey(jobParameters), jobInstance.getVersion() };
- getJdbcTemplate().update(
- getQuery(CREATE_JOB_INSTANCE),
- parameters,
- new int[] { Types.BIGINT, Types.VARCHAR, Types.VARCHAR,
- Types.INTEGER });
-
- return jobInstance;
- }
- 。。。。。。。
===
下面的65行是重点
- public class SimpleJobRepository implements JobRepository {
-
- .......
-
- @Override
- public JobExecution createJobExecution(String jobName, JobParameters jobParameters)
- throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
-
- Assert.notNull(jobName, "Job name must not be null.");
- Assert.notNull(jobParameters, "JobParameters must not be null.");
-
- /*
- * Find all jobs matching the runtime information.
- *
- * If this method is transactional, and the isolation level is
- * REPEATABLE_READ or better, another launcher trying to start the same
- * job in another thread or process will block until this transaction
- * has finished.
- */
-
- JobInstance jobInstance = jobInstanceDao.getJobInstance(jobName, jobParameters);
- ExecutionContext executionContext;
-
- // existing job instance found
- if (jobInstance != null) {
-
- List<JobExecution> executions = jobExecutionDao.findJobExecutions(jobInstance);
-
- if (executions.isEmpty()) {
- throw new IllegalStateException("Cannot find any job execution for job instance: " + jobInstance);
- }
-
- // check for running executions and find the last started
- for (JobExecution execution : executions) {
- if (execution.isRunning() || execution.isStopping()) {
- throw new JobExecutionAlreadyRunningException("A job execution for this job is already running: "
- + jobInstance);
- }
- BatchStatus status = execution.getStatus();
- if (status == BatchStatus.UNKNOWN) {
- throw new JobRestartException("Cannot restart job from UNKNOWN status. "
- + "The last execution ended with a failure that could not be rolled back, "
- + "so it may be dangerous to proceed. Manual intervention is probably necessary.");
- }
- if (execution.getJobParameters().getParameters().size() > 0 && (status == BatchStatus.COMPLETED || status == BatchStatus.ABANDONED)) {
- throw new JobInstanceAlreadyCompleteException(
- "A job instance already exists and is complete for parameters=" + jobParameters
- + ". If you want to run this job again, change the parameters.");
- }
- }
- executionContext = ecDao.getExecutionContext(jobExecutionDao.getLastJobExecution(jobInstance));
- }
- else {
- // no job found, create one
- jobInstance = jobInstanceDao.createJobInstance(jobName, jobParameters);
- executionContext = new ExecutionContext();
- }
-
- JobExecution jobExecution = new JobExecution(jobInstance, jobParameters, null);
- jobExecution.setExecutionContext(executionContext);
- jobExecution.setLastUpdated(new Date(System.currentTimeMillis()));
-
- // Save the JobExecution so that it picks up an ID (useful for clients
- // monitoring asynchronous executions):
- jobExecutionDao.saveJobExecution(jobExecution);
- ecDao.saveExecutionContext(jobExecution);
-
- return jobExecution;
-
- }
--
下面的 4, 23行,是重点 // 向【BATCH_JOB_EXECUTION】表中插入数据
下面的16行,是重点 // 第16行执行之后,Job_Execution_ID 才有值 // jobExecution.setId(jobExecutionIncrementer.nextLongValue());
下面的28行,是重点 // 28行的时候,首次传递了Job_Execution_ID
- public class JdbcJobExecutionDao extends AbstractJdbcBatchMetadataDao implements JobExecutionDao, InitializingBean {
-
- 。。。。。。。
- private static final String SAVE_JOB_EXECUTION = "INSERT into %PREFIX%JOB_EXECUTION(JOB_EXECUTION_ID, JOB_INSTANCE_ID, START_TIME, "
- + "END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, VERSION, CREATE_TIME, LAST_UPDATED, JOB_CONFIGURATION_LOCATION) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
-
- 。。。。。。。
-
- @Override
- public void saveJobExecution(JobExecution jobExecution) {
-
- validateJobExecution(jobExecution);
-
- jobExecution.incrementVersion();
-
- jobExecution.setId(jobExecutionIncrementer.nextLongValue());
- Object[] parameters = new Object[] { jobExecution.getId(), jobExecution.getJobId(),
- jobExecution.getStartTime(), jobExecution.getEndTime(), jobExecution.getStatus().toString(),
- jobExecution.getExitStatus().getExitCode(), jobExecution.getExitStatus().getExitDescription(),
- jobExecution.getVersion(), jobExecution.getCreateTime(), jobExecution.getLastUpdated(),
- jobExecution.getJobConfigurationName() };
- getJdbcTemplate().update(
- getQuery(SAVE_JOB_EXECUTION),
- parameters,
- new int[] { Types.BIGINT, Types.BIGINT, Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR,
- Types.VARCHAR, Types.VARCHAR, Types.INTEGER, Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR });
-
- insertJobParameters(jobExecution.getId(), jobExecution.getJobParameters());
- }
下面的55行,将数据插入DB的向【BATCH_JOB_EXECUTION_PARAMS】表中插入数据
- public class JdbcJobExecutionDao extends AbstractJdbcBatchMetadataDao implements JobExecutionDao, InitializingBean {
-
- 。。。。。。。
-
-
- private static final String FIND_PARAMS_FROM_ID = "SELECT JOB_EXECUTION_ID, KEY_NAME, TYPE_CD, "
- + "STRING_VAL, DATE_VAL, LONG_VAL, DOUBLE_VAL, IDENTIFYING from %PREFIX%JOB_EXECUTION_PARAMS where JOB_EXECUTION_ID = ?";
-
- private static final String CREATE_JOB_PARAMETERS = "INSERT into %PREFIX%JOB_EXECUTION_PARAMS(JOB_EXECUTION_ID, KEY_NAME, TYPE_CD, "
- + "STRING_VAL, DATE_VAL, LONG_VAL, DOUBLE_VAL, IDENTIFYING) values (?, ?, ?, ?, ?, ?, ?, ?)";
-
- 。。。。。。。
- /**
- * Convenience method that inserts all parameters from the provided
- * JobParameters.
- *
- */
- private void insertJobParameters(Long executionId, JobParameters jobParameters) {
-
- for (Entry<String, JobParameter> entry : jobParameters.getParameters()
- .entrySet()) {
- JobParameter jobParameter = entry.getValue();
- insertParameter(executionId, jobParameter.getType(), entry.getKey(),
- jobParameter.getValue(), jobParameter.isIdentifying());
- }
- }
-
- /**
- * Convenience method that inserts an individual records into the
- * JobParameters table.
- */
- private void insertParameter(Long executionId, ParameterType type, String key,
- Object value, boolean identifying) {
-
- Object[] args = new Object[0];
- int[] argTypes = new int[] { Types.BIGINT, Types.VARCHAR,
- Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.BIGINT,
- Types.DOUBLE, Types.CHAR };
-
- String identifyingFlag = identifying? "Y":"N";
-
- if (type == ParameterType.STRING) {
- args = new Object[] { executionId, key, type, value, new Timestamp(0L),
- 0L, 0D, identifyingFlag};
- } else if (type == ParameterType.LONG) {
- args = new Object[] { executionId, key, type, "", new Timestamp(0L),
- value, new Double(0), identifyingFlag};
- } else if (type == ParameterType.DOUBLE) {
- args = new Object[] { executionId, key, type, "", new Timestamp(0L), 0L,
- value, identifyingFlag};
- } else if (type == ParameterType.DATE) {
- args = new Object[] { executionId, key, type, "", value, 0L, 0D, identifyingFlag};
- }
-
- getJdbcTemplate().update(getQuery(CREATE_JOB_PARAMETERS), args, argTypes);
- }
-
- 。。。。。。。
---
- package com.sxz.common.utils;
-
- import java.io.UnsupportedEncodingException;
- import java.math.BigInteger;
- import java.security.MessageDigest;
- import java.security.NoSuchAlgorithmException;
-
- public class GetMD5Info {
-
- public static void main (String[] args) {
-
- // 结尾有分号,应该是想用分号来,分割开来多个参数
- // 具体参照 7.1 中的源码 19 行
- // stringBuffer.append(key).append("=").append(value).append(";");
- System.out.println(EncoderByMd5("run.id=3;"));
-
- }
-
- public static String EncoderByMd5(String str) {
- if (str == null) {
- return null;
- }
-
- String strMD5 = "";
-
-
- try {
- // 确定计算方法
- MessageDigest md5 = MessageDigest.getInstance("MD5");
- // 加密后的字符串
- byte[] bytes = md5.digest(str.getBytes("utf-8"));
-
- strMD5 = String.format("%032x", new BigInteger(1, bytes));
-
- } catch (NoSuchAlgorithmException | UnsupportedEncodingException e) {
- return null;
- }
-
-
- return strMD5;
- }
-
- }
---
a3364faf893276dea0caacefbf618db5
---
因为有主外键,所有只能按照下面顺序,删除表中数据
- delete from BATCH_JOB_EXECUTION_PARAMS;
-
- delete from BATCH_STEP_EXECUTION_CONTEXT;
-
- delete from BATCH_JOB_EXECUTION_CONTEXT;
-
- delete from BATCH_STEP_EXECUTION;
-
- delete from BATCH_JOB_EXECUTION;
-
- delete from BATCH_JOB_INSTANCE;
====
1.删除【BATCH_JOB_INSTANCE】表中,对应的Log中的instanceid的记录
Cannot find any job execution for job instance: id= XXXX
2.再手动运行一下SpringBatch的JOB
同时,保留,刚刚运行完JOB后,SpringBatch表中的信息。
尤其时【BATCH_JOB_EXECUTION_PARAMS】表
因为这里面有, run.id
3.获得合适的run.id的值。 (保证在【BATCH_JOB_INSTANCE】表中,没有重复的值。)
(不过,不同的Jobname,重复也没有关系)
4.更新2中生成的【BATCH_JOB_EXECUTION_PARAMS】表中记录的run.id的值。
5.如果还有其它参数信息,一起补足。(下一次的参数信息,根据上一次而来。)
6.再次多次,连续执行Job,确认不再出错。
---
Java8新特性学习_001_(Lambda表达式,函数式接口,方法引用,Stream类,Optional类)_sun0322的博客-CSDN博客
===
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。