Spark-源码分析03-SubmitTask
阅读原文时间:2023年07月12日阅读:4

1.Rdd

rdd中 reduce、fold、aggregate、collect、count这些方法 都会调用 sparkContext.runJob ,这些方法称之为Action 触发提交Job

def reduce(f: (T, T) => T): T = withScope {

  val cleanF = sc.clean(f)

  val reducePartition: Iterator[T] => Option[T] = iter => {

    if (iter.hasNext) {

      Some(iter.reduceLeft(cleanF))

    } else {

      None

    }

  }

  var jobResult: Option[T] = None

  val mergeResult = (index: Int, taskResult: Option[T]) => {

    if (taskResult.isDefined) {

      jobResult = jobResult match {

        case Some(value) => Some(f(value, taskResult.get))

        case None => taskResult

      }

    }

  }

  sc.runJob(this, reducePartition, mergeResult)

  // Get the final result out of our Option, or throw an exception if the RDD was empty

  jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))

}

def runJob[T, U: ClassTag](

    rdd: RDD[T],

    processPartition: Iterator[T] => U,

    resultHandler: (Int, U) => Unit)

{

  val processFunc = (context: TaskContext, iter: Iterator[T]) => processPartition(iter)

  runJob[T, U](rdd, processFunc, 0 until rdd.partitions.length, resultHandler)

}

2.SparkContext

def runJob[T, U: ClassTag](

    rdd: RDD[T],

    func: (TaskContext, Iterator[T]) => U,

    partitions: Seq[Int],

    resultHandler: (Int, U) => Unit): Unit = {

  if (stopped.get()) {

    throw new IllegalStateException("SparkContext has been shutdown")

  }

  val callSite = getCallSite

  val cleanedFunc = clean(func)

  logInfo("Starting job: " + callSite.shortForm)

  if (conf.getBoolean("spark.logLineage", false)) {

    logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)

  }

  dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)

  progressBar.foreach(_.finishAll())

  rdd.doCheckpoint()

}

3.DAGSchedule

def runJob[T, U](

    rdd: RDD[T],

    func: (TaskContext, Iterator[T]) => U,

    partitions: Seq[Int],

    callSite: CallSite,

    resultHandler: (Int, U) => Unit,

    properties: Properties): Unit = {

  val start = System.nanoTime

  val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)

  ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)

  waiter.completionFuture.value.get match {

    case scala.util.Success(_) =>

      logInfo("Job %d finished: %s, took %f s".format

        (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))

    case scala.util.Failure(exception) =>

      logInfo("Job %d failed: %s, took %f s".format

        (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))

      // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.

      val callerStackTrace = Thread.currentThread().getStackTrace.tail

      exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)

      throw exception

  }

}

def submitJob[T, U](

    rdd: RDD[T],

    func: (TaskContext, Iterator[T]) => U,

    partitions: Seq[Int],

    callSite: CallSite,

    resultHandler: (Int, U) => Unit,

    properties: Properties): JobWaiter[U] = {

  // Check to make sure we are not launching a task on a partition that does not exist.

  val maxPartitions = rdd.partitions.length

  partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>

    throw new IllegalArgumentException(

      "Attempting to access a non-existent partition: " + p + ". " +

        "Total number of partitions: " + maxPartitions)

  }

  val jobId = nextJobId.getAndIncrement()

  if (partitions.size == 0) {

    // Return immediately if the job is running 0 tasks

    return new JobWaiter[U](this, jobId, 0, resultHandler)

  }

  assert(partitions.size > 0)

  val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]

  val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)

  eventProcessLoop.post((

    jobId, rdd, func2, partitions.toArray, callSite, waiter,

    SerializationUtils.clone(properties)))

  waiter

}

4.DAGSchedulerEventProcessLoop

override def onReceive(event: DAGSchedulerEvent): Unit = {

  val timerContext = timer.time()

  try {

    doOnReceive(event)

  } finally {

    timerContext.stop()

  }

}

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {

  case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>

    dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

  case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>

    dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)

  case StageCancelled(stageId, reason) =>

    dagScheduler.handleStageCancellation(stageId, reason)

  case JobCancelled(jobId, reason) =>

    dagScheduler.handleJobCancellation(jobId, reason)

  case JobGroupCancelled(groupId) =>

    dagScheduler.handleJobGroupCancelled(groupId)

  case AllJobsCancelled =>

    dagScheduler.doCancelAllJobs()

  case ExecutorAdded(execId, host) =>

    dagScheduler.handleExecutorAdded(execId, host)

  case ExecutorLost(execId, reason) =>

    val workerLost = reason match {

      case SlaveLost(_, true) => true

      case _ => false

    }

    dagScheduler.handleExecutorLost(execId, workerLost)

  case WorkerRemoved(workerId, host, message) =>

    dagScheduler.handleWorkerRemoved(workerId, host, message)

  case BeginEvent(task, taskInfo) =>

    dagScheduler.handleBeginEvent(task, taskInfo)

  case SpeculativeTaskSubmitted(task) =>

    dagScheduler.handleSpeculativeTaskSubmitted(task)

  case GettingResultEvent(taskInfo) =>

    dagScheduler.handleGetTaskResult(taskInfo)

  case completion: CompletionEvent =>

    dagScheduler.handleTaskCompletion(completion)

  case TaskSetFailed(taskSet, reason, exception) =>

    dagScheduler.handleTaskSetFailed(taskSet, reason, exception)

  case ResubmitFailedStages =>

    dagScheduler.resubmitFailedStages()

}

5.DAGScheduler

M-submitStage 和 M-getMissingParentStages 构成spark stage划分 

划分过程中创建stage 是 M-getOrCreateShuffleMapStage 第一次会创建,第二次就是从map中取(也就是从内存中取)

把一个app 划分成多个stage 使用M-submitMissingTasks 提交过去

M-submitStage

划分过程 ResultStage 是最后一个stage ,

假如ResultStage 依赖ShuffleMapStage B

ShuffleMapStage B 依赖ShuffleMapStage A

会优先提交A,提交后把 B 和Result 放入 waitingStages

M-submitMissingTasks 

根据不同的Stage  将rdd 和 func 或者 stage.shuffleDep 封装到 taskBinaryBytes 最后更具不同的partition id放入Task 中  存入taskset 中

等A 运行完之后,最后一行

submitWaitingChildStages(stage)

M-submitWaitingChildStages

根据当前的stage 从waitingStages 找出当前的stage 的子stage 

然后再次提交到  submitStage

M-getMissingParentStages

if (!mapStage.isAvailable)  则不为true 则不会再次提交

这个是获取mapOutputTrackerMaster 中  _numAvailableOutputs 数量是否和分区数相等。如果相等,则表示 该Stage 已经处理过

taskBinaryBytes = stage match {

  case stage: ShuffleMapStage =>

    JavaUtils.bufferToArray(

      closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))

  case stage: ResultStage =>

    JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))

}

taskBinary = sc.broadcast(taskBinaryBytes)

new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,

  taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),

  Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())

new ResultTask(stage.id, stage.latestInfo.attemptNumber,

  taskBinary, part, locs, id, properties, serializedTaskMetrics,

  Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,

  stage.rdd.isBarrier())

private[scheduler] def handleJobSubmitted(jobId: Int,

    finalRDD: RDD[_],

    func: (TaskContext, Iterator[_]) => _,

    partitions: Array[Int],

    callSite: CallSite,

    listener: JobListener,

    properties: Properties) {

  var finalStage: ResultStage = null

  try {

    // New stage creation may throw an exception if, for example, jobs are run on a

    // HadoopRDD whose underlying HDFS files have been deleted.

    finalStage =  createResultStage(finalRDD, func, partitions, jobId, callSite)

  } catch {

    case e: BarrierJobSlotsNumberCheckFailed =>

      logWarning(s"The job $jobId requires to run a barrier stage that requires more slots " +

        "than the total number of slots in the cluster currently.")

      // If jobId doesn't exist in the map, Scala coverts its value null to 0: Int automatically.

      val numCheckFailures = barrierJobIdToNumTasksCheckFailures.compute(jobId,

        new BiFunction[Int, Int, Int] {

          override def apply(key: Int, value: Int): Int = value + 1

        })

      if (numCheckFailures <= maxFailureNumTasksCheck) {

        messageScheduler.schedule(

          new Runnable {

            override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func,

              partitions, callSite, listener, properties))

          },

          timeIntervalNumTasksCheck,

          TimeUnit.SECONDS

        )

        return

      } else {

        // Job failed, clear internal data.

        barrierJobIdToNumTasksCheckFailures.remove(jobId)

        listener.jobFailed(e)

        return

      }

    case e: Exception =>

      logWarning("Creating new stage failed due to exception - job: " + jobId, e)

      listener.jobFailed(e)

      return

  }

  // Job submitted, clear internal data.

  barrierJobIdToNumTasksCheckFailures.remove(jobId)

  val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)

  clearCacheLocs()

  logInfo("Got job %s (%s) with %d output partitions".format(

    job.jobId, callSite.shortForm, partitions.length))

  logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")

  logInfo("Parents of final stage: " + finalStage.parents)

  logInfo("Missing parents: " + getMissingParentStages(finalStage))

  val jobSubmissionTime = clock.getTimeMillis()

  jobIdToActiveJob(jobId) = job

  activeJobs += job

  finalStage.setActiveJob(job)

  val stageIds = jobIdToStageIds(jobId).toArray

  val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))

  listenerBus.post(

    SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))

  submitStage(finalStage)

}

private def submitStage(stage: Stage) {

  val jobId = activeJobForStage(stage)

  if (jobId.isDefined) {

    logDebug("submitStage(" + stage + ")")

    if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {

      val missing = getMissingParentStages(stage).sortBy(_.id)

      logDebug("missing: " + missing)

      if (missing.isEmpty) {

        logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")

        submitMissingTasks(stage, jobId.get)

      } else {

        for (parent <- missing) {

          submitStage(parent)

        }

        waitingStages += stage

      }

    }

  } else {

    abortStage(stage, "No active job for stage " + stage.id, None)

  }

}

private def getMissingParentStages(stage: Stage): List[Stage] = {

  val missing = new HashSet[Stage]

  val visited = new HashSet[RDD[_]]

  // We are manually maintaining a stack here to prevent StackOverflowError

  // caused by recursively visiting

  val waitingForVisit = new ArrayStack[RDD[_]]

  def visit(rdd: RDD[_]) {

    if (!visited(rdd)) {

      visited += rdd

      val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)

      if (rddHasUncachedPartitions) {

        for (dep <- rdd.dependencies) {

          dep match {

            case shufDep: ShuffleDependency[_, _, _] =>

              val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)

              if (!mapStage.isAvailable) {

                missing += mapStage

              }

            case narrowDep: NarrowDependency[_] =>

              waitingForVisit.push(narrowDep.rdd)

          }

        }

      }

    }

  }

  waitingForVisit.push(stage.rdd)

  while (waitingForVisit.nonEmpty) {

    visit(waitingForVisit.pop())

  }

  missing.toList

}

private def submitMissingTasks(stage: Stage, jobId: Int) {

  logDebug("submitMissingTasks(" + stage + ")")

  // First figure out the indexes of partition ids to compute.

  val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

  // Use the scheduling pool, job group, description, etc. from an ActiveJob associated

  // with this Stage

  val properties = jobIdToActiveJob(jobId).properties

  runningStages += stage

  // SparkListenerStageSubmitted should be posted before testing whether tasks are

  // serializable. If tasks are not serializable, a SparkListenerStageCompleted event

  // will be posted, which should always come after a corresponding SparkListenerStageSubmitted

  // event.

  stage match {

    case s: ShuffleMapStage =>

      outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)

    case s: ResultStage =>

      outputCommitCoordinator.stageStart(

        stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)

  }

  val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {

    stage match {

      case s: ShuffleMapStage =>

        partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap

      case s: ResultStage =>

        partitionsToCompute.map { id =>

          val p = s.partitions(id)

          (id, getPreferredLocs(stage.rdd, p))

        }.toMap

    }

  } catch {

    case NonFatal(e) =>

      stage.makeNewStageAttempt(partitionsToCompute.size)

      listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

      abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))

      runningStages -= stage

      return

  }

  stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)

  // If there are tasks to execute, record the submission time of the stage. Otherwise,

  // post the even without the submission time, which indicates that this stage was

  // skipped.

  if (partitionsToCompute.nonEmpty) {

    stage.latestInfo.submissionTime = Some(clock.getTimeMillis())

  }

  listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

  // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.

  // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast

  // the serialized copy of the RDD and for each task we will deserialize it, which means each

  // task gets a different copy of the RDD. This provides stronger isolation between tasks that

  // might modify state of objects referenced in their closures. This is necessary in Hadoop

  // where the JobConf/Configuration object is not thread-safe.

  var taskBinary: Broadcast[Array[Byte]] = null

  var partitions: Array[Partition] = null

  try {

    // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).

    // For ResultTask, serialize and broadcast (rdd, func).

    var taskBinaryBytes: Array[Byte] = null

    // taskBinaryBytes and partitions are both effected by the checkpoint status. We need

    // this synchronization in case another concurrent job is checkpointing this RDD, so we get a

    // consistent view of both variables.

    RDDCheckpointData.synchronized {

      taskBinaryBytes = stage match {

        case stage: ShuffleMapStage =>

          JavaUtils.bufferToArray(

            closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))

        case stage: ResultStage =>

          JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))

      }

      partitions = stage.rdd.partitions

    }

    taskBinary = sc.broadcast(taskBinaryBytes)

  } catch {

    // In the case of a failure during serialization, abort the stage.

    case e: NotSerializableException =>

      abortStage(stage, "Task not serializable: " + e.toString, Some(e))

      runningStages -= stage

      // Abort execution

      return

    case NonFatal(e) =>

      abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))

      runningStages -= stage

      return

  }

  val tasks: Seq[Task[_]] = try {

    val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()

    stage match {

      case stage: ShuffleMapStage =>

        stage.pendingPartitions.clear()

        partitionsToCompute.map { id =>

          val locs = taskIdToLocations(id)

          val part = partitions(id)

          stage.pendingPartitions += id

          new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,

            taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),

            Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())

        }

      case stage: ResultStage =>

        partitionsToCompute.map { id =>

          val p: Int = stage.partitions(id)

          val part = partitions(p)

          val locs = taskIdToLocations(id)

          new ResultTask(stage.id, stage.latestInfo.attemptNumber,

            taskBinary, part, locs, id, properties, serializedTaskMetrics,

            Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,

            stage.rdd.isBarrier())

        }

    }

  } catch {

    case NonFatal(e) =>

      abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))

      runningStages -= stage

      return

  }

  if (tasks.size > 0) {

    logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +

      s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")

    taskScheduler.submitTasks(new TaskSet(

      tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))

  } else {

    // Because we posted SparkListenerStageSubmitted earlier, we should mark

    // the stage as completed here in case there are no tasks to run

    markStageAsFinished(stage, None)

    stage match {

      case stage: ShuffleMapStage =>

        logDebug(s"Stage ${stage} is actually done; " +

            s"(available: ${stage.isAvailable}," +

            s"available outputs: ${stage.numAvailableOutputs}," +

            s"partitions: ${stage.numPartitions})")

        markMapStageJobsAsFinished(stage)

      case stage : ResultStage =>

        logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})")

    }

    submitWaitingChildStages(stage)

  }

}

private def submitWaitingChildStages(parent: Stage) {

  logTrace(s"Checking if any dependencies of $parent are now runnable")

  logTrace("running: " + runningStages)

  logTrace("waiting: " + waitingStages)

  logTrace("failed: " + failedStages)

  val childStages = waitingStages.filter(_.parents.contains(parent)).toArray

  waitingStages --= childStages

  for (stage <- childStages.sortBy(_.firstJobId)) {

    submitStage(stage)

  }

}

6.TaskScheduleImpl

这部实际是对taskset 进行封装成TaskSetManager 放入队列

override def submitTasks(taskSet: TaskSet) {

  val tasks = taskSet.tasks

  logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")

  this.synchronized {

    val manager = createTaskSetManager(taskSet, maxTaskFailures)

    val stage = taskSet.stageId

    val stageTaskSets =

      taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])

    stageTaskSets(taskSet.stageAttemptId) = manager

    val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>

      ts.taskSet != taskSet && !ts.isZombie

    }

    if (conflictingTaskSet) {

      throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +

        s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")

    }

    //这一步实际上把taskset放入调度队列中

    schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

    if (!isLocal && !hasReceivedTask) {

      starvationTimer.scheduleAtFixedRate(new TimerTask() {

        override def run() {

          if (!hasLaunchedTask) {

            logWarning("Initial job has not accepted any resources; " +

              "check your cluster UI to ensure that workers are registered " +

              "and have sufficient resources")

          } else {

            this.cancel()

          }

        }

      }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)

    }

    hasReceivedTask = true

  }

    //通知 StandaloneSchedulerBackend 进行通知,对任务队列中的task 进行分配executor 

  backend.reviveOffers()

}

7.FIFOSchedulableBuilder

//将TaskSetManager 放入调度队列中

override def addTaskSetManager(manager: Schedulable, properties: Properties) {

  rootPool.addSchedulable(manager)

}

8.CoarseGrainedSchedulerBackend

主要是对executor进行过滤,然后executor 和 task 分配

最后启动task,也就是向executor 发送launchtask 的消息 

launchTask 其实发送的是TaskDescription,TaskDescription 包含了 task 和 executor 信息

TaskSetManager 生成的 TaskDescription

private def makeOffers() {

  // Make sure no executor is killed while some task is launching on it

  val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {

    // Filter out executors under killing

    val activeExecutors = executorDataMap.filterKeys(executorIsAlive)

    val workOffers = activeExecutors.map {

      case (id, executorData) =>

        new WorkerOffer(id, executorData.executorHost, executorData.freeCores,

          Some(executorData.executorAddress.hostPort))

    }.toIndexedSeq

    scheduler.resourceOffers(workOffers)

  }

  if (!taskDescs.isEmpty) {

    launchTasks(taskDescs)

  }

}

def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {

  // Mark each slave as alive and remember its hostname

  // Also track if new executor is added

  var newExecAvail = false

  for (o <- offers) {

    if (!hostToExecutors.contains(o.host)) {

      hostToExecutors(o.host) = new HashSet[String]()

    }

    if (!executorIdToRunningTaskIds.contains(o.executorId)) {

      hostToExecutors(o.host) += o.executorId

      executorAdded(o.executorId, o.host)

      executorIdToHost(o.executorId) = o.host

      executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()

      newExecAvail = true

    }

    for (rack <- getRackForHost(o.host)) {

      hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host

    }

  }

  // Before making any offers, remove any nodes from the blacklist whose blacklist has expired. Do

  // this here to avoid a separate thread and added synchronization overhead, and also because

  // updating the blacklist is only relevant when task offers are being made.

  blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())

  val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>

    offers.filter { offer =>

      !blacklistTracker.isNodeBlacklisted(offer.host) &&

        !blacklistTracker.isExecutorBlacklisted(offer.executorId)

    }

  }.getOrElse(offers)

  val shuffledOffers = shuffleOffers(filteredOffers)

  // Build a list of tasks to assign to each worker.

  val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))

  val availableCpus = shuffledOffers.map(o => o.cores).toArray

  val availableSlots = shuffledOffers.map(o => o.cores / CPUS_PER_TASK).sum

  val sortedTaskSets = rootPool.getSortedTaskSetQueue

  for (taskSet <- sortedTaskSets) {

    logDebug("parentName: %s, name: %s, runningTasks: %s".format(

      taskSet.parent.name, taskSet.name, taskSet.runningTasks))

    if (newExecAvail) {

      taskSet.executorAdded()

    }

  }

  // Take each TaskSet in our scheduling order, and then offer it each node in increasing order

  // of locality levels so that it gets a chance to launch local tasks on all of them.

  // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY

  for (taskSet <- sortedTaskSets) {

    // Skip the barrier taskSet if the available slots are less than the number of pending tasks.

    if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {

      // Skip the launch process.

      // TODO SPARK-24819 If the job requires more slots than available (both busy and free

      // slots), fail the job on submit.

      logInfo(s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " +

        s"because the barrier taskSet requires ${taskSet.numTasks} slots, while the total " +

        s"number of available slots is $availableSlots.")

    } else {

      var launchedAnyTask = false

      // Record all the executor IDs assigned barrier tasks on.

      val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]()

      for (currentMaxLocality <- taskSet.myLocalityLevels) {

        var launchedTaskAtCurrentMaxLocality = false

        do {

          launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet,

            currentMaxLocality, shuffledOffers, availableCpus, tasks, addressesWithDescs)

          launchedAnyTask |= launchedTaskAtCurrentMaxLocality

        } while (launchedTaskAtCurrentMaxLocality)

      }

      if (!launchedAnyTask) {

        taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors).foreach { taskIndex =>

            // If the taskSet is unschedulable we try to find an existing idle blacklisted

            // executor. If we cannot find one, we abort immediately. Else we kill the idle

            // executor and kick off an abortTimer which if it doesn't schedule a task within the

            // the timeout will abort the taskSet if we were unable to schedule any task from the

            // taskSet.

            // Note 1: We keep track of schedulability on a per taskSet basis rather than on a per

            // task basis.

            // Note 2: The taskSet can still be aborted when there are more than one idle

            // blacklisted executors and dynamic allocation is on. This can happen when a killed

            // idle executor isn't replaced in time by ExecutorAllocationManager as it relies on

            // pending tasks and doesn't kill executors on idle timeouts, resulting in the abort

            // timer to expire and abort the taskSet.

            executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match {

              case Some ((executorId, _)) =>

                if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {

                  blacklistTrackerOpt.foreach(blt => blt.killBlacklistedIdleExecutor(executorId))

                  val timeout = conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000

                  unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() + timeout

                  logInfo(s"Waiting for $timeout ms for completely "

                    + s"blacklisted task to be schedulable again before aborting $taskSet.")

                  abortTimer.schedule(

                    createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), timeout)

                }

              case None => // Abort Immediately

                logInfo("Cannot schedule any task because of complete blacklisting. No idle" +

                  s" executors can be found to kill. Aborting $taskSet." )

                taskSet.abortSinceCompletelyBlacklisted(taskIndex)

            }

        }

      } else {

        // We want to defer killing any taskSets as long as we have a non blacklisted executor

        // which can be used to schedule a task from any active taskSets. This ensures that the

        // job can make progress.

        // Note: It is theoretically possible that a taskSet never gets scheduled on a

        // non-blacklisted executor and the abort timer doesn't kick in because of a constant

        // submission of new TaskSets. See the PR for more details.

        if (unschedulableTaskSetToExpiryTime.nonEmpty) {

          logInfo("Clearing the expiry times for all unschedulable taskSets as a task was " +

            "recently scheduled.")

          unschedulableTaskSetToExpiryTime.clear()

        }

      }

      if (launchedAnyTask && taskSet.isBarrier) {

        // Check whether the barrier tasks are partially launched.

        // TODO SPARK-24818 handle the assert failure case (that can happen when some locality

        // requirements are not fulfilled, and we should revert the launched tasks).

        require(addressesWithDescs.size == taskSet.numTasks,

          s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " +

            s"because only ${addressesWithDescs.size} out of a total number of " +

            s"${taskSet.numTasks} tasks got resource offers. The resource offers may have " +

            "been blacklisted or cannot fulfill task locality requirements.")

        // materialize the barrier coordinator.

        maybeInitBarrierCoordinator()

        // Update the taskInfos into all the barrier task properties.

        val addressesStr = addressesWithDescs

          // Addresses ordered by partitionId

          .sortBy(_._2.partitionId)

          .map(_._1)

          .mkString(",")

        addressesWithDescs.foreach(_._2.properties.setProperty("addresses", addressesStr))

        logInfo(s"Successfully scheduled all the ${addressesWithDescs.size} tasks for barrier " +

          s"stage ${taskSet.stageId}.")

      }

    }

  }

  // TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the barrier tasks don't get

  // launched within a configured time.

  if (tasks.size > 0) {

    hasLaunchedTask = true

  }

  return tasks

}

private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {

  for (task <- tasks.flatten) {

    val serializedTask = TaskDescription.encode(task)

    if (serializedTask.limit() >= maxRpcMessageSize) {

      Option(scheduler.taskIdToTaskSetManager.get(task.taskId)).foreach { taskSetMgr =>

        try {

          var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +

            "spark.rpc.message.maxSize (%d bytes). Consider increasing " +

            "spark.rpc.message.maxSize or using broadcast variables for large values."

          msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)

          taskSetMgr.abort(msg)

        } catch {

          case e: Exception => logError("Exception in error callback", e)

        }

      }

    }

    else {

      val executorData = executorDataMap(task.executorId)

      executorData.freeCores -= scheduler.CPUS_PER_TASK

      logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +

        s"${executorData.executorHost}.")

      executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))

    }

  }

}

1.Rdd

rdd中 reduce、fold、aggregate 这些ShuffleTask  还有collect、count这些finalTask 都会调用 sparkContext.runJob

def reduce(f: (T, T) => T): T = withScope {

  val cleanF = sc.clean(f)

  val reducePartition: Iterator[T] => Option[T] = iter => {

    if (iter.hasNext) {

      Some(iter.reduceLeft(cleanF))

    } else {

      None

    }

  }

  var jobResult: Option[T] = None

  val mergeResult = (index: Int, taskResult: Option[T]) => {

    if (taskResult.isDefined) {

      jobResult = jobResult match {

        case Some(value) => Some(f(value, taskResult.get))

        case None => taskResult

      }

    }

  }

  sc.runJob(this, reducePartition, mergeResult)

  // Get the final result out of our Option, or throw an exception if the RDD was empty

  jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))

}

def runJob[T, U: ClassTag](

    rdd: RDD[T],

    processPartition: Iterator[T] => U,

    resultHandler: (Int, U) => Unit)

{

  val processFunc = (context: TaskContext, iter: Iterator[T]) => processPartition(iter)

  runJob[T, U](rdd, processFunc, 0 until rdd.partitions.length, resultHandler)

}

2.SparkContext

def runJob[T, U: ClassTag](

    rdd: RDD[T],

    func: (TaskContext, Iterator[T]) => U,

    partitions: Seq[Int],

    resultHandler: (Int, U) => Unit): Unit = {

  if (stopped.get()) {

    throw new IllegalStateException("SparkContext has been shutdown")

  }

  val callSite = getCallSite

  val cleanedFunc = clean(func)

  logInfo("Starting job: " + callSite.shortForm)

  if (conf.getBoolean("spark.logLineage", false)) {

    logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)

  }

  dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)

  progressBar.foreach(_.finishAll())

  rdd.doCheckpoint()

}

3.DAGSchedule

def runJob[T, U](

    rdd: RDD[T],

    func: (TaskContext, Iterator[T]) => U,

    partitions: Seq[Int],

    callSite: CallSite,

    resultHandler: (Int, U) => Unit,

    properties: Properties): Unit = {

  val start = System.nanoTime

  val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)

  ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)

  waiter.completionFuture.value.get match {

    case scala.util.Success(_) =>

      logInfo("Job %d finished: %s, took %f s".format

        (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))

    case scala.util.Failure(exception) =>

      logInfo("Job %d failed: %s, took %f s".format

        (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))

      // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.

      val callerStackTrace = Thread.currentThread().getStackTrace.tail

      exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)

      throw exception

  }

}

def submitJob[T, U](

    rdd: RDD[T],

    func: (TaskContext, Iterator[T]) => U,

    partitions: Seq[Int],

    callSite: CallSite,

    resultHandler: (Int, U) => Unit,

    properties: Properties): JobWaiter[U] = {

  // Check to make sure we are not launching a task on a partition that does not exist.

  val maxPartitions = rdd.partitions.length

  partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>

    throw new IllegalArgumentException(

      "Attempting to access a non-existent partition: " + p + ". " +

        "Total number of partitions: " + maxPartitions)

  }

  val jobId = nextJobId.getAndIncrement()

  if (partitions.size == 0) {

    // Return immediately if the job is running 0 tasks

    return new JobWaiter[U](this, jobId, 0, resultHandler)

  }

  assert(partitions.size > 0)

  val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]

  val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)

  eventProcessLoop.post((

    jobId, rdd, func2, partitions.toArray, callSite, waiter,

    SerializationUtils.clone(properties)))

  waiter

}

4.DAGSchedulerEventProcessLoop

override def onReceive(event: DAGSchedulerEvent): Unit = {

  val timerContext = timer.time()

  try {

    doOnReceive(event)

  } finally {

    timerContext.stop()

  }

}

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {

  case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>

    dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

  case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>

    dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)

  case StageCancelled(stageId, reason) =>

    dagScheduler.handleStageCancellation(stageId, reason)

  case JobCancelled(jobId, reason) =>

    dagScheduler.handleJobCancellation(jobId, reason)

  case JobGroupCancelled(groupId) =>

    dagScheduler.handleJobGroupCancelled(groupId)

  case AllJobsCancelled =>

    dagScheduler.doCancelAllJobs()

  case ExecutorAdded(execId, host) =>

    dagScheduler.handleExecutorAdded(execId, host)

  case ExecutorLost(execId, reason) =>

    val workerLost = reason match {

      case SlaveLost(_, true) => true

      case _ => false

    }

    dagScheduler.handleExecutorLost(execId, workerLost)

  case WorkerRemoved(workerId, host, message) =>

    dagScheduler.handleWorkerRemoved(workerId, host, message)

  case BeginEvent(task, taskInfo) =>

    dagScheduler.handleBeginEvent(task, taskInfo)

  case SpeculativeTaskSubmitted(task) =>

    dagScheduler.handleSpeculativeTaskSubmitted(task)

  case GettingResultEvent(taskInfo) =>

    dagScheduler.handleGetTaskResult(taskInfo)

  case completion: CompletionEvent =>

    dagScheduler.handleTaskCompletion(completion)

  case TaskSetFailed(taskSet, reason, exception) =>

    dagScheduler.handleTaskSetFailed(taskSet, reason, exception)

  case ResubmitFailedStages =>

    dagScheduler.resubmitFailedStages()

}

5.DAGScheduler

M-submitStage 和 M-getMissingParentStages 构成spark stage划分 

划分过程中创建stage 是 M-getOrCreateShuffleMapStage 第一次会创建,第二次就是从map中取(也就是从内存中取)

把一个app 划分成多个stage 使用M-submitMissingTasks 提交过去

M-submitStage

划分过程 ResultStage 是最后一个stage ,

假如ResultStage 依赖ShuffleMapStage B

ShuffleMapStage B 依赖ShuffleMapStage A

会优先提交A,提交后把 B 和Result 放入 waitingStages

M-submitMissingTasks 

根据不同的Stage  将rdd 和 func 或者 stage.shuffleDep 封装到 taskBinaryBytes 最后更具不同的partition id放入Task 中  存入taskset 中

等A 运行完之后,最后一行

submitWaitingChildStages(stage)

M-submitWaitingChildStages

根据当前的stage 从waitingStages 找出当前的stage 的子stage 

然后再次提交到  submitStage

M-getMissingParentStages

if (!mapStage.isAvailable)  则不为true 则不会再次提交

这个是获取mapOutputTrackerMaster 中  _numAvailableOutputs 数量是否和分区数相等。如果相等,则表示 该Stage 已经处理过

taskBinaryBytes = stage match {

  case stage: ShuffleMapStage =>

    JavaUtils.bufferToArray(

      closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))

  case stage: ResultStage =>

    JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))

}

taskBinary = sc.broadcast(taskBinaryBytes)

new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,

  taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),

  Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())

new ResultTask(stage.id, stage.latestInfo.attemptNumber,

  taskBinary, part, locs, id, properties, serializedTaskMetrics,

  Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,

  stage.rdd.isBarrier())

private[scheduler] def handleJobSubmitted(jobId: Int,

    finalRDD: RDD[_],

    func: (TaskContext, Iterator[_]) => _,

    partitions: Array[Int],

    callSite: CallSite,

    listener: JobListener,

    properties: Properties) {

  var finalStage: ResultStage = null

  try {

    // New stage creation may throw an exception if, for example, jobs are run on a

    // HadoopRDD whose underlying HDFS files have been deleted.

    finalStage =  createResultStage(finalRDD, func, partitions, jobId, callSite)

  } catch {

    case e: BarrierJobSlotsNumberCheckFailed =>

      logWarning(s"The job $jobId requires to run a barrier stage that requires more slots " +

        "than the total number of slots in the cluster currently.")

      // If jobId doesn't exist in the map, Scala coverts its value null to 0: Int automatically.

      val numCheckFailures = barrierJobIdToNumTasksCheckFailures.compute(jobId,

        new BiFunction[Int, Int, Int] {

          override def apply(key: Int, value: Int): Int = value + 1

        })

      if (numCheckFailures <= maxFailureNumTasksCheck) {

        messageScheduler.schedule(

          new Runnable {

            override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func,

              partitions, callSite, listener, properties))

          },

          timeIntervalNumTasksCheck,

          TimeUnit.SECONDS

        )

        return

      } else {

        // Job failed, clear internal data.

        barrierJobIdToNumTasksCheckFailures.remove(jobId)

        listener.jobFailed(e)

        return

      }

    case e: Exception =>

      logWarning("Creating new stage failed due to exception - job: " + jobId, e)

      listener.jobFailed(e)

      return

  }

  // Job submitted, clear internal data.

  barrierJobIdToNumTasksCheckFailures.remove(jobId)

  val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)

  clearCacheLocs()

  logInfo("Got job %s (%s) with %d output partitions".format(

    job.jobId, callSite.shortForm, partitions.length))

  logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")

  logInfo("Parents of final stage: " + finalStage.parents)

  logInfo("Missing parents: " + getMissingParentStages(finalStage))

  val jobSubmissionTime = clock.getTimeMillis()

  jobIdToActiveJob(jobId) = job

  activeJobs += job

  finalStage.setActiveJob(job)

  val stageIds = jobIdToStageIds(jobId).toArray

  val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))

  listenerBus.post(

    SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))

  submitStage(finalStage)

}

private def submitStage(stage: Stage) {

  val jobId = activeJobForStage(stage)

  if (jobId.isDefined) {

    logDebug("submitStage(" + stage + ")")

    if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {

      val missing = getMissingParentStages(stage).sortBy(_.id)

      logDebug("missing: " + missing)

      if (missing.isEmpty) {

        logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")

        submitMissingTasks(stage, jobId.get)

      } else {

        for (parent <- missing) {

          submitStage(parent)

        }

        waitingStages += stage

      }

    }

  } else {

    abortStage(stage, "No active job for stage " + stage.id, None)

  }

}

private def getMissingParentStages(stage: Stage): List[Stage] = {

  val missing = new HashSet[Stage]

  val visited = new HashSet[RDD[_]]

  // We are manually maintaining a stack here to prevent StackOverflowError

  // caused by recursively visiting

  val waitingForVisit = new ArrayStack[RDD[_]]

  def visit(rdd: RDD[_]) {

    if (!visited(rdd)) {

      visited += rdd

      val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)

      if (rddHasUncachedPartitions) {

        for (dep <- rdd.dependencies) {

          dep match {

            case shufDep: ShuffleDependency[_, _, _] =>

              val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)

              if (!mapStage.isAvailable) {

                missing += mapStage

              }

            case narrowDep: NarrowDependency[_] =>

              waitingForVisit.push(narrowDep.rdd)

          }

        }

      }

    }

  }

  waitingForVisit.push(stage.rdd)

  while (waitingForVisit.nonEmpty) {

    visit(waitingForVisit.pop())

  }

  missing.toList

}

private def submitMissingTasks(stage: Stage, jobId: Int) {

  logDebug("submitMissingTasks(" + stage + ")")

  // First figure out the indexes of partition ids to compute.

  val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

  // Use the scheduling pool, job group, description, etc. from an ActiveJob associated

  // with this Stage

  val properties = jobIdToActiveJob(jobId).properties

  runningStages += stage

  // SparkListenerStageSubmitted should be posted before testing whether tasks are

  // serializable. If tasks are not serializable, a SparkListenerStageCompleted event

  // will be posted, which should always come after a corresponding SparkListenerStageSubmitted

  // event.

  stage match {

    case s: ShuffleMapStage =>

      outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)

    case s: ResultStage =>

      outputCommitCoordinator.stageStart(

        stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)

  }

  val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {

    stage match {

      case s: ShuffleMapStage =>

        partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap

      case s: ResultStage =>

        partitionsToCompute.map { id =>

          val p = s.partitions(id)

          (id, getPreferredLocs(stage.rdd, p))

        }.toMap

    }

  } catch {

    case NonFatal(e) =>

      stage.makeNewStageAttempt(partitionsToCompute.size)

      listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

      abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))

      runningStages -= stage

      return

  }

  stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)

  // If there are tasks to execute, record the submission time of the stage. Otherwise,

  // post the even without the submission time, which indicates that this stage was

  // skipped.

  if (partitionsToCompute.nonEmpty) {

    stage.latestInfo.submissionTime = Some(clock.getTimeMillis())

  }

  listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

  // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.

  // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast

  // the serialized copy of the RDD and for each task we will deserialize it, which means each

  // task gets a different copy of the RDD. This provides stronger isolation between tasks that

  // might modify state of objects referenced in their closures. This is necessary in Hadoop

  // where the JobConf/Configuration object is not thread-safe.

  var taskBinary: Broadcast[Array[Byte]] = null

  var partitions: Array[Partition] = null

  try {

    // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).

    // For ResultTask, serialize and broadcast (rdd, func).

    var taskBinaryBytes: Array[Byte] = null

    // taskBinaryBytes and partitions are both effected by the checkpoint status. We need

    // this synchronization in case another concurrent job is checkpointing this RDD, so we get a

    // consistent view of both variables.

    RDDCheckpointData.synchronized {

      taskBinaryBytes = stage match {

        case stage: ShuffleMapStage =>

          JavaUtils.bufferToArray(

            closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))

        case stage: ResultStage =>

          JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))

      }

      partitions = stage.rdd.partitions

    }

    taskBinary = sc.broadcast(taskBinaryBytes)

  } catch {

    // In the case of a failure during serialization, abort the stage.

    case e: NotSerializableException =>

      abortStage(stage, "Task not serializable: " + e.toString, Some(e))

      runningStages -= stage

      // Abort execution

      return

    case NonFatal(e) =>

      abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))

      runningStages -= stage

      return

  }

  val tasks: Seq[Task[_]] = try {

    val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()

    stage match {

      case stage: ShuffleMapStage =>

        stage.pendingPartitions.clear()

        partitionsToCompute.map { id =>

          val locs = taskIdToLocations(id)

          val part = partitions(id)

          stage.pendingPartitions += id

          new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,

            taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),

            Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())

        }

      case stage: ResultStage =>

        partitionsToCompute.map { id =>

          val p: Int = stage.partitions(id)

          val part = partitions(p)

          val locs = taskIdToLocations(id)

          new ResultTask(stage.id, stage.latestInfo.attemptNumber,

            taskBinary, part, locs, id, properties, serializedTaskMetrics,

            Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,

            stage.rdd.isBarrier())

        }

    }

  } catch {

    case NonFatal(e) =>

      abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))

      runningStages -= stage

      return

  }

  if (tasks.size > 0) {

    logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +

      s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")

    taskScheduler.submitTasks(new TaskSet(

      tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))

  } else {

    // Because we posted SparkListenerStageSubmitted earlier, we should mark

    // the stage as completed here in case there are no tasks to run

    markStageAsFinished(stage, None)

    stage match {

      case stage: ShuffleMapStage =>

        logDebug(s"Stage ${stage} is actually done; " +

            s"(available: ${stage.isAvailable}," +

            s"available outputs: ${stage.numAvailableOutputs}," +

            s"partitions: ${stage.numPartitions})")

        markMapStageJobsAsFinished(stage)

      case stage : ResultStage =>

        logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})")

    }

    submitWaitingChildStages(stage)

  }

}

private def submitWaitingChildStages(parent: Stage) {

  logTrace(s"Checking if any dependencies of $parent are now runnable")

  logTrace("running: " + runningStages)

  logTrace("waiting: " + waitingStages)

  logTrace("failed: " + failedStages)

  val childStages = waitingStages.filter(_.parents.contains(parent)).toArray

  waitingStages --= childStages

  for (stage <- childStages.sortBy(_.firstJobId)) {

    submitStage(stage)

  }

}

6.TaskScheduleImpl

这部实际是对taskset 进行封装成TaskSetManager 放入队列

override def submitTasks(taskSet: TaskSet) {

  val tasks = taskSet.tasks

  logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")

  this.synchronized {

    val manager = createTaskSetManager(taskSet, maxTaskFailures)

    val stage = taskSet.stageId

    val stageTaskSets =

      taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])

    stageTaskSets(taskSet.stageAttemptId) = manager

    val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>

      ts.taskSet != taskSet && !ts.isZombie

    }

    if (conflictingTaskSet) {

      throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +

        s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")

    }

    //这一步实际上把taskset放入调度队列中

    schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

    if (!isLocal && !hasReceivedTask) {

      starvationTimer.scheduleAtFixedRate(new TimerTask() {

        override def run() {

          if (!hasLaunchedTask) {

            logWarning("Initial job has not accepted any resources; " +

              "check your cluster UI to ensure that workers are registered " +

              "and have sufficient resources")

          } else {

            this.cancel()

          }

        }

      }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)

    }

    hasReceivedTask = true

  }

    //通知 StandaloneSchedulerBackend 进行通知,对任务队列中的task 进行分配executor 

  backend.reviveOffers()

}

7.FIFOSchedulableBuilder

//将TaskSetManager 放入调度队列中

override def addTaskSetManager(manager: Schedulable, properties: Properties) {

  rootPool.addSchedulable(manager)

}

8.CoarseGrainedSchedulerBackend

主要是对executor进行过滤,然后executor 和 task 分配

最后启动task,也就是向executor 发送launchtask 的消息 

launchTask 其实发送的是TaskDescription,TaskDescription 包含了 task 和 executor 信息

TaskSetManager 生成的 TaskDescription

private def makeOffers() {

  // Make sure no executor is killed while some task is launching on it

  val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {

    // Filter out executors under killing

    val activeExecutors = executorDataMap.filterKeys(executorIsAlive)

    val workOffers = activeExecutors.map {

      case (id, executorData) =>

        new WorkerOffer(id, executorData.executorHost, executorData.freeCores,

          Some(executorData.executorAddress.hostPort))

    }.toIndexedSeq

    scheduler.resourceOffers(workOffers)

  }

  if (!taskDescs.isEmpty) {

    launchTasks(taskDescs)

  }

}

def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {

  // Mark each slave as alive and remember its hostname

  // Also track if new executor is added

  var newExecAvail = false

  for (o <- offers) {

    if (!hostToExecutors.contains(o.host)) {

      hostToExecutors(o.host) = new HashSet[String]()

    }

    if (!executorIdToRunningTaskIds.contains(o.executorId)) {

      hostToExecutors(o.host) += o.executorId

      executorAdded(o.executorId, o.host)

      executorIdToHost(o.executorId) = o.host

      executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()

      newExecAvail = true

    }

    for (rack <- getRackForHost(o.host)) {

      hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host

    }

  }

  // Before making any offers, remove any nodes from the blacklist whose blacklist has expired. Do

  // this here to avoid a separate thread and added synchronization overhead, and also because

  // updating the blacklist is only relevant when task offers are being made.

  blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())

  val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>

    offers.filter { offer =>

      !blacklistTracker.isNodeBlacklisted(offer.host) &&

        !blacklistTracker.isExecutorBlacklisted(offer.executorId)

    }

  }.getOrElse(offers)

  val shuffledOffers = shuffleOffers(filteredOffers)

  // Build a list of tasks to assign to each worker.

  val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))

  val availableCpus = shuffledOffers.map(o => o.cores).toArray

  val availableSlots = shuffledOffers.map(o => o.cores / CPUS_PER_TASK).sum

  val sortedTaskSets = rootPool.getSortedTaskSetQueue

  for (taskSet <- sortedTaskSets) {

    logDebug("parentName: %s, name: %s, runningTasks: %s".format(

      taskSet.parent.name, taskSet.name, taskSet.runningTasks))

    if (newExecAvail) {

      taskSet.executorAdded()

    }

  }

  // Take each TaskSet in our scheduling order, and then offer it each node in increasing order

  // of locality levels so that it gets a chance to launch local tasks on all of them.

  // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY

  for (taskSet <- sortedTaskSets) {

    // Skip the barrier taskSet if the available slots are less than the number of pending tasks.

    if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {

      // Skip the launch process.

      // TODO SPARK-24819 If the job requires more slots than available (both busy and free

      // slots), fail the job on submit.

      logInfo(s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " +

        s"because the barrier taskSet requires ${taskSet.numTasks} slots, while the total " +

        s"number of available slots is $availableSlots.")

    } else {

      var launchedAnyTask = false

      // Record all the executor IDs assigned barrier tasks on.

      val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]()

      for (currentMaxLocality <- taskSet.myLocalityLevels) {

        var launchedTaskAtCurrentMaxLocality = false

        do {

          launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet,

            currentMaxLocality, shuffledOffers, availableCpus, tasks, addressesWithDescs)

          launchedAnyTask |= launchedTaskAtCurrentMaxLocality

        } while (launchedTaskAtCurrentMaxLocality)

      }

      if (!launchedAnyTask) {

        taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors).foreach { taskIndex =>

            // If the taskSet is unschedulable we try to find an existing idle blacklisted

            // executor. If we cannot find one, we abort immediately. Else we kill the idle

            // executor and kick off an abortTimer which if it doesn't schedule a task within the

            // the timeout will abort the taskSet if we were unable to schedule any task from the

            // taskSet.

            // Note 1: We keep track of schedulability on a per taskSet basis rather than on a per

            // task basis.

            // Note 2: The taskSet can still be aborted when there are more than one idle

            // blacklisted executors and dynamic allocation is on. This can happen when a killed

            // idle executor isn't replaced in time by ExecutorAllocationManager as it relies on

            // pending tasks and doesn't kill executors on idle timeouts, resulting in the abort

            // timer to expire and abort the taskSet.

            executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match {

              case Some ((executorId, _)) =>

                if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {

                  blacklistTrackerOpt.foreach(blt => blt.killBlacklistedIdleExecutor(executorId))

                  val timeout = conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000

                  unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() + timeout

                  logInfo(s"Waiting for $timeout ms for completely "

                    + s"blacklisted task to be schedulable again before aborting $taskSet.")

                  abortTimer.schedule(

                    createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), timeout)

                }

              case None => // Abort Immediately

                logInfo("Cannot schedule any task because of complete blacklisting. No idle" +

                  s" executors can be found to kill. Aborting $taskSet." )

                taskSet.abortSinceCompletelyBlacklisted(taskIndex)

            }

        }

      } else {

        // We want to defer killing any taskSets as long as we have a non blacklisted executor

        // which can be used to schedule a task from any active taskSets. This ensures that the

        // job can make progress.

        // Note: It is theoretically possible that a taskSet never gets scheduled on a

        // non-blacklisted executor and the abort timer doesn't kick in because of a constant

        // submission of new TaskSets. See the PR for more details.

        if (unschedulableTaskSetToExpiryTime.nonEmpty) {

          logInfo("Clearing the expiry times for all unschedulable taskSets as a task was " +

            "recently scheduled.")

          unschedulableTaskSetToExpiryTime.clear()

        }

      }

      if (launchedAnyTask && taskSet.isBarrier) {

        // Check whether the barrier tasks are partially launched.

        // TODO SPARK-24818 handle the assert failure case (that can happen when some locality

        // requirements are not fulfilled, and we should revert the launched tasks).

        require(addressesWithDescs.size == taskSet.numTasks,

          s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " +

            s"because only ${addressesWithDescs.size} out of a total number of " +

            s"${taskSet.numTasks} tasks got resource offers. The resource offers may have " +

            "been blacklisted or cannot fulfill task locality requirements.")

        // materialize the barrier coordinator.

        maybeInitBarrierCoordinator()

        // Update the taskInfos into all the barrier task properties.

        val addressesStr = addressesWithDescs

          // Addresses ordered by partitionId

          .sortBy(_._2.partitionId)

          .map(_._1)

          .mkString(",")

        addressesWithDescs.foreach(_._2.properties.setProperty("addresses", addressesStr))

        logInfo(s"Successfully scheduled all the ${addressesWithDescs.size} tasks for barrier " +

          s"stage ${taskSet.stageId}.")

      }

    }

  }

  // TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the barrier tasks don't get

  // launched within a configured time.

  if (tasks.size > 0) {

    hasLaunchedTask = true

  }

  return tasks

}

private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {

  for (task <- tasks.flatten) {

    val serializedTask = TaskDescription.encode(task)

    if (serializedTask.limit() >= maxRpcMessageSize) {

      Option(scheduler.taskIdToTaskSetManager.get(task.taskId)).foreach { taskSetMgr =>

        try {

          var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +

            "spark.rpc.message.maxSize (%d bytes). Consider increasing " +

            "spark.rpc.message.maxSize or using broadcast variables for large values."

          msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)

          taskSetMgr.abort(msg)

        } catch {

          case e: Exception => logError("Exception in error callback", e)

        }

      }

    }

    else {

      val executorData = executorDataMap(task.executorId)

      executorData.freeCores -= scheduler.CPUS_PER_TASK

      logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +

        s"${executorData.executorHost}.")

      executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))

    }

  }

}