Spark 任務調度源碼解析

Spark 作業通過資源調度系統獲取了計算資源,然後即開始調度計算任務來執行實際的數據處理(比如 ETL、機器學習、圖計算),本文繼續來解析 Spark 任務調度的相關處理過程和原理。

Action 觸發任務調度

Spark 是惰性計算模式,所有的 transformation 算子的實際執行都是通過 action 算子來觸發的;action 算子是劃分 job 的分界,因此本文的任務調度以 job 爲單位來解析 Spark 作業任務調度的實現。

RDD 的 action 算子(例如:collect、count、take、foreachPartition 等)都會調用 SparkContext 的 runJob 方法來提交一個 job,以 collect 算子爲例,該算子的實現如下:

def collect(): Array[T] = withScope {
  val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
  Array.concat(results: _*)
}

在 runJob 方法中實際是調用了 DAGScheduler 的 submitJob 方法,然後通過 eventProcessLoop 對象將作業提交的事件放到了事件隊列 eventQueue 中:

eventProcessLoop.post(JobSubmitted(
  jobId, rdd, func2, partitions.toArray, callSite, waiter,
  Utils.cloneProperties(properties)))

eventProcessLoop 是在 DAGScheduler 初始化時創建的事件調度對象,該對象啓動了一個線程不斷從 eventQueue 中獲取事件來處理:

override def run()Unit = {
  try {
    while (!stopped.get) {
      val event = eventQueue.take()
      try {
        onReceive(event)
      } 
    ...

拿到 JobSubmitted 的事件後由 EventLoop 的子類 DAGSchedulerEventProcessLoop 調用 handleJobSubmitted 方法來進行處理,該方法的處理邏輯主要分爲兩步,第一步是劃分 stage 構建 DAG 有向無環圖,第二步則是發起任務調度,本文的主要內容也將圍繞這兩點來展開:

private[scheduler] def handleJobSubmitted(jobId: Int,
    finalRDD: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    callSite: CallSite,
    listener: JobListener,
    properties: Properties)Unit = {
  var finalStage: ResultStage = null
  try {
    // New stage creation may throw an exception iffor example, jobs are run on a
    // HadoopRDD whose underlying HDFS files have been deleted.
    finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
  } ...
  submitStage(finalStage)
  }

劃分 Stage

DAG 是一個有向無環圖,Spark 根據各 RDD 的數據來源、算子以及分區器等要素構建出不同類型的 RDD 對象(例如 HadoopRDD、ShuffledRDD)、子 RDD 與父 RDD 之間的依賴關係集(OneToOneDependency、ShuffleDependency、RangeDependency)以及 RDD 分片的數據本地性屬性等,進而構建出整個作業的 DAG。

由於 Spark 的 stage 是以 shuffle 也即寬依賴爲邊界進行劃分的,有了 DAG 接下來就可以根據 RDD 之間的依賴類型來劃分 stage 了。

首先從 result rdd 開始向前回溯不斷獲取當前 rdd 的所有寬依賴列表,如果遍歷到的依賴是寬依賴則放入 parents HashSet 中,如果遍歷到的是窄依賴則繼續遍歷該窄依賴的 rdd 的所有 dependency,直到找到下一個寬依賴並放到 parents 列表中或者遍歷了其所有父 rdd 無法找父 rdd 爲止 ,得到 rdd 所有的 shuffle 依賴:

private[scheduler] def getShuffleDependenciesAndResourceProfiles(
      rdd: RDD[_])(HashSet[ShuffleDependency[_, _, _]], HashSet[ResourceProfile]) = {
    val parents = new HashSet[ShuffleDependency[_, _, _]]  
    val resourceProfiles = new HashSet[ResourceProfile]
    val visited = new HashSet[RDD[_]]   
    val waitingForVisit = new ListBuffer[RDD[_]]   
    waitingForVisit += rdd
    while (waitingForVisit.nonEmpty) {
      val toVisit = waitingForVisit.remove(0)
      if (!visited(toVisit)) {
        visited += toVisit
        Option(toVisit.getResourceProfile).foreach(resourceProfiles += _)
        toVisit.dependencies.foreach {
          case shuffleDep: ShuffleDependency[_, _, _] =>
            parents += shuffleDep
          case dependency =>
            waitingForVisit.prepend(dependency.rdd)
        }
      }
    }
    (parents, resourceProfiles)
  }

然後根據該 rdd 的 shuffleDeps 列表中的每個 shuffle 依賴創建出相應的父 stage:

private def getOrCreateParentStages(shuffleDeps: HashSet[ShuffleDependency[_, _, _]],
    firstJobId: Int): List[Stage] = {
  shuffleDeps.map { shuffleDep =>
    getOrCreateShuffleMapStage(shuffleDep, firstJobId)
  }.toList
}

其中 getOrCreateShuffleMapStage 方法採用遞歸的邏輯,如果一個 shuffle 依賴所屬的 rdd 有未遍歷到的 shuffle 依賴,則創建該依賴所屬的 stage;否則,繼續遍歷該 rdd 的所有 shuffle 依賴,並創建各個 shuffle 依賴所對應的父 stage:

getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
  if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
    createShuffleMapStage(dep, firstJobId)
  }
}
createShuffleMapStage(shuffleDep, firstJobId)

在 createShuffleMapStage 方法中可以看到 stage 的 id 是一個順序遞增的值 id = nextStageId.getAndIncrement(),由於是遞歸創建 stage 因此越往前遍歷 stage 的 id 越小,新創建的 stage 對象中的屬性主要包括該 stage 的 id、最後邊的那個 rdd 及其分區數(決定了 map 任務數)、該 stage 所有父 stage 的集合、shuffle depenency 等:

val parents = getOrCreateParentStages(shuffleDeps, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ShuffleMapStage(
  id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker,
  resourceProfile.id)
stageIdToStage(id) = stage
shuffleIdToMapStage(shuffleDep.shuffleId) = stage

當創建出了所有的 ShuffleMapStage 之後則會創建 ResultStage:

val stage = new ResultStage(id, rdd, func, partitions, parents, jobId,
  callSite, resourceProfile.id)
stageIdToStage(id) = stage

到目前爲止完成了 handleJobSubmitted 方法中的 finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) 的邏輯,該 job 的 DAG 也完成了構建,接下來通過調用 submitStage(finalStage) 方法開始作業所有 stage 及其任務的調度過程。

啓動 stage 調度

submitStage 方法也是一個遞歸,由於 job 中的 stage 已經劃分完成並且從前到後進行了編碼,在本方法的邏輯中如果當前 stage 的父 stage 列表不爲空,則調用其所有父 stage 的 submitStage 方法;如果當前的 stage 的父 stage 爲空則調用 submitMissingTasks 方法啓動該 stage 中任務調度邏輯:

 private def submitStage(stage: Stage)Unit = {
      ...
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
        val missing = getMissingParentStages(stage).sortBy(_.id)
        logDebug("missing: " + missing)
        if (missing.isEmpty) {
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          submitMissingTasks(stage, jobId.get)
        } else {
          for (parent <- missing) {
            submitStage(parent)
          }
     ...

啓動了 stage 調度並沒有真正地將任務分發到 executor 上去運行,接下來還需要創建任務集並根據特定的調度策略制定調度計劃之後再執行任務的調度以及分發執行。

Stage 之間的調度策略根據用戶的配置來確定,包括 FIFO 和 FAIR 兩種(默認是 FIFO),並且在 TaskScheduler 初始化時就根據作業的配置創建了不同 stage 調度器和調度隊列:

def initialize(backend: SchedulerBackend)Unit = {
  this.backend = backend
  schedulableBuilder = {
    schedulingMode match {
      case SchedulingMode.FIFO =>
        new FIFOSchedulableBuilder(rootPool)
      case SchedulingMode.FAIR =>
        new FairSchedulableBuilder(rootPool, conf)
      case _ =>
        throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
        s"$schedulingMode")
    }
  }
  schedulableBuilder.buildPools()
}

創建 TaskSets

介紹了 Stage 的調度策略,接下來我們來看一下任務集創建的流程:

  1. 獲取該 stage 中各個數據分片的數據本地性,從下面的代碼中可見生成的結果集 taskIdToLocations 是一個 key 爲分片 id,value 爲本地性列表的 HashMap:
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
  stage match {
    case s: ShuffleMapStage =>
      partitionsToCompute.map { id =(id, getPreferredLocs(stage.rdd, id))}.toMap
    case s: ResultStage =>
      partitionsToCompute.map { id =>
        val p = s.partitions(id)
        (id, getPreferredLocs(stage.rdd, p))
      }.toMap
  }
}

其中分片的本地性的計算方法主要通過 getPreferredLocsInternal 方法實現,如果 RDD 已被緩存則返回所有分片被緩存的 TaskLocation 信息並返回,如果 RDD 的分片本身具有本地性屬性則返回其本地性的 TaskLocation 信息並返回,如果不滿足以上兩種情況則不斷獲取該 RDD 窄依賴對應的父 RDD 調用 getPreferredLocsInternal 方法繼續計算,如果最後沒有獲得任何本地性信息則該 RDD 不具有本定性:

  private def getPreferredLocsInternal(
      rdd: RDD[_],
      partition: Int,
      visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
    ...
    val cached = getCacheLocs(rdd)(partition)
    if (cached.nonEmpty) {
      return cached
    }
    val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
    if (rddPrefs.nonEmpty) {
      return rddPrefs.map(TaskLocation(_))
    }
    rdd.dependencies.foreach {
      case n: NarrowDependency[_] =>
        for (inPart <- n.getParents(partition)) {
          val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
          if (locs != Nil) {
            return locs
          }
        }

      case _ =>
    }

    Nil
  }
  1. 將待處理的 task 封裝爲一個 TaskSet 對象,該對象中攜帶了 stageId、任務的代碼閉包(包括 rdd、stage 的 shuffle 依賴關係或者 action 算子信息)、數據分片、分片的數據本地性信息等:
val tasks: Seq[Task[_]] = try {
  val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
  stage match {
    case stage: ShuffleMapStage =>
        ...
        new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
          taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
          Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
      }

    case stage: ResultStage =>
      ...
        new ResultTask(stage.id, stage.latestInfo.attemptNumber,
          taskBinary, part, locs, id, properties, serializedTaskMetrics,
          Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
          stage.rdd.isBarrier())
      }
  }

任務的調度和執行

創建了 TaskSet 之後,TaskScheduler 調用 submitTasks 方法來提交任務集,並在 submitTasks 方法中創建了 TaskSetManager 對象,在 TaskSetManager 對象的實例化過程中通過調用 addPendingTasks 方法將任務放到待調度的任務列表中,包括 forExecutor、forHost、noPrefs、forRack 和 all 多種列表,並且每個 task 根據本地性需求可以被放入到多個列表中,分別對應於 PROCESS_LOCAL(executor 級別), NODE_LOCAL(節點級別), NO_PREF(無本地化需求), RACK_LOCAL(機架級別), ANY(隨機)這幾個本地化級別。

接下將已構建好的 TaskSetManager 對象作爲一個待調度的 stage 放到實現了特定調度策略的調度池中等待調度,這其實是任務調度的需求:

override def submitTasks(taskSet: TaskSet)Unit = {
  val tasks = taskSet.tasks
  this.synchronized {
  val manager = createTaskSetManager(taskSet, maxTaskFailures)
  val stage = taskSet.stageId
  val stageTaskSets = taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
  stageTaskSets(taskSet.stageAttemptId) = manager     
  schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
  ...
  backend.reviveOffers()

我們來看看調度系統是如何來落地任務的調度需求的。在 submitTasks 方法的最後向 SchedulerBackEnd 發送 ReviveOffers 消息,SchedulerBackEnd 在收到消息之後即調用 CoarseGrainedSchedulerBackend 的 makeOffers 方法來獲取當前所有 executor 的資源可用情況,得到該作業包括 executorId、host 以及可用的 core 數爲主要信息的 WorkOffers 對象,體現了該作業所有的 executor 可用於任務調度的資源情況:

private def makeOffers()Unit = {
  // Make sure no executor is killed while some task is launching on it
  val taskDescs = withLock {
    // Filter out executors under killing
    val activeExecutors = executorDataMap.filterKeys(isExecutorActive)
    val workOffers = activeExecutors.map {
      case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores,
          Some(executorData.executorAddress.hostPort),
          executorData.resourcesInfo.map { case (rName, rInfo) =>
            (rName, rInfo.availableAddrs.toBuffer)
          }, executorData.resourceProfileId)
    }.toIndexedSeq
    scheduler.resourceOffers(workOffers, true)
  }
  if (taskDescs.nonEmpty) {
    launchTasks(taskDescs)
  }
}

值得一提的是,executorDataMap 是一個 key 爲 executorId,value 爲 ExecutorData 的 HashMap 結構,用於在 SchedulerBackEnd 中維護當前 spark 作業的所有 executor 的通信地址信息以及 executor 的可用資源等信息。

接下來 TaskScheduler 調用 resourceOffers 方法根據一定的任務調度策略創建任務描述對象並觸發任務的執行,resourceOffers 方法的實現比較複雜,主要邏輯是逐個從待調度的任務集隊列中取出一個 taskset,然後根據該任務集的本地性級別從高到底調用 resourceOfferSingleTaskSet 方法:

val sortedTaskSets = rootPool.getSortedTaskSetQueue
...
for (taskSet <- sortedTaskSets) {
 ...
    for (currentMaxLocality <- taskSet.myLocalityLevels) {
          var launchedTaskAtCurrentMaxLocality = false
          do {
            val (noDelayScheduleReject, minLocality) = resourceOfferSingleTaskSet(
              taskSet, currentMaxLocality, shuffledOffers, availableCpus,
              availableResources, tasks)
            launchedTaskAtCurrentMaxLocality = minLocality.isDefined
            launchedAnyTask |= launchedTaskAtCurrentMaxLocality
            noDelaySchedulingRejects &= noDelayScheduleReject
            globalMinLocality = minTaskLocality(globalMinLocality, minLocality)
          } while (launchedTaskAtCurrentMaxLocality)
  ...

在 resourceOfferSingleTaskSet 方法中遍歷當前的 workoffers 列表,對於每一個 workoffer 對象調用 TaskSetManager 的 resourceOffer 方法創建出相應的 TaskDescription 對象,並調整其可用的資源情況:

for (i <- 0 until shuffledOffers.size) {
    val execId = shuffledOffers(i).executorId
    val host = shuffledOffers(i).host
    ...
    val (taskDescOption, didReject, index) = 
       taskSet.resourceOffer(execId, host, maxLocality, taskResAssignments)
    for (task <- taskDescOption) {
      val (locality, resources) = if (task != null) {
         tasks(i) += task
         addRunningTask(task.taskId, execId, taskSet)
         (taskSet.taskInfos(task.taskId).taskLocality, task.resources)
      }
      ...
      availableCpus(i) -= taskCpus
      assert(availableCpus(i) >= 0)
      resources.foreach { case (rName, rInfo) =>
      availableResources(i)(rName).remove(0, rInfo.addresses.size)
    }         
    ...
}

TaskDescription 是任務調度的基本單元,具有 executorId、host、任務 id、本地化級別等信息;spark 根據指定的 executorId、host 和可用的本地化級別從待調度的任務列表中選擇一個本地化級別較高的、並且本地化需求和該 executor 一致的任務用於任務調度,並將該任務封裝成 TaskDescription 對象,從而使得各個任務能夠儘量調度到最近的 executor 上去執行,避免數據的磁盤 IO 以及網絡分發導致的性能問題:

val taskDescription =
  dequeueTask(execId, host, allowedLocality)
    .map { case (index, taskLocality, speculative) =>
      dequeuedTaskIndex = Some(index)  
      ...
      prepareLaunchingTask(
          execId,
          host,
          index,
          taskLocality,
          speculative,
          taskResourceAssignments,
          curTime)
      }
    }

可見,真正的調度策略是 TaskScheduler 聯合 TaskSetManager 對象實現的,將任務調度到哪個節點上受限於三個方面,第一是 stage 調度的優先級算法,第二是各個 executor 上的可用資源情況,第三是 task 自身的緩存或者本次性需求的情況;TaskScheduler 根據這三點將待調度的任務集中的任務調度到最優的 executor 上去執行。

我們知道在 spark on k8s 存算分離的場景中,由於 executor pod 不運行在 Hadoop 集羣的節點中,如果數據未經過 broadcast 或者 cache 到 executor 內存的情況下,任務將被隨機分發到任意資源可用的 pod 上去運行。

創建了所有 task 的 TaskDescription 之後,在 makeOffers 方法的最後調用了 launchTasks 方法來啓動 task 線程的執行,實際上是將 task 序列化之後發送 LaunchTask 消息給 executor 的 RPC 對象 CoarseGrainedExecutorBackend:

executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))

CoarseGrainedExecutorBackend 收到 launchTask 消息之後反序列化 TaskDescription,並真正將任務調度到 executor 的線程池中去運行:

def launchTask(context: ExecutorBackend, taskDescription: TaskDescription)Unit = {
  val tr = new TaskRunner(context, taskDescription, plugins)
  runningTasks.put(taskDescription.taskId, tr)
  threadPool.execute(tr)
  if (decommissioned) {
    log.error(s"Launching a task while in decommissioned state.")
  }
}

在任務執行結束之後 executor 會調用 statusUpdate 方法,例如:在任務成功的情況下執行 execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult),即向 SchedulerBackEnd 發送 StatusUpdate 指令,SchedulerBackEnd 收到指令則更新任務所在 executor 上的可用資源(即增加其可用的 core 數),從而釋放出計算資源用於其他任務的調度:

val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(prof, conf)
executorInfo.freeCores += taskCpus
resources.foreach { case (k, v) =>
  executorInfo.resourcesInfo.get(k).foreach { r =>
    r.release(v.addresses)
  }
}

總結

綜上,Spark 作業在 DAGScheduler、SchedulerBackend、TaskScheduler 和 ExecutorBackend 等對象的通力合作之下完成了作業的任務調度,首先構建作業 DAG 並劃分 Stage,接下來創建各 stage 的任務集並且結合 stage 的調度策略、executor 的資源可用情況以及 stage 內部數據分片的本地性制定出具體的任務調度計劃,然後再根據調度計劃將任務調度到對應的 executor 線程池上去執行,從而完成整個作業的處理邏輯。

作者簡介

焦媛,主要負責民生銀行 Hadoop 大數據平臺的生產運維工作,並負責 HDFS 和 Spark 相關開源產品的技術支持,以及 Spark 雲原生技術的支持和推廣工作。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/DSUsYlBJiCUYhB49UuIf7A