赞
踩
根据配置xxl.job.executor.logpath
在该路径下按照日期创建文件夹,logId.log为日志名创建日志文件进行记录
job执行日志记录
JobThread.run()
// log filename, like "logPath/yyyy-MM-dd/9999.log" //创建日志目录。在xxl.job.executor.logpath路径下创建创建目录,返回存储日志文件路径 String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId()); XxlJobContext xxlJobContext = new XxlJobContext( triggerParam.getJobId(), triggerParam.getExecutorParams(), logFileName, triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()); // init job context XxlJobContext.setXxlJobContext(xxlJobContext); // execute //创建日志文件并记录 XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());
调用XxlJobHelper.log,填充参数,然后将内容写入到对应的文件中
public static boolean log(String appendLogPattern, Object ... appendLogArguments) { //参数填充,将{}替换值 FormattingTuple ft = MessageFormatter.arrayFormat(appendLogPattern, appendLogArguments); String appendLog = ft.getMessage(); /*appendLog = appendLogPattern; if (appendLogArguments!=null && appendLogArguments.length>0) { appendLog = MessageFormat.format(appendLogPattern, appendLogArguments); }*/ //获取当前线程的调用栈 StackTraceElement callInfo = new Throwable().getStackTrace()[1]; return logDetail(callInfo, appendLog); } /** * append log * * @param callInfo * @param appendLog */ private static boolean logDetail(StackTraceElement callInfo, String appendLog) { XxlJobContext xxlJobContext = XxlJobContext.getXxlJobContext(); if (xxlJobContext == null) { return false; } /*// "yyyy-MM-dd HH:mm:ss [ClassName]-[MethodName]-[LineNumber]-[ThreadName] log"; StackTraceElement[] stackTraceElements = new Throwable().getStackTrace(); StackTraceElement callInfo = stackTraceElements[1];*/ //自定义日志的格式和内容 StringBuffer stringBuffer = new StringBuffer(); stringBuffer.append(DateUtil.formatDateTime(new Date())).append(" ") .append("["+ callInfo.getClassName() + "#" + callInfo.getMethodName() +"]").append("-") .append("["+ callInfo.getLineNumber() +"]").append("-") .append("["+ Thread.currentThread().getName() +"]").append(" ") .append(appendLog!=null?appendLog:""); String formatAppendLog = stringBuffer.toString(); // appendlog String logFileName = xxlJobContext.getJobLogFileName(); if (logFileName!=null && logFileName.trim().length()>0) { //将日志写到文件中 XxlJobFileAppender.appendLog(logFileName, formatAppendLog); return true; } else { logger.info(">>>>>>>>>>> {}", formatAppendLog); return false; } }
XxlJobFileAppender
创建日志文件,然后写入日志
public static String makeLogFileName(Date triggerDate, long logId) { // filePath/yyyy-MM-dd //在配置的日志目录下按照当前时间创建目录 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); // avoid concurrent problem, can not be static File logFilePath = new File(getLogPath(), sdf.format(triggerDate)); if (!logFilePath.exists()) { logFilePath.mkdir(); } // filePath/yyyy-MM-dd/9999.log //返回日志文件名 String logFileName = logFilePath.getPath() .concat(File.separator) .concat(String.valueOf(logId)) .concat(".log"); return logFileName; } /** * append log * * @param logFileName * @param appendLog */ public static void appendLog(String logFileName, String appendLog) { // log file if (logFileName==null || logFileName.trim().length()==0) { return; } File logFile = new File(logFileName); if (!logFile.exists()) { try { logFile.createNewFile(); } catch (IOException e) { logger.error(e.getMessage(), e); return; } } // log if (appendLog == null) { appendLog = ""; } appendLog += "\\r\\n"; // append file content FileOutputStream fos = null; try { fos = new FileOutputStream(logFile, true); fos.write(appendLog.getBytes("utf-8")); fos.flush(); } catch (Exception e) { logger.error(e.getMessage(), e); } finally { if (fos != null) { try { fos.close(); } catch (IOException e) { logger.error(e.getMessage(), e); } } } }
job的执行结果callback记录到对应的日志文件。和job执行日志存放在同一个文件中
TriggerCallbackThread
private void doCallback(List<HandleCallbackParam> callbackParamList){ boolean callbackRet = false; // callback, will retry if error for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { try { //调用调度中心的回调接口,集群中成功一个即可 ReturnT<String> callbackResult = adminBiz.callback(callbackParamList); if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) { callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish."); callbackRet = true; break; } else { //记录日志 callbackLog(callbackParamList, "<br>----------- xxl-job job callback fail, callbackResult:" + callbackResult); } } catch (Exception e) { callbackLog(callbackParamList, "<br>----------- xxl-job job callback error, errorMsg:" + e.getMessage()); } } if (!callbackRet) { //发送callback信息失败,会记录到本地重新发送 appendFailCallbackFile(callbackParamList); } }
callback发送失败也会记录日志。放到callbacklog的文件夹中
private static String failCallbackFilePath = XxlJobFileAppender.getLogPath().concat(File.separator).concat("callbacklog").concat(File.separator); private static String failCallbackFileName = failCallbackFilePath.concat("xxl-job-callback-{x}").concat(".log"); private void appendFailCallbackFile(List<HandleCallbackParam> callbackParamList){ // valid if (callbackParamList==null || callbackParamList.size()==0) { return; } // append file byte[] callbackParamList_bytes = JdkSerializeTool.serialize(callbackParamList); //创建callback日志,将发送失败的记录下来,进行重新发送 File callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis()))); if (callbackLogFile.exists()) { for (int i = 0; i < 100; i++) { callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis()).concat("-").concat(String.valueOf(i)) )); if (!callbackLogFile.exists()) { break; } } } FileUtil.writeFileContent(callbackLogFile, callbackParamList_bytes); }
XxlJobExecutor
public void start() throws Exception {
// init logpath
//创建日志文件夹,将记录日志的path修改成配置文件配置的
XxlJobFileAppender.initLogPath(logPath);
// init JobLogFileCleanThread
//清除日志
JobLogFileCleanThread.getInstance().start(logRetentionDays);
}
JobLogFileCleanThread
public void start(final long logRetentionDays){ // limit min value //值小于3无效 if (logRetentionDays < 3 ) { return; } localThread = new Thread(new Runnable() { @Override public void run() { while (!toStop) { try { // clean log dir, over logRetentionDays //查找日志文件下的文件夹 File[] childDirs = new File(XxlJobFileAppender.getLogPath()).listFiles(); if (childDirs!=null && childDirs.length>0) { // today //获取今天的时间 Calendar todayCal = Calendar.getInstance(); //将时、分、秒、毫秒清零。假设超时五天清除,第五天的0点清除和12点清除有什么区别? todayCal.set(Calendar.HOUR_OF_DAY,0); todayCal.set(Calendar.MINUTE,0); todayCal.set(Calendar.SECOND,0); todayCal.set(Calendar.MILLISECOND,0); Date todayDate = todayCal.getTime(); for (File childFile: childDirs) { // valid if (!childFile.isDirectory()) { continue; } if (childFile.getName().indexOf("-") == -1) { continue; } // file create date Date logFileCreateDate = null; try { SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd"); logFileCreateDate = simpleDateFormat.parse(childFile.getName()); } catch (ParseException e) { logger.error(e.getMessage(), e); } if (logFileCreateDate == null) { continue; } if ((todayDate.getTime()-logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000) ) { //删除文件夹及其中的所有文件夹和文件 FileUtil.deleteRecursively(childFile); } } } } catch (Exception e) { if (!toStop) { logger.error(e.getMessage(), e); } } try { TimeUnit.DAYS.sleep(1); } catch (InterruptedException e) { if (!toStop) { logger.error(e.getMessage(), e); } } } logger.info(">>>>>>>>>>> xxl-job, executor JobLogFileCleanThread thread destroy."); } }); localThread.setDaemon(true); localThread.setName("xxl-job, executor JobLogFileCleanThread"); localThread.start(); }
xxl-job执行任务时,会将任务完成结果写到callbackQueue中
在客户端存在线程读取这个队列的内容写回服务端,记录下来。
在1.2callback日志记录中,发送失败的callback会记录日志。步骤是:
public void start() { // valid if (XxlJobExecutor.getAdminBizList() == null) { logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null."); return; } // callback triggerCallbackThread = new Thread(new Runnable() { @Override public void run() { // normal callback while(!toStop){ try { HandleCallbackParam callback = getInstance().callBackQueue.take(); //可以取到值,则将值全部取出来 if (callback != null) { // callback list param List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>(); int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList); callbackParamList.add(callback); // callback, will retry if error if (callbackParamList!=null && callbackParamList.size()>0) { doCallback(callbackParamList); } } } catch (Exception e) { if (!toStop) { logger.error(e.getMessage(), e); } } } // last callback //上述循环结束,存在的callback内容写回服务端 try { List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>(); int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList); if (callbackParamList!=null && callbackParamList.size()>0) { doCallback(callbackParamList); } } catch (Exception e) { if (!toStop) { logger.error(e.getMessage(), e); } } logger.info(">>>>>>>>>>> xxl-job, executor callback thread destroy."); } }); triggerCallbackThread.setDaemon(true); triggerCallbackThread.setName("xxl-job, executor TriggerCallbackThread"); triggerCallbackThread.start(); // retry triggerRetryCallbackThread = new Thread(new Runnable() { @Override public void run() { while(!toStop){ try { //重新发送失败的callback。 retryFailCallbackFile(); } catch (Exception e) { if (!toStop) { logger.error(e.getMessage(), e); } } try { TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT); } catch (InterruptedException e) { if (!toStop) { logger.error(e.getMessage(), e); } } } logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destroy."); } }); triggerRetryCallbackThread.setDaemon(true); triggerRetryCallbackThread.start(); }
调度任务的时候会记录日志,在客户端callback时,完善job的执行结果信息。清除日志
服务器进行调度的时在xxl_job_log表中记录了日志
// 1、save log-id //xxl_job_log中存储执行日志 XxlJobLog jobLog = new XxlJobLog(); jobLog.setJobGroup(jobInfo.getJobGroup()); jobLog.setJobId(jobInfo.getId()); jobLog.setTriggerTime(new Date()); XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog); logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId()); ...... jobLog.setExecutorAddress(address); jobLog.setExecutorHandler(jobInfo.getExecutorHandler()); jobLog.setExecutorParam(jobInfo.getExecutorParam()); jobLog.setExecutorShardingParam(shardingParam); jobLog.setExecutorFailRetryCount(finalFailRetryCount); //jobLog.setTriggerTime(); jobLog.setTriggerCode(triggerResult.getCode()); jobLog.setTriggerMsg(triggerMsgSb.toString()); XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);
客户端回调最终会调用以下函数保存日志
调用链JobApiController.api(…)→AdminBizImpl.callback(…)→JobCompleteHelper.callback(…)
private ReturnT<String> callback(HandleCallbackParam handleCallbackParam) { // valid log item //根据参数中的logId从xxl_job_log中查询日志 XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(handleCallbackParam.getLogId()); if (log == null) { return new ReturnT<String>(ReturnT.FAIL_CODE, "log item not found."); } if (log.getHandleCode() > 0) { return new ReturnT<String>(ReturnT.FAIL_CODE, "log repeate callback."); // avoid repeat callback, trigger child job etc } // handle msg StringBuffer handleMsg = new StringBuffer(); if (log.getHandleMsg()!=null) { handleMsg.append(log.getHandleMsg()).append("<br>"); } if (handleCallbackParam.getHandleMsg() != null) { handleMsg.append(handleCallbackParam.getHandleMsg()); } // success, save log log.setHandleTime(new Date()); log.setHandleCode(handleCallbackParam.getHandleCode()); log.setHandleMsg(handleMsg.toString()); XxlJobCompleter.updateHandleInfoAndFinish(log); return ReturnT.SUCCESS; }
JobLogReportHelper.run()
//一天检查一次,清除之后更新lastCleanLogTime时间 if (XxlJobAdminConfig.getAdminConfig().getLogretentiondays()>0 && System.currentTimeMillis() - lastCleanLogTime > 24*60*60*1000) { // expire-time //初始化过期时间 Calendar expiredDay = Calendar.getInstance(); expiredDay.add(Calendar.DAY_OF_MONTH, -1 * XxlJobAdminConfig.getAdminConfig().getLogretentiondays()); expiredDay.set(Calendar.HOUR_OF_DAY, 0); expiredDay.set(Calendar.MINUTE, 0); expiredDay.set(Calendar.SECOND, 0); expiredDay.set(Calendar.MILLISECOND, 0); Date clearBeforeTime = expiredDay.getTime(); // clean expired log List<Long> logIds = null; do { logIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findClearLogIds(0, 0, clearBeforeTime, 0, 1000); if (logIds!=null && logIds.size()>0) { XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().clearLog(logIds); } //循环删除一次1000条 } while (logIds!=null && logIds.size()>0); // update clean time lastCleanLogTime = System.currentTimeMillis(); }
客户端:
执行job的日志 存放在logpath/日期/logId.log中 并且会定时清理,保存时长为logretentiondays
job的执行结果会存到callback队列中,会有线程发送到服务端保存。callback结果(成功与否)也会保存相应的job日志中。callback如果发送给服务端失败,则会记录callback发送失败的日志,保存到callbacklog文件夹中,会有线程进行重试。
重试的步骤:
服务端:
将客户端callback回来job的信息进行日志记录。同时每天对日志进行扫描,删除过期的日志
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。