当前位置:   article > 正文

Spark executor中task的数量与最大并发数_spark 如何计算当前任务最大并发task数

spark 如何计算当前任务最大并发task数

关于executor和task的概念可以参考官方文档
本文使用的源码是spark 2.0.0版本

Task的数量

根据类DAGScheduler中的submitMissingTasks方法可以知道,在stage中会为每个需要计算的partition生成一个task,换句话说也就是每个task处理一个partition。

  1. //From submitMissingTasks
  2. ......
  3. val tasks: Seq[Task[_]] = try {
  4. stage match {
  5. case stage: ShuffleMapStage =>
  6. partitionsToCompute.map { id =>
  7. val locs = taskIdToLocations(id)
  8. val part = stage.rdd.partitions(id)
  9. new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
  10. taskBinary, part, locs, stage.latestInfo.taskMetrics, properties)
  11. }
  12. case stage: ResultStage =>
  13. val job = stage.activeJob.get
  14. partitionsToCompute.map { id =>
  15. val p: Int = stage.partitions(id)
  16. val part = stage.rdd.partitions(p)
  17. val locs = taskIdToLocations(id)
  18. new ResultTask(stage.id, stage.latestInfo.attemptId,
  19. taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics)
  20. }
  21. }
  22. }
  23. ......

Task的最大并发数

当task被提交到executor之后,会根据executor可用的cpu核数,决定一个executor中最多同时运行多少个task。在类TaskSchedulerImplresourceOfferSingleTaskSet方法中,CPUS_PER_TASK的定义为val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1),也就是说默认情况下一个task对应cpu的一个核。如果一个executor可用cpu核数为8,那么一个executor中最多同是并发执行8个task;假如设置spark.task.cpus为2,那么同时就只能运行4个task。

  1. //From resourceOfferSingleTaskSet
  2. ......
  3. if (availableCpus(i) >= CPUS_PER_TASK) {
  4. try {
  5. for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
  6. tasks(i) += task
  7. val tid = task.taskId
  8. taskIdToTaskSetManager(tid) = taskSet
  9. taskIdToExecutorId(tid) = execId
  10. executorIdToTaskCount(execId) += 1
  11. executorsByHost(host) += execId
  12. availableCpus(i) -= CPUS_PER_TASK
  13. assert(availableCpus(i) >= 0)
  14. launchedTask = true
  15. }
  16. } catch {
  17. case e: TaskNotSerializableException =>
  18. logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
  19. // Do not offer resources for this task, but don't throw an error to allow other
  20. // task sets to be submitted.
  21. return launchedTask
  22. }
  23. }
  24. ......

Yarn的task与Spark中task的区别

在Yarn的NodeManager节点上启动一个map task或者reduce task,在物理上启动的是一个jvm进程;而Spark的task是Executor进程中的一个线程



作者:Woople
链接:https://www.jianshu.com/p/7c9b08a74de1
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/594976
推荐阅读
相关标签
  

闽ICP备14008679号