k8s daemonset controller源码分析
阅读原文时间:2021年12月23日阅读:3

daemonset controller简介

daemonset controller是kube-controller-manager组件中众多控制器中的一个,是 daemonset 资源对象的控制器,其通过对daemonset、pod、node、ControllerRevision四种资源的监听,当这四种资源发生变化时会触发 daemonset controller 对相应的daemonset资源进行调谐操作,从而完成daemonset在合适node上pod的创建、在不合适node上pod的删除、daemonset的滚动更新、daemonset状态status更新、旧版本daemonset清理等操作。

daemonset controller架构图

daemonset controller的大致组成和处理流程如下图,daemonset controller对daemonset、pod、node、ControllerRevision对象注册了event handler,当有事件时,会watch到然后将对应的daemonset对象放入到queue中,然后syncDaemonset方法为daemonset controller调谐daemonset对象的核心处理逻辑所在,从queue中取出daemonset对象,做调谐处理。

daemonset更新策略

(1)OnDelete:使用 OnDelete 更新策略时,在更新 DaemonSet pod模板后,只有当你手动删除老的 DaemonSet pods 之后,新的 DaemonSet Pod 才会被自动创建。

(2)RollingUpdate:默认的更新策略。使用 RollingUpdate 更新策略时,在更新 DaemonSet pod模板后, 老的 DaemonSet pods 将被删除,并且将根据滚动更新配置自动创建新的 DaemonSet pods。 滚动更新期间,最多只能有 DaemonSet 的一个 Pod 运行于每个节点上。

daemonset controller分析将分为两大块进行,分别是:

(1)daemonset controller初始化与启动分析;

(2)daemonset controller处理逻辑分析。

1.daemonset controller初始化与启动分析

基于tag v1.17.4

https://github.com/kubernetes/kubernetes/releases/tag/v1.17.4

直接看到startDaemonSetController函数,作为daemonset controller初始化与启动分析的入口。

startDaemonSetController

startDaemonSetController主要逻辑:

(1)调用daemon.NewDaemonSetsController新建并初始化DaemonSetsController;

(2)拉起一个goroutine,跑DaemonSetsController的Run方法。

// cmd/kube-controller-manager/app/apps.go
func startDaemonSetController(ctx ControllerContext) (http.Handler, bool, error) {
    if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "daemonsets"}] {
        return nil, false, nil
    }
    dsc, err := daemon.NewDaemonSetsController(
        ctx.InformerFactory.Apps().V1().DaemonSets(),
        ctx.InformerFactory.Apps().V1().ControllerRevisions(),
        ctx.InformerFactory.Core().V1().Pods(),
        ctx.InformerFactory.Core().V1().Nodes(),
        ctx.ClientBuilder.ClientOrDie("daemon-set-controller"),
        flowcontrol.NewBackOff(1*time.Second, 15*time.Minute),
    )
    if err != nil {
        return nil, true, fmt.Errorf("error creating DaemonSets controller: %v", err)
    }
    go dsc.Run(int(ctx.ComponentConfig.DaemonSetController.ConcurrentDaemonSetSyncs), ctx.Stop)
    return nil, true, nil
}

1.1 daemon.NewDaemonSetsController

daemon.NewDaemonSetsController函数代码中可以看到,daemonset controller注册了daemonset、node、pod与ControllerRevisions对象的EventHandler,也即对这几个对象的event进行监听,把event放入事件队列并做处理。并且将dsc.syncDaemonSet方法赋值给dsc.syncHandler,也即注册为核心处理方法,在dsc.Run方法中会调用该核心处理方法来调谐daemonset对象(核心处理方法后面会进行详细分析)。

// pkg/controller/daemon/daemon_controller.go
func NewDaemonSetsController(
    daemonSetInformer appsinformers.DaemonSetInformer,
    historyInformer appsinformers.ControllerRevisionInformer,
    podInformer coreinformers.PodInformer,
    nodeInformer coreinformers.NodeInformer,
    kubeClient clientset.Interface,
    failedPodsBackoff *flowcontrol.Backoff,
) (*DaemonSetsController, error) {
    eventBroadcaster := record.NewBroadcaster()
    eventBroadcaster.StartLogging(klog.Infof)
    eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})

    if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
        if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("daemon_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil {
            return nil, err
        }
    }
    dsc := &DaemonSetsController{
        kubeClient:    kubeClient,
        eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}),
        podControl: controller.RealPodControl{
            KubeClient: kubeClient,
            Recorder:   eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}),
        },
        crControl: controller.RealControllerRevisionControl{
            KubeClient: kubeClient,
        },
        burstReplicas: BurstReplicas,
        expectations:  controller.NewControllerExpectations(),
        queue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "daemonset"),
    }

    daemonSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            ds := obj.(*apps.DaemonSet)
            klog.V(4).Infof("Adding daemon set %s", ds.Name)
            dsc.enqueueDaemonSet(ds)
        },
        UpdateFunc: func(old, cur interface{}) {
            oldDS := old.(*apps.DaemonSet)
            curDS := cur.(*apps.DaemonSet)
            klog.V(4).Infof("Updating daemon set %s", oldDS.Name)
            dsc.enqueueDaemonSet(curDS)
        },
        DeleteFunc: dsc.deleteDaemonset,
    })
    dsc.dsLister = daemonSetInformer.Lister()
    dsc.dsStoreSynced = daemonSetInformer.Informer().HasSynced

    historyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    dsc.addHistory,
        UpdateFunc: dsc.updateHistory,
        DeleteFunc: dsc.deleteHistory,
    })
    dsc.historyLister = historyInformer.Lister()
    dsc.historyStoreSynced = historyInformer.Informer().HasSynced

    // Watch for creation/deletion of pods. The reason we watch is that we don't want a daemon set to create/delete
    // more pods until all the effects (expectations) of a daemon set's create/delete have been observed.
    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    dsc.addPod,
        UpdateFunc: dsc.updatePod,
        DeleteFunc: dsc.deletePod,
    })
    dsc.podLister = podInformer.Lister()

    // This custom indexer will index pods based on their NodeName which will decrease the amount of pods we need to get in simulate() call.
    podInformer.Informer().GetIndexer().AddIndexers(cache.Indexers{
        "nodeName": indexByPodNodeName,
    })
    dsc.podNodeIndex = podInformer.Informer().GetIndexer()
    dsc.podStoreSynced = podInformer.Informer().HasSynced

    nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    dsc.addNode,
        UpdateFunc: dsc.updateNode,
    },
    )
    dsc.nodeStoreSynced = nodeInformer.Informer().HasSynced
    dsc.nodeLister = nodeInformer.Lister()

    dsc.syncHandler = dsc.syncDaemonSet
    dsc.enqueueDaemonSet = dsc.enqueue

    dsc.failedPodsBackoff = failedPodsBackoff

    return dsc, nil
}

1.2 dsc.Run

主要看到for循环处,根据workers的值(默认值为2),启动相应数量的goroutine,跑dsc.runWorker方法,主要是调用前面讲到的daemonset controller核心处理方法dsc.syncDaemonSet

// pkg/controller/daemon/daemon_controller.go
func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    defer dsc.queue.ShutDown()

    klog.Infof("Starting daemon sets controller")
    defer klog.Infof("Shutting down daemon sets controller")

    if !cache.WaitForNamedCacheSync("daemon sets", stopCh, dsc.podStoreSynced, dsc.nodeStoreSynced, dsc.historyStoreSynced, dsc.dsStoreSynced) {
        return
    }

    for i := 0; i < workers; i++ {
        go wait.Until(dsc.runWorker, time.Second, stopCh)
    }

    go wait.Until(dsc.failedPodsBackoff.GC, BackoffGCInterval, stopCh)

    <-stopCh
}

1.2.1 dsc.runWorker

从queue队列中取出事件key,并调用dsc.syncHandledsc.syncDaemonSet做调谐处理。queue队列里的事件来源前面讲过,是daemonset controller注册的daemonset、node、pod与ControllerRevisions对象的EventHandler,它们的变化event会被监听到然后放入queue中。

// pkg/controller/daemon/daemon_controller.go
func (dsc *DaemonSetsController) runWorker() {
    for dsc.processNextWorkItem() {
    }
}

func (dsc *DaemonSetsController) processNextWorkItem() bool {
    dsKey, quit := dsc.queue.Get()
    if quit {
        return false
    }
    defer dsc.queue.Done(dsKey)

    err := dsc.syncHandler(dsKey.(string))
    if err == nil {
        dsc.queue.Forget(dsKey)
        return true
    }

    utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err))
    dsc.queue.AddRateLimited(dsKey)

    return true
}

2.daemonset controller核心处理逻辑分析

syncDaemonSet

直接看到daemonset controller核心处理方法syncDaemonSet。

主要逻辑:

(1)获取执行方法时的当前时间,并定义defer函数,用于计算该方法总执行时间,也即统计对一个 daemonset 进行同步调谐操作的耗时;

(2)根据 daemonset 对象的命名空间与名称,获取 daemonset 对象;

(3)获取所有node的对象列表;

(4)判断daemonset对象的DeletionTimestamp是否为空,不为空则直接return,代表该daemonset对象正在被删除,无需再调谐;

(5)调用dsc.constructHistory获取daemonset的历史版本;

(6)调用dsc.expectations.SatisfiedExpectations,判断该daemonset对象是否满足expectations机制(expectations机制与replicaset controller分析中的用途一致,这里不再展开分析),不满足则调用dsc.updateDaemonSetStatus更新daemonset状态后直接return;

(7)调用dsc.manage,dsc.manage方法中不区分新旧daemonset版本的pod,只保证daemonset的pod运行在每一个合适条件的node上,在合适的node上没有daemonset的pod时创建pod,且把不符合条件的node上的daemonset pod删除掉;

(8)再次调用dsc.expectations.SatisfiedExpectations判断是否满足expectations机制,满足则判断daemonset配置的更新策略,如果是滚动更新则调用dsc.rollingUpdate,主要用于处理daemonset对象的滚动更新处理,根据配置的滚动更新配置,删除旧的pod(pod的创建操作在dsc.manage方法中进行);

当daemonset更新策略配置为OnDelete时,这里不做额外处理,因为只有当手动删除老的 DaemonSet pods 之后,新的 DaemonSet Pod 才会被自动创建,手动删除老的pod后,将在dsc.manage方法中创建新版本的pod;

(9)调用dsc.cleanupHistory,根据daemonset的spec.revisionHistoryLimit配置以及版本新旧顺序(优先清理最老旧版本)来清理daemonset的已经不存在pod的历史版本;

(10)最后调用dsc.updateDaemonSetStatus,根据现存daemonset pod的部署情况以及pod的状态、node是否满足pod运行条件等信息,更新daemonset的status。

// pkg/controller/daemon/daemon_controller.go
func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
    startTime := time.Now()
    defer func() {
        klog.V(4).Infof("Finished syncing daemon set %q (%v)", key, time.Since(startTime))
    }()

    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        return err
    }
    ds, err := dsc.dsLister.DaemonSets(namespace).Get(name)
    if errors.IsNotFound(err) {
        klog.V(3).Infof("daemon set has been deleted %v", key)
        dsc.expectations.DeleteExpectations(key)
        return nil
    }
    if err != nil {
        return fmt.Errorf("unable to retrieve ds %v from store: %v", key, err)
    }

    nodeList, err := dsc.nodeLister.List(labels.Everything())
    if err != nil {
        return fmt.Errorf("couldn't get list of nodes when syncing daemon set %#v: %v", ds, err)
    }

    everything := metav1.LabelSelector{}
    if reflect.DeepEqual(ds.Spec.Selector, &everything) {
        dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, SelectingAllReason, "This daemon set is selecting all pods. A non-empty selector is required.")
        return nil
    }

    // Don't process a daemon set until all its creations and deletions have been processed.
    // For example if daemon set foo asked for 3 new daemon pods in the previous call to manage,
    // then we do not want to call manage on foo until the daemon pods have been created.
    dsKey, err := controller.KeyFunc(ds)
    if err != nil {
        return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
    }

    // If the DaemonSet is being deleted (either by foreground deletion or
    // orphan deletion), we cannot be sure if the DaemonSet history objects
    // it owned still exist -- those history objects can either be deleted
    // or orphaned. Garbage collector doesn't guarantee that it will delete
    // DaemonSet pods before deleting DaemonSet history objects, because
    // DaemonSet history doesn't own DaemonSet pods. We cannot reliably
    // calculate the status of a DaemonSet being deleted. Therefore, return
    // here without updating status for the DaemonSet being deleted.
    if ds.DeletionTimestamp != nil {
        return nil
    }

    // Construct histories of the DaemonSet, and get the hash of current history
    cur, old, err := dsc.constructHistory(ds)
    if err != nil {
        return fmt.Errorf("failed to construct revisions of DaemonSet: %v", err)
    }
    hash := cur.Labels[apps.DefaultDaemonSetUniqueLabelKey]

    if !dsc.expectations.SatisfiedExpectations(dsKey) {
        // Only update status. Don't raise observedGeneration since controller didn't process object of that generation.
        return dsc.updateDaemonSetStatus(ds, nodeList, hash, false)
    }

    err = dsc.manage(ds, nodeList, hash)
    if err != nil {
        return err
    }

    // Process rolling updates if we're ready.
    if dsc.expectations.SatisfiedExpectations(dsKey) {
        switch ds.Spec.UpdateStrategy.Type {
        case apps.OnDeleteDaemonSetStrategyType:
        case apps.RollingUpdateDaemonSetStrategyType:
            err = dsc.rollingUpdate(ds, nodeList, hash)
        }
        if err != nil {
            return err
        }
    }

    err = dsc.cleanupHistory(ds, old)
    if err != nil {
        return fmt.Errorf("failed to clean up revisions of DaemonSet: %v", err)
    }

    return dsc.updateDaemonSetStatus(ds, nodeList, hash, true)
}

2.1 dsc.manage

dsc.manage方法中不区分新旧daemonset版本的pod,主要是用于保证daemonset的pod运行在每一个合适条件的node上,在合适的node上没有daemonset的pod时创建pod,且把不符合条件的node上的daemonset pod删除掉。

主要逻辑:

(1)调用dsc.getNodesToDaemonPods,根据daemonset的Selector获取daemonset的所有pod,然后返回pod与node的对应关联关系map;

(2)遍历前面获取到的node列表,执行dsc.podsShouldBeOnNode,根据pod是否指定了nodeName、nodeSelector、ToleratesNodeTaints等,以及node对象的相关信息来做比对,来确定在某个node上是否已经存在daemonset对应的pod,以及是要为该daemonset创建pod还是删除pod;

(3)调用getUnscheduledPodsWithoutNode,将pod的nodeName与前面获取到的node列表比对,将nodeName不存在的pod加入到要被删除的pod列表中;

(4)调用dsc.syncNodes,根据前面获取到的要创建的pod的node列表以及要删除的pod列表,做相应的创建、删除pod的操作。

// pkg/controller/daemon/daemon_controller.go
func (dsc *DaemonSetsController) manage(ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
    // Find out the pods which are created for the nodes by DaemonSet.
    nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
    if err != nil {
        return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
    }

    // For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon
    // pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node.
    var nodesNeedingDaemonPods, podsToDelete []string
    for _, node := range nodeList {
        nodesNeedingDaemonPodsOnNode, podsToDeleteOnNode, err := dsc.podsShouldBeOnNode(
            node, nodeToDaemonPods, ds)

        if err != nil {
            continue
        }

        nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, nodesNeedingDaemonPodsOnNode...)
        podsToDelete = append(podsToDelete, podsToDeleteOnNode...)
    }

    // Remove unscheduled pods assigned to not existing nodes when daemonset pods are scheduled by scheduler.
    // If node doesn't exist then pods are never scheduled and can't be deleted by PodGCController.
    podsToDelete = append(podsToDelete, getUnscheduledPodsWithoutNode(nodeList, nodeToDaemonPods)...)

    // Label new pods using the hash label value of the current history when creating them
    if err = dsc.syncNodes(ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil {
        return err
    }

    return nil
}

2.1.1 dsc.podsShouldBeOnNode

dsc.podsShouldBeOnNode方法用于判断一个node上是否需要运行daemonset pod,方法返回nodesNeedingDaemonPods与podsToDelete,分别代表需要运行daemonset pod的node、需要被删除的pod列表。

主要逻辑:

(1)调用dsc.nodeShouldRunDaemonPod,返回shouldSchedule与shouldContinueRunning,分别代表daemonset pod是否应该调度到某node、某node上的daemonset pod是否可以继续运行;

(2)当shouldSchedule为true,即pod应该调度到某node,但现在不存在时,将该node添加到nodesNeedingDaemonPods;

(3)当shouldContinueRunning为true,找出在该node上还在运行没有退出的daemonset pod列表,然后按照pod创建时间排序,只保留最新创建的pod,其余的加入到podsToDelete;

(4)当shouldContinueRunning为false,即daemonset pod不应继续在某node上运行,且现在该node已经存在该daemonset pod时,将node上该daemonset的所有pod都加入到podsToDelete;

(5)返回nodesNeedingDaemonPods与podsToDelete,分别代表需要运行daemonset pod的node、需要被删除的pod列表。

// pkg/controller/daemon/daemon_controller.go
func (dsc *DaemonSetsController) podsShouldBeOnNode(
    node *v1.Node,
    nodeToDaemonPods map[string][]*v1.Pod,
    ds *apps.DaemonSet,
) (nodesNeedingDaemonPods, podsToDelete []string, err error) {

    _, shouldSchedule, shouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(node, ds)
    if err != nil {
        return
    }

    daemonPods, exists := nodeToDaemonPods[node.Name]

    switch {
    case shouldSchedule && !exists:
        // If daemon pod is supposed to be running on node, but isn't, create daemon pod.
        nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, node.Name)
    case shouldContinueRunning:
        // If a daemon pod failed, delete it
        // If there's non-daemon pods left on this node, we will create it in the next sync loop
        var daemonPodsRunning []*v1.Pod
        for _, pod := range daemonPods {
            if pod.DeletionTimestamp != nil {
                continue
            }
            if pod.Status.Phase == v1.PodFailed {
                // This is a critical place where DS is often fighting with kubelet that rejects pods.
                // We need to avoid hot looping and backoff.
                backoffKey := failedPodsBackoffKey(ds, node.Name)

                now := dsc.failedPodsBackoff.Clock.Now()
                inBackoff := dsc.failedPodsBackoff.IsInBackOffSinceUpdate(backoffKey, now)
                if inBackoff {
                    delay := dsc.failedPodsBackoff.Get(backoffKey)
                    klog.V(4).Infof("Deleting failed pod %s/%s on node %s has been limited by backoff - %v remaining",
                        pod.Namespace, pod.Name, node.Name, delay)
                    dsc.enqueueDaemonSetAfter(ds, delay)
                    continue
                }

                dsc.failedPodsBackoff.Next(backoffKey, now)

                msg := fmt.Sprintf("Found failed daemon pod %s/%s on node %s, will try to kill it", pod.Namespace, pod.Name, node.Name)
                klog.V(2).Infof(msg)
                // Emit an event so that it's discoverable to users.
                dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedDaemonPodReason, msg)
                podsToDelete = append(podsToDelete, pod.Name)
            } else {
                daemonPodsRunning = append(daemonPodsRunning, pod)
            }
        }
        // If daemon pod is supposed to be running on node, but more than 1 daemon pod is running, delete the excess daemon pods.
        // Sort the daemon pods by creation time, so the oldest is preserved.
        if len(daemonPodsRunning) > 1 {
            sort.Sort(podByCreationTimestampAndPhase(daemonPodsRunning))
            for i := 1; i < len(daemonPodsRunning); i++ {
                podsToDelete = append(podsToDelete, daemonPodsRunning[i].Name)
            }
        }
    case !shouldContinueRunning && exists:
        // If daemon pod isn't supposed to run on node, but it is, delete all daemon pods on node.
        for _, pod := range daemonPods {
            if pod.DeletionTimestamp != nil {
                continue
            }
            podsToDelete = append(podsToDelete, pod.Name)
        }
    }

    return nodesNeedingDaemonPods, podsToDelete, nil
}
dsc.nodeShouldRunDaemonPod

关于dsc.nodeShouldRunDaemonPod方法,不做展开分析,它主要是调用dsc.simulate执行Predicates预选算法来检查某个node 是否满足pod的运行条件,如果预选失败,则根据失败信息,返回wantToRun、shouldSchedule、shouldContinueRunning,分别代表node与pod的selector、taints 等是否匹配(不考虑node资源是否充足)、daemonset pod是否应该调度到某node、某node上的daemonset pod是否可以继续运行,预选成功则全都返回true。

2.1.2 dsc.syncNodes

dsc.syncNodes是daemonset controller对pod进行创建和删除操作的方法。

该方法也涉及到expectations机制,与replicaset controller中的expectations机制作用一致,使用上也基本一致,忘记的可以回头看下replicaset controller分析中对expectations机制的分析,这里不再对expectations机制展开分析。

主要逻辑:

(1)计算要创建、删除pod的数量,上限为dsc.burstReplicas(250),即每一次对daemonset对象的同步操作,能创建/删除的pod数量上限为250,超出的部分需要在下一次同步操作才能进行;

(2)调用dsc.expectations.SetExpectations,设置expectations;

(3)调用util.CreatePodTemplate,计算并获取要创建的podTemplate;

(4)先进行pod的创建操作:pod的创建与replicaset controller创建pod类似,使用了慢开始算法,分多批次进行创建,第一批创建1个pod,第二批创建2个pod,第三批创建4个pod,以2倍往下依次执行,直到达到期望为止;而每一批次的创建,会拉起与要创建pod数量相等的goroutine,每个goroutine负责创建一个pod,并使用WaitGroup等待该批次的所有创建任务完成,再进行下一批次的创建;

(4)再进行pod的删除操作:对于每个要删除的pod,都拉起一个goroutine来做删除操作,并使用WaitGroup等待所有goroutine完成。

// pkg/controller/daemon/daemon_controller.go
func (dsc *DaemonSetsController) syncNodes(ds *apps.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string, hash string) error {
    // We need to set expectations before creating/deleting pods to avoid race conditions.
    dsKey, err := controller.KeyFunc(ds)
    if err != nil {
        return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
    }

    createDiff := len(nodesNeedingDaemonPods)
    deleteDiff := len(podsToDelete)

    if createDiff > dsc.burstReplicas {
        createDiff = dsc.burstReplicas
    }
    if deleteDiff > dsc.burstReplicas {
        deleteDiff = dsc.burstReplicas
    }

    dsc.expectations.SetExpectations(dsKey, createDiff, deleteDiff)

    // error channel to communicate back failures.  make the buffer big enough to avoid any blocking
    errCh := make(chan error, createDiff+deleteDiff)

    klog.V(4).Infof("Nodes needing daemon pods for daemon set %s: %+v, creating %d", ds.Name, nodesNeedingDaemonPods, createDiff)
    createWait := sync.WaitGroup{}
    // If the returned error is not nil we have a parse error.
    // The controller handles this via the hash.
    generation, err := util.GetTemplateGeneration(ds)
    if err != nil {
        generation = nil
    }
    template := util.CreatePodTemplate(ds.Spec.Template, generation, hash)
    // Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
    // and double with each successful iteration in a kind of "slow start".
    // This handles attempts to start large numbers of pods that would
    // likely all fail with the same error. For example a project with a
    // low quota that attempts to create a large number of pods will be
    // prevented from spamming the API service with the pod create requests
    // after one of its pods fails.  Conveniently, this also prevents the
    // event spam that those failures would generate.
    batchSize := integer.IntMin(createDiff, controller.SlowStartInitialBatchSize)
    for pos := 0; createDiff > pos; batchSize, pos = integer.IntMin(2*batchSize, createDiff-(pos+batchSize)), pos+batchSize {
        errorCount := len(errCh)
        createWait.Add(batchSize)
        for i := pos; i < pos+batchSize; i++ {
            go func(ix int) {
                defer createWait.Done()

                podTemplate := template.DeepCopy()
                // The pod's NodeAffinity will be updated to make sure the Pod is bound
                // to the target node by default scheduler. It is safe to do so because there
                // should be no conflicting node affinity with the target node.
                podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity(
                    podTemplate.Spec.Affinity, nodesNeedingDaemonPods[ix])

                err := dsc.podControl.CreatePodsWithControllerRef(ds.Namespace, podTemplate,
                    ds, metav1.NewControllerRef(ds, controllerKind))

                if err != nil {
                    if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
                        // If the namespace is being torn down, we can safely ignore
                        // this error since all subsequent creations will fail.
                        return
                    }
                }
                if err != nil {
                    klog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
                    dsc.expectations.CreationObserved(dsKey)
                    errCh <- err
                    utilruntime.HandleError(err)
                }
            }(i)
        }
        createWait.Wait()
        // any skipped pods that we never attempted to start shouldn't be expected.
        skippedPods := createDiff - (batchSize + pos)
        if errorCount < len(errCh) && skippedPods > 0 {
            klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for set %q/%q", skippedPods, ds.Namespace, ds.Name)
            dsc.expectations.LowerExpectations(dsKey, skippedPods, 0)
            // The skipped pods will be retried later. The next controller resync will
            // retry the slow start process.
            break
        }
    }

    klog.V(4).Infof("Pods to delete for daemon set %s: %+v, deleting %d", ds.Name, podsToDelete, deleteDiff)
    deleteWait := sync.WaitGroup{}
    deleteWait.Add(deleteDiff)
    for i := 0; i < deleteDiff; i++ {
        go func(ix int) {
            defer deleteWait.Done()
            if err := dsc.podControl.DeletePod(ds.Namespace, podsToDelete[ix], ds); err != nil {
                klog.V(2).Infof("Failed deletion, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
                dsc.expectations.DeletionObserved(dsKey)
                errCh <- err
                utilruntime.HandleError(err)
            }
        }(i)
    }
    deleteWait.Wait()

    // collect errors if any for proper reporting/retry logic in the controller
    errors := []error{}
    close(errCh)
    for err := range errCh {
        errors = append(errors, err)
    }
    return utilerrors.NewAggregate(errors)
}

2.2 dsc.rollingUpdate

dsc.rollingUpdate方法主要用于处理daemonset对象的滚动更新处理,根据配置的滚动更新配置,删除旧的pod(pod的创建操作在dsc.manage方法中进行)。

主要逻辑:

(1)调用dsc.getNodesToDaemonPods,获取daemonset所属pod与node的对应关联关系map;

(2)调用dsc.getAllDaemonSetPods,获取所有的旧版本daemonset的pod;

(3)调用dsc.getUnavailableNumbers,根据daemonset的滚动更新策略配置获取maxUnavailable值,再获取numUnavailable值,numUnavailable代表在符合条件的node节点中,没有daemonset对应的pod或者pod处于Unavailable状态的node数量;

(4)调用util.SplitByAvailablePods,将旧版本daemonset的所有pod分成oldAvailablePods列表,以及oldUnavailablePods列表;

(5)定义一个字符串数组oldPodsToDelete,用于储存准备要删除的pod;

(6)将全部oldUnavailablePods加入到oldPodsToDelete数组中;

(7)遍历oldAvailablePods列表,当numUnavailable小于maxUnavailable值时,将pod加入到oldPodsToDelete数组中,且numUnavailable值加一;

(8)调用dsc.syncNodes,将oldPodsToDelete数组中的pod删除。

// pkg/controller/daemon/update.go
func (dsc *DaemonSetsController) rollingUpdate(ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
    nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
    if err != nil {
        return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
    }

    _, oldPods := dsc.getAllDaemonSetPods(ds, nodeToDaemonPods, hash)
    maxUnavailable, numUnavailable, err := dsc.getUnavailableNumbers(ds, nodeList, nodeToDaemonPods)
    if err != nil {
        return fmt.Errorf("couldn't get unavailable numbers: %v", err)
    }
    oldAvailablePods, oldUnavailablePods := util.SplitByAvailablePods(ds.Spec.MinReadySeconds, oldPods)

    // for oldPods delete all not running pods
    var oldPodsToDelete []string
    klog.V(4).Infof("Marking all unavailable old pods for deletion")
    for _, pod := range oldUnavailablePods {
        // Skip terminating pods. We won't delete them again
        if pod.DeletionTimestamp != nil {
            continue
        }
        klog.V(4).Infof("Marking pod %s/%s for deletion", ds.Name, pod.Name)
        oldPodsToDelete = append(oldPodsToDelete, pod.Name)
    }

    klog.V(4).Infof("Marking old pods for deletion")
    for _, pod := range oldAvailablePods {
        if numUnavailable >= maxUnavailable {
            klog.V(4).Infof("Number of unavailable DaemonSet pods: %d, is equal to or exceeds allowed maximum: %d", numUnavailable, maxUnavailable)
            break
        }
        klog.V(4).Infof("Marking pod %s/%s for deletion", ds.Name, pod.Name)
        oldPodsToDelete = append(oldPodsToDelete, pod.Name)
        numUnavailable++
    }
    return dsc.syncNodes(ds, oldPodsToDelete, []string{}, hash)
}

2.3 dsc.updateDaemonSetStatus

dsc.updateDaemonSetStatus方法负责根据现存daemonset pod的部署情况以及pod的状态、node是否满足pod运行条件等信息,来更新daemonset的status状态值,这里不对代码展开分析,只分析一下daemonset的status中各个字段的意思。

(1)currentNumberScheduled: 已经调度了daemonset pod的节点数量;

(2)desiredNumberScheduled: 期望调度daemonset pod的节点数量;

(3)numberMisscheduled:不需要调度daemonset pod但已经调度完成了的节点数量;

(4)numberAvailable: pod状态达到Available的数量(pod达到Ready状态MinReadySeconds时间后,就认为达到了Available状态);

(5)numberReady: pod状态达到Ready的数量;

(6)numberUnavailable: desiredNumberScheduled - numberAvailable;

(7)updatedNumberScheduled: 已经调度了最新版本daemonset pod的节点数量。

daemonset controller创建 pod 的流程与 replicaset controller 创建 pod 的流程是相似的,都使用了 expectations 机制并且限制了在一次调谐过程中最多创建或删除的 pod 数量。daemonset的更新方式与 statefulset 一样包含 OnDelete 和 RollingUpdate(滚动更新) 两种,OnDelete 方式需要手动删除对应的 pod,然后daemonset controller才会创建出新的pod,而 RollingUpdate 方式与 statefulset 和 deployment 有所区别, RollingUpdate方式更新时是按照先删除pod再创建pod的顺序进行,不像deployment那样可以先创建出新的pod再删除旧的pod。

daemonset controller架构

daemonset controller的大致组成和处理流程如下图,daemonset controller对daemonset、pod、node、ControllerRevision对象注册了event handler,当有事件时,会watch到然后将对应的daemonset对象放入到queue中,然后syncDaemonset方法为daemonset controller调谐daemonset对象的核心处理逻辑所在,从queue中取出daemonset对象,做调谐处理。

daemonset controller核心处理逻辑

daemonset controller的核心处理逻辑是调谐daomonset对象,使得daemonset在合适node上完成pod的创建、在不合适node上完成pod的删除,触发滚动更新时按照配置的滚动更新策略配置来删除旧的pod、创建新的pod,并根据历史版本限制配置清理daemonset的历史版本,最后更新daemonset对象的status状态。

daemonset controller创建pod算法

daemonset controller创建pod的算法与replicaset controller创建pod的算法几乎相同,按1、2、4、8…的递增趋势分多批次进行(每次调谐中创建pod的数量上限为250个,超过上限的会在下次调谐中再创建),若某批次创建pod有失败的(如apiserver限流,丢弃请求等,注意:超时除外,因为initialization处理有可能超时),则后续批次的pod创建不再进行,需等待该daemonset对象下次调谐时再触发该pod创建算法,进行pod的创建,直至所有满足条件的node上都有该daemonset的pod。

daemonset controller删除pod算法

daemonset controller删除pod的算法是,拉起与要删除的pod数量相同的goroutine来删除pod(每次调谐中删除pod的数量上限为250),并等待所有goroutine执行完成。删除pod有失败的(如apiserver限流,丢弃请求)或超过250上限的部分,需等待该daemonset对象下次调谐时再触发该pod删除算法,进行pod的删除,直至所有期望被删除的pod都被删除。

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章