Kubernetes CRD 开发实践

2020.03.08

背景

Kubernetes的最大亮点之一必定是它的声明式API设计,所谓的声明式就是告诉kubernetes你要什么,而不是告诉它怎么做命令。我们日常使用kubernetes做编排工作的时候,经常会接触 DeploymentServicePod 等资源对象,我们可以很灵活地创建其定义配置,然后执行 kubectl apply 命令,kubernetes总能为我们创建相关资源对象并完成资源的注册,进而执行资源所负责的功能。

但是我们有没想过,日常业务开发过程中,虽然常规的资源基本满足需求,但是这些常规的资源大多仅仅是代表相对底层、通用的概念的对象, 某些情况下我们总是想根据业务定制我们的资源类型,并且利用kubernetes的声明式API,对资源的增删改查进行监听并作出具体的业务功能。随着Kubernetes生态系统的持续发展,越来越多高层次的对象将会不断涌现,比起目前使用的对象,新对象将更加专业化。

有了自定义资源定义API,开发者将不需要逐一进行 Deployment、Service、ConfigMap 等步骤,而是创建并关联一些用于表述整个应用程序或者软件服务的对象。除此,我们能使用自定义的高阶对象,并在这些高阶对象的基础上创建底层对象。例如:我们想要一个Backup资源,我们创建它的对象时,就希望通过spec的定义进行日常的备份操作声明,当提交给k8s集群的时候,相关的Deployment、Service资源会被自动创建,很大程度让业务扩展性加大。

Kubernetes1.7 之前,要实现类似的自定义资源,需要通过 TPR(ThirdPartyResource ) 对象方式定义自定义资源,但因为这种方式十分复杂,一度并不被人重视。到了 Kubernetes1.8 版本,TPR开始被停用,被官方推荐的 CRD(CustomResourceDefinitions)所取代

CRD,称之为自定义资源定义,本质上,它的表现形式是一段声明,用于定义用户定义的资源对象罢了。单单通过它还不能产生任何收益,因为开发者还要针对CRD定义提供关联的CRD对象CRD控制器(CRD Controller)。CRD控制器通常可以通过Golang进行开发,只需要遵循Kubernetes的控制器开发规范,并基于client-go进行调用,并实现 Informer、ResourceEventHandler、Workqueue等组件逻辑即可。听起来感觉很复杂的样子,不过其实真正开发的时候,并不困难,因为大部分繁琐的代码逻辑都能通过k8s的code generator代码生成出来。关于如何进行CRD控制器的开发,下面我们会通过一个例子慢慢地深入,希望通过实践来理解CRD的原理。

CRD定义范例

与其他资源对象一样,对CRD的定义也使用YAML配置进行声明。下面先来看下本文的Demo例子的CRD定义

本文的 crddemo 源码为:https://github.com/domac/crddemo

mydemo.yaml

apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
  name: mydemos.crddemo.k8s.io
spec:
  group: crddemo.k8s.io
  version: v1
  names:
    kind: Mydemo
    plural: mydemos
  scope: Namespaced

CRD定义中的关键字段如下:

api url

  • group:设置API所属的组,将其映射为API URL中的 “/apis/” 下一级目录。它是逻辑上相关的Kinds集合

  • scope:该API的生效范围,可选项为Namespaced和Cluster。

  • version:每个 Group 可以存在多个版本。例如,v1alpha1,然后升为 v1beta1,最后稳定为 v1 版本。

  • names:CRD的名称,包括单数、复数、kind、所属组等名称定义

在这个 CRD 中,我指定了 group: crddemo.k8s.io , version: v1 这样的 API 信息,也指定了这个 CR 的资源类型叫作 Mydemo,复数(plural)是 mydemos。

我们先别着急使用kubectl create创建资源定义,我们接下来要做的是再基于这个CRD的定义创建相应的具体自定义对象

example-mydemo.yaml

apiVersion: crddemo.k8s.io/v1
kind: Mydemo
metadata:
  name: example-mydemo
spec:
  ip: "127.0.0.1"
  port: 8080

这个资源对象跟定义pod差不多,它的主要信息都是来源上面的定义,Kind是Mydemo,版本是v1,资源组是crddemo.k8s.io

除了这些设置,还需要在spec端设置相应的参数,一般是开发者自定义定制的,在这里,我定制了两个属性:ipport。所以整个对象我要告诉k8s,我期待监听处理一个叫example-mydemo的程序,它的地址是127.0.0.1,端口是8080。 当然,这里只是一个demo,没有什么严格的属性约束,开发者还是根据自己的业务需要自行定义吧。为了不影响本文的介绍,姑且认为这两个属性是非常重要的。

到这里为止,相对轻松的工作已经完成,我们已经完成CRD的“设计图”工作,下面我们开始动手构建这个CRD控制器的编码工作了。

CRD 控制器原理

在正式编码之前,我们先理解一下自定义控制器的工作原理,如下图

crd arch

CRD控制器的工作流,可分为监听、同步、触发三个步骤:

一、Controller 首先会通过Informer (所谓的 Informer,就是一个自带缓存和索引机制),从K8ss的API Server中获取它所关心的对象,举个例子,也就是我编写的Controller获取的应该是Mydemo对象。值得注意的是Informer在构建之前,会使用我们生成的client(下面编码阶段会提到),再透过Reflector的ListAndWatch机制跟API Server建立连接,不断地监听 Mydemo对象实例的变化。在 ListAndWatch 机制下,一旦 APIServer 端有新的 Mydemo 实例被创建、删除或者更新,Reflector 都会收到“事件通知”。该事件及它对应的 API 对象会被放进一个 Delta FIFO Queue中。

二、Local Store 此时完成同步缓存操作

三、Informer 根据这些事件的类型,触发我们编写并注册号的ResourceEventHandler,完成业务动作的触发。

上面图中的 Control Loop 实际上可以通过code-generator生成,下面也会提到。总之Control Loop中我们只关心如何拿到“实际状态”,并与“期待状态”对比,从而具体的差异处理逻辑,只需要开发者自行编写即可。

CRD 开发过程

下面会通过一个简单的例子,开始我们的CRD代码的编写, 完整代码: https://github.com/domac/crddemo

自定义资源代码编写

首先,kubernetes涉及的代码生成对项目目录结构是有要求的,所以我们先创建一个结构如下的项目

├── controller.go
├── crd
│   └── mydemo.yaml
├── example
│   └── example-mydemo.yaml
├── main.go
└── pkg
    └── apis
        └── crddemo
            ├── register.go
            └── v1
                ├── doc.go
                ├── register.go
                ├── types.go

可见关键部分的pkg目录就是API组的URL结构, 如下图

path2

v1 下面的 types.go 文件里,则定义了 Mydemo 对象的完整描述。

1、我们首先开看 pkg/apis/crddemo/register.go,这个文件主要用来存放全局变量,如下:

package crddemo

const (
    GroupName = "crddemo.k8s.io"
    Version   = "v1"
)

2、pkg/apis/crddemo/v1下的doc.go 也是比较简单的:

// +k8s:deepcopy-gen=package
// +groupName=crddemo.k8s.io
package v1

在这个文件中,你会看到 +k8s:deepcopy-gen=package 和 +groupName=crddemo.k8s.io,这就是 Kubernetes 进行代码生成要用的 Annotation 风格的注释。

  • +k8s:deepcopy-gen=package 意思是,请为整个 v1 包里的所有类型定义自动生成 DeepCopy 方法;
  • +groupName=crddemo.k8s.io,则定义了这个包对应的crddemo API 组的名字。

可以看到,这些定义在 doc.go 文件的注释,起到的是全局的代码生成控制的作用,所以也被称为 Global Tags。

3、pkg/apis/crddemo/types.go 的作用就是定义一个 Mydemo 类型到底有哪些字段(比如,spec 字段里的内容)。这个文件的主要内容如下所示:

package v1


import (
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)


// +genclient
// +genclient:noStatus
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object


// Mydemo 描述一个 Mydemo的资源字段
type Mydemo struct {
    metav1.TypeMeta `json:",inline"`

    metav1.ObjectMeta `json:"metadata,omitempty"`

    Spec MydemoSpec `json:"spec"`
}

//MydemoSpec 为 Mydemo的资源的spec属性的字段
type MydemoSpec struct {
    Ip   string `json:"ip"`
    Port int    `json:"port"`
}


// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

//复数形式
type MydemoList struct {
    metav1.TypeMeta `json:",inline"`
    metav1.ListMeta `json:"metadata"`

    Items []Mydemo `json:"items"`
}

上面的代码,可以开的我们的Mydemo可续定义方法跟k8s对象一样,都包含了TypeMeta和ObjectMeta字段,而其中比较重要的是 Spec 字段,就是需要我们自己定义的部分:定义了 Ip 和 Port 两个字段。

此外,除了定义 Mydemo 类型,你还需要定义一个 MydemoList 类型,用来描述一组 Mydemo 对象应该包括哪些字段。之所以需要这样一个类型,是因为在 Kubernetes 中,获取所有某对象的 List() 方法,返回值都是List 类型,而不是某类型的数组。所以代码上一定要做区分

关于上面代码的几个重要注解,下面说明一下:

  • +genclient 这段注解的意思是:请为下面资源类型生成对应的 Client 代码。

  • +genclient:noStatus 的意思是:这个 API 资源类型定义里,没有 Status 字段,因为Mydemo才是主类型,所以 +genclient 要写在Mydemo之上,不用写在MydemoList之上,这时要细心注意的。

  • +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object 的意思是,请在生成 DeepCopy 的时候,实现 Kubernetes 提供的 runtime.Object 接口。否则,在某些版本的 Kubernetes 里,你的这个类型定义会出现编译错误。

4、pkg/apis/crddemo/register.go 作用就是注册一个类型(Type)给 APIServer。

package v1


import (
    "github.com/domac/crddemo/pkg/apis/crddemo"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/apimachinery/pkg/runtime/schema"
)


var SchemeGroupVersion = schema.GroupVersion{
    Group:   crddemo.GroupName,
    Version: crddemo.Version,
}


var (
    SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
    AddToScheme   = SchemeBuilder.AddToScheme
)


func Resource(resource string) schema.GroupResource {
    return SchemeGroupVersion.WithResource(resource).GroupResource()
}


func Kind(kind string) schema.GroupKind {
    return SchemeGroupVersion.WithKind(kind).GroupKind()
}


//Mydemo 资源类型在服务器端的注册的工作,APIServer 会自动帮我们完成
//但与之对应的,我们还需要让客户端也能知道 Mydemo 资源类型的定义
func addKnownTypes(scheme *runtime.Scheme) error {
    scheme.AddKnownTypes(
        SchemeGroupVersion,
        &Mydemo{},
        &MydemoList{},
    )

    // register the type in the scheme
    metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
    return nil
}

有了 addKnownTypes 这个方法,Kubernetes 就能够在后面生成客户端的时候,知道Mydemo 以及MydemoList 类型的定义了。

好了,到这里为止,我们有关定义的代码已经写好了,正如controller原理图所示,接下来我们需要通过kubernetes提供的代码生成工具,为上面的Mydemo资源类型生成clientset、informer 和 lister。

关于如何使用代码生成,这里我已经编写了一个脚步,只需只需本脚步即可

代码生成

具体可以调用我提供的shll脚本:code-gen.sh

#!/bin/bash

set -x

ROOT_PACKAGE="github.com/domac/crddemo"
CUSTOM_RESOURCE_NAME="crddemo"
CUSTOM_RESOURCE_VERSION="v1"
GO111MODULE=off

# 安装k8s.io/code-generator
[[ -d $GOPATH/src/k8s.io/code-generator ]] || go get -u k8s.io/code-generator/...

# 执行代码自动生成,其中pkg/client是生成目标目录,pkg/apis是类型定义目录
cd $GOPATH/src/k8s.io/code-generator && ./generate-groups.sh all "$ROOT_PACKAGE/pkg/client" "$ROOT_PACKAGE/pkg/apis" "$CUSTOM_RESOURCE_NAME:$CUSTOM_RESOURCE_VERSION"

当只需代码生成脚步后,可以发现我们的代码工作目录也发送了变化,多出了一个client目录

client
├── clientset
│   └── versioned
│       ├── clientset.go
│       ├── doc.go
│       ├── fake
│       │   ├── clientset_generated.go
│       │   ├── doc.go
│       │   └── register.go
│       ├── scheme
│       │   ├── doc.go
│       │   └── register.go
│       └── typed
│           └── crddemo
│               └── v1
│                   ├── crddemo_client.go
│                   ├── doc.go
│                   ├── fake
│                   │   ├── doc.go
│                   │   ├── fake_crddemo_client.go
│                   │   └── fake_mydemo.go
│                   ├── generated_expansion.go
│                   └── mydemo.go
├── informers
│   └── externalversions
│       ├── crddemo
│       │   ├── interface.go
│       │   └── v1
│       │       ├── interface.go
│       │       └── mydemo.go
│       ├── factory.go
│       ├── generic.go
│       └── internalinterfaces
│           └── factory_interfaces.go
└── listers
    └── crddemo
        └── v1
            ├── expansion_generated.go
            └── mydemo.go

其中,pkg/apis/crddemo/v1 下面的 zz_generated.deepcopy.go 文件,就是自动生成的 DeepCopy 代码文件。下面的三个包(clientset、informers、 listers),都是 Kubernetes 为 Mydemo 类型生成的client库,这些库会在后面编写自定义控制器的时候用到。

自定义控制器代码编写

k8s的声明式 API并不像“命令式 API”那样有着明显的执行逻辑。这就使得基于声明式 API 的业务功能实现,往往需要通过控制器模式来“监视”API 对象的变化,然后以此来决定实际要执行的具体工作。

main.go

package main


import (
    "flag"
    "os"
    "os/signal"
    "syscall"
    "time"
    "github.com/golang/glog"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
    clientset "github.com/domac/crddemo/pkg/client/clientset/versioned"
    informers "github.com/domac/crddemo/pkg/client/informers/externalversions"
)


//程序启动参数
var (
    flagSet              = flag.NewFlagSet("crddemo", flag.ExitOnError)
    master               = flag.String("master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
    kubeconfig           = flag.String("kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
    onlyOneSignalHandler = make(chan struct{})
    shutdownSignals      = []os.Signal{os.Interrupt, syscall.SIGTERM}
)


//设置信号处理
func setupSignalHandler() (stopCh <-chan struct{}) {
    close(onlyOneSignalHandler)


    stop := make(chan struct{})
    c := make(chan os.Signal, 2)
    signal.Notify(c, shutdownSignals...)
    go func() {
        <-c
        close(stop)
        <-c
        os.Exit(1)
    }()


    return stop
}


func main() {
    flag.Parse()

    //设置一个信号处理,应用于优雅关闭
    stopCh := setupSignalHandler()


    cfg, err := clientcmd.BuildConfigFromFlags(*master, *kubeconfig)
    if err != nil {
        glog.Fatalf("Error building kubeconfig: %s", err.Error())
    }


    kubeClient, err := kubernetes.NewForConfig(cfg)
    if err != nil {
        glog.Fatalf("Error building kubernetes clientset: %s", err.Error())
    }


    mydemoClient, err := clientset.NewForConfig(cfg)
    if err != nil {
        glog.Fatalf("Error building example clientset: %s", err.Error())
    }


    //informerFactory工厂类, 这里注入我们通过代码生成的client
    //clent主要用于和API Server 进行通信,实现ListAndWatch
    mydemoInformerFactory := informers.NewSharedInformerFactory(mydemoClient, time.Second*30)

    //生成一个crddemo组的Mydemo对象传递给自定义控制器
    controller := NewController(kubeClient, mydemoClient,
        mydemoInformerFactory().V1().Mydemos())


    go mydemoInformerFactory(stopCh)


    if err = controller.Run(2, stopCh); err != nil {
        glog.Fatalf("Error running controller: %s", err.Error())
    }
}

接下来,我们来看跟业务最紧密的控制器Controller的编写

controller.go 部分重要代码:

  

func NewController(
    kubeclientset kubernetes.Interface,
    mydemoslientset clientset.Interface,
    mydemoInformer informers.MydemoInformer) *Controller {


    // Create event broadcaster
    // Add sample-controller types to the default Kubernetes Scheme so Events can be
    // logged for sample-controller types.
    utilruntime.Must(mydemoscheme.AddToScheme(scheme.Scheme))
    glog.V(4).Info("Creating event broadcaster")
    eventBroadcaster := record.NewBroadcaster()
    eventBroadcaster.StartLogging(glog.Infof)
    eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
    recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})


    //使用client 和前面创建的 Informer,初始化了自定义控制器
    controller := &Controller{
        kubeclientset:   kubeclientset,
        mydemoslientset: mydemoslientset,
        demoInformer:    mydemoInformer.Lister(),
        mydemosSynced:   mydemoInformer.Informer().HasSynced,
        workqueue:       workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Mydemos”), //WorkQueue的实现,负责同步 Informer 和控制循环之间的数据
        recorder:        recorder,
    }


    glog.Info("Setting up mydemo event handlers)

    //mydemoInformer 注册了三个 Handler(AddFunc、UpdateFunc 和 DeleteFunc)
    // 分别对应 API 对象的“添加”“更新”和“删除”事件。而具体的处理操作,都是将该事件对应的 API 对象加入到工作队列中
    mydemoInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: controller.enqueueMydemo,
        UpdateFunc: func(old, new interface{}) {
            oldMydemo := old.(*samplecrdv1.Mydemo)
            newMydemo := new.(*samplecrdv1.Mydemo)
            if oldMydemo.ResourceVersion == newMydemo.ResourceVersion {
                return
            }
            controller.enqueueMydemo(new)
        },
        DeleteFunc: controller.enqueueMydemoForDelete,
    })


    return controller
}

  

通过上面Controller的代码实现,我们基本实现了控制器ListAndWatch的事件注册逻辑:通过 APIServer 的 LIST API获取所有最新版本的 API 对象;然后,再通过 WATCH-API 来监听所有这些API对象的变化。通过监听到的事件变化,Informer 就可以实时地更新本地缓存,并且调用这些事件对应的 EventHandler了

下面,我们再来看原理图中的Control Loop的部分


func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
    defer runtime.HandleCrash()
    defer c.workqueue.ShutDown()


    // 记录开始日志
    glog.Info("Starting Mydemo control loop")
  
    glog.Info("Waiting for informer caches to sync")

    if ok := cache.WaitForCacheSync(stopCh, c.mydemosSynced); !ok {
        return fmt.Errorf("failed to wait for caches to sync")
    }

    glog.Info("Starting workers")
    for i := 0; i < threadiness; i++ {
        go wait.Until(c.runWorker, time.Second, stopCh)
    }

    glog.Info("Started workers")
    <-stopCh
    glog.Info("Shutting down workers")


    return nil
}

可以看到,启动控制循环的逻辑非常简单,就是同步+循环监听任务。而这个循环监听任务就是我们真正的业务实现部分了


//runWorker是一个不断运行的方法,并且一直会调用 c.processNextWorkItem 从workqueue读取和读取消息
func (c *Controller) runWorker() {
    for c.processNextWorkItem() {
    }
}

//从workqueue读取和读取消息
func (c *Controller) processNextWorkItem() bool {
    obj, shutdown := c.workqueue.Get()
    if shutdown {
        return false
    }
    err := func(obj interface{}) error {
        defer c.workqueue.Done(obj)
        var key string
        var ok bool
        if key, ok = obj.(string); !ok {
            c.workqueue.Forget(obj)
            runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
            return nil
        }
        if err := c.syncHandler(key); err != nil {
            return fmt.Errorf("error syncing '%s': %s", key, err.Error())
        }
        c.workqueue.Forget(obj)
        glog.Infof("Successfully synced '%s'", key)
        return nil
    }(obj)

    if err != nil {
        runtime.HandleError(err)
        return true
    }
    return true
}

//尝试从 Informer 维护的缓存中拿到了它所对应的 Mydemo 对象
func (c *Controller) syncHandler(key string) error {
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        runtime.HandleError(fmt.Errorf("invalid resource key: %s", key))
        return nil
    }

    mydemo, err := c.demoInformer.Mydemos(namespace).Get(name)
 
    //从缓存中拿不到这个对象,那就意味着这个 Mydemo 对象的 Key 是通过前面的“删除”事件添加进工作队列的。
    if err != nil {
        if errors.IsNotFound(err) {
            //对应的 Mydemo 对象已经被删除了
            glog.Warningf("DemoCRD: %s/%s does not exist in local cache, will delete it from Mydemo ...",
                namespace, name)
            glog.Infof("[DemoCRD] Deleting mydemo: %s/%s ...", namespace, name)
            return nil
        }
        runtime.HandleError(fmt.Errorf("failed to list mydemo by: %s/%s", namespace, name))
        return err
    }
    glog.Infof("[DemoCRD] Try to process mydemo: %#v ...", mydemo)
    c.recorder.Event(mydemo, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
    return nil
}

代码中的 demoInformer,从namespace中通过key获取Mydemo对象这个操作,其实就是在访问本地缓存的索引,实际上,在 Kubernetes 的源码中,你会经常看到控制器从各种 Lister 里获取对象,比如:podLister、nodeLister 等等,它们使用的都是 Informer 和缓存机制。

而如果控制循环从缓存中拿不到这个对象(demoInformer 返回了 IsNotFound 错误),那就意味着这个 Mydemo 对象的 Key 是通过前面的“删除”事件添加进工作队列的。所以,尽管队列里有这个 Key,但是对应的 Mydemo 对象已经被删除了。而如果能够获取到对应的 Mydemo 对象,就可以执行控制器模式里的对比“期望状态(用户通过 YAML 文件提交到 APIServer 里的信息)”和“实际状态(我们的控制循环需要通过查询实际的Mydemo资源情况” 的功能逻辑了。不过在本例子中,就不做过多的业务假设了。

至此,一个完整的自定义 API 对象和它所对应的自定义控制器,就编写完毕了。

部署测试

代码编码后,我们准备开始代码的发布,可以使用提供 Makefile 进行编译

$ make

... …

gofmt -w .
go test -v . 
?       github.com/domac/crddemo        [no test files]
mkdir -p releases
GOOS=linux GOARCH=amd64 go build -mod=vendor -ldflags "-s -w" -v -o releases/crddemo *.go
github.com/golang/groupcache/lru
k8s.io/apimachinery/third_party/forked/golang/json
k8s.io/apimachinery/pkg/util/mergepatch
k8s.io/kube-openapi/pkg/util/proto
k8s.io/client-go/tools/record/util
k8s.io/apimachinery/pkg/util/strategicpatch
k8s.io/client-go/tools/record
command-line-arguments
go clean -i

... ...

编译完成后,会生成 crddemo 的二进制文件,我们要做把crddemo放到kubernetes集群中,或者本地也行,只要能访问到 apiserver 和具备kubeconfig

$  ./crddemo --kubeconfig=/data/svr/projects/kubernetes/config/kubectl.kubeconfig --master=http://127.0.0.1:8080  -alsologtostderr=true 

I0308 12:23:18.494507   27426 controller.go:79] Setting up mydemo event handlers
I0308 12:23:18.494829   27426 controller.go:105] Starting Mydemo control loop
I0308 12:23:18.494840   27426 controller.go:108] Waiting for informer caches to sync
E0308 12:23:18.496902   27426 reflector.go:178] github.com/domac/crddemo/pkg/client/informers/externalversions/factory.go:117: Failed to list *v1.Mydemo: the server could not find the requested resource (get mydemos.crddemo.k8s.io)
E0308 12:23:18.497477   27426 reflector.go:178] github.com/domac/crddemo/pkg/client/informers/externalversions/factory.go:117: Failed to list *v1.Mydemo: the server could not find the requested resource (get mydemos.crddemo.k8s.io)
E0308 12:23:21.604508   27426 reflector.go:178] github.com/domac/crddemo/pkg/client/informers/externalversions/factory.go:117: Failed to list *v1.Mydemo: the server could not find the requested resource (get mydemos.crddemo.k8s.io)
E0308 12:23:26.932293   27426 reflector.go:178] github.com/domac/crddemo/pkg/client/informers/externalversions/factory.go:117: Failed to list *v1.Mydemo: the server could not find the requested resource (get mydemos.crddemo.k8s.io)


... ...

可以看到,程序运行的时候,一开始会报错。这是因为,此时 Mydemo 对象的 CRD 还没有被创建出来,所以 Informer 去 APIServer 里获取 Mydemos 对象时,并不能找到 Mydemo 这个 API 资源类型的定义

接下来,我们执行我们自定义资源的定义文件:

$  kubectl apply -f crd/mydemo.yaml 
customresourcedefinition.apiextensions.k8s.io/mydemos.crddemo.k8s.io created

此时,观察crddemo的日志输出,可以看到Controller的日志恢复了正常,控制循环启动成功

I0308 12:30:29.956263   28282 controller.go:113] Starting workers
I0308 12:30:29.956307   28282 controller.go:118] Started workers

然后,我们可以对我们的Mydemo对象进行增删改查操作了。

提交我们的自定义资源对象

$  kubectl apply -f example-mydemo.yaml 
mydemo.crddemo.k8s.io/example-mydemo created

创建成功够,看k8s集群是否成功存储起来

$  kubectl get Mydemo                   
NAME             AGE
example-mydemo   2s

这时候,查看一下控制器的输出:

I0308 12:31:24.983663   28282 controller.go:216] [DemoCRD] Try to process mydemo: &v1.Mydemo{TypeMeta:v1.TypeMeta{Kind:"", APIVersion:""}, ObjectMeta:v1.ObjectMeta{Name:"example-mydemo", GenerateName:"", Namespace:"default", SelfLink:"/apis/crddemo.k8s.io/v1/namespaces/default/mydemos/example-mydemo", UID:"8a6d17f7-17f3-4a1d-8250-bb092678ae7e", ResourceVersion:"10818363", Generation:1, CreationTimestamp:v1.Time{Time:time.Time{wall:0x0, ext:63719238684, loc:(*time.Location)(0x1e566c0)}}, DeletionTimestamp:(*v1.Time)(nil), DeletionGracePeriodSeconds:(*int64)(nil), Labels:map[string]string(nil), Annotations:map[string]string{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"crddemo.k8s.io/v1\",\"kind\":\"Mydemo\",\"metadata\":{\"annotations\":{},\"name\":\"example-mydemo\",\"namespace\":\"default\"},\"spec\":{\"ip\":\"127.0.0.1\",\"port\":8080}}\n"}, OwnerReferences:[]v1.OwnerReference(nil), Finalizers:[]string(nil), ClusterName:"", ManagedFields:[]v1.ManagedFieldsEntry(nil)}, Spec:v1.MydemoSpec{Ip:"127.0.0.1", Port:8080}}
I0308 12:31:24.983844   28282 controller.go:174] Successfully synced 'default/example-mydemo’

I0308 12:31:24.983893   28282 event.go:278] Event(v1.ObjectReference{Kind:"Mydemo", Namespace:"default", Name:"example-mydemo", UID:"8a6d17f7-17f3-4a1d-8250-bb092678ae7e", APIVersion:"crddemo.k8s.io/v1", ResourceVersion:"10818363", FieldPath:""}): type: 'Normal' reason: 'Synced' Mydemo synced successfully

可以看到,我们上面创建 example-mydemo.yaml 的操作,触发了 EventHandler 的添加事件,从而被放进了工作队列。紧接着,控制循环就从队列里拿到了这个对象,并且打印出了正在处理这个 Mydemo 对象的日志。

我们这时候,尝试修改资源,对对应的port属性进行修改

apiVersion: crddemo.k8s.io/v1
kind: Mydemo
metadata:
  name: example-mydemo
spec:
  ip: "127.0.0.1"
  port: 9090

手动执行修改:

$  kubectl apply -f example-mydemo.yaml

此时,crddemo新增出来的日志如下:

I0308 12:32:05.663044   28282 controller.go:216] [DemoCRD] Try to process mydemo: &v1.Mydemo{TypeMeta:v1.TypeMeta{Kind:"", APIVersion:""}, ObjectMeta:v1.ObjectMeta{Name:"example-mydemo", GenerateName:"", Namespace:"default", SelfLink:"/apis/crddemo.k8s.io/v1/namespaces/default/mydemos/example-mydemo", UID:"8a6d17f7-17f3-4a1d-8250-bb092678ae7e", ResourceVersion:"10818457", Generation:2, CreationTimestamp:v1.Time{Time:time.Time{wall:0x0, ext:63719238684, loc:(*time.Location)(0x1e566c0)}}, DeletionTimestamp:(*v1.Time)(nil), DeletionGracePeriodSeconds:(*int64)(nil), Labels:map[string]string(nil), Annotations:map[string]string{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"crddemo.k8s.io/v1\",\"kind\":\"Mydemo\",\"metadata\":{\"annotations\":{},\"name\":\"example-mydemo\",\"namespace\":\"default\"},\"spec\":{\"ip\":\"127.0.0.1\",\"port\":9080}}\n"}, OwnerReferences:[]v1.OwnerReference(nil), Finalizers:[]string(nil), ClusterName:"", ManagedFields:[]v1.ManagedFieldsEntry(nil)}, Spec:v1.MydemoSpec{Ip:"127.0.0.1", Port:9080}}
I0308 12:32:05.663179   28282 controller.go:174] Successfully synced 'default/example-mydemo’

I0308 12:32:05.663208   28282 event.go:278] Event(v1.ObjectReference{Kind:"Mydemo", Namespace:"default", Name:"example-mydemo", UID:"8a6d17f7-17f3-4a1d-8250-bb092678ae7e", APIVersion:"crddemo.k8s.io/v1", ResourceVersion:"10818457", FieldPath:""}): type: 'Normal' reason: 'Synced' Mydemo synced successfully

可以看到,这一次,Informer 注册的更新事件被触发,更新后的Mydemo对象的 Key 被添加到了工作队列之中。

所以,接下来控制循环从工作队列里拿到的Mydemo对象,与前一个对象是不同的:它的ResourceVersion的值从 10818363 变成了 10818457 ;而 Spec 里的Port字段,则变成了 9080。最后,我再把这个对象删除掉:

$  kubectl delete -f example-mydemo.yaml 
mydemo.crddemo.k8s.io "example-mydemo" deleted

这一次,在控制器的输出里,我们就可以看到,Informer 注册的“删除”事件被触发,输出如下:


0308 12:33:08.494755   28282 controller.go:203] DemoCRD: default/example-mydemo does not exist in local cache, will delete it from Mydemo …

I0308 12:33:08.495793   28282 controller.go:206] [DemoCRD] Deleting mydemo: default/example-mydemo …

I0308 12:33:08.495808   28282 controller.go:174] Successfully synced 'default/example-mydemo'

然后,k8s集群的资源也被清除了:

$  kubectl get Mydemo                    
No resources found in default namespace.

以上就是使用自定义控制器的基本开发流程