kube-scheduler的调度上下文
阅读原文时间:2023年07月10日阅读:1

前一章节了解到了kube-scheduler中的概念,该章节则对调度上下文的源码进行分析

Scheduler 是整个 kube-scheduler 的一个 structure,提供了 kube-scheduler 运行所需的组件。

type Scheduler struct {
    // Cache是一个抽象,会缓存pod的信息,作为scheduler进行查找,操作是基于Pod进行增加
    Cache internalcache.Cache
    // Extenders 算是调度框架中提供的调度插件,会影响kubernetes中的调度策略
    Extenders []framework.Extender

    // NextPod 作为一个函数提供,会阻塞获取下一个ke'diao'du
    NextPod func() *framework.QueuedPodInfo

    // Error is called if there is an error. It is passed the pod in
    // question, and the error
    Error func(*framework.QueuedPodInfo, error)

    // SchedulePod 尝试将给出的pod调度到Node。
    SchedulePod func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error)

    // 关闭scheduler的信号
    StopEverything <-chan struct{}

    // SchedulingQueue保存要调度的Pod
    SchedulingQueue internalqueue.SchedulingQueue

    // Profiles中是多个调度框架
    Profiles profile.Map
    client clientset.Interface
    nodeInfoSnapshot *internalcache.Snapshot
    percentageOfNodesToScore int32
    nextStartNodeIndex int
}

作为实际执行的两个核心,SchedulingQueue ,与 scheduleOne 将会分析到这两个

在知道 kube-scheduler 初始化过程后,需要对 kube-scheduler 的整个 structureworkflow 进行分析

Run 中,运行的是 一个 SchedulingQueue 与 一个 scheduleOne ,从结构上看是属于 Scheduler

func (sched *Scheduler) Run(ctx context.Context) {
    sched.SchedulingQueue.Run()

    // We need to start scheduleOne loop in a dedicated goroutine,
    // because scheduleOne function hangs on getting the next item
    // from the SchedulingQueue.
    // If there are no new pods to schedule, it will be hanging there
    // and if done in this goroutine it will be blocking closing
    // SchedulingQueue, in effect causing a deadlock on shutdown.
    go wait.UntilWithContext(ctx, sched.scheduleOne, 0)

    <-ctx.Done()
    sched.SchedulingQueue.Close()
}

SchedulingQueue 是一个队列的抽象,用于存储等待调度的Pod。该接口遵循类似于 cache.FIFO 和 cache.Heap 的模式。

type SchedulingQueue interface {
    framework.PodNominator
    Add(pod *v1.Pod) error
    // Activate moves the given pods to activeQ iff they're in unschedulablePods or backoffQ.
    // The passed-in pods are originally compiled from plugins that want to activate Pods,
    // by injecting the pods through a reserved CycleState struct (PodsToActivate).
    Activate(pods map[string]*v1.Pod)
    // 将不可调度的Pod重入到队列中
    AddUnschedulableIfNotPresent(pod *framework.QueuedPodInfo, podSchedulingCycle int64) error
    // SchedulingCycle returns the current number of scheduling cycle which is
    // cached by scheduling queue. Normally, incrementing this number whenever
    // a pod is popped (e.g. called Pop()) is enough.
    SchedulingCycle() int64
    // Pop会弹出一个pod,并从head优先级队列中删除
    Pop() (*framework.QueuedPodInfo, error)
    Update(oldPod, newPod *v1.Pod) error
    Delete(pod *v1.Pod) error
    MoveAllToActiveOrBackoffQueue(event framework.ClusterEvent, preCheck PreEnqueueCheck)
    AssignedPodAdded(pod *v1.Pod)
    AssignedPodUpdated(pod *v1.Pod)
    PendingPods() []*v1.Pod
    // Close closes the SchedulingQueue so that the goroutine which is
    // waiting to pop items can exit gracefully.
    Close()
    // Run starts the goroutines managing the queue.
    Run()
}

PriorityQueueSchedulingQueue 的实现,该部分的核心构成是两个子队列与一个数据结构,即 activeQbackoffQunschedulablePods

  • activeQ:是一个 heap 类型的优先级队列,是 sheduler 从中获得优先级最高的Pod进行调度

  • backoffQ:也是一个 heap 类型的优先级队列,存放的是不可调度的Pod

  • unschedulablePods :保存确定不可被调度的Pod

    type SchedulingQueue interface {
    framework.PodNominator
    Add(pod v1.Pod) error // Activate moves the given pods to activeQ iff they're in unschedulablePods or backoffQ. // The passed-in pods are originally compiled from plugins that want to activate Pods, // by injecting the pods through a reserved CycleState struct (PodsToActivate). Activate(pods map[string]v1.Pod)
    // AddUnschedulableIfNotPresent adds an unschedulable pod back to scheduling queue.
    // The podSchedulingCycle represents the current scheduling cycle number which can be
    // returned by calling SchedulingCycle().
    AddUnschedulableIfNotPresent(pod framework.QueuedPodInfo, podSchedulingCycle int64) error // SchedulingCycle returns the current number of scheduling cycle which is // cached by scheduling queue. Normally, incrementing this number whenever // a pod is popped (e.g. called Pop()) is enough. SchedulingCycle() int64 // Pop removes the head of the queue and returns it. It blocks if the // queue is empty and waits until a new item is added to the queue. Pop() (framework.QueuedPodInfo, error)
    Update(oldPod, newPod *v1.Pod) error
    Delete(pod *v1.Pod) error
    MoveAllToActiveOrBackoffQueue(event framework.ClusterEvent, preCheck PreEnqueueCheck)
    AssignedPodAdded(pod *v1.Pod)
    AssignedPodUpdated(pod *v1.Pod)
    PendingPods() []*v1.Pod
    // Close closes the SchedulingQueue so that the goroutine which is
    // waiting to pop items can exit gracefully.
    Close()
    // Run starts the goroutines managing the queue.
    Run()
    }

在New scheduler 时可以看到会初始化这个queue

podQueue := internalqueue.NewSchedulingQueue(
    // 实现pod对比的一个函数即less
    profiles[options.profiles[0].SchedulerName].QueueSortFunc(),
    informerFactory,
    internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second),
    internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second),
    internalqueue.WithPodNominator(nominator),
    internalqueue.WithClusterEventMap(clusterEventMap),
    internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration),
)

NewSchedulingQueue 则是初始化这个 PriorityQueue

// NewSchedulingQueue initializes a priority queue as a new scheduling queue.
func NewSchedulingQueue(
    lessFn framework.LessFunc,
    informerFactory informers.SharedInformerFactory,
    opts ...Option) SchedulingQueue {
    return NewPriorityQueue(lessFn, informerFactory, opts...)
}

// NewPriorityQueue creates a PriorityQueue object.
func NewPriorityQueue(
    lessFn framework.LessFunc,
    informerFactory informers.SharedInformerFactory,
    opts ...Option,
) *PriorityQueue {
    options := defaultPriorityQueueOptions
    for _, opt := range opts {
        opt(&options)
    }
    // 这个就是 less函数,作为打分的一部分
    comp := func(podInfo1, podInfo2 interface{}) bool {
        pInfo1 := podInfo1.(*framework.QueuedPodInfo)
        pInfo2 := podInfo2.(*framework.QueuedPodInfo)
        return lessFn(pInfo1, pInfo2)
    }

    if options.podNominator == nil {
        options.podNominator = NewPodNominator(informerFactory.Core().V1().Pods().Lister())
    }

    pq := &PriorityQueue{
        PodNominator:                      options.podNominator,
        clock:                             options.clock,
        stop:                              make(chan struct{}),
        podInitialBackoffDuration:         options.podInitialBackoffDuration,
        podMaxBackoffDuration:             options.podMaxBackoffDuration,
        podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration,
        activeQ:                           heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
        unschedulablePods:                 newUnschedulablePods(metrics.NewUnschedulablePodsRecorder()),
        moveRequestCycle:                  -1,
        clusterEventMap:                   options.clusterEventMap,
    }
    pq.cond.L = &pq.lock
    pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
    pq.nsLister = informerFactory.Core().V1().Namespaces().Lister()

    return pq
}

了解了Queue的结构,就需要知道 入队列与出队列是在哪里操作的。在初始化时,需要注册一个 addEventHandlerFuncs 这个时候,会注入三个动作函数,也就是controller中的概念;而在AddFunc中可以看到会入队列。

注入是对 Pod 的informer注入的,注入的函数 addPodToSchedulingQueue 就是入栈

Handler: cache.ResourceEventHandlerFuncs{
    AddFunc:    sched.addPodToSchedulingQueue,
    UpdateFunc: sched.updatePodInSchedulingQueue,
    DeleteFunc: sched.deletePodFromSchedulingQueue,
},

func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) {
    pod := obj.(*v1.Pod)
    klog.V(3).InfoS("Add event for unscheduled pod", "pod", klog.KObj(pod))
    if err := sched.SchedulingQueue.Add(pod); err != nil {
        utilruntime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err))
    }
}

而这个 SchedulingQueue 的实现就是 PriorityQueue ,而Add中则对 activeQ进行的操作

func (p *PriorityQueue) Add(pod *v1.Pod) error {
    p.lock.Lock()
    defer p.lock.Unlock()
    // 格式化入栈数据,包含podinfo,里会包含v1.Pod
    // 初始化的时间,创建的时间,以及不能被调度时的记录其plugin的名称
    pInfo := p.newQueuedPodInfo(pod)
    // 入栈
    if err := p.activeQ.Add(pInfo); err != nil {
        klog.ErrorS(err, "Error adding pod to the active queue", "pod", klog.KObj(pod))
        return err
    }
    if p.unschedulablePods.get(pod) != nil {
        klog.ErrorS(nil, "Error: pod is already in the unschedulable queue", "pod", klog.KObj(pod))
        p.unschedulablePods.delete(pod)
    }
    // Delete pod from backoffQ if it is backing off
    if err := p.podBackoffQ.Delete(pInfo); err == nil {
        klog.ErrorS(nil, "Error: pod is already in the podBackoff queue", "pod", klog.KObj(pod))
    }
    metrics.SchedulerQueueIncomingPods.WithLabelValues("active", PodAdd).Inc()
    p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil)
    p.cond.Broadcast()

    return nil
}

在上面看 scheduler 结构时,可以看到有一个 nextPod的,nextPod就是从队列中弹出一个pod,这个在scheduler 时会传入 MakeNextPodFunc 就是这个 nextpod

func MakeNextPodFunc(queue SchedulingQueue) func() *framework.QueuedPodInfo {
    return func() *framework.QueuedPodInfo {
        podInfo, err := queue.Pop()
        if err == nil {
            klog.V(4).InfoS("About to try and schedule pod", "pod", klog.KObj(podInfo.Pod))
            for plugin := range podInfo.UnschedulablePlugins {
                metrics.UnschedulableReason(plugin, podInfo.Pod.Spec.SchedulerName).Dec()
            }
            return podInfo
        }
        klog.ErrorS(err, "Error while retrieving next pod from scheduling queue")
        return nil
    }
}

而这个 queue.Pop() 对应的就是 PriorityQueuePop() ,在这里会将作为 activeQ 的消费端

func (p *PriorityQueue) Pop() (*framework.QueuedPodInfo, error) {
   p.lock.Lock()
   defer p.lock.Unlock()
   for p.activeQ.Len() == 0 {
      // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
      // When Close() is called, the p.closed is set and the condition is broadcast,
      // which causes this loop to continue and return from the Pop().
      if p.closed {
         return nil, fmt.Errorf(queueClosed)
      }
      p.cond.Wait()
   }
   obj, err := p.activeQ.Pop()
   if err != nil {
      return nil, err
   }
   pInfo := obj.(*framework.QueuedPodInfo)
   pInfo.Attempts++
   p.schedulingCycle++
   return pInfo, nil
}

在上面入口部分也看到了,scheduleOne 和 scheduler,scheduleOne 就是去消费一个Pod,他会调用 NextPod,NextPod就是在初始化传入的 MakeNextPodFunc ,至此回到对应的 Pop来做消费。

schedulerOne是为一个Pod做调度的流程。

func (sched *Scheduler) scheduleOne(ctx context.Context) {
    podInfo := sched.NextPod()
    // pod could be nil when schedulerQueue is closed
    if podInfo == nil || podInfo.Pod == nil {
        return
    }
    pod := podInfo.Pod
    fwk, err := sched.frameworkForPod(pod)
    if err != nil {
        // This shouldn't happen, because we only accept for scheduling the pods
        // which specify a scheduler name that matches one of the profiles.
        klog.ErrorS(err, "Error occurred")
        return
    }
    if sched.skipPodSchedule(fwk, pod) {
        return
    }
...

当了解了scheduler结构后,下面分析下调度上下文的过程。看看扩展点是怎么工作的。这个时候又需要提到官网的调度上下文的图。

图1:Pod的调度上下文
Source:https://kubernetes.io/docs/concepts/scheduling-eviction/scheduling-framework

scheduler 对于调度上下文来就是这个 scheduleOne ,下面就是看这个调度上下文

Sort

Sort 插件提供了排序功能,用于对在调度队列中待处理 Pod 进行排序。一次只能启用一个队列排序。

在进入 scheduleOne 后,NextPodactiveQ 中队列中得到一个Pod,然后的 frameworkForPod 会做打分的动作就是调度上下文的第一个扩展点 sort

func (sched *Scheduler) scheduleOne(ctx context.Context) {
    podInfo := sched.NextPod()
    // pod could be nil when schedulerQueue is closed
    if podInfo == nil || podInfo.Pod == nil {
        return
    }
    pod := podInfo.Pod
    fwk, err := sched.frameworkForPod(pod)
...

func (sched *Scheduler) frameworkForPod(pod *v1.Pod) (framework.Framework, error) {
    // 获取指定的profile
    fwk, ok := sched.Profiles[pod.Spec.SchedulerName]
    if !ok {
        return nil, fmt.Errorf("profile not found for scheduler name %q", pod.Spec.SchedulerName)
    }
    return fwk, nil
}

回顾,因为在New scheduler时会初始化这个 sort 函数

podQueue := internalqueue.NewSchedulingQueue(
    profiles[options.profiles[0].SchedulerName].QueueSortFunc(),
    informerFactory,
    internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second),
    internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second),
    internalqueue.WithPodNominator(nominator),
    internalqueue.WithClusterEventMap(clusterEventMap),
    internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration),
)

preFilter

preFilter作为第一个扩展点,是用于在过滤之前预处理或检查 Pod 或集群的相关信息。这里会终止调度

func (sched *Scheduler) scheduleOne(ctx context.Context) {
    podInfo := sched.NextPod()
    // pod could be nil when schedulerQueue is closed
    if podInfo == nil || podInfo.Pod == nil {
        return
    }
    pod := podInfo.Pod
    fwk, err := sched.frameworkForPod(pod)
    if err != nil {
        // This shouldn't happen, because we only accept for scheduling the pods
        // which specify a scheduler name that matches one of the profiles.
        klog.ErrorS(err, "Error occurred")
        return
    }
    if sched.skipPodSchedule(fwk, pod) {
        return
    }

    klog.V(3).InfoS("Attempting to schedule pod", "pod", klog.KObj(pod))

    // Synchronously attempt to find a fit for the pod.
    start := time.Now()
    state := framework.NewCycleState()
    state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent)
    // Initialize an empty podsToActivate struct, which will be filled up by plugins or stay empty.
    podsToActivate := framework.NewPodsToActivate()
    state.Write(framework.PodsToActivateKey, podsToActivate)

    schedulingCycleCtx, cancel := context.WithCancel(ctx)
    defer cancel()
    // 这里将进入prefilter
    scheduleResult, err := sched.SchedulePod(schedulingCycleCtx, fwk, state, pod)

schedulePod 尝试将给定的 pod 调度到节点列表中的节点之一。如果成功,它将返回节点的名称。

func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
    trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
    defer trace.LogIfLong(100 * time.Millisecond)
    // 用于将cache更新为当前内容
    if err := sched.Cache.UpdateSnapshot(sched.nodeInfoSnapshot); err != nil {
        return result, err
    }
    trace.Step("Snapshotting scheduler cache and node infos done")

    if sched.nodeInfoSnapshot.NumNodes() == 0 {
        return result, ErrNoNodesAvailable
    }
    // 找到一个合适的pod时,会执行扩展点
    feasibleNodes, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod)

    ...

findNodesThatFitPod 会执行对应的过滤插件来找到最适合的Node,包括备注,以及方法名都可以看到,这里运行的插件,后面会分析算法内容,只对workflow学习。

func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) {
    diagnosis := framework.Diagnosis{
        NodeToStatusMap:      make(framework.NodeToStatusMap),
        UnschedulablePlugins: sets.NewString(),
    }

    // Run "prefilter" plugins.
    preRes, s := fwk.RunPreFilterPlugins(ctx, state, pod)
    allNodes, err := sched.nodeInfoSnapshot.NodeInfos().List()
    if err != nil {
        return nil, diagnosis, err
    }
    if !s.IsSuccess() {
        if !s.IsUnschedulable() {
            return nil, diagnosis, s.AsError()
        }
        // All nodes will have the same status. Some non trivial refactoring is
        // needed to avoid this copy.
        for _, n := range allNodes {
            diagnosis.NodeToStatusMap[n.Node().Name] = s
        }
        // Status satisfying IsUnschedulable() gets injected into diagnosis.UnschedulablePlugins.
        if s.FailedPlugin() != "" {
            diagnosis.UnschedulablePlugins.Insert(s.FailedPlugin())
        }
        return nil, diagnosis, nil
    }

    // "NominatedNodeName" can potentially be set in a previous scheduling cycle as a result of preemption.
    // This node is likely the only candidate that will fit the pod, and hence we try it first before iterating over all nodes.
    if len(pod.Status.NominatedNodeName) > 0 {
        feasibleNodes, err := sched.evaluateNominatedNode(ctx, pod, fwk, state, diagnosis)
        if err != nil {
            klog.ErrorS(err, "Evaluation failed on nominated node", "pod", klog.KObj(pod), "node", pod.Status.NominatedNodeName)
        }
        // Nominated node passes all the filters, scheduler is good to assign this node to the pod.
        if len(feasibleNodes) != 0 {
            return feasibleNodes, diagnosis, nil
        }
    }

    nodes := allNodes
    if !preRes.AllNodes() {
        nodes = make([]*framework.NodeInfo, 0, len(preRes.NodeNames))
        for n := range preRes.NodeNames {
            nInfo, err := sched.nodeInfoSnapshot.NodeInfos().Get(n)
            if err != nil {
                return nil, diagnosis, err
            }
            nodes = append(nodes, nInfo)
        }
    }
    feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, nodes)
    if err != nil {
        return nil, diagnosis, err
    }

    feasibleNodes, err = findNodesThatPassExtenders(sched.Extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)
    if err != nil {
        return nil, diagnosis, err
    }
    return feasibleNodes, diagnosis, nil
}

filter

filter插件相当于调度上下文中的 Predicates,用于排除不能运行 Pod 的节点。Filter 会按配置的顺序进行调用。如果有一个filter将节点标记位不可用,则将 Pod 标记为不可调度(即不会向下执行)。

对于代码中来讲,filter还是处于 findNodesThatFitPod 函数中,findNodesThatPassFilters 就是获取到 FN,即可行节点,而这个过程就是 filter 扩展点

func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) {
    ...

    feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, nodes)
    if err != nil {
        return nil, diagnosis, err
    }

    feasibleNodes, err = findNodesThatPassExtenders(sched.Extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)
    if err != nil {
        return nil, diagnosis, err
    }
    return feasibleNodes, diagnosis, nil
}

Postfilter

当没有为 pod 找到FN时,该插件会按照配置的顺序进行调用。如果任何postFilter插件将 Pod 标记为schedulable,则不会调用其余插件。即 filter 成功后不会进行这步骤,那我们来验证下这里把

还是在 scheduleOne 中,当我们运行的 SchedulePod 完成后(成功或失败),这时会返回一个err,而 postfilter 会根据这个 err进行选择执行或不执行,符合官方给出的说法。

scheduleResult, err := sched.SchedulePod(schedulingCycleCtx, fwk, state, pod)
    if err != nil {
        // SchedulePod() may have failed because the pod would not fit on any host, so we try to
        // preempt, with the expectation that the next time the pod is tried for scheduling it
        // will fit due to the preemption. It is also possible that a different pod will schedule
        // into the resources that were preempted, but this is harmless.
        var nominatingInfo *framework.NominatingInfo
        if fitError, ok := err.(*framework.FitError); ok {
            if !fwk.HasPostFilterPlugins() {
                klog.V(3).InfoS("No PostFilter plugins are registered, so no preemption will be performed")
            } else {
                // Run PostFilter plugins to try to make the pod schedulable in a future scheduling cycle.
                result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.Diagnosis.NodeToStatusMap)
                if status.Code() == framework.Error {
                    klog.ErrorS(nil, "Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", status)
                } else {
                    fitError.Diagnosis.PostFilterMsg = status.Message()
                    klog.V(5).InfoS("Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", status)
                }
                if result != nil {
                    nominatingInfo = result.NominatingInfo
                }
            }
            // Pod did not fit anywhere, so it is counted as a failure. If preemption
            // succeeds, the pod should get counted as a success the next time we try to
            // schedule it. (hopefully)
            metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
        } else if err == ErrNoNodesAvailable {
            nominatingInfo = clearNominatedNode
            // No nodes available is counted as unschedulable rather than an error.
            metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
        } else {
            nominatingInfo = clearNominatedNode
            klog.ErrorS(err, "Error selecting node for pod", "pod", klog.KObj(pod))
            metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
        }
        sched.handleSchedulingFailure(ctx, fwk, podInfo, err, v1.PodReasonUnschedulable, nominatingInfo)
        return
    }

PreScore,Score

可用于进行预Score工作,作为通知性的扩展点,会在在filter完之后直接会关联 preScore 插件进行继续工作,而不是返回,如果配置的这些插件有任何一个返回失败,则Pod将被拒绝。

func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
    trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
    defer trace.LogIfLong(100 * time.Millisecond)

    if err := sched.Cache.UpdateSnapshot(sched.nodeInfoSnapshot); err != nil {
        return result, err
    }
    trace.Step("Snapshotting scheduler cache and node infos done")

    if sched.nodeInfoSnapshot.NumNodes() == 0 {
        return result, ErrNoNodesAvailable
    }

    feasibleNodes, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod)
    if err != nil {
        return result, err
    }
    trace.Step("Computing predicates done")

    if len(feasibleNodes) == 0 {
        return result, &framework.FitError{
            Pod:         pod,
            NumAllNodes: sched.nodeInfoSnapshot.NumNodes(),
            Diagnosis:   diagnosis,
        }
    }

    // When only one node after predicate, just use it.
    if len(feasibleNodes) == 1 {
        return ScheduleResult{
            SuggestedHost:  feasibleNodes[0].Name,
            EvaluatedNodes: 1 + len(diagnosis.NodeToStatusMap),
            FeasibleNodes:  1,
        }, nil
    }
    // 这里会完成prescore,score
    priorityList, err := prioritizeNodes(ctx, sched.Extenders, fwk, state, pod, feasibleNodes)
    if err != nil {
        return result, err
    }

    host, err := selectHost(priorityList)
    trace.Step("Prioritizing done")

    return ScheduleResult{
        SuggestedHost:  host,
        EvaluatedNodes: len(feasibleNodes) + len(diagnosis.NodeToStatusMap),
        FeasibleNodes:  len(feasibleNodes),
    }, err
}

priorityNodes 会通过配置的插件给Node打分,并返回每个Node的分数,将每个插件打分结果计算总和获得Node的分数,最后获得节点的加权总分数。

func prioritizeNodes(
    ctx context.Context,
    extenders []framework.Extender,
    fwk framework.Framework,
    state *framework.CycleState,
    pod *v1.Pod,
    nodes []*v1.Node,
) (framework.NodeScoreList, error) {
    // If no priority configs are provided, then all nodes will have a score of one.
    // This is required to generate the priority list in the required format
    if len(extenders) == 0 && !fwk.HasScorePlugins() {
        result := make(framework.NodeScoreList, 0, len(nodes))
        for i := range nodes {
            result = append(result, framework.NodeScore{
                Name:  nodes[i].Name,
                Score: 1,
            })
        }
        return result, nil
    }

    // Run PreScore plugins.
    preScoreStatus := fwk.RunPreScorePlugins(ctx, state, pod, nodes)
    if !preScoreStatus.IsSuccess() {
        return nil, preScoreStatus.AsError()
    }

    // Run the Score plugins.
    scoresMap, scoreStatus := fwk.RunScorePlugins(ctx, state, pod, nodes)
    if !scoreStatus.IsSuccess() {
        return nil, scoreStatus.AsError()
    }

    // Additional details logged at level 10 if enabled.
    klogV := klog.V(10)
    if klogV.Enabled() {
        for plugin, nodeScoreList := range scoresMap {
            for _, nodeScore := range nodeScoreList {
                klogV.InfoS("Plugin scored node for pod", "pod", klog.KObj(pod), "plugin", plugin, "node", nodeScore.Name, "score", nodeScore.Score)
            }
        }
    }

    // Summarize all scores.
    result := make(framework.NodeScoreList, 0, len(nodes))

    for i := range nodes {
        result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0})
        for j := range scoresMap {
            result[i].Score += scoresMap[j][i].Score
        }
    }

    if len(extenders) != 0 && nodes != nil {
        var mu sync.Mutex
        var wg sync.WaitGroup
        combinedScores := make(map[string]int64, len(nodes))
        for i := range extenders {
            if !extenders[i].IsInterested(pod) {
                continue
            }
            wg.Add(1)
            go func(extIndex int) {
                metrics.SchedulerGoroutines.WithLabelValues(metrics.PrioritizingExtender).Inc()
                defer func() {
                    metrics.SchedulerGoroutines.WithLabelValues(metrics.PrioritizingExtender).Dec()
                    wg.Done()
                }()
                prioritizedList, weight, err := extenders[extIndex].Prioritize(pod, nodes)
                if err != nil {
                    // Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
                    klog.V(5).InfoS("Failed to run extender's priority function. No score given by this extender.", "error", err, "pod", klog.KObj(pod), "extender", extenders[extIndex].Name())
                    return
                }
                mu.Lock()
                for i := range *prioritizedList {
                    host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
                    if klogV.Enabled() {
                        klogV.InfoS("Extender scored node for pod", "pod", klog.KObj(pod), "extender", extenders[extIndex].Name(), "node", host, "score", score)
                    }
                    combinedScores[host] += score * weight
                }
                mu.Unlock()
            }(i)
        }
        // wait for all go routines to finish
        wg.Wait()
        for i := range result {
            // MaxExtenderPriority may diverge from the max priority used in the scheduler and defined by MaxNodeScore,
            // therefore we need to scale the score returned by extenders to the score range used by the scheduler.
            result[i].Score += combinedScores[result[i].Name] * (framework.MaxNodeScore / extenderv1.MaxExtenderPriority)
        }
    }

    if klogV.Enabled() {
        for i := range result {
            klogV.InfoS("Calculated node's final score for pod", "pod", klog.KObj(pod), "node", result[i].Name, "score", result[i].Score)
        }
    }
    return result, nil
}

Reserve

Reserve 因为绑定事件时异步发生的,该插件是为了避免Pod在绑定到节点前时,调度到新的Pod,使节点使用资源超过可用资源情况。如果后续阶段发生错误或失败,将触发 UnReserve 回滚(通知性扩展点)。这也是作为调度周期中最后一个状态,要么成功到 postBind ,要么失败触发 UnReserve

// Run the Reserve method of reserve plugins.
if sts := fwk.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() { // 当处理不成功时
    metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
    // 触发 un-reserve 来清理相关Pod的状态
    fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
        klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
    }
    sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, sts.AsError(), SchedulerError, clearNominatedNode)
    return
}

permit

Permit 插件可以阻止或延迟 Pod 的绑定

    // Run "permit" plugins.
    runPermitStatus := fwk.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    if !runPermitStatus.IsWait() && !runPermitStatus.IsSuccess() {
        var reason string
        if runPermitStatus.IsUnschedulable() {
            metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
            reason = v1.PodReasonUnschedulable
        } else {
            metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
            reason = SchedulerError
        }
        // 只要其中一个插件返回的状态不是 success 或者 wait
        fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
        // 从cache中忘掉pod
        if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
            klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
        }
        sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, runPermitStatus.AsError(), reason, clearNominatedNode)
        return
    }

Binding Cycle

在选择好 FN 后则做一个假设绑定,并更新到cache中,接下来回去执行真正的bind操作,也就是 binding cycle

func (sched *Scheduler) scheduleOne(ctx context.Context) {
    ...
    ...
    // binding cycle 是一个异步的操作,这里表现就是go协程
    go func() {
        bindingCycleCtx, cancel := context.WithCancel(ctx)
        defer cancel()
        metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Inc()
        defer metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Dec()
        // 运行WaitOnPermit插件,如果失败则,unReserve回滚
        waitOnPermitStatus := fwk.WaitOnPermit(bindingCycleCtx, assumedPod)
        if !waitOnPermitStatus.IsSuccess() {
            var reason string
            if waitOnPermitStatus.IsUnschedulable() {
                metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
                reason = v1.PodReasonUnschedulable
            } else {
                metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
                reason = SchedulerError
            }
            // trigger un-reserve plugins to clean up state associated with the reserved Pod
            fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
            if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
                klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
            } else {
                // "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
                // as the assumed Pod had occupied a certain amount of resources in scheduler cache.
                // TODO(#103853): de-duplicate the logic.
                // Avoid moving the assumed Pod itself as it's always Unschedulable.
                // It's intentional to "defer" this operation; otherwise MoveAllToActiveOrBackoffQueue() would
                // update `q.moveRequest` and thus move the assumed pod to backoffQ anyways.
                defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, func(pod *v1.Pod) bool {
                    return assumedPod.UID != pod.UID
                })
            }
            sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, clearNominatedNode)
            return
        }

    // 运行Prebind 插件
        preBindStatus := fwk.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
        if !preBindStatus.IsSuccess() {
            metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
            // trigger un-reserve plugins to clean up state associated with the reserved Pod
            fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
            if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
                klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
            } else {
                // "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
                // as the assumed Pod had occupied a certain amount of resources in scheduler cache.
                // TODO(#103853): de-duplicate the logic.
                sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil)
            }
            sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, clearNominatedNode)
            return
        }
        // bind是真正的绑定操作
        err := sched.bind(bindingCycleCtx, fwk, assumedPod, scheduleResult.SuggestedHost, state)
        if err != nil {
            metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
            // 如果失败了就触发 un-reserve plugins
            fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
            if err := sched.Cache.ForgetPod(assumedPod); err != nil {
                klog.ErrorS(err, "scheduler cache ForgetPod failed")
            } else {
                // "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
                // as the assumed Pod had occupied a certain amount of resources in scheduler cache.
                // TODO(#103853): de-duplicate the logic.
                sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil)
            }
            sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", err), SchedulerError, clearNominatedNode)
            return
        }
        // Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
        klog.V(2).InfoS("Successfully bound pod to node", "pod", klog.KObj(pod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes)
        metrics.PodScheduled(fwk.ProfileName(), metrics.SinceInSeconds(start))
        metrics.PodSchedulingAttempts.Observe(float64(podInfo.Attempts))
        metrics.PodSchedulingDuration.WithLabelValues(getAttemptsLabel(podInfo)).Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp))

        // 运行 "postbind" 插件
        // 是通知性的扩展点,该插件在绑定 Pod 后调用,可用于清理相关资源()。
        fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)

        // At the end of a successful binding cycle, move up Pods if needed.
        if len(podsToActivate.Map) != 0 {
            sched.SchedulingQueue.Activate(podsToActivate.Map)
            // Unlike the logic in scheduling cycle, we don't bother deleting the entries
            // as `podsToActivate.Map` is no longer consumed.
        }
    }()
}

上面说到的都是正常的请求,下面会对失败的请求是如何重试的进行分析,而 scheduler 中关于失败处理方面相关的属性会涉及到上面 scheduler 结构中的 backoffQunschedulablePods

  • backoffQ:也是一个 heap 类型的优先级队列,存放的是不可调度的Pod
  • unschedulablePods :保存确定不可被调度的Pod,一个map类型

backoffQ 与 unschedulablePods 会在初始化 scheduler 时初始化,

func NewPriorityQueue(
    lessFn framework.LessFunc,
    informerFactory informers.SharedInformerFactory,
    opts ...Option,
) *PriorityQueue {
    options := defaultPriorityQueueOptions
    for _, opt := range opts {
        opt(&options)
    }

    comp := func(podInfo1, podInfo2 interface{}) bool {
        pInfo1 := podInfo1.(*framework.QueuedPodInfo)
        pInfo2 := podInfo2.(*framework.QueuedPodInfo)
        return lessFn(pInfo1, pInfo2)
    }

    if options.podNominator == nil {
        options.podNominator = NewPodNominator(informerFactory.Core().V1().Pods().Lister())
    }

    pq := &PriorityQueue{
        PodNominator:                      options.podNominator,
        clock:                             options.clock,
        stop:                              make(chan struct{}),
        podInitialBackoffDuration:         options.podInitialBackoffDuration,
        podMaxBackoffDuration:             options.podMaxBackoffDuration,
        podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration,
        activeQ:                           heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
        unschedulablePods:                 newUnschedulablePods(metrics.NewUnschedulablePodsRecorder()),
        moveRequestCycle:                  -1,
        clusterEventMap:                   options.clusterEventMap,
    }
    pq.cond.L = &pq.lock
    // 初始化backoffQ
    // NewWithRecorder作为一个可选的 metricRecorder 的 Heap 对象。
    // podInfoKeyFunc是一个函数,返回错误与字符串
    // pq.podsCompareBackoffCompleted 比较两个pod的回退时间,如果第一个在第二个之前为true,
    // 反之 false
    pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
    pq.nsLister = informerFactory.Core().V1().Namespaces().Lister()

    return pq
}

对于初始化 backoffQ 会产生的两个函数,getBackoffTimecalculateBackoffDuration

// getBackoffTime returns the time that podInfo completes backoff
func (p *PriorityQueue) getBackoffTime(podInfo *framework.QueuedPodInfo) time.Time {
    duration := p.calculateBackoffDuration(podInfo)
    backoffTime := podInfo.Timestamp.Add(duration)
    return backoffTime
}

// calculateBackoffDuration is a helper function for calculating the backoffDuration
// based on the number of attempts the pod has made.
func (p *PriorityQueue) calculateBackoffDuration(podInfo *framework.QueuedPodInfo) time.Duration {
    duration := p.podInitialBackoffDuration
    for i := 1; i < podInfo.Attempts; i++ {
        // Use subtraction instead of addition or multiplication to avoid overflow.
        if duration > p.podMaxBackoffDuration-duration {
            return p.podMaxBackoffDuration
        }
        duration += duration
    }
    return duration
}

对于整个故障错误会按照如下流程进行,在初始化 scheduler 会注册一个 Error 函数,这个函数用作对不可调度Pod进行处理,实际上被注册的函数是 MakeDefaultErrorFunc。这个函数将作为 Error 函数被调用。

sched := newScheduler(
    schedulerCache,
    extenders,
    internalqueue.MakeNextPodFunc(podQueue),
    MakeDefaultErrorFunc(client, podLister, podQueue, schedulerCache),
    stopEverything,
    podQueue,
    profiles,
    client,
    snapshot,
    options.percentageOfNodesToScore,
)

而在 调度周期中,也就是 scheduleOne 可以看到,每个扩展点操作失败后都会调用 handleSchedulingFailure 而该函数,使用了注册的 Error 函数来处理Pod

func (sched *Scheduler) scheduleOne(ctx context.Context) {
    ...
    defer cancel()
    scheduleResult, err := sched.SchedulePod(schedulingCycleCtx, fwk, state, pod)
    if err != nil {

        var nominatingInfo *framework.NominatingInfo
        if fitError, ok := err.(*framework.FitError); ok {
            if !fwk.HasPostFilterPlugins() {
                klog.V(3).InfoS("No PostFilter plugins are registered, so no preemption will be performed")
            } else {

                result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.Diagnosis.NodeToStatusMap)
                if status.Code() == framework.Error {
                    klog.ErrorS(nil, "Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", status)
                } else {
                    fitError.Diagnosis.PostFilterMsg = status.Message()
                    klog.V(5).InfoS("Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", status)
                }
                if result != nil {
                    nominatingInfo = result.NominatingInfo
                }
            }

            metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
        } else if err == ErrNoNodesAvailable {
            nominatingInfo = clearNominatedNode
            // No nodes available is counted as unschedulable rather than an error.
            metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
        } else {
            nominatingInfo = clearNominatedNode
            klog.ErrorS(err, "Error selecting node for pod", "pod", klog.KObj(pod))
            metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
        }
        // 处理不可调度Pod
        sched.handleSchedulingFailure(ctx, fwk, podInfo, err, v1.PodReasonUnschedulable, nominatingInfo)
        return
    }

来到了注册的 Error 函数 MakeDefaultErrorFunc

func MakeDefaultErrorFunc(client clientset.Interface, podLister corelisters.PodLister, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache) func(*framework.QueuedPodInfo, error) {
    return func(podInfo *framework.QueuedPodInfo, err error) {
        pod := podInfo.Pod
        if err == ErrNoNodesAvailable {
            klog.V(2).InfoS("Unable to schedule pod; no nodes are registered to the cluster; waiting", "pod", klog.KObj(pod))
        } else if fitError, ok := err.(*framework.FitError); ok {
            // Inject UnschedulablePlugins to PodInfo, which will be used later for moving Pods between queues efficiently.
            podInfo.UnschedulablePlugins = fitError.Diagnosis.UnschedulablePlugins
            klog.V(2).InfoS("Unable to schedule pod; no fit; waiting", "pod", klog.KObj(pod), "err", err)
        } else if apierrors.IsNotFound(err) {
            klog.V(2).InfoS("Unable to schedule pod, possibly due to node not found; waiting", "pod", klog.KObj(pod), "err", err)
            if errStatus, ok := err.(apierrors.APIStatus); ok && errStatus.Status().Details.Kind == "node" {
                nodeName := errStatus.Status().Details.Name
                // when node is not found, We do not remove the node right away. Trying again to get
                // the node and if the node is still not found, then remove it from the scheduler cache.
                _, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
                if err != nil && apierrors.IsNotFound(err) {
                    node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}}
                    if err := schedulerCache.RemoveNode(&node); err != nil {
                        klog.V(4).InfoS("Node is not found; failed to remove it from the cache", "node", node.Name)
                    }
                }
            }
        } else {
            klog.ErrorS(err, "Error scheduling pod; retrying", "pod", klog.KObj(pod))
        }

        // Check if the Pod exists in informer cache.
        cachedPod, err := podLister.Pods(pod.Namespace).Get(pod.Name)
        if err != nil {
            klog.InfoS("Pod doesn't exist in informer cache", "pod", klog.KObj(pod), "err", err)
            return
        }

        // In the case of extender, the pod may have been bound successfully, but timed out returning its response to the scheduler.
        // It could result in the live version to carry .spec.nodeName, and that's inconsistent with the internal-queued version.
        if len(cachedPod.Spec.NodeName) != 0 {
            klog.InfoS("Pod has been assigned to node. Abort adding it back to queue.", "pod", klog.KObj(pod), "node", cachedPod.Spec.NodeName)
            return
        }

        // As <cachedPod> is from SharedInformer, we need to do a DeepCopy() here.
        podInfo.PodInfo = framework.NewPodInfo(cachedPod.DeepCopy())
        // 添加到unschedulable队列中
        if err := podQueue.AddUnschedulableIfNotPresent(podInfo, podQueue.SchedulingCycle()); err != nil {
            klog.ErrorS(err, "Error occurred")
        }
    }
}

下面来到 AddUnschedulableIfNotPresent ,这个也是操作 backoffQunschedulablePods 的真正的动作

AddUnschedulableIfNotPresent 函数会吧无法调度的 pod 插入队列,除非它已经在队列中。通常情况下,PriorityQueue 将不可调度的 Pod 放在 unschedulablePods 中。但如果最近有 move request,则将 pod 放入 podBackoffQ 中。

func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.QueuedPodInfo, podSchedulingCycle int64) error {
    p.lock.Lock()
    defer p.lock.Unlock()
    pod := pInfo.Pod
    // 如果已经存在则不添加
    if p.unschedulablePods.get(pod) != nil {
        return fmt.Errorf("Pod %v is already present in unschedulable queue", klog.KObj(pod))
    }
    // 检查是否在activeQ中
    if _, exists, _ := p.activeQ.Get(pInfo); exists {
        return fmt.Errorf("Pod %v is already present in the active queue", klog.KObj(pod))
    }
    // 检查是否在podBackoffQ中
    if _, exists, _ := p.podBackoffQ.Get(pInfo); exists {
        return fmt.Errorf("Pod %v is already present in the backoff queue", klog.KObj(pod))
    }

    // 在重新添加时,会刷新 Pod时间为最新操作的时间
    pInfo.Timestamp = p.clock.Now()

    for plugin := range pInfo.UnschedulablePlugins {
        metrics.UnschedulableReason(plugin, pInfo.Pod.Spec.SchedulerName).Inc()
    }
    // 如果接受到move request那么则放入BackoffQ
    if p.moveRequestCycle >= podSchedulingCycle {
        if err := p.podBackoffQ.Add(pInfo); err != nil {
            return fmt.Errorf("error adding pod %v to the backoff queue: %v", pod.Name, err)
        }
        metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", ScheduleAttemptFailure).Inc()
    } else {
        // 否则将放入到 unschedulablePods
        p.unschedulablePods.addOrUpdate(pInfo)
        metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc()

    }

    p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil)
    return nil
}

在启动 scheduler 时,会将这两个队列异步启用两个loop来操作队列。表现在 Run()

func (p *PriorityQueue) Run() {
    go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
    go wait.Until(p.flushUnschedulablePodsLeftover, 30*time.Second, p.stop)
}

可以看到 flushBackoffQCompleted 作为 BackoffQ 实现;而 flushUnschedulablePodsLeftover 作为 UnschedulablePods 实现。

flushBackoffQCompleted 是用于将所有已完成回退的 pod 从 backoffQ 移到 activeQ

func (p *PriorityQueue) flushBackoffQCompleted() {
    p.lock.Lock()
    defer p.lock.Unlock()
    broadcast := false
    for { // 这就是heap实现的方法,窥视下,但不弹出
        rawPodInfo := p.podBackoffQ.Peek()
        if rawPodInfo == nil {
            break
        }
        pod := rawPodInfo.(*framework.QueuedPodInfo).Pod
        boTime := p.getBackoffTime(rawPodInfo.(*framework.QueuedPodInfo))
        if boTime.After(p.clock.Now()) {
            break
        }
        _, err := p.podBackoffQ.Pop() // 弹出一个
        if err != nil {
            klog.ErrorS(err, "Unable to pop pod from backoff queue despite backoff completion", "pod", klog.KObj(pod))
            break
        }
        p.activeQ.Add(rawPodInfo) // 放入到活动队列中
        metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc()
        broadcast = true
    }

    if broadcast {
        p.cond.Broadcast()
    }
}

flushUnschedulablePodsLeftover 函数用于将在 unschedulablePods 中的存放时间超过 podMaxInUnschedulablePodsDuration 值的 pod 移动到 backoffQactiveQ 中。

podMaxInUnschedulablePodsDuration 会根据配置传入,当没有传入,也就是使用了 Deprecated 那么会为5分钟。

func NewOptions() *Options {
    o := &Options{
        SecureServing:  apiserveroptions.NewSecureServingOptions().WithLoopback(),
        Authentication: apiserveroptions.NewDelegatingAuthenticationOptions(),
        Authorization:  apiserveroptions.NewDelegatingAuthorizationOptions(),
        Deprecated: &DeprecatedOptions{
            PodMaxInUnschedulablePodsDuration: 5 * time.Minute,
        },

对于 flushUnschedulablePodsLeftover 就是做一个时间对比,然后添加到对应的队列中

func (p *PriorityQueue) flushUnschedulablePodsLeftover() {
    p.lock.Lock()
    defer p.lock.Unlock()

    var podsToMove []*framework.QueuedPodInfo
    currentTime := p.clock.Now()
    for _, pInfo := range p.unschedulablePods.podInfoMap {
        lastScheduleTime := pInfo.Timestamp
        if currentTime.Sub(lastScheduleTime) > p.podMaxInUnschedulablePodsDuration {
            podsToMove = append(podsToMove, pInfo)
        }
    }

    if len(podsToMove) > 0 {
        p.movePodsToActiveOrBackoffQueue(podsToMove, UnschedulableTimeout)
    }
}
  • 在构建一个 scheduler 时经历如下步骤:

    • 准备cache,informer,queue,错误处理函数等
    • 添加事件函数,会监听资源(如Pod),当有变动则触发对应事件函数,这是入站 activeQ
  • 构建完成后会 run,run时会run一个 SchedulingQueue,这个是作为不可调度队列

    • BackoffQ
    • UnschedulablePods
    • 不可调度队列会根据注册时定期消费队列中Pod将其添加到 activeQ
  • 启动一个 scheduleOne 的loop,这个是调度上下文中所有的扩展点的执行,也是 activeQ 的消费端

    • scheduleOne 获取 pod
    • 执行各个扩展点,如果出错则 Error 函数 MakeDefaultErrorFunc 将其添加到不可调度队列中
    • 回到不可调度队列中消费部分

Reference

[1] kubernetes scheduler extender