当前位置:   article > 正文

Job的任务执行流程之TaskCleanup_cleanup task

cleanup task

      任何一个作业在Hadoop集群中执行主要包括四个阶段:setup、map、reduce、cleanup,但在这四个阶段都出现任务实例在TaskTracker节点执行失败的情况。当一个任务实例在TaskTracker节点的JVM中执行时除了成功执行意外,还有可能出现一些异常情况:1).在JVM中执行失败;2).JVM进程被操作系统stop;3).任务实例被JobTracker节点要求kill;这些异常情况都会造成该任务实例执行的失败,从而使得该任务进入FAILEDFAILED_UNCLEANKILLED_UNCLEAN等三种状态中的某一种。这里就有一个问题了,一个任务实例失败时到底会进入哪一种状态?这其实很好判断:

1).如果一个任务实例在JVM中运行时出现异常或错误而无法再继续运行,同时在调用了该任务所属作业对应的OutputCimmitter输出提交器的abortTask()方法之后离开JVM的话,这个任务实例会进入FAILED状态;

2).如果一个任务实例在JVM中运行时出现异常或错误而无法再继续运行,同时在没有调用该任务所属作业对应的OutputCimmitter输出提交器的abortTask()方法就离开了JVM的话,这个任务实例会进入FAILED_UNCLEAN状态;

3).如果一个任务实例在JVM中正常运行时突然被停止了(如:任务实例所在的JVM进程被OSstop或者被TaskTracker节点强制命令停止),此时还来不起调用该任务所属作业对应的输出提交器OutputCimmitter的abortTask()方法,所以它会进入KILLED_UNCLEAN状态。

本文将主要围绕JobTracker节点对处于FAILED_UNCLEANKILLED_UNCLEAN状态的任务实例的处理来详细地展开讲解。

    TaskTracker在任务实例停止执行之后,就会把这个任务实例对应的状态报告给JobTracker节点来处理,当然,前面说过,JobTracker节点是不会直接处理任何任务实例的状态报告的,而是交给对应的JobInProgress来处理。对于处于FAILED_UNCLEANKILLED_UNCLEAN状态的任务实例,JobInProgress会将他们存储在对应的待清理的任务队列中,当然,一个作业主要包含两种这样的任务队列,一种存储Map型的任务实例,另一种存储Reduce型的任务实例,然后它会交给合适的TaskTracker节点来执行对该任务的清理操作。这种清理工作就是前面所说的TaskCleanup任务。这个处理过程是是很简单的,对应的源代码如下:

  1. class JobInProgress {
  2. ...
  3. public synchronized void updateTaskStatus(TaskInProgress tip, TaskStatus status) {
  4. ...
  5. if (state == TaskStatus.State.FAILED_UNCLEAN || state == TaskStatus.State.KILLED_UNCLEAN) {
  6. tip.incompleteSubTask(taskid, this.status);
  7. // add this task, to be rescheduled as cleanup attempt
  8. if (tip.isMapTask()) {
  9. mapCleanupTasks.add(taskid);
  10. } else {
  11. reduceCleanupTasks.add(taskid);
  12. }
  13. // Remove the task entry from jobtracker
  14. jobtracker.removeTaskEntry(taskid);
  15. }
  16. ...
  17. }
  18. ...
  19. }

    上一篇博文也说过,当一个作业中有TaskCleanup任务的话,就会优先调度这些TaskCleanup任务,而不会调度它的正式Map/Reduce任务。对应的调度策略也很简单,源码如下:

  1. public Task obtainTaskCleanupTask(TaskTrackerStatus tts, boolean isMapSlot) throws IOException {
  2. if (!tasksInited.get()) {
  3. return null;
  4. }
  5. synchronized (this) {
  6. if (this.status.getRunState() != JobStatus.RUNNING || jobFailed || jobKilled) {
  7. return null;
  8. }
  9. String taskTracker = tts.getTrackerName();
  10. if (!shouldRunOnTaskTracker(taskTracker)) {
  11. return null;
  12. }
  13. TaskAttemptID taskid = null;
  14. TaskInProgress tip = null;
  15. if (isMapSlot) {
  16. if (!mapCleanupTasks.isEmpty()) {
  17. taskid = mapCleanupTasks.remove(0);
  18. tip = maps[taskid.getTaskID().getId()];
  19. }
  20. } else {
  21. if (!reduceCleanupTasks.isEmpty()) {
  22. taskid = reduceCleanupTasks.remove(0);
  23. tip = reduces[taskid.getTaskID().getId()];
  24. }
  25. }
  26. if (tip != null) {
  27. return tip.addRunningTask(taskid, taskTracker, true);
  28. }
  29. return null;
  30. }
  31. }
    TaskTracker节点对TaskCleanup任务的本地化和调度同JobSetup、JobCleanup、Map、Reduce任务是一样,最终都会交给一个JVM实例来负责执行。在JVM中,它主要会调用作业对应的输出提交器OutputCimmitter的abortTask()方法,即放弃该任务,在FileOutputCimmitter实现中就是清理该任务实例在执行过程中所占用的临时存储空间。这里要提醒的是,无论这个TaskCleanup任务在JVM中执行成功或者失败,或者在本地化时就出错而被kill掉,它都会进入对应的FAILED或者KILLED状态:如果该TaskCleanup 任务处于 FAILED_UNCLEAN状态,它就会进入FAILED状态;和 如果该 TaskCleanup处于KILLED_UNCLEAN状态,它就会进入KILLED状态。TaskCleanup任务被TaskTracker节点执行完之后的处理同JobSetup、JobCleanup、Map、Reduce任务实例也是一样的,所以就不再赘述了。

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/码创造者/article/detail/759845
推荐阅读
相关标签
  

闽ICP备14008679号