spark源码分析, 任务提交及序列化
阅读原文时间:2023年07月08日阅读:3

简易基本流程图如下

1. org.apache.spark.scheduler.DAGScheduler#submitMissingTasks

2.  => org.apache.spark.scheduler.TaskSchedulerImpl#submitTasks

// First figure out the indexes of partition ids to compute.
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

// Use the scheduling pool, job group, description, etc. from an ActiveJob associated  
// with this Stage  
val properties = jobIdToActiveJob(jobId).properties

runningStages += stage  
// SparkListenerStageSubmitted should be posted before testing whether tasks are  
// serializable. If tasks are not serializable, a SparkListenerStageCompleted event  
// will be posted, which should always come after a corresponding SparkListenerStageSubmitted  
// event.  
stage match {  
  case s: ShuffleMapStage =>  
    outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)  
  case s: ResultStage =>  
    outputCommitCoordinator.stageStart(  
      stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)  
}  
val taskIdToLocations: Map\[Int, Seq\[TaskLocation\]\] = try {  
  stage match {  
    case s: ShuffleMapStage =>  
      partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap  
    case s: ResultStage =>  
      partitionsToCompute.map { id =>  
        val p = s.partitions(id)  
        (id, getPreferredLocs(stage.rdd, p))  
      }.toMap  
  }  
}

//序列化 RDD
// TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
// Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
// the serialized copy of the RDD and for each task we will deserialize it, which means each
// task gets a different copy of the RDD. This provides stronger isolation between tasks that
// might modify state of objects referenced in their closures. This is necessary in Hadoop
// where the JobConf/Configuration object is not thread-safe.
var taskBinary: Broadcast[Array[Byte]] = null
var partitions: Array[Partition] = null
try {
// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
// For ResultTask, serialize and broadcast (rdd, func).
var taskBinaryBytes: Array[Byte] = null
// taskBinaryBytes and partitions are both effected by the checkpoint status. We need
// this synchronization in case another concurrent job is checkpointing this RDD, so we get a
// consistent view of both variables.
RDDCheckpointData.synchronized {
taskBinaryBytes = stage match {
case stage: ShuffleMapStage =>
JavaUtils.bufferToArray(
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
case stage: ResultStage =>
JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
}

    partitions = stage.rdd.partitions  
  }

  taskBinary = sc.broadcast(taskBinaryBytes)  
}

//生成 taskset
val tasks: Seq[Task[_]] = try {
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
case stage: ShuffleMapStage =>
stage.pendingPartitions.clear()
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = partitions(id)
stage.pendingPartitions += id
new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
}

    case stage: ResultStage =>  
      partitionsToCompute.map { id =>  
        val p: Int = stage.partitions(id)  
        val part = partitions(p)  
        val locs = taskIdToLocations(id)  
        new ResultTask(stage.id, stage.latestInfo.attemptNumber,  
          taskBinary, part, locs, id, properties, serializedTaskMetrics,  
          Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,  
          stage.rdd.isBarrier())  
      }  
  }  
}

//最终提交 taskset
if (tasks.size > 0) {
logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
}

3. =>  org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#reviveOffers ,发送消息

def reviveOffers() {
// 类型 CoarseGrainedClusterMessage

driverEndpoint.send(ReviveOffers) }

4. => 自己处理消息org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint#receive

override def receive: PartialFunction\[Any, Unit\] = {  
  case StatusUpdate(executorId, taskId, state, data) =>  
   .....  
  case ReviveOffers =>  
    makeOffers()

  case KillTask(taskId, executorId, interruptThread, reason) =>  
    ....  
  case KillExecutorsOnHost(host) =>  
  .....  
  case UpdateDelegationTokens(newDelegationTokens) =>  
 .....  
  case RemoveExecutor(executorId, reason) =>  
 ...  
    removeExecutor(executorId, reason)  
}

5.=> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint#makeOffers

// Make fake resource offers on all executors
private def makeOffers() {
// Make sure no executor is killed while some task is launching on it
val taskDescs = withLock {
// Filter out executors under killing
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map {
case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores,
Some(executorData.executorAddress.hostPort))
}.toIndexedSeq
scheduler.resourceOffers(workOffers)
}
if (!taskDescs.isEmpty) {
launchTasks(taskDescs)
}
}

6.=> org.apache.spark.scheduler.TaskSchedulerImpl#resourceOffers. 按照核心分配算法分配各 task 到 executor 上.

// Take each TaskSet in our scheduling order, and then offer it each node in increasing order  
// of locality levels so that it gets a chance to launch local tasks on all of them.  
// NOTE: the preferredLocality order: PROCESS\_LOCAL, NODE\_LOCAL, NO\_PREF, RACK\_LOCAL, ANY  
for (taskSet <- sortedTaskSets) {  
    var launchedAnyTask = false  
    // Record all the executor IDs assigned barrier tasks on.  
    val addressesWithDescs = ArrayBuffer\[(String, TaskDescription)\]()  
    for (currentMaxLocality <- taskSet.myLocalityLevels) {  
      var launchedTaskAtCurrentMaxLocality = false  
      do {  
        launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet,  
          currentMaxLocality, shuffledOffers, availableCpus, tasks, addressesWithDescs)  
        launchedAnyTask |= launchedTaskAtCurrentMaxLocality  
      } while (launchedTaskAtCurrentMaxLocality)  
    }  
}

=>org.apache.spark.scheduler.TaskSchedulerImpl#resourceOfferSingleTaskSet

=>org.apache.spark.scheduler.TaskSchedulerImpl#resourceOfferSingleTaskSet

private def resourceOfferSingleTaskSet(
taskSet: TaskSetManager,
maxLocality: TaskLocality,
shuffledOffers: Seq[WorkerOffer],
availableCpus: Array[Int],
tasks: IndexedSeq[ArrayBuffer[TaskDescription]],
addressesWithDescs: ArrayBuffer[(String, TaskDescription)]) : Boolean = {
var launchedTask = false

//分配任务
for (i <- 0 until shuffledOffers.size) { val execId = shuffledOffers(i).executorId val host = shuffledOffers(i).host if (availableCpus(i) >= CPUS_PER_TASK) {

      for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {  
        tasks(i) += task  
        val tid = task.taskId  
        taskIdToTaskSetManager.put(tid, taskSet)  
        taskIdToExecutorId(tid) = execId  
        executorIdToRunningTaskIds(execId).add(tid)  
        availableCpus(i) -= CPUS\_PER\_TASK

        launchedTask = true  
      }

  }  
}  
return launchedTask  

}

==> org.apache.spark.scheduler.TaskSetManager#resourceOffer
@throws[TaskNotSerializableException]
def resourceOffer(
execId: String,
host: String,
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist =>
blacklist.isNodeBlacklistedForTaskSet(host) ||
blacklist.isExecutorBlacklistedForTaskSet(execId)
}
if (!isZombie && !offerBlacklisted) {
val curTime = clock.getTimeMillis()

  var allowedLocality = maxLocality

  if (maxLocality != TaskLocality.NO\_PREF) {  
    allowedLocality = getAllowedLocalityLevel(curTime)  
    if (allowedLocality > maxLocality) {  
      // We're not allowed to search for farther-away tasks  
      allowedLocality = maxLocality  
    }  
  }

  dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) =>  
    // Found a task; do some bookkeeping and return a task description  
    //找到一个任务,然后封装task的信息,包括序列化  
    val task = tasks(index)  
    //原子自增  
    val taskId = sched.newTaskId()  
    // Do various bookkeeping  
    copiesRunning(index) += 1  
    val attemptNum = taskAttempts(index).size  
    val info = new TaskInfo(taskId, index, attemptNum, curTime,  
      execId, host, taskLocality, speculative)  
    taskInfos(taskId) = info  
    taskAttempts(index) = info :: taskAttempts(index)

    // Serialize and return the task  
    val serializedTask: ByteBuffer = try {  
      ser.serialize(task)  
    }  
    //添加到运行Map中  
    addRunningTask(taskId)

    sched.dagScheduler.taskStarted(task, info)  
    new TaskDescription(  
      taskId,  
      attemptNum,  
      execId,  
      taskName,  
      index,  
      task.partitionId,  
      addedFiles,  
      addedJars,  
      task.localProperties,  
      serializedTask)  
  }  
} else {  
  None  
}  

}

7.=> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint#launchTasks

// Launch tasks returned by a set of resource offers
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
val serializedTask = TaskDescription.encode(task)

executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}

8. => org.apache.spark.scheduler.TaskDescription#encode TaskDescription作为 message 发送给 executor

def encode(taskDescription: TaskDescription): ByteBuffer = {
val bytesOut = new ByteBufferOutputStream(4096)
val dataOut = new DataOutputStream(bytesOut)

dataOut.writeLong(taskDescription.taskId)  
dataOut.writeInt(taskDescription.attemptNumber)  
dataOut.writeUTF(taskDescription.executorId)  
dataOut.writeUTF(taskDescription.name)  
dataOut.writeInt(taskDescription.index)  
dataOut.writeInt(taskDescription.partitionId)

// Write files.  
serializeStringLongMap(taskDescription.addedFiles, dataOut)

// Write jars.  
serializeStringLongMap(taskDescription.addedJars, dataOut)

// Write properties.  
dataOut.writeInt(taskDescription.properties.size())  
taskDescription.properties.asScala.foreach { case (key, value) =>  
  dataOut.writeUTF(key)  
  // SPARK-19796 -- writeUTF doesn't work for long strings, which can happen for property values  
  val bytes = value.getBytes(StandardCharsets.UTF\_8)  
  dataOut.writeInt(bytes.length)  
  dataOut.write(bytes)  
}

// Write the task. The task is already serialized, so write it directly to the byte buffer.  
Utils.writeByteBuffer(taskDescription.serializedTask, bytesOut)

dataOut.close()  
bytesOut.close()  
bytesOut.toByteBuffer  

}

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章