浅析kubernetes中client-go structure01
阅读原文时间:2022年05月23日阅读:3

Introduction

从2016年8月起,Kubernetes官方提取了与Kubernetes相关的核心源代码,形成了一个独立的项目,即client-go,作为官方提供的go客户端。Kubernetes的部分代码也是基于这个项目的。

client-go 是kubernetes中广义的客户端基础库,在Kubernetes各个组件中或多或少都有使用其功能。。也就是说,client-go可以在kubernetes集群中添加、删除和查询资源对象(包括deployment、service、pod、ns等)。

在了解client-go前,还需要掌握一些概念

  • 在客户端验证 API
  • 使用证书和使用令牌,来验证客户端
  • kubernetes集群的访问模式

使用证书和令牌来验证客户端

在访问apiserver时,会对访问者进行鉴权,因为是https请求,在请求时是需要ca的,也可以使用 -k 使用insecure模式

$ curl --cacert /etc/kubernetes/pki/ca.crt https://10.0.0.4:6443/version
\{
  "major": "1",
  "minor": "18+",
  "gitVersion": "v1.18.20-dirty",
  "gitCommit": "1f3e19b7beb1cc0110255668c4238ed63dadb7ad",
  "gitTreeState": "dirty",
  "buildDate": "2022-05-17T12:45:14Z",
  "goVersion": "go1.16.15",
  "compiler": "gc",
  "platform": "linux/amd64"
}

$ curl -k https://10.0.0.4:6443/api/v1/namespace/default/pods/netbox
{
  "kind": "Status",
  "apiVersion": "v1",
  "metadata": {

  },
  "status": "Failure",
  "message": "namespace \"default\" is forbidden: User \"system:anonymous\" cannot get resource \"namespace/pods\" in API group \"\" at the cluster scope",
  "reason": "Forbidden",
  "details": {
    "name": "default",
    "kind": "namespace"
  },
  "code": 403
}

从错误中可以看出,该请求已通过身份验证,用户是 system:anonymous,但该用户未授权列出对应的资源。而上述请求只是忽略 curl 的https请求需要做的验证,而Kubernetes也有对应验证的机制,这个时候需要提供额外的身份信息来获得所需的访问权限。Kubernetes支持多种身份认证机制,ssl证书也是其中一种。

注:在Kubernetes中没有表示用户的资源。即kubernetes集群中,无法添加和创建。但由集群提供的有效证书的用户都视为允许的用户。Kubernetes从证书中的使用者CN和使用者可选名称中获得用户;然后,RBAC 判断用户是否有权限操作资源。从 Kubernetes1.4 开始,支持用户组,即证书中的O

可以使用 curl 的 --cert--key 指定用户的证书

curl --cacert /etc/kubernetes/pki/ca.crt  \
    --cert /etc/kubernetes/pki/apiserver-kubelet-client.crt \
    --key /etc/kubernetes/pki/apiserver-ubelet-client.key \
    https://10.0.0.4:6443/api/v1/namespaces/default/pods/netbox

使用serviceaccount验证客户端身份

使用一个serviceaccount JWT,获取一个SA的方式如下

kubectl get secrets \
$(kubectl get serviceaccounts/default -o jsonpath='{.secrets[0].name}')  -o jsonpath='{.data.token}' \
| base64 --decode

JWT=$(kubectl get secrets \
$(kubectl get serviceaccounts/default -o jsonpath='{.secrets[0].name}')  -o jsonpath='{.data.token}' \
| base64 --decode)

使用secret来访问API

curl --cacert /etc/kubernetes/pki/ca.crt \
    --header "Authorization: Bearer $JWT" \
    https://10.0.0.4:6443/apis/apps/v1/namespaces/default/deployments

Pod内部调用Kubernetes API

kubernete会将Kubernetes API地址通过环境变量提供给 Pod,可以通过命令看到

$ env|grep -i kuber
KUBERNETES_SERVICE_PORT=443
KUBERNETES_PORT=tcp://192.168.0.1:443
KUBERNETES_PORT_443_TCP_ADDR=192.168.0.1
KUBERNETES_PORT_443_TCP_PORT=443
KUBERNETES_PORT_443_TCP_PROTO=tcp
KUBERNETES_PORT_443_TCP=tcp://192.168.0.1:443
KUBERNETES_SERVICE_PORT_HTTPS=443
KUBERNETES_SERVICE_HOST=192.168.0.1

并且还会在将 Kubernetes CA和SA等信息放置在目录 /var/run/secrets/kubernetes.io/serviceaccount/,通过这些就可以从Pod内部访问API

cd /var/run/secrets/kubernetes.io/serviceaccount/

curl --cacert ca.crt --header "Authorization: Bearer $(cat token)" https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT/api/v1/namespaces/default/pods/netbox

Reference

Kubernetes API Reference Docs

关于client-go的模块

k8s.io/api

与Pods、ConfigMaps、Secrets和其他Kubernetes 对象所对应的数据结构都在,k8s.io/api,此包几乎没有算法,仅仅是数据机构,该模块有多达上千个用于描述Kubernetes中资源API的结构;通常被client,server,controller等其他的组件使用。

k8s.io/apimachinery

根据该库的描述文件可知,这个库是Server和Client中使用的Kubernetes API共享依赖库,也是kubernetes中更低一级的通用的数据结构。在我们构建自定义资源时,不需要为自定义结构创建属性,如 Kind, apiVersionname…,这些都是库 apimachinery 所提供的功能。

如,在包 k8s.io/apimachinery/pkg/apis/meta 定义了两个结构 TypeMetaObjectMeta;将这这两个结构嵌入自定义的结构中,可以以通用的方式兼容对象,如Kubernetes中的资源 Deplyment 也是这么完成的

通过图来了解Kubernetes的资源如何实现的

如在 k8s.io/apimachinery/pkg/runtime/interfaces.go 中定义了 interface,这个类为在schema中注册的API都需要实现这个结构

type Object interface {
    GetObjectKind() schema.ObjectKind
    DeepCopyObject() Object
}

非结构化数据

非结构化数据 Unstructured 是指在kubernete中允许将没有注册为Kubernetes API的对象,作为Json对象的方式进行操作,如,使用非结构化 Kubernetes 对象

desired := &unstructured.Unstructured{
        Object: map[string]interface{}{
            "apiVersion": "v1",
            "kind":       "ConfigMap",
            "metadata": map[string]interface{}{
                "namespace":    namespace,
                "generateName": "crud-dynamic-simple-",
            },
            "data": map[string]interface{}{
                "foo": "bar",
            },
        },
    }

非结构化数据的转换

k8s.io/apimachinery/pkg/runtime.UnstructuredConverter 中,也提供了将非结构化数据转换为Kubernetes API注册过的结构,参考如何将非结构化对象转换为Kubernetes Object

Reference

go types

install client-go

如何选择 client-go 的版本

​ 对于不同的kubernetes版本使用标签 v0.x.y 来表示对应的客户端版本。具体对应参考 client-go

​ 例如使用的kubernetes版本为 v1.18.20 则使用对应的标签 v0.x.y 来替换符合当前版本的客户端库。例如:

go get k8s.io/client-go@v0.18.10

官网中给出了client-go的兼容性矩阵,可以很明了的看出如何选择适用于自己kubernetes版本的对应的client-go

  • 表示 该版本的 client-go 与对应的 kubernetes版本功能完全一致
  • + client-go 具有 kubernetes apiserver中不具备的功能。
  • - Kubernetes apiserver 具有client-go 无法使用的功。

一般情况下,除了对应的版本号完全一致外,其他都存在 功能的+-

client-go 目录介绍

client-go的每一个目录都是一个go package

  • kubernetes 包含与Kubernetes API所通信的客户端集
  • discovery 用于发现kube-apiserver所支持的api
  • dynamic 包含了一个动态客户端,该客户端能够对kube-apiserver任意的API进行操作。
  • transport 提供了用于设置认证和启动链接的功能
  • tools/cache: 一些 low-level controller与一些数据结构如fifo,reflector等

structure of client-go

  • RestClient:是最基础的基础架构,其作用是将是使用了http包进行封装成RESTClient。位于rest 目录,RESTClient封装了资源URL的通用格式,例如Get()Put()Post() Delete()。是与Kubernetes API的访问行为提供的基于RESTful方法进行交互基础架构。

    • 同时支持Json 与 protobuf
    • 支持所有的原生资源和CRD
  • ClientSet:Clientset基于RestClient进行封装对 Resource 与 version 管理集合;如何创建

  • DiscoverySet:RestClient进行封装,可动态发现 kube-apiserver 所支持的 GVR(Group Version Resource);如何创建,这种类型是一种非映射至clientset的客户端

  • DynamicClient:基于RestClient,包含动态的客户端,可以对Kubernetes所支持的 API对象进行操作,包括CRD;如何创建

  • 仅支持json

  • fakeClientclient-go 实现的mock对象,主要用于单元测试。

以上client-go所提供的客户端,仅可使用kubeconfig进行连接。

什么是clientset

clientset代表了kubernetes中所有的资源类型,这里不包含CRD的资源,如:

  • core
  • extensions
  • batch

client-go使用

DynamicClient客户端

  • 与 ClientSet 的区别是,可以对任意 Kubernetes 资源进行 RESTful 操作。同样提供管理的方法

  • 最大的不同,ClientSet 需要预先实现每种 Resource 和 Version 的操作,内部的数据都是结构化数据(已知数据结构);DynamicClient 内部实现了 Unstructured,用于处理非结构化的数据(无法提前预知的数据结构),这是其可以处理 CRD 自定义资源的关键。

dynamicClient 实现流程

  • 通过 NewForConfig 实例化 conf 为 DynamicInterface客户端

  • DynamicInterface 客户端中,实现了一个Resource 方法即为实现了Interface接口

  • dynamicClient 实现了非结构化数据类型与rest client,可以通过其方法将Resource 由rest从apiserver中获得api对象,runtime.DeafultUnstructuredConverter.FromUnstructrued 转为对应的类型。


注意:GVR 中资源类型 resource为复数。kind:Pod 即为 Pods

package main

import (
    "context"
    "flag"
    "fmt"
    "os"

    v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/runtime/schema"
    "k8s.io/client-go/dynamic"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/client-go/util/homedir"
)

func main() {
    var (
        k8sconfig  *string //使用kubeconfig配置文件进行集群权限认证
        restConfig *rest.Config
        err        error
    )

    if home := homedir.HomeDir(); home != "" {
        k8sconfig = flag.String("kubeconfig", fmt.Sprintf("%s/.kube/config", home), "kubernetes auth config")
    }
    k8sconfig = k8sconfig
    flag.Parse()
    if _, err := os.Stat(*k8sconfig); err != nil {
        panic(err)
    }

    if restConfig, err = rest.InClusterConfig(); err != nil {
        // 这里是从masterUrl 或者 kubeconfig传入集群的信息,两者选一
        restConfig, err = clientcmd.BuildConfigFromFlags("", *k8sconfig)
        if err != nil {
            panic(err)
        }
    }
    // 创建客户端类型
    // NewForConfig creates a new dynamic client or returns an error.
    // dynamic.NewForConfig(restConfig)
    // NewForConfig creates a new Clientset for the given config
    // kubernetes.NewForConfig(restConfig)
    // NewDiscoveryClientForConfig creates a new DiscoveryClient for the given config.
    //clientset, err := discovery.NewDiscoveryClientForConfig(restConfig)
    dynamicset, err := dynamic.NewForConfig(restConfig)

    // 这里遵循的是 kubernetes Rest API,如Pod是
    // /api/v1/namespaces/{namespace}/pods
    // /apis/apps/v1/namespaces/{namespace}/deployments
    // 遵循GVR格式填写
    podList, err := dynamicset.Resource(schema.GroupVersionResource{
        Group:    "",
        Version:  "v1",
        Resource: "pods",
    }).Namespace("default").List(context.TODO(), v1.ListOptions{})
    if err != nil {
        panic(err)
    }

    daemonsetList, err := dynamicset.Resource(schema.GroupVersionResource{
        Group:    "apps",
        Version:  "v1",
        Resource: "daemonsets",
    }).Namespace("kube-system").List(context.TODO(), v1.ListOptions{})

    if err != nil {
        panic(err)
    }

    for _, row := range podList.Items {
        fmt.Println(row.GetName())
    }

    for _, row := range daemonsetList.Items {
        fmt.Println(row.GetName())
    }

    // clientset mode

    clientset, err := kubernetes.NewForConfig(restConfig)
    podIns, err := clientset.CoreV1().Pods("default").List(context.TODO(), v1.ListOptions{})
    for _, row := range podIns.Items {
        fmt.Println(row.GetName())
    }
}

Extension

一些client-go使用

informer是client-go提供的 Listwatcher 接口,主要作为 Controller构成的组件,在Kubernetes中, Controller的一个重要作用是观察对象的期望状态 spec 和实际状态 statue为了观察对象的状态,Controller需要向 Apiserver发送请求;但是通常情况下,频繁向Apiserver发出请求的会增加etcd的压力,为了解决这类问题,client-go 一个缓存,通过缓存,控制器可以不必发出大量请求,并且只关心对象的事件。也就是 informer。

从本质上来讲,informer是使用kubernetes API观察其变化,来维护状态的缓存,称为 indexer;并通过对应事件函数通知客户端信息的变化,informer为一系列组件,通过这些组件来实现的这些功能。

  • Reflector:与 apiserver交互的组件
  • Delta FIFO:一个特殊的队列,Reflector将状态的变化存储在里面
  • indexer:本地存储,与etcd保持一致,减轻API Server与etcd的压力
  • Processor:监听处理器,通过将监听到的事件发送给对应的监听函数
  • Controller:从队列中对整个数据的编排处理的过程

informer的工作模式

首先通过List从Kubernetes API中获取资源所有对象并同时缓存,然后通过Watch机制监控资源。这样,通过informer与缓存,就可以直接和informer交互,而不用每次都和Kubernetes API交互。

另外,informer 还提供了事件的处理机制,以便 Controller 或其他应用程序根据回调钩子函数等处理特定的业务逻辑。因为Informer可以通过List/Watch机制监控所有资源的所有事件,只要在Informer中添加ResourceEventHandler实例的回调函数,如:onadd(obj interface {}), onupdate (oldobj, newobj interface {})OnDelete( obj interface {}) 可以实现处理资源的创建、更新和删除。 在Kubernetes中,各种控制器都使用了Informer。

分析informer的流程

通过代码 k8s.io/client-go/informers/apps/v1/deployment.go 可以看出,在每个控制器下,都实现了一个 InformerLister ,Lister就是indexer;

type SharedInformer interface {
    // 添加一个事件处理函数,使用informer默认的resync period
    AddEventHandler(handler ResourceEventHandler)
    // 将事件处理函数注册到 share informer,将resyncPeriod作为参数传入
    AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
    // 从本地缓存获取的信息作为infomer的返回
    GetStore() Store
    // 已弃用
    GetController() Controller
    // 运行一个informer,当stopCh停止时,informer也被关闭
    Run(stopCh <-chan struct{})
    // HasSynced returns true if the shared informer's store has been
    // informed by at least one full LIST of the authoritative state
    // of the informer's object collection.  This is unrelated to "resync".
    HasSynced() bool
    // LastSyncResourceVersion is the resource version observed when last synced with the underlying store. The value returned is not synchronized with access to the underlying store and is not thread-safe.
    LastSyncResourceVersion() string
}

而 Shared Informer 对所有的API组提供一个shared informer

// SharedInformerFactory provides shared informers for resources in all known
// API group versions.
type SharedInformerFactory interface {
    internalinterfaces.SharedInformerFactory
    ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
    WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool

    Admissionregistration() admissionregistration.Interface
    Apps() apps.Interface
    Auditregistration() auditregistration.Interface
    Autoscaling() autoscaling.Interface
    Batch() batch.Interface
    Certificates() certificates.Interface
    Coordination() coordination.Interface
    Core() core.Interface
    Discovery() discovery.Interface
    Events() events.Interface
    Extensions() extensions.Interface
    Flowcontrol() flowcontrol.Interface
    Networking() networking.Interface
    Node() node.Interface
    Policy() policy.Interface
    Rbac() rbac.Interface
    Scheduling() scheduling.Interface
    Settings() settings.Interface
    Storage() storage.Interface
}

可以看到在 k8s.io/client-go/informers/apps/v1/deployment.go 实现了这个interface

type DeploymentInformer interface {
   Informer() cache.SharedIndexInformer
   Lister() v1.DeploymentLister
}

而在对应的 deployment controller中会调用这个Informer 实现对状态的监听;``

// NewDeploymentController creates a new DeploymentController.
//  appsinformers.DeploymentInformer就是client-go 中的 /apps/v1/deployment实现的informer
func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
    eventBroadcaster := record.NewBroadcaster()
    eventBroadcaster.StartLogging(klog.Infof)
    eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})

    if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
        if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("deployment_controller", client.CoreV1().RESTClient().GetRateLimiter()); err != nil {
            return nil, err
        }
    }
    dc := &DeploymentController{
        client:        client,
        eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}),
        queue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
    }
    dc.rsControl = controller.RealRSControl{
        KubeClient: client,
        Recorder:   dc.eventRecorder,
    }

    dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    dc.addDeployment,
        UpdateFunc: dc.updateDeployment,
        // This will enter the sync loop and no-op, because the deployment has been deleted from the store.
        DeleteFunc: dc.deleteDeployment,
    })
    rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    dc.addReplicaSet,
        UpdateFunc: dc.updateReplicaSet,
        DeleteFunc: dc.deleteReplicaSet,
    })
    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        DeleteFunc: dc.deletePod,
    })

    dc.syncHandler = dc.syncDeployment
    dc.enqueueDeployment = dc.enqueue

    dc.dLister = dInformer.Lister()
    dc.rsLister = rsInformer.Lister()
    dc.podLister = podInformer.Lister()
    dc.dListerSynced = dInformer.Informer().HasSynced
    dc.rsListerSynced = rsInformer.Informer().HasSynced
    dc.podListerSynced = podInformer.Informer().HasSynced
    return dc, nil
}

reflector是client-go中负责监听 Kubernetes API 的组件,也是整个机制中的生产者,负责将 watch到的数据将其放入 watchHandler 中的delta FIFO队列中。也就是吧etcd的数据反射为 delta fifo的数据

在代码 k8s.io/client-go/tools/cache/reflector.go 中定义了 Reflector 对象

type Reflector struct {
    // reflector的名称,默认为一个 file:line的格式
    name string
    // 期待的类型名称,这里只做展示用,
    // 如果提供,是一个expectedGVK字符串类型,否则是expectedType字符串类型
    expectedTypeName string
    // 期待放置在存储中的类型,如果是一个非格式化数据,那么其 APIVersion与Kind也必须为正确的格式
    expectedType reflect.Type
    // GVK 存储中的对象,是GVK格式
    expectedGVK *schema.GroupVersionKind
    // 同步数据的存储
    store Store
    // 这个是reflector的一个核心,提供了 List和Watch功能
    listerWatcher ListerWatcher

    // backoff manages backoff of ListWatch
    backoffManager wait.BackoffManager

    resyncPeriod time.Duration

    ShouldResync func() bool
    // clock allows tests to manipulate time
    clock clock.Clock

    paginatedResult bool
    // 最后资源的版本号
    lastSyncResourceVersion string
    // 当 lastSyncResourceVersion 过期或者版本太大,这个值将为 true
    isLastSyncResourceVersionUnavailable bool
    // 读写锁,对lastSyncResourceVersion的读写操作的保护
    lastSyncResourceVersionMutex sync.RWMutex
    // WatchListPageSize is the requested chunk size of initial and resync watch lists.
    // scalability problems.
    // 是初始化时,或者重新同步时的块大小。如果没有设置,将为任意的旧数据
    // 因为是提供了分页功能,RV=0则为默认的页面大小
    //
    WatchListPageSize int64
}

而 方法 NewReflector() 给用户提供了一个初始化 Reflector的接口

在 cotroller.go 中会初始化一个 relector

func (c *controller) Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    go func() {
        <-stopCh
        c.config.Queue.Close()
    }()
    r := NewReflector(
        c.config.ListerWatcher,
        c.config.ObjectType,
        c.config.Queue,
        c.config.FullResyncPeriod,
    )

Reflector下有三个可对用户提供的方法,Run(), ListAndWatch() , LastSyncResourceVersion()

Run() 是对Reflector的运行,也就是对 ListAndWatch()

func (r *Reflector) Run(stopCh <-chan struct{}) {
    klog.V(2).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
    wait.BackoffUntil(func() {
        if err := r.ListAndWatch(stopCh); err != nil {
            utilruntime.HandleError(err)
        }
    }, r.backoffManager, true, stopCh)
    klog.V(2).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}

ListAndWatch() 则是实际上真实的对Reflector业务的执行

// 前面一些都是对信息的初始化与日志输出
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
    klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
    var resourceVersion string

    options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
    // 分页功能
    if err := func() error {
        initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
        defer initTrace.LogIfLong(10 * time.Second)
        var list runtime.Object
        var paginatedResult bool
        var err error
        listCh := make(chan struct{}, 1)
        panicCh := make(chan interface{}, 1)
        go func() {
            ....
    // 清理和重新同步的一些
    resyncerrc := make(chan error, 1)
    cancelCh := make(chan struct{})
    defer close(cancelCh)
    go func() {
        ...
    }()

    for {
        // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
        select {
        case <-stopCh:
            return nil
        default:
        }

        timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
        options = metav1.ListOptions{
            ResourceVersion: resourceVersion,
            // 为了避免watch的挂起设置一个超时
            // 仅在工作窗口期,处理任何时间
            TimeoutSeconds: &timeoutSeconds,
            // To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
            // Reflector doesn't assume bookmarks are returned at all (if the server do not support
            // watch bookmarks, it will ignore this field).
            AllowWatchBookmarks: true,
        }

        start := r.clock.Now()
        // 开始监听
        w, err := r.listerWatcher.Watch(options)
        if err != nil {
            switch {
            case isExpiredError(err):
                // 没有设置 LastSyncResourceVersionExpired 也就是过期,会保持与返回数据相同的
                // 首次会先将RV列出
                klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
            case err == io.EOF:
                // 通常为watch关闭
            case err == io.ErrUnexpectedEOF:
                klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err)
            default:
                utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err))
            }
            // 如果出现 connection refuse,通常与apisserver通讯失败,这个时候会重新发送请求
            if utilnet.IsConnectionRefused(err) {
                time.Sleep(time.Second)
                continue
            }
            return nil
        }

        if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
            if err != errorStopRequested {
                switch {
                case isExpiredError(err):
                    // 同上步骤的功能
                    klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
                default:
                    klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
                }
            }
            return nil
        }
    }
}

那么在实现时,如 deploymentinformer,会实现 Listfunc和 watchfunc,这其实就是clientset中的操作方法,也是就list与watch

func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
    return cache.NewSharedIndexInformer(
        &cache.ListWatch{
            ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                return client.AppsV1().Deployments(namespace).List(context.TODO(), options)
            },
            WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                return client.AppsV1().Deployments(namespace).Watch(context.TODO(), options)
            },
        },
        &appsv1.Deployment{},
        resyncPeriod,
        indexers,
    )
}

tools/cache/controller.go 是存储controller的配置及实现。

type Config struct {
    Queue // 对象的队列,必须为DeltaFIFO
    ListerWatcher // 这里能够监视并列出对象的一些信息,这个对象接受process函数的弹出
    // Something that can process a popped Deltas.
    Process ProcessFunc // 处理Delta的弹出
    // 对象类型,这个controller期待的处理类型,其apiServer与kind必须正确,即,GVR必须正确
    ObjectType runtime.Object
        // FullResyncPeriod是每次重新同步的时间间隔
    FullResyncPeriod time.Duration
        // type ShouldResyncFunc func() bool
    // 返回值nil或true,则表示reflector继续同步
    ShouldResync ShouldResyncFunc
    RetryOnError bool // 标志位,true时,在process()返回错误时重新排列对象
    // Called whenever the ListAndWatch drops the connection with an error.
    // 断开连接是出现错误调用这个函数处理
    WatchErrorHandler WatchErrorHandler
    // WatchListPageSize is the requested chunk size of initial and relist watch lists.
    WatchListPageSize int64
}

实现这个接口

type controller struct {
    config         Config
    reflector      *Reflector
    reflectorMutex sync.RWMutex
    clock          clock.Clock
}

New() 为给定controller 配置的设置,即为上面的config struct,用来初始化controller对象

NewInformer() :返回一个store(保存数据的最终接口)和一个用于store的controller,同时提供事件的通知(crud)等

NewIndexerInformer():返回一个索引与一个用于索引填充的控制器

控制器的run()的功能实现

func (c *controller) Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash() // 延迟销毁
    go func() {  // 信号处理,用于线程管理
        <-stopCh
        c.config.Queue.Close()
    }()
    r := NewReflector(  // 初始化Reflector
        c.config.ListerWatcher, // ls
        c.config.ObjectType,
        c.config.Queue,
        c.config.FullResyncPeriod,
    )
    r.ShouldResync = c.config.ShouldResync // 配置是否应该继续同步
    r.WatchListPageSize = c.config.WatchListPageSize
    r.clock = c.clock
    if c.config.WatchErrorHandler != nil { // 断开连接错误处理
        r.watchErrorHandler = c.config.WatchErrorHandler
    }

    c.reflectorMutex.Lock()
    c.reflector = r
    c.reflectorMutex.Unlock()

    var wg wait.Group

    wg.StartWithChannel(stopCh, r.Run) // 这里是真正的运行。
    // processLoop() 是DeltaFIFO的消费者方法
    wait.Until(c.processLoop, time.Second, stopCh) // 消费队列的数据
    wg.Wait()
}

总结

在controller的初始化时就初始化了Reflector, controller.Run里面Reflector是结构体初始化时的Reflector,主要作用是watch指定的资源,并且将变化同步到本地的store中。

Reflector接着执行ListAndWatch函数,ListAndWatch第一次会列出所有的对象,并获取资源对象的版本号,然后watch资源对象的版本号来查看是否有被变更。首先会将资源版本号设置为0,list()可能会导致本地的缓存相对于etcd里面的内容存在延迟,Reflector会通过watch的方法将延迟的部分补充上,使得本地缓存数据与etcd的数据保持一致。

controller.Run函数还会调用processLoop函数,processLoop通过调用HandleDeltas,再调用distribute,processorListener.add最终将不同更新类型的对象加入processorListener的channel中,供processorListener.Run使用。

通过下图可以看出,Delta FIFO 是位于Reflector中的一个FIFO队列,那么 Delta FIFO 究竟是什么,让我们来进一步深剖。

图源于:https://miro.medium.com/max/700/1*iI8uFsPRBY5m_g_WW4huMQ.png

在代码中的注释可以看到一些信息,根据信息可以总结出

  • Delta FIFO 是一个生产者-消费者的队列,生产者是 Reflector,消费者是 Pop()
  • 与传统的FIFO有两点不同
    • Delta FIFO

Delta FIFO也是实现了 Queue以及一些其他 interface 的类,

type DeltaFIFO struct {
    lock sync.RWMutex  // 一个读写锁,保证线程安全
    cond sync.Cond
    items map[string]Deltas // 存放的类型是一个key[string] =》 value[Delta] 类型的数据
    queue []string  // 用于存储item的key,是一个fifo
    populated bool // populated 是用来标记首次被加入的数据是否被变动
    initialPopulationCount int // 首次调用 replace() 的数量
    keyFunc KeyFunc
    knownObjects KeyListerGetter // 这里为indexer
    closed     bool       // 代表已关闭
    closedLock sync.Mutex
    emitDeltaTypeReplaced bool // 表示事件的类型,true为 replace(), false 为 sync()
}

那么delta的类型是,也就是说通常情况下,Delta为一个 string[runtime.object] 的对象

type Delta struct {
    Type   DeltaType // 这就是一个string
    Object interface{} // 之前API部分有了解到,API的类型大致为两类,runtime.Object和非结构化数据
}

apimachinery/pkg/runtime/interfaces.go

那么此时,已经明白了Delta FIFO的结构,为一个Delta的队列,整个结构如下

第一步创建一个Delta FIFO

现在版本中,对创建Delta FIFO是通过函数 NewDeltaFIFOWithOptions()

func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
    if opts.KeyFunction == nil {
        opts.KeyFunction = MetaNamespaceKeyFunc // 默认的计算key的方法
    }
    f := &DeltaFIFO{
        items:        map[string]Deltas{},
        queue:        []string{},
        keyFunc:      opts.KeyFunction,
        knownObjects: opts.KnownObjects,

        emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
    }
    f.cond.L = &f.lock
    return f
}

queueActionLocked,Delta FIFO添加操作

这里说下之前说道的,在追加时的操作 queueActionLocked ,如add update delete实际上走的都是这里

func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
    id, err := f.KeyOf(obj) // 计算key
    if err != nil {
        return KeyError{obj, err}
    }
    // 把新数据添加到DeltaFIFO中,Detal就是 动作为key,对象为值
    // item是DeltaFIFO中维护的一个 map[string]Deltas
    newDeltas := append(f.items[id], Delta{actionType, obj})
    newDeltas = dedupDeltas(newDeltas) // 去重,去重我们前面讨论过了

    if len(newDeltas) > 0 {
        if _, exists := f.items[id]; !exists {
            f.queue = append(f.queue, id)
        } // 不存在则添加
        f.items[id] = newDeltas
        f.cond.Broadcast()
    } else {
        delete(f.items, id) // 这里走不到,因为添加更新等操作用newDelta是1
        // 源码中也说要忽略这里
    }
    return nil
}

在FIFO继承的Stroe的方法中,如,Add, Update等都是需要去重的,去重的操作是通过对比最后一个和倒数第二个值

func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
    id, err := f.KeyOf(obj)
    if err != nil {
        return KeyError{obj, err}
    }

    newDeltas := append(f.items[id], Delta{actionType, obj})
    newDeltas = dedupDeltas(newDeltas)
...

在函数 dedupDeltas() 中实现的这个

// re-listing and watching can deliver the same update multiple times in any
order. This will combine the most recent two deltas if they are the same.
func dedupDeltas(deltas Deltas) Deltas {
    n := len(deltas)
    if n < 2 {
        return deltas
    }
    a := &deltas[n-1] // 如 [1,2,3,4] a=4
    b := &deltas[n-2] // b=3,这里两个值其实为事件
    if out := isDup(a, b); out != nil {
        d := append(Deltas{}, deltas[:n-2]...)
        return append(d, *out)
    }
    return deltas
}

如果b对象的类型是 DeletedFinalStateUnknown 也会认为是一个旧对象被删除,这里在去重时也只是对删除的操作进行去重。

// tools/cache/delta_fifo.go
func isDup(a, b *Delta) *Delta {
    if out := isDeletionDup(a, b); out != nil {
        return out
    }
    // TODO: Detect other duplicate situations? Are there any?
    return nil
}
// keep the one with the most information if both are deletions.
func isDeletionDup(a, b *Delta) *Delta {
    if b.Type != Deleted || a.Type != Deleted {
        return nil
    }
    // Do more sophisticated checks, or is this sufficient?
    if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
        return a
    }
    return b
}

为什么需要去重?什么情况下需合并

代码中开发者给我们留了一个TODO

TODO: is there anything other than deletions that need deduping?

  • 取决于Detal FIFO 生产-消费延迟

    • 当在一个资源的创建时,其状态会频繁的更新,如 Creating,Runinng等,这个时候会出现大量写入FIFO中的数据,但是在消费端可能之前的并未消费完。
    • 在上面那种情况下,以及Kubernetes 声明式 API 的设计,其实多余的根本不关注,只需要最后一个动作如Running,这种情况下,多个内容可以合并为一个步骤
  • 然而在代码中,去重仅仅是在Delete状态生效,显然这不可用;那么结合这些得到:

    • 在一个工作时间窗口内,如果对于删除操作来说发生多次,与发生一次实际上没什么区别,可以去重
    • 但在更新于新增操作时,实际上在对于声明式 API 的设计个人感觉是完全可以做到去重操作。
      • 同一个时间窗口内多次操作,如更新,实际上Kubernetes应该只关注最终状态而不是命令式?

Compute Key

上面大概对一些Detal FIFO的逻辑进行了分析,那么对于Detal FIFO如何去计算,也就是说 MetaNamespaceKeyFunc ,这个是默认的KeyFunc,作用是计算Detals中的唯一key。

func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
    if key, ok := obj.(ExplicitKey); ok {  // 显示声明的则为这个值
        return string(key), nil
    }
    meta, err := meta.Accessor(obj) // 那么使用Accessor,每一个资源都会实现这个Accessor
    if err != nil {
        return "", fmt.Errorf("object has no meta: %v", err)
    }
    if len(meta.GetNamespace()) > 0 {
        return meta.GetNamespace() + "/" + meta.GetName(), nil
    }
    return meta.GetName(), nil
}

ObjectMetaAccessor 每个Kubernetes资源都会实现这个对象,如Deployment

// accessor interface
type ObjectMetaAccessor interface {
    GetObjectMeta() Object
}

// 会被ObjectMeta所实现
func (obj *ObjectMeta) GetObjectMeta() Object { return obj }
// 而每一个资源都会继承这个 ObjectMeta,如 ClusterRole

type ClusterRole struct {
    metav1.TypeMeta `json:",inline"`
    metav1.ObjectMeta `json:"metadata,omitempty"protobuf:"bytes,1,opt,name=metadata"`

那么这个Deltas的key则为集群类型的是资源本身的名字,namespace范围的则为 meta.GetNamespace() + "/" + meta.GetName(),可以在上面代码中看到,这样就可以给Detal生成了一个唯一的key

keyof,用于计算对象的key

func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
    if d, ok := obj.(Deltas); ok {
        if len(d) == 0 { // 长度为0的时候是一个初始的类型
            return "", KeyError{obj, ErrZeroLengthDeltasObject}
        }
        obj = d.Newest().Object // 用最新的一个对象,如果为空则是nil
    }
    if d, ok := obj.(DeletedFinalStateUnknown); ok {
        return d.Key, nil // 到了这里,之前提到过,是一个过期的值将会被删除
    }
    return f.keyFunc(obj) // 调用具体的key计算函数
}

indexer 在整个 client-go 架构中提供了一个具有线程安全的数据存储的对象存储功能;对于Indexer这里会分析下对应的架构及使用方法。

client-go/tools/cache/index.go 中可以看到 indexer是一个实现了Store 的一个interface

type Indexer interface {
    // 继承了store,拥有store的所有方法
    Store
    // 返回indexname的obj的交集
    Index(indexName string, obj interface{}) ([]interface{}, error)
    // 通过对 indexName,indexedValue与之相匹配的集合
    IndexKeys(indexName, indexedValue string) ([]string, error)
    // 给定一个indexName 返回所有的indexed
    ListIndexFuncValues(indexName string) []string
    // 通过indexname,返回与indexedvalue相关的 obj
    ByIndex(indexName, indexedValue string) ([]interface{}, error)
    // 返回所有的indexer
    GetIndexers() Indexers
    AddIndexers(newIndexers Indexers) error
}

实际上对他的实现是一个 cache,cache是一个KeyFunc与ThreadSafeStore实现的indexer,有名称可知具有线程安全的功能

type cache struct {
    cacheStorage ThreadSafeStore
    keyFunc KeyFunc
}

既然index继承了Store那么,也就是 ThreadSafeStore 必然实现了Store,这是一个基础保证

type ThreadSafeStore interface {
    Add(key string, obj interface{})
    Update(key string, obj interface{})
    Delete(key string)
    Get(key string) (item interface{}, exists bool)
    List() []interface{}
    ListKeys() []string
    Replace(map[string]interface{}, string)
    Index(indexName string, obj interface{}) ([]interface{}, error)
    IndexKeys(indexName, indexKey string) ([]string, error)
    ListIndexFuncValues(name string) []string
    ByIndex(indexName, indexKey string) ([]interface{}, error)
    GetIndexers() Indexers
    AddIndexers(newIndexers Indexers) error
    Resync() error // Resync is a no-op and is deprecated
}
// KeyFunc是一个生成key的函数,给一个对象,返回一个key值
type KeyFunc func(obj interface{}) (string, error)

那么这个indexer structure可以通过图来很直观的看出来

cache的结构

cache中会出现三种数据结构,也可以成为三种名词,为 Index , Indexers , Indices

type Index map[string]sets.String
type Indexers map[string]IndexFunc
type Indices map[string]Index

可以看出:

  • Index 映射到对象,sets.String 也是在API中定义的数据类型 [string]Struct{}
  • Indexers 是这个 IndexIndexFunc , 是一个如何计算Index的keyname的函数
  • Indices 通过Index 名词拿到对应的对象

这个名词的概念如下,通过图来了解会更加清晰

从创建开始

创建一个cache有两种方式,一种是指定indexer,一种是默认indexer

// NewStore returns a Store implemented simply with a map and a lock.
func NewStore(keyFunc KeyFunc) Store {
    return &cache{
        cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
        keyFunc:      keyFunc,
    }
}

// NewIndexer returns an Indexer implemented simply with a map and a lock.
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
    return &cache{
        cacheStorage: NewThreadSafeStore(indexers, Indices{}),
        keyFunc:      keyFunc,
    }
}

更新操作

在indexer中的更新操作(诸如 add , update ),实际上操作的是 updateIndices, 通过在代码可以看出

tools/cache/thread_safe_store.go 的 77行起,那么就来看下 updateIndices() 具体做了什么

func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
    // 在操作时,如果有旧对象,需要先删除
    if oldObj != nil {
        c.deleteFromIndices(oldObj, key)
    }
    // 先对整个indexer遍历,拿到index name与 index function
    for name, indexFunc := range c.indexers {
        // 通过index function,计算出对象的indexed name
        indexValues, err := indexFunc(newObj)
        if err != nil {
            panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
        }
        // 接下来通过遍历的index name 拿到这个index的对象
        index := c.indices[name]
        if index == nil { // 确认这个index是否存在,
            index = Index{} // 如果不存在将一个Index{}初始化
            c.indices[name] = index
        }
        // 通过计算出的indexed name来拿到对应的 set of object
        for _, indexValue := range indexValues {
            set := index[indexValue]
            if set == nil {
                // 如果这个set不存在,则初始化这个set
                set = sets.String{}
                index[indexValue] = set
            }
            set.Insert(key) // 然后将key插入set中
        }
    }
}

那么通过上面可以了解到了 updateIndices 的逻辑,那么通过对更新函数分析来看看他具体做了什么?这里是add函数,通过一段代码模拟操作来熟悉结构

testIndexer := "testIndexer"
testIndex := "testIndex"

indexers := cache.Indexers{
    testIndexer: func(obj interface{}) (strings []string, e error) {
        indexes := []string{testIndex} // index的名词
        return indexes, nil
    },
}

indices := cache.Indices{}
store := cache.NewThreadSafeStore(indexers, indices)

fmt.Printf("%#v\n", store.GetIndexers())

store.Add("retain", "pod--1")
store.Add("delete", "pod--2")
store.Update("retain", "pod-3")
//lists := store.Update("retain", "pod-3")
lists := store.List()
for _, item := range lists {
    fmt.Println(item)
}

这里是对add操作以及对updateIndices() 进行操作

// threadSafe.go
func (c *threadSafeMap) Add(key string, obj interface{}) {
    c.lock.Lock()
    defer c.lock.Unlock()
    oldObject := c.items[key] // 这个item就是存储object的地方, 为空
    c.items[key] = obj // 这里已经添加了新的值
    c.updateIndices(oldObject, obj, key) // 转至updateIndices
}

// updateIndices
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
    // 就当是新创建的,这里是空的忽略
    if oldObj != nil {
        c.deleteFromIndices(oldObj, key)
    }
    // 这个时候拿到的就是 name=testKey function=testIndexer
    for name, indexFunc := range c.indexers {
        // 通过testIndexer对testKey计算出的结果是 []string{testIndexer}
        indexValues, err := indexFunc(newObj)
        if err != nil {
            panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
        }
        index := c.indices[name]
        if index == nil {
            index = Index{}
            // 因为假设为空了,故到这里c.indices[testIndexer]= Index{}
            c.indices[name] = index
        }
        for _, indexValue := range indexValues {
            // indexValue=testIndexer
            // set := c.index[name] = c.indices[testIndexer]Index{}
            set := index[indexValue]
            if set == nil {
                set = sets.String{}
                index[indexValue] = set
            }
            set.Insert(key) // 到这里就为set=indices[testIndexer]Index{}
        }
    }
}

总结一下,到这里,可以很明显的看出来,indexer中的三个概念是什么了,前面如果没有看明白话

  • Index:通过indexer计算出key的名称,值为对应obj的一个集合,可以理解为索引的数据结构

    • 比如说 Pod:{"nginx-pod1": v1.Pod{Name:Nginx}}
  • Indexers :这个很简单,就是,对于Index中如何计算每个key的名称;可以理解为分词器,索引的过程

  • Indices 通过Index 名词拿到对应的对象,是Index的集合;是将原始数据Item做了一个索引,可以理解为做索引的具体字段

    • 比如说 Indices["Pod"]{"nginx-pod1": v1.Pod{Name:Nginx}, "nginx-pod2": v1.Pod{Name:Nginx}}
  • Items:实际上存储的在Indices中的set.String{key:value} ,中的 key=value

    • 例如:Item:{"nginx-pod1": v1.Pod{Name:Nginx}, "coredns-depoyment": App.Deployment{Name:coredns}}

删除操作

对于删除操作,在最新版本中是使用了 updateIndices 就是 add update delete全都是相同的方法操作,对于旧版包含1.19- 是单独的一个操作

// v1.2+
func (c *threadSafeMap) Delete(key string) {
    c.lock.Lock()
    defer c.lock.Unlock()
    if obj, exists := c.items[key]; exists {
        c.updateIndices(obj, nil, key)
        delete(c.items, key)
    }
}
// v1.19-
func (c *threadSafeMap) Delete(key string) {
    c.lock.Lock()
    defer c.lock.Unlock()
    if obj, exists := c.items[key]; exists {
        c.deleteFromIndices(obj, key)
        delete(c.items, key)
    }
}

indexer使用

上面了解了indexer概念,可以通过写代码来尝试使用一些indexer

package main

import (
    "fmt"

    appsV1 "k8s.io/api/apps/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/tools/cache"
)

func main() {

    indexers := cache.Indexers{
        "getDeplyment": func(obj interface{}) (strings []string, e error) {
            d, ok := obj.(*appsV1.Deployment)
            if !ok {
                return []string{}, nil
            }
            return []string{d.Name}, nil
        },
        "getDaemonset": func(obj interface{}) (strings []string, e error) {
            d, ok := obj.(*appsV1.DaemonSet)
            if !ok {
                return []string{}, nil
            }
            return []string{d.Name}, nil
        },
    }

    // 第一个参数是计算set内的key的名称 就是map[string]sets.String的这个strings的名称/namespace/resorcename
    // 第二个参数是计算index即外部的key的名称
    indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, indexers)

    deployment := &appsV1.Deployment{
        ObjectMeta: metav1.ObjectMeta{
            Name:      "nginx-deplyment",
            Namespace: "test",
        },
    }

    daemonset := &appsV1.DaemonSet{
        ObjectMeta: metav1.ObjectMeta{
            Name:      "firewall-daemonset",
            Namespace: "test",
        },
    }

    daemonset2 := &appsV1.DaemonSet{
        ObjectMeta: metav1.ObjectMeta{
            Name:      "etcd-daemonset",
            Namespace: "default",
        },
    }

    indexer.Add(deployment)
    indexer.Add(daemonset)
    indexer.Add(daemonset2)

    // 第一个参数是索引器
    // 第二个参数是所引起做索引的字段
    lists, _ := indexer.ByIndex("getDaemonset", "etcd-daemonset")
    for _, item := range lists {
        switch item.(type) {
        case *appsV1.Deployment:
            fmt.Println(item.(*appsV1.Deployment).Name)
        case *appsV1.DaemonSet:
            fmt.Println(item.(*appsV1.DaemonSet).Name)
        }
    }
}