0%

【Kubernetes】CRD

Controller Manager 的系列文章 中我们看到,Kubernetes 内置的各种 Controller 通过ApiServer监控 DeploymentDaemonSetStatefulSet 等内部资源对象,在一个控制循环中通过各种操作将系统维持在我们期望的一个状态中,这即是其经典的 声明式API设计。然而,内置的API资源大多仅代表相对底层和通用概念的对象,已经不能够满足越来越复杂的业务场景需求。随着Kubernetes生态系统的持续发展,我们将需要更多高层次的面向专门的场景的对象。在声明式API的原则下,设计自定义资源API,开发者将不需要逐一进行 Deployment、Service、ConfigMap 等步骤,而是创建并关联一些用于表述整个应用程序或者软件服务的对象。在当前,CoreOS推出的各种 Operator 即是这一思想的广泛利用。为了实现 Operator,你需要了解 CRD(CustomResourceDefinitions)。本文所有实现的代码,可以参考我的 Github

CRD使用方法

Extend the Kubernetes API with CustomResourceDefinitions 中介绍了使用CRD扩展Kubernetes API的详细用法,简单来说可以分为两步:

  • 利用CRD API声明自定义的资源API
  • 根据刚才声明的资源API,创建自定义的资源对象

Create CustomResourceDefinition

首先我们可以使用 CustomResourceDefinition 声明自定义的资源API,这里也可以将我们自定义的资源API理解为 CustomResourceDefinition 这个API的对象,我们可以指定它的 metadata.name。这里没有 metadata.namespace字段,是因为CustomResourceDefinition 适用于所有命名空间。

CRD定义中的关键字段如下:

  • group:设置API所属的组,将其映射为API URL中的 “/apis/” 下一级目录。它是逻辑上相关的Kinds集合
  • scope:该API的生效范围,可选项为Namespaced和Cluster。
  • version:每个 Group 可以存在多个版本。例如,v1alpha1,然后升为 v1beta1,最后稳定为 v1 版本。
  • names:CRD的名称,包括单数、复数、kind、所属组等名称定义

api url

在下面的示例中,我们定义资源的Group是example.houmin.cc,version是v1,kind是Foo。这里的version是一个list,可以指定多个服务的版本,这里只是简单声明了v1这个版本,具体可以参考 Extend the Kubernetes API with CustomResourceDefinitions源代码

crd.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
# name must match the spec fields below, and be in the form: <plural>.<group>
name: foos.example.houmin.cc
spec:
# group name to use for REST API: /apis/<group>/<version>
group: example.houmin.cc
# list of versions supported by this CustomResourceDefinition
version: v1
names:
# kind is normally the CamelCased singular type. Your resource manifests use this.
kind: Foo
# plural name to be used in the URL: /apis/<group>/<version>/<plural>
plural: foos
# singular name to be used as an alias on the CLI and for display
singular: foo
# shortNames allow shorter string to match your resource on the CLI
shortNames:
- fo
# either Namespaced or Cluster
scope: Namespaced

根据上面的manifest文件,即可创建CRD。

1
2
# kubectl create -f crd.yaml
customresourcedefinition.apiextensions.k8s.io/foos.example.houmin.cc created

Create custom objects

在声明了自定义资源后,就可以编辑下面这样的manifest文件,创建用户自定义资源的对象,就像 Pod等原生资源一样。

example-foo.yaml
1
2
3
4
5
6
7
apiVersion: example.houmin.cc/v1
kind: Foo
metadata:
name: example-foo
spec:
deploymentName: example-foo
replicas: 3

这里的 deploymentNamereplicas 都是我们自定义资源API的字段,在后面会详细介绍。使用kubectl创建资源后,我们发现CRD的使用和原生API资源毫无区别。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# kubectl apply -f example-foo.yaml
foo.example.houmin.cc/example-foo created
# kubectl get foo
NAME AGE
example-foo 8s
# kubectl get foo -o yaml
apiVersion: v1
items:
- apiVersion: example.houmin.cc/v1
kind: Foo
metadata:
creationTimestamp: "2020-10-27T13:32:15Z"
generation: 1
name: example-foo
namespace: default
resourceVersion: "294744500"
selfLink: /apis/example.houmin.cc/v1/namespaces/default/foos/example-foo
uid: ece2daac-5510-4de1-b924-c8b1a5e178fd
spec:
deploymentName: example-foo
replicas: 3
kind: List
metadata:
resourceVersion: ""
selfLink: ""

经过上面的两步操作,我们已经能够自定义资源API并且使用它了。但是,在第一步和第二步之间还需要做一些工作,不然直接创建自定义资源对象是不会成功的。为什么呢?想一想,对于原生的资源API,比如Deployment,我们都有对应的Controller在 ApiServer 监听每一个Deployment 资源的创建,并随之创建对应的Pod和维护其状态。对于我们创建的CRD资源,我们也需要有对应的Controller做类似的工作。

另外,刚才提到我们创建的 Foo资源具有两个字段 deploymentNamereplicas,这个都是我们自定义的,你也可以根据你的需要定义你自己的字段。每次创建 Foo 对象后,我们实现的 Controller 就会根据自己的逻辑去做自己的事情,比如这里就是维护 replicasdeployment,具体的工作流程如下图所示。

crd arch

CRD控制器的工作流,可分为监听、同步、触发三个步骤:

  • Controller 首先会通过 Informer 从 API Server中获取它所关心的对象,这里就是上面的Foo对象。
    • 值得注意的是Informer在构建之前,会使用我们生成的client(下面编码阶段会提到),再透过Reflector的ListAndWatch机制跟API Server建立连接,不断地监听 Foo 对象实例的变化。
    • 在 ListAndWatch 机制下,一旦 APIServer 端有新的 Foo 实例被创建、删除或者更新,Reflector 都会收到 事件通知
    • 该事件及它对应的 API 对象会被放进一个 Delta FIFO Queue中。
  • Local Store 此时完成同步缓存操作
  • Informer 根据这些事件的类型,触发我们编写并注册好的ResourceEventHandler,完成业务动作的触发。

上面图中的 Control Loop 实际上可以通过code-generator生成,下面也会提到。总之Control Loop中我们只关心如何拿到 Current State,并与 Desired State 对比,从而具体的差异处理逻辑,只需要开发者自行编写即可。

定义CRD资源

首先,kubernetes涉及的代码生成对项目目录结构是有要求的,所以我们先创建一个结构如下的项目,可见关键在于pkg目录就是API组的URL结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
├── artifacts
│   └── examples
│   ├── crd.yaml
│   └── example-foo.yaml
├── controller.go
├── go.mod
├── go.sum
├── hack
├── main.go
└── pkg
└── apis
   └── example
   ├── register.go
   └── v1
   ├── doc.go
   ├── register.go
   └── types.go
  1. 我们首先开看 pkg/apis/example/register.go,这个文件主要用来存放全局变量,如下:
pkg/apis/example/register.go
1
2
3
4
5
6
package example

const (
GroupName = "example.houmin.cc"
Version = "v1"
)
  1. pkg/apis/example/v1/doc.go 主要是 global tags,起到的是全局的代码生成控制的作用,详见代码生成解释。
1
2
3
// +k8s:deepcopy-gen=package
// +groupName=example.houmin.cc
package v1
  1. pkg/apis/example/v1/types 的作用就是定义一个 Foo 类型到底有哪些字段(比如,spec 字段里的内容)。这个文件的主要内容如下所示:
pkg/apis/example/v1/types
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package v1

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// +genclient
// +genclient:noStatus
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

type Foo struct {
metav1.TypeMeta `json:",inline"`

metav1.ObjectMeta `json:"metadata,omitempty"`

Spec FooSpec `json:"spec"`
Status FooStatus `json:"status"`
}

// FooSpec is the spec for a Foo resource
type FooSpec struct {
DeploymentName string `json:"deploymentName"`
Replicas *int32 `json:"replicas"`
}

// FooStatus is the status for a Foo resource
type FooStatus struct {
AvailableReplicas int32 `json:"availableReplicas"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

type FooList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata"`

Items []Foo `json:"items"`
}

上面的代码,可以看到我们的Foo定义方法跟k8s对象一样,都包含了 TypeMetaObjectMeta字段,而其中比较重要的是 Spec 字段和 Status 字段,这个可以根据用户需要自定义。

此外,除了定义 Foo 类型,你还需要定义一个 FooList 类型,用来描述一组 Foo 对象应该包括哪些字段。之所以需要这样一个类型,是因为在 Kubernetes 中,获取所有某对象的 List() 方法,返回值都是List 类型,而不是某类型的数组。所以代码上一定要做区分

除此之外,还有几个作为 local tags 存在的注释,主要用于控制代码生成,详见下一小节。

  1. pkg/apis/example/register.go 作用就是注册一个类型(Type)给 APIServer。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package v1

import (
"github.com/SimpCosm/crddemo/pkg/apis/example"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)

var SchemeGroupVersion = schema.GroupVersion{
Group: example.GroupName,
Version: example.Version,
}

var (
SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
AddToScheme = SchemeBuilder.AddToScheme
)

func Resource(resource string) schema.GroupResource {
return SchemeGroupVersion.WithResource(resource).GroupResource()
}

func Kind(kind string) schema.GroupKind {
return SchemeGroupVersion.WithKind(kind).GroupKind()
}

func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(
SchemeGroupVersion,
&Foo{},
&FooList{},
)

// register the type in the scheme
metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
return nil
}

有了 addKnownTypes 这个方法,Kubernetes 就能够在后面生成客户端的时候,知道 Foo 以及FooList 类型的定义了。

好了,到这里为止,我们有关定义的代码已经写好了,正如controller原理图所示,接下来我们需要通过kubernetes提供的代码生成工具,为上面的Foo资源类型生成clientset、informer 和 lister。

代码生成

kubernetes社区有一个k8s.io/code-generator 仓库,在里面提供了一系列代码生成工具:

  • deepcopy-gen:为每个类型T创建了 func (t *T) DeepCopy() *T方法
  • client-gen:为CustomeResource APIGroups 创建 typed clientsets
  • informer-gen:为CustomResource创建informers,能够监听到服务端CustomResource发生变化的事件
  • lister-gen:为CustomResource创建listers,能够为GET/LIST请求提供一个read only的Caching Layer

其中生成的 informer 和 lister 是创建Controller的基础,通过这四个generator就可以创建一个 full-featured, producation-ready的controller。除此之外,code-generator还提供了其他的生成工具,比如 conversion-gen 提供了API内部版本和外部版本的转换函数,defaulter-gen提供了产生默认的字段的工具。

所有的这些code-generator都是基于k8s.io/gengo实现的,他们有一些共同的命令行参数,比如 --input-dirs 获得input package,--output-package 指定生成的package的目录。但是我们不需要去一个一个指定各个命令行参数, k8s.io/code-generator 提供了一个Shell脚本 generator-group.sh 来便于在CRD开发过程中的代码生成。只需一行代码,通常在 hack/update-codegen.sh中即可调用

1
2
3
4
$ vendor/k8s.io/code-generator/generate-groups.sh all \ # GENS="$1"
github.com/SimpCosm/crddemo/pkg/client \ # OUTPUT_PKG="$2"
github.com/SimpCosm/crddemo/pkg/apis \ # APIS_PKG="$3"
example.houmin.cc:v1 # GROUPS_WITH_VERSIONS="$4"

执行命令后,可以看到 pkg 下代码生成如下,在 pkg/apis目录下,除了原有的代码,生成了 zz_generated_deepcopy.go 的代码, pkg/client 目录则是完全生成的,包括 clientsetinformerslisters等代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
pkg
├── apis
│   └── example
│   ├── register.go
│   └── v1
│   ├── doc.go
│   ├── register.go
│   ├── types.go
│   └── zz_generated.deepcopy.go
└── client
├── clientset
│   └── versioned
│   ├── clientset.go
│   ├── doc.go
│   ├── fake
│   ├── scheme
│   └── typed
├── informers
│   └── externalversions
│   ├── example
│   ├── factory.go
│   ├── generic.go
│   └── internalinterfaces
└── listers
└── example
└── v1

这些生成的代码都是不允许手动修改的,一般你需要修改 pkg/apis下面的源码后,再去通过执行 hack/update-codegen.sh 来生成新的代码。

我们可以通过 code-generator 来控制代码生成的一些参数,但是代码生成更多的属性是通过Go代码中的Tags来控制。这里有两种类型的Tag:

  • Global Tags:全局Tags,在 doc.go 中位于 package之上
  • Local Tags:局部Tags

Tags一般的形式是 // +tag-name 或者 // +tag-name=value,以注释的形式存在。一般来说,Tags存在的位置很重要,有些tag必须直接在type之上,有些tag必须和type间隔一行,具体可以参见pull request #53579 and issue #53893

Global Tags

Global Tags 是定义在 doc.go 文件的注释,起到的是全局的代码生成控制的作用,具体如下所示,在这个文件中,你会看到 +k8s:deepcopy-gen=package+groupName=crddemo.k8s.io,这就是 Kubernetes 进行代码生成要用的 Annotation 风格的注释。

pkg/apis/crddemo/v1/doc.go
1
2
3
// +k8s:deepcopy-gen=package
// +groupName=example.houmin.cc
package v1
  • +k8s:deepcopy-gen=package 意思是,请为整个 v1 包里的所有类型定义自动生成 DeepCopy 方法;
  • +groupName=example.houmin.cc,则定义了这个包对应的crddemo API 组的名字,注意这个注释必须就在package之上 (see Issue #53893).

如果你有一些Type不需要 deepcopy,那么你可以通过一个 Local Tag // +k8s:deepcopy-gen=false 来为这个 Typo 不生成 deep copy。如果没有打开package全局生成 deepcopy的开关,你可以对每个你想要的Type使用Local Tag // +k8s:deepcopy-gen=true 为它生成 deepcopy

Local Tags

Local Tags 是直接写在 API Types 之上的注释,下面是一个例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package v1

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// +genclient
// +genclient:noStatus
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

type Foo struct {
metav1.TypeMeta `json:",inline"`

metav1.ObjectMeta `json:"metadata,omitempty"`

Spec FooSpec `json:"spec"`
Status FooStatus `json:"status"`
}

// FooSpec is the spec for a Foo resource
type FooSpec struct {
DeploymentName string `json:"deploymentName"`
Replicas *int32 `json:"replicas"`
}

// FooStatus is the status for a Foo resource
type FooStatus struct {
AvailableReplicas int32 `json:"availableReplicas"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

type FooList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata"`

Items []Foo `json:"items"`
}

这里有一个 deepcopy tag解释如下:

  • +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object 的意思是,请在生成 DeepCopy 的时候,实现 Kubernetes 提供的 runtime.Object 接口。否则,在某些版本的 Kubernetes 里,你的这个类型定义会出现编译错误。

在上面的示例中,有部分tag用于控制 client-gen,如下所示:

1
2
// +genclient
// +genclient:noStatus
  • +genclient 这段注解的意思是:请为下面资源类型生成对应的 Client 代码。因为Foo才是主类型,所以 +genclient 要写在Mydemo之上,不用写在 FooList 之上,这是要细心注意的。
  • +genclient:noStatus 的意思是:这个 API 资源类型定义里,没有 Status 字段,这个tag告诉 client-gen 不要生成 UpdateStatus方法,一般用于使用子资源分离的例如/status分离的,用来避免更新到status资源(当然代码的struct中也没有status)

对于 cluster-wide的资源,你需要使用下面的tag

1
// +genclient:nonNamespaced

有时候你想控制client提供的HTTP方法,你可以使用类似于下面的tag

1
2
3
4
5
6
7
// +genclient:noVerbs

// +genclient:onlyVerbs=create,delete

// +genclient:skipVerbs=get,list,create,update,patch,delete,deleteCollection,watch

// +genclient:method=Create,verb=create,result=k8s.io/apimachinery/pkg/apis/meta/v1.Status

前三个很好理解,指定了client可以使用的verb。对于第四个tag,这里对应的Type只能是 create-only,并且不会返回 API Type本身,而是返回了 metav1.Status

Controller开发

在代码生成 informersclientslisters等代码后,我们就可以用这些API编写Controller的代码了。

主函数实现

我们可以像使用原生 kubernetes client一样,使用我们生成的client,如下所示:

main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65

package main

import (
"flag"
"time"

kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"

clientset "github.com/SimpCosm/crddemo/pkg/client/clientset/versioned"
informers "github.com/SimpCosm/crddemo/pkg/client/informers/externalversions"
"github.com/SimpCosm/crddemo/pkg/signals"
)

var (
masterURL string
kubeconfig string
)

func main() {
klog.InitFlags(nil)
flag.Parse()

// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()

cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
if err != nil {
klog.Fatalf("Error building kubeconfig: %s", err.Error())
}

kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
klog.Fatalf("Error building kubernetes clientset: %s", err.Error())
}

exampleClient, err := clientset.NewForConfig(cfg)
if err != nil {
klog.Fatalf("Error building example clientset: %s", err.Error())
}

kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)

controller := NewController(kubeClient, exampleClient,
kubeInformerFactory.Apps().V1().Deployments(),
exampleInformerFactory.Example().V1().Foos())

// notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh)
// Start method is non-blocking and runs all registered informers in a dedicated goroutine.
kubeInformerFactory.Start(stopCh)
exampleInformerFactory.Start(stopCh)

if err = controller.Run(2, stopCh); err != nil {
klog.Fatalf("Error running controller: %s", err.Error())
}
}

func init() {
flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
}

Controller定义

这里我们定义了自己的Controller:

  • 分别有原生的kubernetes clientset和自定义API group的clientset。
  • 因为我们会监听原生的 Deployment和自己的 Foo,所以分别加上了各自的lister。
  • 每次资源发生改变时,会将其放入到workqueue等待处理在Control Loop中处理。
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Controller is the controller implementation for Foo resources
type Controller struct {
// kubeclientset is a standard kubernetes clientset
kubeclientset kubernetes.Interface
// sampleclientset is a clientset for our own API group
sampleclientset clientset.Interface

deploymentsLister appslisters.DeploymentLister
deploymentsSynced cache.InformerSynced
foosLister listers.FooLister
foosSynced cache.InformerSynced

// workqueue is a rate limited work queue. This is used to queue work to be
// processed instead of performing it as soon as a change happens. This
// means we can ensure we only process a fixed amount of resources at a
// time, and makes it easy to ensure we are never processing the same item
// simultaneously in two different workers.
workqueue workqueue.RateLimitingInterface
// recorder is an event recorder for recording Event resources to the
// Kubernetes API.
recorder record.EventRecorder
}

Controller创建

接下来,我们来看跟业务最紧密的控制器Controller的编写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// NewController returns a new sample controller
func NewController(
kubeclientset kubernetes.Interface,
sampleclientset clientset.Interface,
deploymentInformer appsinformers.DeploymentInformer,
fooInformer informers.FooInformer) *Controller {

// Create event broadcaster
// Add example types to the default Kubernetes Scheme so Events can be
// logged for example types.
utilruntime.Must(samplescheme.AddToScheme(scheme.Scheme))
klog.V(4).Info("Creating event broadcaster")
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})

controller := &Controller{
kubeclientset: kubeclientset,
sampleclientset: sampleclientset,
deploymentsLister: deploymentInformer.Lister(),
deploymentsSynced: deploymentInformer.Informer().HasSynced,
foosLister: fooInformer.Lister(),
foosSynced: fooInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Foos"),
recorder: recorder,
}

klog.Info("Setting up event handlers")
// Set up an event handler for when Foo resources change
fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueFoo,
UpdateFunc: func(old, new interface{}) {
controller.enqueueFoo(new)
},
})

deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.handleObject,
UpdateFunc: func(old, new interface{}) {
newDepl := new.(*appsv1.Deployment)
oldDepl := old.(*appsv1.Deployment)
if newDepl.ResourceVersion == oldDepl.ResourceVersion {
// Periodic resync will send update events for all known Deployments.
// Two different versions of the same Deployment will always have different RVs.
return
}
controller.handleObject(new)
},
DeleteFunc: controller.handleObject,
})

return controller
}

通过上面Controller的代码实现,我们基本实现了控制器ListAndWatch的事件注册逻辑:通过 APIServer 的 LIST API获取所有最新版本的 API 对象;然后,再通过 WATCH-API 来监听所有这些API对象的变化。通过监听到的事件变化,Informer 就可以实时地更新本地缓存,并且调用这些事件对应的 EventHandler了。

Control Loop

下面,我们再来看原理图中的Control Loop的部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()

// Start the informer factories to begin populating the informer caches
klog.Info("Starting Foo controller")

// Wait for the caches to be synced before starting workers
klog.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, c.foosSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}

klog.Info("Starting workers")
// Launch two workers to process Foo resources
for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}

klog.Info("Started workers")
<-stopCh
klog.Info("Shutting down workers")

return nil
}

可以看到,启动控制循环的逻辑非常简单,就是同步+循环监听任务。而这个循环监听任务就是我们真正的业务实现部分了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
func (c *Controller) runWorker() {
for c.processNextWorkItem() {
}
}

// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the syncHandler.
func (c *Controller) processNextWorkItem() bool {
obj, shutdown := c.workqueue.Get()

if shutdown {
return false
}

// We wrap this block in a func so we can defer c.workqueue.Done.
err := func(obj interface{}) error {
// We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is
// put back on the workqueue and attempted again after a back-off
// period.
defer c.workqueue.Done(obj)
var key string
var ok bool
// We expect strings to come off the workqueue. These are of the
// form namespace/name. We do this as the delayed nature of the
// workqueue means the items in the informer cache may actually be
// more up to date that when the item was initially put onto the
// workqueue.
if key, ok = obj.(string); !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
c.workqueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
// Run the syncHandler, passing it the namespace/name string of the
// Foo resource to be synced.
if err := c.syncHandler(key); err != nil {
// Put the item back on the workqueue to handle any transient errors.
c.workqueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
c.workqueue.Forget(obj)
klog.Infof("Successfully synced '%s'", key)
return nil
}(obj)

if err != nil {
utilruntime.HandleError(err)
return true
}

return true
}

代码中的 fooInformer,从namespace中通过key获取 Foo 对象这个操作,其实就是在访问本地缓存的索引,实际上,在 Kubernetes 的源码中,你会经常看到控制器从各种 Lister 里获取对象,比如:podLister、nodeLister 等等,它们使用的都是 Informer 和缓存机制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
// syncHandler compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the Foo resource
// with the current status of the resource.
func (c *Controller) syncHandler(key string) error {
// Convert the namespace/name string into a distinct namespace and name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}

// Get the Foo resource with this namespace/name
foo, err := c.foosLister.Foos(namespace).Get(name)
if err != nil {
// The Foo resource may no longer exist, in which case we stop
// processing.
if errors.IsNotFound(err) {
utilruntime.HandleError(fmt.Errorf("foo '%s' in work queue no longer exists", key))
return nil
}

return err
}

deploymentName := foo.Spec.DeploymentName
if deploymentName == "" {
// We choose to absorb the error here as the worker would requeue the
// resource otherwise. Instead, the next time the resource is updated
// the resource will be queued again.
utilruntime.HandleError(fmt.Errorf("%s: deployment name must be specified", key))
return nil
}

// Get the deployment with the name specified in Foo.spec
deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName)
// If the resource doesn't exist, we'll create it
if errors.IsNotFound(err) {
deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(context.TODO(), newDeployment(foo), metav1.CreateOptions{})
}

// If an error occurs during Get/Create, we'll requeue the item so we can
// attempt processing again later. This could have been caused by a
// temporary network failure, or any other transient reason.
if err != nil {
return err
}

// If the Deployment is not controlled by this Foo resource, we should log
// a warning to the event recorder and return error msg.
if !metav1.IsControlledBy(deployment, foo) {
msg := fmt.Sprintf(MessageResourceExists, deployment.Name)
c.recorder.Event(foo, corev1.EventTypeWarning, ErrResourceExists, msg)
return fmt.Errorf(msg)
}

// If this number of the replicas on the Foo resource is specified, and the
// number does not equal the current desired replicas on the Deployment, we
// should update the Deployment resource.
if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas {
klog.V(4).Infof("Foo %s replicas: %d, deployment replicas: %d", name, *foo.Spec.Replicas, *deployment.Spec.Replicas)
deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(context.TODO(), newDeployment(foo), metav1.UpdateOptions{})
}

// If an error occurs during Update, we'll requeue the item so we can
// attempt processing again later. This could have been caused by a
// temporary network failure, or any other transient reason.
if err != nil {
return err
}

// Finally, we update the status block of the Foo resource to reflect the
// current state of the world
err = c.updateFooStatus(foo, deployment)
if err != nil {
return err
}

c.recorder.Event(foo, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
return nil
}

而如果控制循环从缓存中拿不到这个对象(fooInformer 返回了 IsNotFound 错误),那就意味着这个 Foo 对象的 Key 是通过前面的“删除”事件添加进工作队列的。所以,尽管队列里有这个 Key,但是对应的 Foo 对象已经被删除了。而如果能够获取到对应的 Foo 对象,就可以执行控制器模式里的对比 Desired StateCurrentState 的功能逻辑了。

至此,一个完整的自定义 API 对象和它所对应的自定义控制器,就编写完毕了。

部署测试

编译完成后,会生成 crddemo 的二进制文件,我们要做把crddemo放到kubernetes集群中,或者本地也行,只要能访问到 apiserver 和具备kubeconfig

1
2
3
4
5
6
7
# ./crddemo --kubeconfig=.kube/config
I1106 10:47:59.055510 11946 controller.go:115] Setting up event handlers
I1106 10:47:59.055608 11946 controller.go:156] Starting Foo controller
I1106 10:47:59.055620 11946 controller.go:159] Waiting for informer caches to sync
E1106 10:47:59.079342 11946 reflector.go:138] pkg/client/informers/externalversions/factory.go:116: Failed to watch *v1.Foo: failed to list *v1.Foo: the server could not find the requested resource (get foos.example.houmin.cc)
E1106 10:48:00.369747 11946 reflector.go:138] pkg/client/informers/externalversions/factory.go:116: Failed to watch *v1.Foo: failed to list *v1.Foo: the server could not find the requested resource (get foos.example.houmin.cc)
E1106 10:48:03.038137 11946 reflector.go:138] pkg/client/informers/externalversions/factory.go:116: Failed to watch *v1.Foo: failed to list *v1.Foo: the server could not find the requested resource (get foos.example.houmin.cc)

可以看到,程序运行的时候,一开始会报错。这是因为,此时 Mydemo 对象的 CRD 还没有被创建出来,所以 Informer 去 APIServer 里获取 Mydemos 对象时,并不能找到 Mydemo 这个 API 资源类型的定义

接下来,我们执行我们自定义资源的定义文件:

1
2
$  kubectl apply -f artifacts/examples/crd.yaml 
customresourcedefinition.apiextensions.k8s.io/foos.example.houmin.cc created

此时,观察crddemo的日志输出,可以看到Controller的日志恢复了正常,控制循环启动成功

1
2
I0308 12:30:29.956263   28282 controller.go:113] Starting workers
I0308 12:30:29.956307 28282 controller.go:118] Started workers

然后,我们可以对我们的Mydemo对象进行增删改查操作了。

提交我们的自定义资源对象

1
2
# kubectl apply -f artifacts/examples/example-foo.yaml 
foo.example.houmin.cc/example-foo created

创建成功够,看k8s集群是否成功存储起来

1
2
3
# kubectl get foo
NAME AGE
example-foo 8s

这时候,查看一下控制器的输出:

1
2
3
4
5
6
7
8
9
10
# ./crddemo --kubeconfig=.kube/config
I1106 10:50:36.627136 12768 controller.go:115] Setting up event handlers
I1106 10:50:36.627225 12768 controller.go:156] Starting Foo controller
I1106 10:50:36.627235 12768 controller.go:159] Waiting for informer caches to sync
I1106 10:50:36.727340 12768 controller.go:164] Starting workers
I1106 10:50:36.727364 12768 controller.go:170] Started workers
I1106 10:51:27.596870 12768 controller.go:228] Successfully synced 'default/example-foo'
I1106 10:51:27.597113 12768 event.go:291] "Event occurred" object="default/example-foo" kind="Foo" apiVersion="example.houmin.cc/v1" type="Normal" reason="Synced" message="Foo synced successfully"
I1106 10:51:27.612272 12768 controller.go:228] Successfully synced 'default/example-foo'
I1106 10:51:27.612393 12768 event.go:291] "Event occurred" object="default/example-foo" kind="Foo" apiVersion="example.houmin.cc/v1" type="Normal" reason="Synced" message="Foo synced successfully"

可以看到,我们上面创建 example-mydemo.yaml 的操作,触发了 EventHandler 的添加事件,从而被放进了工作队列。紧接着,控制循环就从队列里拿到了这个对象,并且打印出了正在处理这个 Foo 对象的日志。

同时我们可以看到,与Foo相关的Deployment也同时被创建:

1
2
3
# kubectl get deployment
NAME READY UP-TO-DATE AVAILABLE AGE
example-foo 1/1 1 1 76s

我们这时候,尝试修改资源,对对应的replicas属性进行修改

1
2
3
4
5
6
7
apiVersion: example.houmin.cc/v1
kind: Foo
metadata:
name: example-foo
spec:
deploymentName: example-foo
replicas: 3

手动执行修改:

1
# kubectl apply -f artifacts/examples/example-foo.yaml

同时我们可以看到,与Foo相关的Deployment的副本数也同时增加:

1
2
3
# kubectl get deployment
NAME READY UP-TO-DATE AVAILABLE AGE
example-foo 3/3 3 3 2m34s

我们这时候,尝试修改资源,对对应的replicas属性进行修改

可以看到,这一次,Informer 注册的更新事件被触发,更新后的 Foo 对象的 Key 被添加到了工作队列之中。

所以,接下来控制循环从工作队列里拿到的 Foo 对象,与前一个对象是不同的:它的ResourceVersion的值发生了改变;而 Spec 里的Replicas 字段,则变成了3。最后,我再把这个对象删除掉:

1
2
$  kubectl delete -f artifacts/examples/example-foo.yaml 
foo.example.houmin.cc "example-foo" deleted

然后,k8s集群的资源也被清除了:

1
2
$  kubectl get foo                    
No resources found in default namespace.

以上就是使用自定义控制器的基本开发流程

参考资料