Kubernetes client-go Informer 源码分析
阅读原文时间:2023年07月09日阅读:1

概述ControllerController 的初始化Controller 的启动processLoopHandleDeltas()SharedIndexInformersharedIndexerInformersharedProcessorprocessorListenersharedProcessor.addListener()sharedProcessor.distribute()sharedProcessor.run()sharedIndexInformer.Run()SharedInformerFactoryNewSharedInformerFactorysharedInformerFactory.Start()小结

源码版本:kubernetes master 分支 commit-fe62fc(2021年10月14日)

Informer 这个词的出镜率很高,我们在很多文章里都可以看到 Informer 的身影,但是我们在源码里真的去找一个叫做 Informer 的对象,却又发现找不到一个单纯的 Informer,但是有很多结构体或者接口里包含了 Informer 这个词……

ReflectorWorkqueue 等组件不同,Informer 相对来说更加模糊,让人初读源码时感觉迷惑。今天我们一起来揭开 Informer 等面纱,看下到底什么是 Informer

《Kubernetes client-go 源码分析 - 开篇》中我们提到过 InformerDeltaFIFO 中 pop 相应对象,然后通过 Indexer 将对象和索引丢到本地 cache 中,再触发相应的事件处理函数(Resource Event Handlers)运行。Informer 在整个自定义控制器工作流程中的位置如下图所示,今天我们具体分析下 Informer 的源码实现。

Informer 通过一个 controller 对象来定义,本身很简单,长这样:

  • client-go/tools/cache/controller.go:89

    1type controller struct {2   config         Config3   reflector      *Reflector4   reflectorMutex sync.RWMutex5   clock          clock.Clock6}

这里有我们熟悉的 Reflector,可以猜到 Informer 启动的时候会去运行 Reflector,从而通过 Reflector 实现 list-watch apiserver,更新“事件”到 DeltaFIFO 中用于进一步处理。Config 对象等会再看,我们继续看下 controller 对应的接口:

  • client-go/tools/cache/controller.go:98

    1type Controller interface {2   Run(stopCh <-chan struct{})3   HasSynced() bool4   LastSyncResourceVersion() string5}

这里的核心明显是 Run(stopCh <-chan struct{}) 方法,Run 负责两件事情:

  1. 构造 Reflector 利用 ListerWatcher 的能力将对象事件更新到 DeltaFIFO;
  2. 从 DeltaFIFO 中 Pop 对象然后调用 ProcessFunc 来处理;

Controller 的初始化

Controller 的 New 方法很简单:

  • client-go/tools/cache/controller.go:116

    1func New(c Config) Controller {2   ctlr := &controller{3      config: c,4      clock:  &clock.RealClock{},5   }6   return ctlr7}

这里没有太多的逻辑,主要是传递了一个 Config 进来,可以猜到核心逻辑是 Config 从何而来以及后面如何使用。我们先向上跟一下 Config 从哪里来,New() 的调用有几个地方,我们不去看 newInformer() 分支的代码,因为实际开发中主要是使用 SharedIndexInformer,两个入口初始化 Controller 的逻辑类似,我们直接跟更实用的一个分支,看 func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) 方法中如何调用的 New()

  • client-go/tools/cache/shared_informer.go:368

    1func (s sharedIndexInformer) Run(stopCh <-chan struct{}) { 2    // …… 3    fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ 4        KnownObjects:          s.indexer, 5        EmitDeltaTypeReplaced: true, 6    }) 7 8    cfg := &Config{ 9        Queue:            fifo,10        ListerWatcher:    s.listerWatcher,11        ObjectType:       s.objectType,12        FullResyncPeriod: s.resyncCheckPeriod,13        RetryOnError:     false,14        ShouldResync:     s.processor.shouldResync,1516        Process:           s.HandleDeltas,17        WatchErrorHandler: s.watchErrorHandler,18    }1920    func() {21        s.startedLock.Lock()22        defer s.startedLock.Unlock()2324        s.controller = New(cfg)25        s.controller.(controller).clock = s.clock26        s.started = true27    }()28  // ……29    s.controller.Run(stopCh)30}

上面只保留了主要代码,我们后面会分析 SharedIndexInformer,所以这里先不纠结 SharedIndexInformer 的细节,我们从这里可以看到 SharedIndexInformer 的 Run() 过程里会构造一个 Config,然后创建 Controller,最后调用 Controller 的 Run() 方法。另外这里也可以看到我们前面系列文章里分析过的 DeltaFIFO、ListerWatcher 等,这里还有一个比较重要的是 Process:s.HandleDeltas, 这一行,Process 属性的类型是 ProcessFunc,这里可以看到具体的 ProcessFunc 是 HandleDeltas 方法。

Controller 的启动

上面提到 Controller 的初始化本身没有太多的逻辑,主要是构造了一个 Config 对象传递进来,所以 Controller 启动的时候肯定会有这个 Config 的使用逻辑,我们具体来看:

  • client-go/tools/cache/controller.go:127

    1func (c *controller) Run(stopCh <-chan struct{}) { 2   defer utilruntime.HandleCrash() 3   go func() { 4      <-stopCh 5      c.config.Queue.Close() 6   }() 7   // 利用 Config 里的配置构造 Reflector 8   r := NewReflector( 9      c.config.ListerWatcher,10      c.config.ObjectType,11      c.config.Queue,12      c.config.FullResyncPeriod,13   )14   r.ShouldResync = c.config.ShouldResync15   r.WatchListPageSize = c.config.WatchListPageSize16   r.clock = c.clock17   if c.config.WatchErrorHandler != nil {18      r.watchErrorHandler = c.config.WatchErrorHandler19   }2021   c.reflectorMutex.Lock()22   c.reflector = r23   c.reflectorMutex.Unlock()2425   var wg wait.Group26   // 启动 Reflector27   wg.StartWithChannel(stopCh, r.Run)28   // 执行 Controller 的 processLoop29   wait.Until(c.processLoop, time.Second, stopCh)30   wg.Wait()31}

这里的逻辑很简单,构造 Reflector 后运行起来,然后执行 c.processLoop,所以很明显,Controller 的业务逻辑肯定隐藏在 processLoop 方法里,我们继续来看。

processLoop

  • client-go/tools/cache/controller.go:181

    1func (c *controller) processLoop() { 2   for { 3      obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) 4      if err != nil { 5         if err == ErrFIFOClosed { 6            return 7         } 8         if c.config.RetryOnError { 9            c.config.Queue.AddIfNotPresent(obj)10         }11      }12   }13}

这里的逻辑是从 DeltaFIFO 中 Pop 出一个对象丢给 PopProcessFunc 处理,如果失败了就 re-enqueue 到 DeltaFIFO 中。我们前面提到过这里的 PopProcessFunc 实现是 HandleDeltas() 方法,所以这里的主要逻辑就转到了 HandleDeltas() 是如何实现的了。

HandleDeltas()

这里我们先回顾下 DeltaFIFO 的存储结构,看下这个图:

然后再看源码,这里的逻辑主要是遍历一个 Deltas 里的所有 Delta,然后根据 Delta 的类型来决定如何操作 Indexer,也就是更新本地 cache,同时分发相应的通知。

  • client-go/tools/cache/shared_informer.go:537

    1func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { 2   s.blockDeltas.Lock() 3   defer s.blockDeltas.Unlock() 4   // 对于每个 Deltas 来说,里面存了很多的 Delta,也就是对应不同 Type 的多个 Object,这里的遍历会从旧往新走 5   for _, d := range obj.(Deltas) { 6      switch d.Type { 7      // 除了 Deleted 外所有情况 8      case Sync, Replaced, Added, Updated: 9         // 记录变更,没有太多实际作用10         s.cacheMutationDetector.AddObject(d.Object)11         // 通过 indexer 从 cache 里查询当前 Object,如果存在12         if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {13            // 更新 indexer 里的对象14            if err := s.indexer.Update(d.Object); err != nil {15               return err16            }1718            isSync := false19            switch {20            case d.Type == Sync:21               isSync = true22            case d.Type == Replaced:23               if accessor, err := meta.Accessor(d.Object); err == nil {24                  if oldAccessor, err := meta.Accessor(old); err == nil {25                     isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()26                  }27               }28            }29            // 分发一个更新通知30            s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)31           // 如果本地 cache 里没有这个 Object,则添加32         } else {33            if err := s.indexer.Add(d.Object); err != nil {34               return err35            }36            // 分发一个新增通知37            s.processor.distribute(addNotification{newObj: d.Object}, false)38         }39        // 如果是删除操作,则从 indexer 里删除这个 Object,然后分发一个删除通知40      case Deleted:41         if err := s.indexer.Delete(d.Object); err != nil {42            return err43         }44         s.processor.distribute(deleteNotification{oldObj: d.Object}, false)45      }46   }47   return nil48}

这里涉及到一个知识点:s.processor.distribute(addNotification{newObj: d.Object}, false) 中 processor 是什么?如何分发通知的?谁来接收通知?

我们回到 ProcessFunc 的实现上,除了 sharedIndexInformerHandleDeltas() 方法外,还有一个基础版本:

  • client-go/tools/cache/controller.go:446

    1        Process: func(obj interface{}) error { 2            for _, d := range obj.(Deltas) { 3                obj := d.Object 4                if transformer != nil { 5                    var err error 6                    obj, err = transformer(obj) 7                    if err != nil { 8                        return err 9                    }10                }1112                switch d.Type {13                case Sync, Replaced, Added, Updated:14                    if old, exists, err := clientState.Get(obj); err == nil && exists {15                        if err := clientState.Update(obj); err != nil {16                            return err17                        }18                        h.OnUpdate(old, obj)19                    } else {20                        if err := clientState.Add(obj); err != nil {21                            return err22                        }23                        h.OnAdd(obj)24                    }25                case Deleted:26                    if err := clientState.Delete(obj); err != nil {27                        return err28                    }29                    h.OnDelete(obj)30                }31            }32            return nil33        },

这里可以看到逻辑简单很多,除了更新 cache 外,调用了 h.OnAdd(obj)/h.OnUpdate(old, obj)/h.OnDelete(obj) 等方法,这里的 h 是 ResourceEventHandler 类型的,也就是 Process 过程直接调用了 ResourceEventHandler 的相应方法,这样就已经逻辑闭环了,ResourceEventHandler 的这几个方法里做一些简单的过滤后,会将这些对象的 key 丢到 workqueue,进而就触发了自定义调谐函数的运行。

换言之,sharedIndexInformer 中实现的 ProcessFunc 是一个进阶版本,不满足于简单调用 ResourceEventHandler 对应方法来完成 Process 逻辑,所以到这里基础的 Informer 逻辑已经闭环了,我们后面继续来看 sharedIndexInformer 中又对 Informer 做了哪些“增强”

我们在 Operator 开发中,如果不使用 controller-runtime 库,也就是不通过 Kubebuilder 等工具来生成脚手架时,经常会用到 SharedInformerFactory,比如典型的 sample-controller 中的 main() 函数:

  • sample-controller/main.go:40

    1func main() { 2    klog.InitFlags(nil) 3    flag.Parse() 4 5    stopCh := signals.SetupSignalHandler() 6 7    cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig) 8    if err != nil { 9        klog.Fatalf("Error building kubeconfig: %s", err.Error())10    }1112    kubeClient, err := kubernetes.NewForConfig(cfg)13    if err != nil {14        klog.Fatalf("Error building kubernetes clientset: %s", err.Error())15    }1617    exampleClient, err := clientset.NewForConfig(cfg)18    if err != nil {19        klog.Fatalf("Error building example clientset: %s", err.Error())20    }2122    kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second30)23    exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second30)2425    controller := NewController(kubeClient, exampleClient,26        kubeInformerFactory.Apps().V1().Deployments(),27        exampleInformerFactory.Samplecontroller().V1alpha1().Foos())2829    kubeInformerFactory.Start(stopCh)30    exampleInformerFactory.Start(stopCh)3132    if err = controller.Run(2, stopCh); err != nil {33        klog.Fatalf("Error running controller: %s", err.Error())34    }35}

这里可以看到我们依赖于 kubeInformerFactory.Apps().V1().Deployments() 提供一个 Informer,这里的 Deployments() 方法返回的是一个 DeploymentInformer 类型,DeploymentInformer 是什么呢?如下

  • client-go/informers/apps/v1/deployment.go:37

    1type DeploymentInformer interface {2    Informer() cache.SharedIndexInformer3    Lister() v1.DeploymentLister4}

可以看到所谓的 DeploymentInformer 由 “Informer” 和 “Lister” 组成,也就是说我们编码时用到的 Informer 本质就是一个 SharedIndexInformer

  • client-go/tools/cache/shared_informer.go:186

    1type SharedIndexInformer interface {2   SharedInformer3   AddIndexers(indexers Indexers) error4   GetIndexer() Indexer5}

这里的 Indexer 就很熟悉了,SharedInformer 又是啥呢?

  • client-go/tools/cache/shared_informer.go:133

    1type SharedInformer interface { 2   // 可以添加自定义的 ResourceEventHandler 3   AddEventHandler(handler ResourceEventHandler) 4   // 附带 resync 间隔配置,设置为 0 表示不关心 resync 5   AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) 6   // 这里的 Store 指的是 Indexer 7   GetStore() Store 8   // 过时了,没有用 9   GetController() Controller10   // 通过 Run 来启动11   Run(stopCh <-chan struct{})12   // 这里和 resync 逻辑没有关系,表示 Indexer 至少更新过一次全量的对象13   HasSynced() bool14   // 最后一次拿到的 RV15   LastSyncResourceVersion() string16   // 用于每次 ListAndWatch 连接断开时回调,主要就是日志记录的作用17   SetWatchErrorHandler(handler WatchErrorHandler) error18}

sharedIndexerInformer

接下来就该看下 SharedIndexInformer 接口的实现了,sharedIndexerInformer 定义如下:

  • client-go/tools/cache/shared_informer.go:287

    1type sharedIndexInformer struct { 2   indexer    Indexer 3   controller Controller 4   processor             *sharedProcessor 5   cacheMutationDetector MutationDetector 6   listerWatcher ListerWatcher 7   // 表示当前 Informer 期望关注的类型,主要是 GVK 信息 8   objectType runtime.Object 9   // reflector 的 resync 计时器计时间隔,通知所有的 listener 执行 resync10   resyncCheckPeriod time.Duration11   defaultEventHandlerResyncPeriod time.Duration12   clock clock.Clock13   started, stopped bool14   startedLock      sync.Mutex15   blockDeltas sync.Mutex16   watchErrorHandler WatchErrorHandler17}

这里的 Indexer、Controller、ListerWatcher 等都是我们熟悉的组件,sharedProcessor 我们在前面遇到了,需要重点关注一下。

sharedProcessor

sharedProcessor 中维护了 processorListener 集合,然后分发通知对象到这些 listeners,先看下结构定义:

  • client-go/tools/cache/shared_informer.go:588

    1type sharedProcessor struct {2   listenersStarted bool3   listenersLock    sync.RWMutex4   listeners        []processorListener5   syncingListeners []processorListener6   clock            clock.Clock7   wg               wait.Group8}

马上就会有一个疑问了,processorListener 是什么?

processorListener

  • client-go/tools/cache/shared_informer.go:690

    1type processorListener struct { 2   nextCh chan interface{} 3   addCh  chan interface{} 4   // 核心属性 5   handler ResourceEventHandler 6   pendingNotifications buffer.RingGrowing 7   requestedResyncPeriod time.Duration 8   resyncPeriod time.Duration 9   nextResync time.Time10   resyncLock sync.Mutex11}

可以看到 processorListener 里有一个 ResourceEventHandler,这是我们认识的组件。processorListener 有三个主要方法:

  • add(notification interface{})
  • pop()
  • run()

一个个来看吧。

run()

  • client-go/tools/cache/shared_informer.go:775

    1func (p processorListener) run() { 2   stopCh := make(chan struct{}) 3   wait.Until(func() { 4      for next := range p.nextCh { 5         switch notification := next.(type) { 6         case updateNotification: 7            p.handler.OnUpdate(notification.oldObj, notification.newObj) 8         case addNotification: 9            p.handler.OnAdd(notification.newObj)10         case deleteNotification:11            p.handler.OnDelete(notification.oldObj)12         default:13            utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))14         }15      }16      close(stopCh)17   }, 1time.Second, stopCh)18}

这里的逻辑很清晰,从 nextCh 里拿通知,然后根据其类型去调用 ResourceEventHandler 相应的 OnAdd/OnUpdate/OnDelete 方法。

add() 和 pop()

  • client-go/tools/cache/shared_informer.go:741

    1func (p processorListener) add(notification interface{}) { 2   // 将通知放到 addCh 中,所以下面 pop() 方法里先执行到的 case 是第二个 3   p.addCh <- notification 4} 5 6func (p processorListener) pop() { 7   defer utilruntime.HandleCrash() 8   defer close(p.nextCh) // Tell .run() to stop 910   var nextCh chan<- interface{}11   var notification interface{}12   for {13      select {14        // 下面获取到的通知,添加到 nextCh 里,供 run() 方法中消费15      case nextCh <- notification:16         var ok bool17         // 从 pendingNotifications 里消费通知,生产者在下面 case 里18         notification, ok = p.pendingNotifications.ReadOne()19         if !ok {20            nextCh = nil21         }22        // 逻辑从这里开始,从 addCh 里提取通知23      case notificationToAdd, ok := <-p.addCh:24         if !ok {25            return26         }27         if notification == nil { 28            notification = notificationToAdd29            nextCh = p.nextCh30         } else {31            // 新添加的通知丢到 pendingNotifications32            p.pendingNotifications.WriteOne(notificationToAdd)33         }34      }35   }36}

也就是说 processorListener 提供了一定的缓冲机制来接收 notification,然后去消费这些 notification 调用 ResourceEventHandler 相关方法。

然后接着继续看 sharedProcessor 的几个主要方法。

sharedProcessor.addListener()

addListener 会直接调用 listenerrun()pop() 方法,这两个方法的逻辑我们上面已经分析过

  • client-go/tools/cache/shared_informer.go:597

    1func (p sharedProcessor) addListener(listener processorListener) { 2   p.listenersLock.Lock() 3   defer p.listenersLock.Unlock() 4 5   p.addListenerLocked(listener) 6   if p.listenersStarted { 7      p.wg.Start(listener.run) 8      p.wg.Start(listener.pop) 9   }10}

sharedProcessor.distribute()

distribute 的逻辑就是调用 sharedProcessor 内部维护的所有 listner 的 add() 方法

  • client-go/tools/cache/shared_informer.go:613

    1func (p *sharedProcessor) distribute(obj interface{}, sync bool) { 2   p.listenersLock.RLock() 3   defer p.listenersLock.RUnlock() 4 5   if sync { 6      for , listener := range p.syncingListeners { 7         listener.add(obj) 8      } 9   } else {10      for , listener := range p.listeners {11         listener.add(obj)12      }13   }14}

sharedProcessor.run()

run() 的逻辑和前面的 addListener() 类似,也就是调用 listenerrun()pop() 方法

  • client-go/tools/cache/shared_informer.go:628

    1func (p *sharedProcessor) run(stopCh <-chan struct{}) { 2    func() { 3        p.listenersLock.RLock() 4        defer p.listenersLock.RUnlock() 5        for , listener := range p.listeners { 6            p.wg.Start(listener.run) 7            p.wg.Start(listener.pop) 8        } 9        p.listenersStarted = true10    }()11    <-stopCh12    p.listenersLock.RLock()13    defer p.listenersLock.RUnlock()14    for , listener := range p.listeners {15        close(listener.addCh)16    }17    p.wg.Wait()18}

到这里基本就知道 sharedProcessor 的能力了,继续往下看。

sharedIndexInformer.Run()

继续来看 sharedIndexInformerRun() 方法,这里面已经几乎没有陌生的内容了。

  • client-go/tools/cache/shared_informer.go:368

    1func (s sharedIndexInformer) Run(stopCh <-chan struct{}) { 2   defer utilruntime.HandleCrash() 3 4   if s.HasStarted() { 5      klog.Warningf("The sharedIndexInformer has started, run more than once is not allowed") 6      return 7   } 8   // DeltaFIFO 就很熟悉了 9   fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{10      KnownObjects:          s.indexer,11      EmitDeltaTypeReplaced: true,12   })13   // Config 的逻辑也在上面遇到过了14   cfg := &Config{15      Queue:            fifo,16      ListerWatcher:    s.listerWatcher,17      ObjectType:       s.objectType,18      FullResyncPeriod: s.resyncCheckPeriod,19      RetryOnError:     false,20      ShouldResync:     s.processor.shouldResync,2122      Process:           s.HandleDeltas,23      WatchErrorHandler: s.watchErrorHandler,24   }2526   func() {27      s.startedLock.Lock()28      defer s.startedLock.Unlock()29      // 前文分析过这里的 New() 函数逻辑了30      s.controller = New(cfg)31      s.controller.(controller).clock = s.clock32      s.started = true33   }()3435   processorStopCh := make(chan struct{})36   var wg wait.Group37   defer wg.Wait()              38   defer close(processorStopCh) 39   wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)40   // processor 的 run 方法41   wg.StartWithChannel(processorStopCh, s.processor.run)4243   defer func() {44      s.startedLock.Lock()45      defer s.startedLock.Unlock()46      s.stopped = true // Don't want any new listeners47   }()48   // controller 的 Run()49   s.controller.Run(stopCh)50}

到这里也就基本知道了 sharedIndexInformer 的逻辑了,再往上层走就剩下一个 SharedInformerFactory 了,继续看吧~

我们前面提到过 SharedInformerFactory,现在具体来看一下 SharedInformerFactory 是怎么实现的。先看接口定义:

  • client-go/informers/factory.go:187

    1type SharedInformerFactory interface { 2   internalinterfaces.SharedInformerFactory 3   ForResource(resource schema.GroupVersionResource) (GenericInformer, error) 4   WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool 5 6   Admissionregistration() admissionregistration.Interface 7   Internal() apiserverinternal.Interface 8   Apps() apps.Interface 9   Autoscaling() autoscaling.Interface10   Batch() batch.Interface11   Certificates() certificates.Interface12   Coordination() coordination.Interface13   Core() core.Interface14   Discovery() discovery.Interface15   Events() events.Interface16   Extensions() extensions.Interface17   Flowcontrol() flowcontrol.Interface18   Networking() networking.Interface19   Node() node.Interface20   Policy() policy.Interface21   Rbac() rbac.Interface22   Scheduling() scheduling.Interface23   Storage() storage.Interface24}

这里涉及到几个点:

  1. internalinterfaces.SharedInformerFactory

这也是一个接口,比较简短:

1type&nbsp;SharedInformerFactory&nbsp;interface&nbsp;{2&nbsp;&nbsp;&nbsp;Start(stopCh&nbsp;<-chan&nbsp;struct{})3&nbsp;&nbsp;&nbsp;InformerFor(obj&nbsp;runtime.Object,&nbsp;newFunc&nbsp;NewInformerFunc)&nbsp;cache.SharedIndexInformer4}

可以看到熟悉的 SharedIndexInformer

  1. ForResource(resource schema.GroupVersionResource) (GenericInformer, error)

这里接收一个 GVR,返回了一个 GenericInformer,看下什么是 GenericInformer:

1type&nbsp;GenericInformer&nbsp;interface&nbsp;{2&nbsp;&nbsp;&nbsp;Informer()&nbsp;cache.SharedIndexInformer3&nbsp;&nbsp;&nbsp;Lister()&nbsp;cache.GenericLister4}

也很简短。

  1. Apps() apps.Interface 等

后面一堆方法是类似的,我们以 Apps() 为例来看下怎么回事。这里的 Interface 定义如下:

  • client-go/informers/apps/interface.go:29

    1type Interface interface {2   // V1 provides access to shared informers for resources in V1.3   V1() v1.Interface4   // V1beta1 provides access to shared informers for resources in V1beta1.5   V1beta1() v1beta1.Interface6   // V1beta2 provides access to shared informers for resources in V1beta2.7   V1beta2() v1beta2.Interface8}

显然应该继续看下 v1.Interface 是个啥。

  • client-go/informers/apps/v1/interface.go:26

    1type Interface interface { 2   // ControllerRevisions returns a ControllerRevisionInformer. 3   ControllerRevisions() ControllerRevisionInformer 4   // DaemonSets returns a DaemonSetInformer. 5   DaemonSets() DaemonSetInformer 6   // Deployments returns a DeploymentInformer. 7   Deployments() DeploymentInformer 8   // ReplicaSets returns a ReplicaSetInformer. 9   ReplicaSets() ReplicaSetInformer10   // StatefulSets returns a StatefulSetInformer.11   StatefulSets() StatefulSetInformer12}

到这里已经有看着很眼熟的 Deployments() DeploymentInformer 之类的代码了,DeploymentInformer 我们刚才看过内部结构,长这样:

1type&nbsp;DeploymentInformer&nbsp;interface&nbsp;{2&nbsp;&nbsp;&nbsp;Informer()&nbsp;cache.SharedIndexInformer3&nbsp;&nbsp;&nbsp;Lister()&nbsp;v1.DeploymentLister4}

到这里也就不难理解 SharedInformerFactory 的作用了,它提供了所有 API group-version 的资源对应的 SharedIndexInformer,也就不难理解开头我们引用的 sample-controller 中的这行代码:

1kubeInformerFactory.Apps().V1().Deployments()

通过其可以拿到一个 Deployment 资源对应的 SharedIndexInformer。

NewSharedInformerFactory

继续看下 SharedInformerFactory 是如何创建的

  • client-go/informers/factory.go:96

    1func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory {2   return NewSharedInformerFactoryWithOptions(client, defaultResync)3}

可以看到参数非常简单,主要是需要一个 Clientset,毕竟 ListerWatcher 的能力本质还是 client 提供的。

  • client-go/informers/factory.go:109

    1func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options …SharedInformerOption) SharedInformerFactory { 2   factory := &sharedInformerFactory{ 3      client:           client, 4      namespace:        v1.NamespaceAll, // 空字符串 "" 5      defaultResync:    defaultResync, 6      informers:        make(map[reflect.Type]cache.SharedIndexInformer), // 可以存放不同类型的 SharedIndexInformer 7      startedInformers: make(map[reflect.Type]bool), 8      customResync:     make(map[reflect.Type]time.Duration), 9   }1011   for _, opt := range options {12      factory = opt(factory)13   }1415   return factory16}

接着是如何启动

sharedInformerFactory.Start()

  • client-go/informers/factory.go:128

    1func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) { 2   f.lock.Lock() 3   defer f.lock.Unlock() 4 5   for informerType, informer := range f.informers { 6      // 同类型只会调用一次,Run() 的逻辑我们前面介绍过了 7      if !f.startedInformers[informerType] { 8         go informer.Run(stopCh) 9         f.startedInformers[informerType] = true10      }11   }12}

今天我们一个基础 Informer - Controller 开始介绍,先分析了 Controller 的能力,也就是其通过构造 Reflector 并启动从而能够获取指定类型资源的“更新”事件,然后通过事件构造 Delta 放到 DeltaFIFO 中,进而在 processLoop 中从 DeltaFIFO 里 pop Deltas 来处理,一方面将对象通过 Indexer 同步到本地 cache,也就是一个 ThreadSafeStore,一方面调用 ProcessFunc 来处理这些 Delta。

然后 SharedIndexInformer 提供了构造 Controller 的能力,通过 HandleDeltas() 方法实现上面提到的 ProcessFunc,同时还引入了 sharedProcessor 在 HandleDeltas() 中用于事件通知的处理。sharedProcessor 分发事件通知的时候,接收方是内部继续抽象出来的 processorListener,在 processorListener 中完成了 ResourceEventHandler 具体回调函数的调用。

最后 SharedInformerFactory 又进一步封装了提供所有 api 资源对应的 SharedIndexInformer 的能力。也就是说一个 SharedIndexInformer 可以处理一种类型的资源,比如 Deployment 或者 Pod 等,而通过 SharedInformerFactory 可以轻松构造任意已知类型的 SharedIndexInformer。另外这里用到了 Clientset 提供的访问所有 api 资源的能力,通过其也就能够完整实现整套 Informer 逻辑了。

此前我们已经陆续分析了:

各种“组件”分工明确,最终汇聚在 “Informer” 里,实现了一套复杂而优雅的资源处理能力,至此自定义控制器涉及逻辑中 client-go 部分就基本分析完了!

【完整目录参见>>> 《深入理解 K8S 原理与实现》系列目录 <<<

(转载请保留本文原始链接 https://www.danielhu.cn)