赞
踩
TaskTracker主要的功能是执行任务,其有两点操作:
(1)当TaskTracker启动时会根据当前TaskTracker资源定时向JobTracker
(2)TaskTracker接收到JobTracker推送的任务执行任务。
地址:https://github.com/ltsopensource/lts-examples/tree/master/lts-example-tasktracker
1、配置
- <context:component-scan base-package="com.github.ltsopensource.example"/>
-
- <bean id="taskTracker" class="com.github.ltsopensource.spring.TaskTrackerAnnotationFactoryBean" init-method="start">
- //任务执行类
- <property name="jobRunnerClass" value="com.github.ltsopensource.example.spring.SpringAnnotationJobRunner"/>
- <property name="clusterName" value="test_cluster"/>
- <property name="registryAddress" value="zookeeper://127.0.0.1:2181"/>
- <property name="nodeGroup" value="test_trade_TaskTracker"/>
- <property name="workThreads" value="64"/>
- <property name="configs">
- <props>
- <prop key="job.fail.store">mapdb</prop>
- </props>
- </property>
- </bean>
2、任务执行类实现JobRunner接口
- public class SpringAnnotationJobRunner implements JobRunner {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(SpringAnnotationJobRunner.class);
-
- @Autowired
- SpringBean springBean;
-
- @Override
- public Result run(JobContext jobContext) throws Throwable {
- try {
- Thread.sleep(1000L);
-
- springBean.hello();
-
- // TODO 业务逻辑
- LOGGER.info("我要执行:" + jobContext.getJob());
- BizLogger bizLogger = jobContext.getBizLogger();
- // 会发送到 LTS (JobTracker上)
- bizLogger.info("测试,业务日志啊啊啊啊啊");
-
- } catch (Exception e) {
- LOGGER.info("Run job failed!", e);
- return new Result(Action.EXECUTE_LATER, e.getMessage());
- }
- return new Result(Action.EXECUTE_SUCCESS, "执行成功了,哈哈");
- }
-
- }
在TaskTracker初始化时会启用定时任务,根据服务器线程资源请求向JobTracker发送获取任务的请求。
在TaskTracker的beforeStart中初始化时会初始化JobPullMachine会建立定时任务向JobTracker发送任务请求。
- @Override
- protected void beforeStart() {
- appContext.setMStatReporter(new TaskTrackerMStatReporter(appContext));
-
- appContext.setRemotingClient(remotingClient);
- // 设置 线程池
- appContext.setRunnerPool(new RunnerPool(appContext));
- appContext.getMStatReporter().start();
- //拉取任务
- appContext.setJobPullMachine(new JobPullMachine(appContext));
- appContext.setStopWorkingMonitor(new StopWorkingMonitor(appContext));
-
- appContext.getHttpCmdServer().registerCommands(
- new JobTerminateCmd(appContext)); // 终止某个正在执行的任务
- }
在JobPullMachine初始化时会监听注册中心中JobTracker节点的信息,如果存在这个节点,则会调用start方法,start方法中建立定时任务向JobTracker发送请求。
- public JobPullMachine(final TaskTrackerAppContext appContext) {
- this.appContext = appContext;
- this.jobPullFrequency = appContext.getConfig().getParameter(ExtConfig.JOB_PULL_FREQUENCY, Constants.DEFAULT_JOB_PULL_FREQUENCY);
-
- this.machineResCheckEnable = appContext.getConfig().getParameter(ExtConfig.LB_MACHINE_RES_CHECK_ENABLE, false);
- //从zk中监听JobTracker节点,如果存在节点则调用start方法,start方法中会建立定时任务执行worker线程
- appContext.getEventCenter().subscribe(
- new EventSubscriber(JobPullMachine.class.getSimpleName().concat(appContext.getConfig().getIdentity()),
- new Observer() {
- @Override
- public void onObserved(EventInfo eventInfo) {
- if (EcTopic.JOB_TRACKER_AVAILABLE.equals(eventInfo.getTopic())) {
- // JobTracker 可用了
- start();
- } else if (EcTopic.NO_JOB_TRACKER_AVAILABLE.equals(eventInfo.getTopic())) {
- stop();
- }
- }
- }), EcTopic.JOB_TRACKER_AVAILABLE, EcTopic.NO_JOB_TRACKER_AVAILABLE);
- this.worker = new Runnable() {
- @Override
- public void run() {
- try {
- if (!start.get()) {
- return;
- }
- if (!isMachineResEnough()) {
- // 如果机器资源不足,那么不去取任务
- return;
- }
- //向JobTracker服务器发送任务请求
- sendRequest();
- } catch (Exception e) {
- LOGGER.error("Job pull machine run error!", e);
- }
- }
- };
- }
在start方法中建立定时任务,定时任务中会调用sendRequest方法,定时向JobTracker服务发送任务请求。
- private void start() {
- try {
- if (start.compareAndSet(false, true)) {
- if (scheduledFuture == null) {
- scheduledFuture = executorService.scheduleWithFixedDelay(worker, jobPullFrequency * 1000, jobPullFrequency * 1000, TimeUnit.MILLISECONDS);
- }
- LOGGER.info("Start Job pull machine success!");
- }
- } catch (Throwable t) {
- LOGGER.error("Start Job pull machine failed!", t);
- }
- }
在sendRequest中会判断当前TaskTracker可执行线程数,然后向JobTracker发送请求
- private void sendRequest() throws RemotingCommandFieldCheckException {
- //判断当前节点还有可执行线程数
- int availableThreads = appContext.getRunnerPool().getAvailablePoolSize();
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("current availableThreads:{}", availableThreads);
- }
- if (availableThreads == 0) {
- return;
- }
- //创建请求体
- JobPullRequest requestBody = appContext.getCommandBodyWrapper().wrapper(new JobPullRequest());
- requestBody.setAvailableThreads(availableThreads);
- RemotingCommand request = RemotingCommand.createRequestCommand(JobProtos.RequestCode.JOB_PULL.code(), requestBody);
-
- try {
- //向JobTracker发送任务请求
- RemotingCommand responseCommand = appContext.getRemotingClient().invokeSync(request);
- if (responseCommand == null) {
- LOGGER.warn("Job pull request failed! response command is null!");
- return;
- }
- if (JobProtos.ResponseCode.JOB_PULL_SUCCESS.code() == responseCommand.getCode()) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Job pull request success!");
- }
- return;
- }
- LOGGER.warn("Job pull request failed! response command is null!");
- } catch (JobTrackerNotFoundException e) {
- LOGGER.warn("no job tracker available!");
- }
- }
在RemotingClientDelegate的invokeSync方法中会从注册中心获取JobTracker节点信息,然后建立远程连接发送请求。
- /**
- * 同步调用
- */
- public RemotingCommand invokeSync(RemotingCommand request)
- throws JobTrackerNotFoundException {
-
- Node jobTracker = getJobTrackerNode();
-
- try {
- RemotingCommand response = remotingClient.invokeSync(jobTracker.getAddress(),
- request, appContext.getConfig().getInvokeTimeoutMillis());
- this.serverEnable = true;
- return response;
- } catch (Exception e) {
- // 将这个JobTracker移除
- jobTrackers.remove(jobTracker);
- try {
- Thread.sleep(100L);
- } catch (InterruptedException e1) {
- LOGGER.error(e1.getMessage(), e1);
- }
- // 只要不是节点 不可用, 轮询所有节点请求
- return invokeSync(request);
- }
- }
TaskTracker也是提供了RemotingDispatcher类,用来接收JobTracker发送过来的任务
- public class RemotingDispatcher extends AbstractProcessor {
-
- private final Map<JobProtos.RequestCode, RemotingProcessor> processors = new HashMap<JobProtos.RequestCode, RemotingProcessor>();
-
- public RemotingDispatcher(TaskTrackerAppContext appContext) {
- super(appContext);
- processors.put(JobProtos.RequestCode.PUSH_JOB, new JobPushProcessor(appContext));
- processors.put(JobProtos.RequestCode.JOB_ASK, new JobAskProcessor(appContext));
- }
- //处理JobTracker发送过来的任务
- @Override
- public RemotingCommand processRequest(Channel channel, RemotingCommand request) throws RemotingCommandException {
-
- JobProtos.RequestCode code = JobProtos.RequestCode.valueOf(request.getCode());
- RemotingProcessor processor = processors.get(code);
- if (processor == null) {
- return RemotingCommand.createResponseCommand(RemotingProtos.ResponseCode.REQUEST_CODE_NOT_SUPPORTED.code(),
- "request code not supported!");
- }
- return processor.processRequest(channel, request);
- }
-
- }
在JobPushProcessor的processRequest中会根据JobTracker发送过来的数据,线程池中执行相关任务。
- @Override
- public RemotingCommand processRequest(Channel channel,
- final RemotingCommand request) throws RemotingCommandException {
-
- //获取请求数据
- JobPushRequest requestBody = request.getBody();
-
- // JobTracker 分发来的 job
- final List<JobMeta> jobMetaList = requestBody.getJobMetaList();
- List<String> failedJobIds = null;
-
- for (JobMeta jobMeta : jobMetaList) {
- try {
- //执行任务
- appContext.getRunnerPool().execute(jobMeta, jobRunnerCallback);
- } catch (NoAvailableJobRunnerException e) {
- if (failedJobIds == null) {
- failedJobIds = new ArrayList<String>();
- }
- failedJobIds.add(jobMeta.getJobId());
- }
- }
- if (CollectionUtils.isNotEmpty(failedJobIds)) {
- // 任务推送失败
- JobPushResponse jobPushResponse = new JobPushResponse();
- jobPushResponse.setFailedJobIds(failedJobIds);
- return RemotingCommand.createResponseCommand(JobProtos.ResponseCode.NO_AVAILABLE_JOB_RUNNER.code(), jobPushResponse);
- }
-
- // 任务推送成功
- return RemotingCommand.createResponseCommand(JobProtos
- .ResponseCode.JOB_PUSH_SUCCESS.code(), "job push success!");
- }
在RunnerPool中调用execute执行任务,会创建线程在线程池中进行执行
- public void execute(JobMeta jobMeta, RunnerCallback callback) throws NoAvailableJobRunnerException {
- try {
- threadPoolExecutor.execute(
- new JobRunnerDelegate(appContext, jobMeta, callback));
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Receive job success ! " + jobMeta);
- }
- } catch (RejectedExecutionException e) {
- LOGGER.warn("No more thread to run job .");
- throw new NoAvailableJobRunnerException(e);
- }
- }
在线程JobRunnerDelegate中调用run方法执行,通过反射调用jobRunnerClass的实现类,完成run执行
- @Override
- public void run() {
-
- thread = Thread.currentThread();
-
- try {
- blockedOn(interruptor);
- if (Thread.currentThread().isInterrupted()) {
- ((InterruptibleAdapter) interruptor).interrupt();
- }
-
- while (jobMeta != null) {
- long startTime = SystemClock.now();
- // 设置当前context中的jobId
- Response response = new Response();
- response.setJobMeta(jobMeta);
-
- BizLoggerAdapter logger = (BizLoggerAdapter) BizLoggerFactory.getLogger(
- appContext.getBizLogLevel(),
- appContext.getRemotingClient(), appContext);
-
- try {
- appContext.getRunnerPool().getRunningJobManager()
- .in(jobMeta.getJobId(), this);
- //获取JobRunner接口的实现类
- this.curJobRunner = appContext.getRunnerPool().getRunnerFactory().newRunner();
- //调用run方法,完成任务调度
- Result result = this.curJobRunner.run(buildJobContext(logger, jobMeta));
-
- if (result == null) {
- response.setAction(Action.EXECUTE_SUCCESS);
- } else {
- if (result.getAction() == null) {
- response.setAction(Action.EXECUTE_SUCCESS);
- } else {
- response.setAction(result.getAction());
- }
- response.setMsg(result.getMsg());
- }
-
- long time = SystemClock.now() - startTime;
- stat.addRunningTime(time);
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Job execute completed : {}, time:{} ms.", jobMeta.getJob(), time);
- }
- } catch (Throwable t) {
- StringWriter sw = new StringWriter();
- t.printStackTrace(new PrintWriter(sw));
- response.setAction(Action.EXECUTE_EXCEPTION);
- response.setMsg(sw.toString());
- long time = SystemClock.now() - startTime;
- stat.addRunningTime(time);
- LOGGER.error("Job execute error : {}, time: {}, {}", jobMeta.getJob(), time, t.getMessage(), t);
- } finally {
- checkInterrupted(logger);
- appContext.getRunnerPool().getRunningJobManager()
- .out(jobMeta.getJobId());
- }
- // 统计数据
- stat(response.getAction());
-
- if (isStopToGetNewJob()) {
- response.setReceiveNewJob(false);
- }
- this.jobMeta = callback.runComplete(response);
- DotLogUtils.dot("JobRunnerDelegate.run get job " + (this.jobMeta == null ? "NULL" : "NOT_NULL"));
- }
- } finally {
- blockedOn(null);
- }
- }
总结:
TaskTracker主要有两方面:
(1)主动向JobTracker发送请求获取任务
(2)监听JobTracker发送过来的任务进行任务执行
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。