当前位置:   article > 正文

LTS原理--TaskTracker任务处理(四)

tasktracker

TaskTracker主要的功能是执行任务,其有两点操作:

(1)当TaskTracker启动时会根据当前TaskTracker资源定时向JobTracker

(2)TaskTracker接收到JobTracker推送的任务执行任务。

参考LTS原理--JobTracker任务接收与分配(三)

一、示例:

地址:https://github.com/ltsopensource/lts-examples/tree/master/lts-example-tasktracker

1、配置

  1. <context:component-scan base-package="com.github.ltsopensource.example"/>
  2. <bean id="taskTracker" class="com.github.ltsopensource.spring.TaskTrackerAnnotationFactoryBean" init-method="start">
  3. //任务执行类
  4. <property name="jobRunnerClass" value="com.github.ltsopensource.example.spring.SpringAnnotationJobRunner"/>
  5. <property name="clusterName" value="test_cluster"/>
  6. <property name="registryAddress" value="zookeeper://127.0.0.1:2181"/>
  7. <property name="nodeGroup" value="test_trade_TaskTracker"/>
  8. <property name="workThreads" value="64"/>
  9. <property name="configs">
  10. <props>
  11. <prop key="job.fail.store">mapdb</prop>
  12. </props>
  13. </property>
  14. </bean>

2、任务执行类实现JobRunner接口

  1. public class SpringAnnotationJobRunner implements JobRunner {
  2. private static final Logger LOGGER = LoggerFactory.getLogger(SpringAnnotationJobRunner.class);
  3. @Autowired
  4. SpringBean springBean;
  5. @Override
  6. public Result run(JobContext jobContext) throws Throwable {
  7. try {
  8. Thread.sleep(1000L);
  9. springBean.hello();
  10. // TODO 业务逻辑
  11. LOGGER.info("我要执行:" + jobContext.getJob());
  12. BizLogger bizLogger = jobContext.getBizLogger();
  13. // 会发送到 LTS (JobTracker上)
  14. bizLogger.info("测试,业务日志啊啊啊啊啊");
  15. } catch (Exception e) {
  16. LOGGER.info("Run job failed!", e);
  17. return new Result(Action.EXECUTE_LATER, e.getMessage());
  18. }
  19. return new Result(Action.EXECUTE_SUCCESS, "执行成功了,哈哈");
  20. }
  21. }

二、请求任务

在TaskTracker初始化时会启用定时任务,根据服务器线程资源请求向JobTracker发送获取任务的请求。

在TaskTracker的beforeStart中初始化时会初始化JobPullMachine会建立定时任务向JobTracker发送任务请求。

  1. @Override
  2. protected void beforeStart() {
  3. appContext.setMStatReporter(new TaskTrackerMStatReporter(appContext));
  4. appContext.setRemotingClient(remotingClient);
  5. // 设置 线程池
  6. appContext.setRunnerPool(new RunnerPool(appContext));
  7. appContext.getMStatReporter().start();
  8. //拉取任务
  9. appContext.setJobPullMachine(new JobPullMachine(appContext));
  10. appContext.setStopWorkingMonitor(new StopWorkingMonitor(appContext));
  11. appContext.getHttpCmdServer().registerCommands(
  12. new JobTerminateCmd(appContext)); // 终止某个正在执行的任务
  13. }

在JobPullMachine初始化时会监听注册中心中JobTracker节点的信息,如果存在这个节点,则会调用start方法,start方法中建立定时任务向JobTracker发送请求。

  1. public JobPullMachine(final TaskTrackerAppContext appContext) {
  2. this.appContext = appContext;
  3. this.jobPullFrequency = appContext.getConfig().getParameter(ExtConfig.JOB_PULL_FREQUENCY, Constants.DEFAULT_JOB_PULL_FREQUENCY);
  4. this.machineResCheckEnable = appContext.getConfig().getParameter(ExtConfig.LB_MACHINE_RES_CHECK_ENABLE, false);
  5. //从zk中监听JobTracker节点,如果存在节点则调用start方法,start方法中会建立定时任务执行worker线程
  6. appContext.getEventCenter().subscribe(
  7. new EventSubscriber(JobPullMachine.class.getSimpleName().concat(appContext.getConfig().getIdentity()),
  8. new Observer() {
  9. @Override
  10. public void onObserved(EventInfo eventInfo) {
  11. if (EcTopic.JOB_TRACKER_AVAILABLE.equals(eventInfo.getTopic())) {
  12. // JobTracker 可用了
  13. start();
  14. } else if (EcTopic.NO_JOB_TRACKER_AVAILABLE.equals(eventInfo.getTopic())) {
  15. stop();
  16. }
  17. }
  18. }), EcTopic.JOB_TRACKER_AVAILABLE, EcTopic.NO_JOB_TRACKER_AVAILABLE);
  19. this.worker = new Runnable() {
  20. @Override
  21. public void run() {
  22. try {
  23. if (!start.get()) {
  24. return;
  25. }
  26. if (!isMachineResEnough()) {
  27. // 如果机器资源不足,那么不去取任务
  28. return;
  29. }
  30. //向JobTracker服务器发送任务请求
  31. sendRequest();
  32. } catch (Exception e) {
  33. LOGGER.error("Job pull machine run error!", e);
  34. }
  35. }
  36. };
  37. }

在start方法中建立定时任务,定时任务中会调用sendRequest方法,定时向JobTracker服务发送任务请求。

  1. private void start() {
  2. try {
  3. if (start.compareAndSet(false, true)) {
  4. if (scheduledFuture == null) {
  5. scheduledFuture = executorService.scheduleWithFixedDelay(worker, jobPullFrequency * 1000, jobPullFrequency * 1000, TimeUnit.MILLISECONDS);
  6. }
  7. LOGGER.info("Start Job pull machine success!");
  8. }
  9. } catch (Throwable t) {
  10. LOGGER.error("Start Job pull machine failed!", t);
  11. }
  12. }

在sendRequest中会判断当前TaskTracker可执行线程数,然后向JobTracker发送请求

  1. private void sendRequest() throws RemotingCommandFieldCheckException {
  2. //判断当前节点还有可执行线程数
  3. int availableThreads = appContext.getRunnerPool().getAvailablePoolSize();
  4. if (LOGGER.isDebugEnabled()) {
  5. LOGGER.debug("current availableThreads:{}", availableThreads);
  6. }
  7. if (availableThreads == 0) {
  8. return;
  9. }
  10. //创建请求体
  11. JobPullRequest requestBody = appContext.getCommandBodyWrapper().wrapper(new JobPullRequest());
  12. requestBody.setAvailableThreads(availableThreads);
  13. RemotingCommand request = RemotingCommand.createRequestCommand(JobProtos.RequestCode.JOB_PULL.code(), requestBody);
  14. try {
  15. //向JobTracker发送任务请求
  16. RemotingCommand responseCommand = appContext.getRemotingClient().invokeSync(request);
  17. if (responseCommand == null) {
  18. LOGGER.warn("Job pull request failed! response command is null!");
  19. return;
  20. }
  21. if (JobProtos.ResponseCode.JOB_PULL_SUCCESS.code() == responseCommand.getCode()) {
  22. if (LOGGER.isDebugEnabled()) {
  23. LOGGER.debug("Job pull request success!");
  24. }
  25. return;
  26. }
  27. LOGGER.warn("Job pull request failed! response command is null!");
  28. } catch (JobTrackerNotFoundException e) {
  29. LOGGER.warn("no job tracker available!");
  30. }
  31. }

在RemotingClientDelegate的invokeSync方法中会从注册中心获取JobTracker节点信息,然后建立远程连接发送请求。

  1. /**
  2. * 同步调用
  3. */
  4. public RemotingCommand invokeSync(RemotingCommand request)
  5. throws JobTrackerNotFoundException {
  6. Node jobTracker = getJobTrackerNode();
  7. try {
  8. RemotingCommand response = remotingClient.invokeSync(jobTracker.getAddress(),
  9. request, appContext.getConfig().getInvokeTimeoutMillis());
  10. this.serverEnable = true;
  11. return response;
  12. } catch (Exception e) {
  13. // 将这个JobTracker移除
  14. jobTrackers.remove(jobTracker);
  15. try {
  16. Thread.sleep(100L);
  17. } catch (InterruptedException e1) {
  18. LOGGER.error(e1.getMessage(), e1);
  19. }
  20. // 只要不是节点 不可用, 轮询所有节点请求
  21. return invokeSync(request);
  22. }
  23. }

三、接收任务执行

TaskTracker也是提供了RemotingDispatcher类,用来接收JobTracker发送过来的任务

  1. public class RemotingDispatcher extends AbstractProcessor {
  2. private final Map<JobProtos.RequestCode, RemotingProcessor> processors = new HashMap<JobProtos.RequestCode, RemotingProcessor>();
  3. public RemotingDispatcher(TaskTrackerAppContext appContext) {
  4. super(appContext);
  5. processors.put(JobProtos.RequestCode.PUSH_JOB, new JobPushProcessor(appContext));
  6. processors.put(JobProtos.RequestCode.JOB_ASK, new JobAskProcessor(appContext));
  7. }
  8. //处理JobTracker发送过来的任务
  9. @Override
  10. public RemotingCommand processRequest(Channel channel, RemotingCommand request) throws RemotingCommandException {
  11. JobProtos.RequestCode code = JobProtos.RequestCode.valueOf(request.getCode());
  12. RemotingProcessor processor = processors.get(code);
  13. if (processor == null) {
  14. return RemotingCommand.createResponseCommand(RemotingProtos.ResponseCode.REQUEST_CODE_NOT_SUPPORTED.code(),
  15. "request code not supported!");
  16. }
  17. return processor.processRequest(channel, request);
  18. }
  19. }

在JobPushProcessor的processRequest中会根据JobTracker发送过来的数据,线程池中执行相关任务。

  1. @Override
  2. public RemotingCommand processRequest(Channel channel,
  3. final RemotingCommand request) throws RemotingCommandException {
  4. //获取请求数据
  5. JobPushRequest requestBody = request.getBody();
  6. // JobTracker 分发来的 job
  7. final List<JobMeta> jobMetaList = requestBody.getJobMetaList();
  8. List<String> failedJobIds = null;
  9. for (JobMeta jobMeta : jobMetaList) {
  10. try {
  11. //执行任务
  12. appContext.getRunnerPool().execute(jobMeta, jobRunnerCallback);
  13. } catch (NoAvailableJobRunnerException e) {
  14. if (failedJobIds == null) {
  15. failedJobIds = new ArrayList<String>();
  16. }
  17. failedJobIds.add(jobMeta.getJobId());
  18. }
  19. }
  20. if (CollectionUtils.isNotEmpty(failedJobIds)) {
  21. // 任务推送失败
  22. JobPushResponse jobPushResponse = new JobPushResponse();
  23. jobPushResponse.setFailedJobIds(failedJobIds);
  24. return RemotingCommand.createResponseCommand(JobProtos.ResponseCode.NO_AVAILABLE_JOB_RUNNER.code(), jobPushResponse);
  25. }
  26. // 任务推送成功
  27. return RemotingCommand.createResponseCommand(JobProtos
  28. .ResponseCode.JOB_PUSH_SUCCESS.code(), "job push success!");
  29. }

在RunnerPool中调用execute执行任务,会创建线程在线程池中进行执行

  1. public void execute(JobMeta jobMeta, RunnerCallback callback) throws NoAvailableJobRunnerException {
  2. try {
  3. threadPoolExecutor.execute(
  4. new JobRunnerDelegate(appContext, jobMeta, callback));
  5. if (LOGGER.isDebugEnabled()) {
  6. LOGGER.debug("Receive job success ! " + jobMeta);
  7. }
  8. } catch (RejectedExecutionException e) {
  9. LOGGER.warn("No more thread to run job .");
  10. throw new NoAvailableJobRunnerException(e);
  11. }
  12. }

在线程JobRunnerDelegate中调用run方法执行,通过反射调用jobRunnerClass的实现类,完成run执行

  1. @Override
  2. public void run() {
  3. thread = Thread.currentThread();
  4. try {
  5. blockedOn(interruptor);
  6. if (Thread.currentThread().isInterrupted()) {
  7. ((InterruptibleAdapter) interruptor).interrupt();
  8. }
  9. while (jobMeta != null) {
  10. long startTime = SystemClock.now();
  11. // 设置当前context中的jobId
  12. Response response = new Response();
  13. response.setJobMeta(jobMeta);
  14. BizLoggerAdapter logger = (BizLoggerAdapter) BizLoggerFactory.getLogger(
  15. appContext.getBizLogLevel(),
  16. appContext.getRemotingClient(), appContext);
  17. try {
  18. appContext.getRunnerPool().getRunningJobManager()
  19. .in(jobMeta.getJobId(), this);
  20. //获取JobRunner接口的实现类
  21. this.curJobRunner = appContext.getRunnerPool().getRunnerFactory().newRunner();
  22. //调用run方法,完成任务调度
  23. Result result = this.curJobRunner.run(buildJobContext(logger, jobMeta));
  24. if (result == null) {
  25. response.setAction(Action.EXECUTE_SUCCESS);
  26. } else {
  27. if (result.getAction() == null) {
  28. response.setAction(Action.EXECUTE_SUCCESS);
  29. } else {
  30. response.setAction(result.getAction());
  31. }
  32. response.setMsg(result.getMsg());
  33. }
  34. long time = SystemClock.now() - startTime;
  35. stat.addRunningTime(time);
  36. if (LOGGER.isDebugEnabled()) {
  37. LOGGER.debug("Job execute completed : {}, time:{} ms.", jobMeta.getJob(), time);
  38. }
  39. } catch (Throwable t) {
  40. StringWriter sw = new StringWriter();
  41. t.printStackTrace(new PrintWriter(sw));
  42. response.setAction(Action.EXECUTE_EXCEPTION);
  43. response.setMsg(sw.toString());
  44. long time = SystemClock.now() - startTime;
  45. stat.addRunningTime(time);
  46. LOGGER.error("Job execute error : {}, time: {}, {}", jobMeta.getJob(), time, t.getMessage(), t);
  47. } finally {
  48. checkInterrupted(logger);
  49. appContext.getRunnerPool().getRunningJobManager()
  50. .out(jobMeta.getJobId());
  51. }
  52. // 统计数据
  53. stat(response.getAction());
  54. if (isStopToGetNewJob()) {
  55. response.setReceiveNewJob(false);
  56. }
  57. this.jobMeta = callback.runComplete(response);
  58. DotLogUtils.dot("JobRunnerDelegate.run get job " + (this.jobMeta == null ? "NULL" : "NOT_NULL"));
  59. }
  60. } finally {
  61. blockedOn(null);
  62. }
  63. }

总结:

TaskTracker主要有两方面:

(1)主动向JobTracker发送请求获取任务

(2)监听JobTracker发送过来的任务进行任务执行

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/544803
推荐阅读
相关标签
  

闽ICP备14008679号