赞
踩
我们有个task服务,这个服务是双节点,即两台服务器上都部署有这个服务,但是每天的定时任务我们只想执行一次,于是我们想到用数据库的行锁来实现这种功能。
核心代码是JobLocksMo lock = brJobLocksMongoDaoImpl.lock(jobLocksMo.getId(), jobLocksMo.getVersion());这块,当拿到任务Id后,判断当前任务是否被执行,如果没有被执行则给任务上锁,并把版本加1,这样下一个线程进来就没法更新数据库行记录
(在 MongoDB 中,findAndModify 方法用于查找文档并对其进行修改。如果查询条件找不到文档,findAndModify 方法将返回 null,如果更新成功则返回最新的对象)。
@Async("taskExecutor") @Scheduled(cron = "0 0 5 9 12 ?") public void updateAgentReport() { log.info("处理顾问历史数据,开始...."); JobLocksMo jobLocksMo = brJobLocksMongoDaoImpl.findById("br_data_job_lock_report_agent"); if (null == jobLocksMo || jobLocksMo.getStatus() == 1) { log.info("任务锁未释放,不执行后续操作"); return; } StopWatch watch = new StopWatch(); watch.start(); try { log.info("br_data_job_lock_report_agent status : " + jobLocksMo.getStatus()); if (jobLocksMo.getStatus() == 0) { // 如果当前任务处于空闲状态则给当前任务加锁,核对版本正确才更新 JobLocksMo lock = brJobLocksMongoDaoImpl.lock(jobLocksMo.getId(), jobLocksMo.getVersion()); if (null != lock) { LocalDateRangeBean localDateRangeBean = new LocalDateRangeBean(); localDateRangeBean.setStartDate(LocalDate.of(2019, 8, 1)); localDateRangeBean.setEndDate(LocalDate.now()); historyServiceImpl.updateAgentReport(localDateRangeBean); } } } catch (Exception e) { log.error("12.9 5点处理顾问历史数据异常:", e); } finally { watch.stop(); // 只要是给当前任务加上了锁最后必须释放锁 if (jobLocksMo.getStatus() == 0) { brJobLocksMongoDaoImpl.unlock(jobLocksMo.getId()); } } }
public JobLocksMo lock(String id, Long version) {
Query query = new Query();
Criteria criteria = new Criteria();
criteria.and("id").is(id);
criteria.and("status").is(0);
criteria.and("version").is(version);
query.addCriteria(criteria);
Update update = new Update();
update.set("status", 1);
update.inc("version", 1);
update.set("updateId", "system_job_lock");
update.set("updateTime", LocalDateTime.now());
return brMongoTemplate.findAndModify(query, update, JobLocksMo.class);
}
public JobLocksMo unlock(String id) {
Query query = new Query();
Criteria criteria = new Criteria();
criteria.and("id").is(id);
criteria.and("status").is(1);
query.addCriteria(criteria);
Update update = new Update();
update.set("status", 0);
update.set("updateId", "system_job_unlock");
update.set("updateTime", LocalDateTime.now());
return brMongoTemplate.findAndModify(query, update, JobLocksMo.class);
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。