概述ControllerController 的初始化Controller 的启动processLoopHandleDeltas()SharedIndexInformersharedIndexerInformersharedProcessorprocessorListenersharedProcessor.addListener()sharedProcessor.distribute()sharedProcessor.run()sharedIndexInformer.Run()SharedInformerFactoryNewSharedInformerFactorysharedInformerFactory.Start()小结
源码版本:kubernetes master 分支 commit-fe62fc(2021年10月14日)
Informer 这个词的出镜率很高,我们在很多文章里都可以看到 Informer 的身影,但是我们在源码里真的去找一个叫做 Informer 的对象,却又发现找不到一个单纯的 Informer,但是有很多结构体或者接口里包含了 Informer 这个词……
和 Reflector、Workqueue 等组件不同,Informer 相对来说更加模糊,让人初读源码时感觉迷惑。今天我们一起来揭开 Informer 等面纱,看下到底什么是 Informer。
在《Kubernetes client-go 源码分析 - 开篇》中我们提到过 Informer 从 DeltaFIFO 中 pop 相应对象,然后通过 Indexer 将对象和索引丢到本地 cache 中,再触发相应的事件处理函数(Resource Event Handlers)运行。Informer 在整个自定义控制器工作流程中的位置如下图所示,今天我们具体分析下 Informer 的源码实现。
Informer 通过一个 controller 对象来定义,本身很简单,长这样:
client-go/tools/cache/controller.go:89
1type controller struct {2 config Config3 reflector *Reflector4 reflectorMutex sync.RWMutex5 clock clock.Clock6}
这里有我们熟悉的 Reflector,可以猜到 Informer 启动的时候会去运行 Reflector,从而通过 Reflector 实现 list-watch apiserver,更新“事件”到 DeltaFIFO 中用于进一步处理。Config 对象等会再看,我们继续看下 controller 对应的接口:
client-go/tools/cache/controller.go:98
1type Controller interface {2 Run(stopCh <-chan struct{})3 HasSynced() bool4 LastSyncResourceVersion() string5}
这里的核心明显是 Run(stopCh <-chan struct{})
方法,Run 负责两件事情:
Controller 的 New 方法很简单:
client-go/tools/cache/controller.go:116
1func New(c Config) Controller {2 ctlr := &controller{3 config: c,4 clock: &clock.RealClock{},5 }6 return ctlr7}
这里没有太多的逻辑,主要是传递了一个 Config 进来,可以猜到核心逻辑是 Config 从何而来以及后面如何使用。我们先向上跟一下 Config 从哪里来,New() 的调用有几个地方,我们不去看 newInformer()
分支的代码,因为实际开发中主要是使用 SharedIndexInformer,两个入口初始化 Controller 的逻辑类似,我们直接跟更实用的一个分支,看 func (s *sharedIndexInformer) Run(stopCh <-chan struct{})
方法中如何调用的 New()
:
client-go/tools/cache/shared_informer.go:368
1func (s sharedIndexInformer) Run(stopCh <-chan struct{}) { 2 // …… 3 fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ 4 KnownObjects: s.indexer, 5 EmitDeltaTypeReplaced: true, 6 }) 7 8 cfg := &Config{ 9 Queue: fifo,10 ListerWatcher: s.listerWatcher,11 ObjectType: s.objectType,12 FullResyncPeriod: s.resyncCheckPeriod,13 RetryOnError: false,14 ShouldResync: s.processor.shouldResync,1516 Process: s.HandleDeltas,17 WatchErrorHandler: s.watchErrorHandler,18 }1920 func() {21 s.startedLock.Lock()22 defer s.startedLock.Unlock()2324 s.controller = New(cfg)25 s.controller.(controller).clock = s.clock26 s.started = true27 }()28 // ……29 s.controller.Run(stopCh)30}
上面只保留了主要代码,我们后面会分析 SharedIndexInformer,所以这里先不纠结 SharedIndexInformer 的细节,我们从这里可以看到 SharedIndexInformer 的 Run() 过程里会构造一个 Config,然后创建 Controller,最后调用 Controller 的 Run() 方法。另外这里也可以看到我们前面系列文章里分析过的 DeltaFIFO、ListerWatcher 等,这里还有一个比较重要的是 Process:s.HandleDeltas,
这一行,Process 属性的类型是 ProcessFunc,这里可以看到具体的 ProcessFunc 是 HandleDeltas 方法。
上面提到 Controller 的初始化本身没有太多的逻辑,主要是构造了一个 Config 对象传递进来,所以 Controller 启动的时候肯定会有这个 Config 的使用逻辑,我们具体来看:
client-go/tools/cache/controller.go:127
1func (c *controller) Run(stopCh <-chan struct{}) { 2 defer utilruntime.HandleCrash() 3 go func() { 4 <-stopCh 5 c.config.Queue.Close() 6 }() 7 // 利用 Config 里的配置构造 Reflector 8 r := NewReflector( 9 c.config.ListerWatcher,10 c.config.ObjectType,11 c.config.Queue,12 c.config.FullResyncPeriod,13 )14 r.ShouldResync = c.config.ShouldResync15 r.WatchListPageSize = c.config.WatchListPageSize16 r.clock = c.clock17 if c.config.WatchErrorHandler != nil {18 r.watchErrorHandler = c.config.WatchErrorHandler19 }2021 c.reflectorMutex.Lock()22 c.reflector = r23 c.reflectorMutex.Unlock()2425 var wg wait.Group26 // 启动 Reflector27 wg.StartWithChannel(stopCh, r.Run)28 // 执行 Controller 的 processLoop29 wait.Until(c.processLoop, time.Second, stopCh)30 wg.Wait()31}
这里的逻辑很简单,构造 Reflector 后运行起来,然后执行 c.processLoop
,所以很明显,Controller 的业务逻辑肯定隐藏在 processLoop 方法里,我们继续来看。
client-go/tools/cache/controller.go:181
1func (c *controller) processLoop() { 2 for { 3 obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) 4 if err != nil { 5 if err == ErrFIFOClosed { 6 return 7 } 8 if c.config.RetryOnError { 9 c.config.Queue.AddIfNotPresent(obj)10 }11 }12 }13}
这里的逻辑是从 DeltaFIFO 中 Pop 出一个对象丢给 PopProcessFunc 处理,如果失败了就 re-enqueue 到 DeltaFIFO 中。我们前面提到过这里的 PopProcessFunc 实现是 HandleDeltas()
方法,所以这里的主要逻辑就转到了 HandleDeltas()
是如何实现的了。
这里我们先回顾下 DeltaFIFO 的存储结构,看下这个图:
然后再看源码,这里的逻辑主要是遍历一个 Deltas 里的所有 Delta,然后根据 Delta 的类型来决定如何操作 Indexer,也就是更新本地 cache,同时分发相应的通知。
client-go/tools/cache/shared_informer.go:537
1func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { 2 s.blockDeltas.Lock() 3 defer s.blockDeltas.Unlock() 4 // 对于每个 Deltas 来说,里面存了很多的 Delta,也就是对应不同 Type 的多个 Object,这里的遍历会从旧往新走 5 for _, d := range obj.(Deltas) { 6 switch d.Type { 7 // 除了 Deleted 外所有情况 8 case Sync, Replaced, Added, Updated: 9 // 记录变更,没有太多实际作用10 s.cacheMutationDetector.AddObject(d.Object)11 // 通过 indexer 从 cache 里查询当前 Object,如果存在12 if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {13 // 更新 indexer 里的对象14 if err := s.indexer.Update(d.Object); err != nil {15 return err16 }1718 isSync := false19 switch {20 case d.Type == Sync:21 isSync = true22 case d.Type == Replaced:23 if accessor, err := meta.Accessor(d.Object); err == nil {24 if oldAccessor, err := meta.Accessor(old); err == nil {25 isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()26 }27 }28 }29 // 分发一个更新通知30 s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)31 // 如果本地 cache 里没有这个 Object,则添加32 } else {33 if err := s.indexer.Add(d.Object); err != nil {34 return err35 }36 // 分发一个新增通知37 s.processor.distribute(addNotification{newObj: d.Object}, false)38 }39 // 如果是删除操作,则从 indexer 里删除这个 Object,然后分发一个删除通知40 case Deleted:41 if err := s.indexer.Delete(d.Object); err != nil {42 return err43 }44 s.processor.distribute(deleteNotification{oldObj: d.Object}, false)45 }46 }47 return nil48}
这里涉及到一个知识点:s.processor.distribute(addNotification{newObj: d.Object}, false)
中 processor 是什么?如何分发通知的?谁来接收通知?
我们回到 ProcessFunc 的实现上,除了 sharedIndexInformer 的 HandleDeltas()
方法外,还有一个基础版本:
client-go/tools/cache/controller.go:446
1 Process: func(obj interface{}) error { 2 for _, d := range obj.(Deltas) { 3 obj := d.Object 4 if transformer != nil { 5 var err error 6 obj, err = transformer(obj) 7 if err != nil { 8 return err 9 }10 }1112 switch d.Type {13 case Sync, Replaced, Added, Updated:14 if old, exists, err := clientState.Get(obj); err == nil && exists {15 if err := clientState.Update(obj); err != nil {16 return err17 }18 h.OnUpdate(old, obj)19 } else {20 if err := clientState.Add(obj); err != nil {21 return err22 }23 h.OnAdd(obj)24 }25 case Deleted:26 if err := clientState.Delete(obj); err != nil {27 return err28 }29 h.OnDelete(obj)30 }31 }32 return nil33 },
这里可以看到逻辑简单很多,除了更新 cache 外,调用了 h.OnAdd(obj)/h.OnUpdate(old, obj)/h.OnDelete(obj)
等方法,这里的 h 是 ResourceEventHandler 类型的,也就是 Process 过程直接调用了 ResourceEventHandler 的相应方法,这样就已经逻辑闭环了,ResourceEventHandler 的这几个方法里做一些简单的过滤后,会将这些对象的 key 丢到 workqueue,进而就触发了自定义调谐函数的运行。
换言之,sharedIndexInformer 中实现的 ProcessFunc 是一个进阶版本,不满足于简单调用 ResourceEventHandler 对应方法来完成 Process 逻辑,所以到这里基础的 Informer 逻辑已经闭环了,我们后面继续来看 sharedIndexInformer 中又对 Informer 做了哪些“增强”
我们在 Operator 开发中,如果不使用 controller-runtime 库,也就是不通过 Kubebuilder 等工具来生成脚手架时,经常会用到 SharedInformerFactory,比如典型的 sample-controller 中的 main() 函数:
sample-controller/main.go:40
1func main() { 2 klog.InitFlags(nil) 3 flag.Parse() 4 5 stopCh := signals.SetupSignalHandler() 6 7 cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig) 8 if err != nil { 9 klog.Fatalf("Error building kubeconfig: %s", err.Error())10 }1112 kubeClient, err := kubernetes.NewForConfig(cfg)13 if err != nil {14 klog.Fatalf("Error building kubernetes clientset: %s", err.Error())15 }1617 exampleClient, err := clientset.NewForConfig(cfg)18 if err != nil {19 klog.Fatalf("Error building example clientset: %s", err.Error())20 }2122 kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second30)23 exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second30)2425 controller := NewController(kubeClient, exampleClient,26 kubeInformerFactory.Apps().V1().Deployments(),27 exampleInformerFactory.Samplecontroller().V1alpha1().Foos())2829 kubeInformerFactory.Start(stopCh)30 exampleInformerFactory.Start(stopCh)3132 if err = controller.Run(2, stopCh); err != nil {33 klog.Fatalf("Error running controller: %s", err.Error())34 }35}
这里可以看到我们依赖于 kubeInformerFactory.Apps().V1().Deployments()
提供一个 Informer,这里的 Deployments()
方法返回的是一个 DeploymentInformer 类型,DeploymentInformer 是什么呢?如下
client-go/informers/apps/v1/deployment.go:37
1type DeploymentInformer interface {2 Informer() cache.SharedIndexInformer3 Lister() v1.DeploymentLister4}
可以看到所谓的 DeploymentInformer 由 “Informer” 和 “Lister” 组成,也就是说我们编码时用到的 Informer 本质就是一个 SharedIndexInformer
client-go/tools/cache/shared_informer.go:186
1type SharedIndexInformer interface {2 SharedInformer3 AddIndexers(indexers Indexers) error4 GetIndexer() Indexer5}
这里的 Indexer 就很熟悉了,SharedInformer 又是啥呢?
client-go/tools/cache/shared_informer.go:133
1type SharedInformer interface { 2 // 可以添加自定义的 ResourceEventHandler 3 AddEventHandler(handler ResourceEventHandler) 4 // 附带 resync 间隔配置,设置为 0 表示不关心 resync 5 AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) 6 // 这里的 Store 指的是 Indexer 7 GetStore() Store 8 // 过时了,没有用 9 GetController() Controller10 // 通过 Run 来启动11 Run(stopCh <-chan struct{})12 // 这里和 resync 逻辑没有关系,表示 Indexer 至少更新过一次全量的对象13 HasSynced() bool14 // 最后一次拿到的 RV15 LastSyncResourceVersion() string16 // 用于每次 ListAndWatch 连接断开时回调,主要就是日志记录的作用17 SetWatchErrorHandler(handler WatchErrorHandler) error18}
接下来就该看下 SharedIndexInformer 接口的实现了,sharedIndexerInformer 定义如下:
client-go/tools/cache/shared_informer.go:287
1type sharedIndexInformer struct { 2 indexer Indexer 3 controller Controller 4 processor *sharedProcessor 5 cacheMutationDetector MutationDetector 6 listerWatcher ListerWatcher 7 // 表示当前 Informer 期望关注的类型,主要是 GVK 信息 8 objectType runtime.Object 9 // reflector 的 resync 计时器计时间隔,通知所有的 listener 执行 resync10 resyncCheckPeriod time.Duration11 defaultEventHandlerResyncPeriod time.Duration12 clock clock.Clock13 started, stopped bool14 startedLock sync.Mutex15 blockDeltas sync.Mutex16 watchErrorHandler WatchErrorHandler17}
这里的 Indexer、Controller、ListerWatcher 等都是我们熟悉的组件,sharedProcessor 我们在前面遇到了,需要重点关注一下。
sharedProcessor 中维护了 processorListener 集合,然后分发通知对象到这些 listeners,先看下结构定义:
client-go/tools/cache/shared_informer.go:588
1type sharedProcessor struct {2 listenersStarted bool3 listenersLock sync.RWMutex4 listeners []processorListener5 syncingListeners []processorListener6 clock clock.Clock7 wg wait.Group8}
马上就会有一个疑问了,processorListener 是什么?
client-go/tools/cache/shared_informer.go:690
1type processorListener struct { 2 nextCh chan interface{} 3 addCh chan interface{} 4 // 核心属性 5 handler ResourceEventHandler 6 pendingNotifications buffer.RingGrowing 7 requestedResyncPeriod time.Duration 8 resyncPeriod time.Duration 9 nextResync time.Time10 resyncLock sync.Mutex11}
可以看到 processorListener 里有一个 ResourceEventHandler,这是我们认识的组件。processorListener 有三个主要方法:
add(notification interface{})
pop()
run()
一个个来看吧。
run()
client-go/tools/cache/shared_informer.go:775
1func (p processorListener) run() { 2 stopCh := make(chan struct{}) 3 wait.Until(func() { 4 for next := range p.nextCh { 5 switch notification := next.(type) { 6 case updateNotification: 7 p.handler.OnUpdate(notification.oldObj, notification.newObj) 8 case addNotification: 9 p.handler.OnAdd(notification.newObj)10 case deleteNotification:11 p.handler.OnDelete(notification.oldObj)12 default:13 utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))14 }15 }16 close(stopCh)17 }, 1time.Second, stopCh)18}
这里的逻辑很清晰,从 nextCh 里拿通知,然后根据其类型去调用 ResourceEventHandler 相应的 OnAdd/OnUpdate/OnDelete
方法。
add() 和 pop()
client-go/tools/cache/shared_informer.go:741
1func (p processorListener) add(notification interface{}) { 2 // 将通知放到 addCh 中,所以下面 pop() 方法里先执行到的 case 是第二个 3 p.addCh <- notification 4} 5 6func (p processorListener) pop() { 7 defer utilruntime.HandleCrash() 8 defer close(p.nextCh) // Tell .run() to stop 910 var nextCh chan<- interface{}11 var notification interface{}12 for {13 select {14 // 下面获取到的通知,添加到 nextCh 里,供 run() 方法中消费15 case nextCh <- notification:16 var ok bool17 // 从 pendingNotifications 里消费通知,生产者在下面 case 里18 notification, ok = p.pendingNotifications.ReadOne()19 if !ok {20 nextCh = nil21 }22 // 逻辑从这里开始,从 addCh 里提取通知23 case notificationToAdd, ok := <-p.addCh:24 if !ok {25 return26 }27 if notification == nil { 28 notification = notificationToAdd29 nextCh = p.nextCh30 } else {31 // 新添加的通知丢到 pendingNotifications32 p.pendingNotifications.WriteOne(notificationToAdd)33 }34 }35 }36}
也就是说 processorListener 提供了一定的缓冲机制来接收 notification,然后去消费这些 notification 调用 ResourceEventHandler 相关方法。
然后接着继续看 sharedProcessor 的几个主要方法。
addListener 会直接调用 listener 的 run()
和 pop()
方法,这两个方法的逻辑我们上面已经分析过
client-go/tools/cache/shared_informer.go:597
1func (p sharedProcessor) addListener(listener processorListener) { 2 p.listenersLock.Lock() 3 defer p.listenersLock.Unlock() 4 5 p.addListenerLocked(listener) 6 if p.listenersStarted { 7 p.wg.Start(listener.run) 8 p.wg.Start(listener.pop) 9 }10}
distribute 的逻辑就是调用 sharedProcessor 内部维护的所有 listner 的 add()
方法
client-go/tools/cache/shared_informer.go:613
1func (p *sharedProcessor) distribute(obj interface{}, sync bool) { 2 p.listenersLock.RLock() 3 defer p.listenersLock.RUnlock() 4 5 if sync { 6 for , listener := range p.syncingListeners { 7 listener.add(obj) 8 } 9 } else {10 for , listener := range p.listeners {11 listener.add(obj)12 }13 }14}
run()
的逻辑和前面的 addListener() 类似,也就是调用 listener 的 run()
和 pop()
方法
client-go/tools/cache/shared_informer.go:628
1func (p *sharedProcessor) run(stopCh <-chan struct{}) { 2 func() { 3 p.listenersLock.RLock() 4 defer p.listenersLock.RUnlock() 5 for , listener := range p.listeners { 6 p.wg.Start(listener.run) 7 p.wg.Start(listener.pop) 8 } 9 p.listenersStarted = true10 }()11 <-stopCh12 p.listenersLock.RLock()13 defer p.listenersLock.RUnlock()14 for , listener := range p.listeners {15 close(listener.addCh)16 }17 p.wg.Wait()18}
到这里基本就知道 sharedProcessor 的能力了,继续往下看。
继续来看 sharedIndexInformer 的 Run()
方法,这里面已经几乎没有陌生的内容了。
client-go/tools/cache/shared_informer.go:368
1func (s sharedIndexInformer) Run(stopCh <-chan struct{}) { 2 defer utilruntime.HandleCrash() 3 4 if s.HasStarted() { 5 klog.Warningf("The sharedIndexInformer has started, run more than once is not allowed") 6 return 7 } 8 // DeltaFIFO 就很熟悉了 9 fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{10 KnownObjects: s.indexer,11 EmitDeltaTypeReplaced: true,12 })13 // Config 的逻辑也在上面遇到过了14 cfg := &Config{15 Queue: fifo,16 ListerWatcher: s.listerWatcher,17 ObjectType: s.objectType,18 FullResyncPeriod: s.resyncCheckPeriod,19 RetryOnError: false,20 ShouldResync: s.processor.shouldResync,2122 Process: s.HandleDeltas,23 WatchErrorHandler: s.watchErrorHandler,24 }2526 func() {27 s.startedLock.Lock()28 defer s.startedLock.Unlock()29 // 前文分析过这里的 New() 函数逻辑了30 s.controller = New(cfg)31 s.controller.(controller).clock = s.clock32 s.started = true33 }()3435 processorStopCh := make(chan struct{})36 var wg wait.Group37 defer wg.Wait() 38 defer close(processorStopCh) 39 wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)40 // processor 的 run 方法41 wg.StartWithChannel(processorStopCh, s.processor.run)4243 defer func() {44 s.startedLock.Lock()45 defer s.startedLock.Unlock()46 s.stopped = true // Don't want any new listeners47 }()48 // controller 的 Run()49 s.controller.Run(stopCh)50}
到这里也就基本知道了 sharedIndexInformer 的逻辑了,再往上层走就剩下一个 SharedInformerFactory 了,继续看吧~
我们前面提到过 SharedInformerFactory,现在具体来看一下 SharedInformerFactory 是怎么实现的。先看接口定义:
client-go/informers/factory.go:187
1type SharedInformerFactory interface { 2 internalinterfaces.SharedInformerFactory 3 ForResource(resource schema.GroupVersionResource) (GenericInformer, error) 4 WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool 5 6 Admissionregistration() admissionregistration.Interface 7 Internal() apiserverinternal.Interface 8 Apps() apps.Interface 9 Autoscaling() autoscaling.Interface10 Batch() batch.Interface11 Certificates() certificates.Interface12 Coordination() coordination.Interface13 Core() core.Interface14 Discovery() discovery.Interface15 Events() events.Interface16 Extensions() extensions.Interface17 Flowcontrol() flowcontrol.Interface18 Networking() networking.Interface19 Node() node.Interface20 Policy() policy.Interface21 Rbac() rbac.Interface22 Scheduling() scheduling.Interface23 Storage() storage.Interface24}
这里涉及到几个点:
这也是一个接口,比较简短:
1type SharedInformerFactory interface {2 Start(stopCh <-chan struct{})3 InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer4}
可以看到熟悉的 SharedIndexInformer
这里接收一个 GVR,返回了一个 GenericInformer,看下什么是 GenericInformer:
1type GenericInformer interface {2 Informer() cache.SharedIndexInformer3 Lister() cache.GenericLister4}
也很简短。
后面一堆方法是类似的,我们以 Apps() 为例来看下怎么回事。这里的 Interface 定义如下:
client-go/informers/apps/interface.go:29
1type Interface interface {2 // V1 provides access to shared informers for resources in V1.3 V1() v1.Interface4 // V1beta1 provides access to shared informers for resources in V1beta1.5 V1beta1() v1beta1.Interface6 // V1beta2 provides access to shared informers for resources in V1beta2.7 V1beta2() v1beta2.Interface8}
显然应该继续看下 v1.Interface 是个啥。
client-go/informers/apps/v1/interface.go:26
1type Interface interface { 2 // ControllerRevisions returns a ControllerRevisionInformer. 3 ControllerRevisions() ControllerRevisionInformer 4 // DaemonSets returns a DaemonSetInformer. 5 DaemonSets() DaemonSetInformer 6 // Deployments returns a DeploymentInformer. 7 Deployments() DeploymentInformer 8 // ReplicaSets returns a ReplicaSetInformer. 9 ReplicaSets() ReplicaSetInformer10 // StatefulSets returns a StatefulSetInformer.11 StatefulSets() StatefulSetInformer12}
到这里已经有看着很眼熟的 Deployments() DeploymentInformer
之类的代码了,DeploymentInformer 我们刚才看过内部结构,长这样:
1type DeploymentInformer interface {2 Informer() cache.SharedIndexInformer3 Lister() v1.DeploymentLister4}
到这里也就不难理解 SharedInformerFactory 的作用了,它提供了所有 API group-version 的资源对应的 SharedIndexInformer,也就不难理解开头我们引用的 sample-controller 中的这行代码:
1kubeInformerFactory.Apps().V1().Deployments()
通过其可以拿到一个 Deployment 资源对应的 SharedIndexInformer。
继续看下 SharedInformerFactory 是如何创建的
client-go/informers/factory.go:96
1func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory {2 return NewSharedInformerFactoryWithOptions(client, defaultResync)3}
可以看到参数非常简单,主要是需要一个 Clientset,毕竟 ListerWatcher 的能力本质还是 client 提供的。
client-go/informers/factory.go:109
1func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options …SharedInformerOption) SharedInformerFactory { 2 factory := &sharedInformerFactory{ 3 client: client, 4 namespace: v1.NamespaceAll, // 空字符串 "" 5 defaultResync: defaultResync, 6 informers: make(map[reflect.Type]cache.SharedIndexInformer), // 可以存放不同类型的 SharedIndexInformer 7 startedInformers: make(map[reflect.Type]bool), 8 customResync: make(map[reflect.Type]time.Duration), 9 }1011 for _, opt := range options {12 factory = opt(factory)13 }1415 return factory16}
接着是如何启动
client-go/informers/factory.go:128
1func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) { 2 f.lock.Lock() 3 defer f.lock.Unlock() 4 5 for informerType, informer := range f.informers { 6 // 同类型只会调用一次,Run() 的逻辑我们前面介绍过了 7 if !f.startedInformers[informerType] { 8 go informer.Run(stopCh) 9 f.startedInformers[informerType] = true10 }11 }12}
今天我们一个基础 Informer - Controller 开始介绍,先分析了 Controller 的能力,也就是其通过构造 Reflector 并启动从而能够获取指定类型资源的“更新”事件,然后通过事件构造 Delta 放到 DeltaFIFO 中,进而在 processLoop 中从 DeltaFIFO 里 pop Deltas 来处理,一方面将对象通过 Indexer 同步到本地 cache,也就是一个 ThreadSafeStore,一方面调用 ProcessFunc 来处理这些 Delta。
然后 SharedIndexInformer 提供了构造 Controller 的能力,通过 HandleDeltas() 方法实现上面提到的 ProcessFunc,同时还引入了 sharedProcessor 在 HandleDeltas() 中用于事件通知的处理。sharedProcessor 分发事件通知的时候,接收方是内部继续抽象出来的 processorListener,在 processorListener 中完成了 ResourceEventHandler 具体回调函数的调用。
最后 SharedInformerFactory 又进一步封装了提供所有 api 资源对应的 SharedIndexInformer 的能力。也就是说一个 SharedIndexInformer 可以处理一种类型的资源,比如 Deployment 或者 Pod 等,而通过 SharedInformerFactory 可以轻松构造任意已知类型的 SharedIndexInformer。另外这里用到了 Clientset 提供的访问所有 api 资源的能力,通过其也就能够完整实现整套 Informer 逻辑了。
此前我们已经陆续分析了:
各种“组件”分工明确,最终汇聚在 “Informer” 里,实现了一套复杂而优雅的资源处理能力,至此自定义控制器涉及逻辑中 client-go 部分就基本分析完了!
【完整目录参见>>> 《深入理解 K8S 原理与实现》系列目录 <<<】
(转载请保留本文原始链接 https://www.danielhu.cn)
手机扫一扫
移动阅读更方便
你可能感兴趣的文章