0%

【Kubernetes】Client Go

源码结构

Client

client-go里面有许许多多的库,先从RESTClient说起,client-go一共支持4种客户端逻辑:

  • DiscoveryClient:发现版本,请求访问k8s集群的API信息,如kubectl api-versions
  • ClientSet:客户端集合,仅限使用k8s内置资源,如Pods,Service等。
  • DynamicClient:动态客户端,用于无类型资源, 如CRD
  • RESTClient: 实现rest.Interface接口,自由度高,有需要时可以进行封装。

其中前三者都是基于RESTClient实现,可以在下图或源码中看到restClient踪影(DynamicClientclient命名)。

Client关系图

DiscoveryClient

实现方法:

  • ServerGroups: 返回metav1.ApiGroup列表。
  • ServerResourcesForGroupVersion: 根据GV参数获取metav1.APIResource列表。
  • ServerResources: 获取metav1.APIResource列表。
  • ServerGroupsAndResources: 返回metav1.APIGroup数组和metav1.APIResource列表。
  • ServerPreferredResources: 返回首选的资源列表。
  • ServerPreferredNamespacedResources: 返回首选的命名空间层级资源列表。
  • ServerVersion: 返回服务器版本。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

func main() {
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
flag.Parse()

config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
panic(err)
}

// DiscoveryClient示例
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
if err != nil {
panic(err)
}

version, err := discoveryClient.ServerVersion()
if err != nil {
panic(err)
}

pretty.Println(version)
}

ClientSet

使用ClientSet比较简单,只需要配置好RESTClient就可以操作系统资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func main() {
var kubeconfig *string
if home := homeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
flag.Parse()

// use the current context in kubeconfig
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
panic(err.Error())
}

// ClientSet示例
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
for {
// 调用链:客户端->GV->Resources->OP
pods, err := clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{})
if err != nil {
panic(err.Error())
}
fmt.Printf("There are %d pods in the cluster\n", len(pods.Items))

namespace := "default"
pod := "example-xxxxx"
_, err = clientset.CoreV1().Pods(namespace).Get(context.TODO(), pod, metav1.GetOptions{})
if errors.IsNotFound(err) {
fmt.Printf("Pod %s in namespace %s not found\n", pod, namespace)
} else if statusError, isStatus := err.(*errors.StatusError); isStatus {
fmt.Printf("Error getting pod %s in namespace %s: %v\n",
pod, namespace, statusError.ErrStatus.Message)
} else if err != nil {
panic(err.Error())
} else {
fmt.Printf("Found pod %s in namespace %s\n", pod, namespace)
}

time.Sleep(10 * time.Second)
}
}

DynamicClient

对自定义资源类型(CRD),采用DynamicClient进行CRUD操作,创建或返回数据的是Unstructured类型,本质是map[string]interface。目前有很多关于CRD库已经从自定义资源生成代码,关联相关类型,所以基本上没有这种接近原生数据的开发。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64

func main() {
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
flag.Parse()

config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
panic(err)
}

// DynamicClient示例
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
panic(err)
}

// 记得先创建CRD
gvr := schema.GroupVersionResource{
Group: "stable.example.com",
Version: "v1",
Resource: "crontabs",
}

crontabList, err := dynamicClient.Resource(gvr).List(context.TODO(), v1.ListOptions{})
if err != nil {
panic(err)
}

var crontab *unstructured.Unstructured
if len(crontabList.Items) < 1 {
crontab, err = dynamicClient.Resource(gvr).
Namespace("default").
Create(context.TODO(),
// Unstructed类型
&unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "stable.example.com/v1",
"kind": "CronTab",
"metadata": map[string]interface{}{
"name": "demo-crontab",
},
"spec": map[string]interface{}{
"cronSpec": "* * * * 1",
"replicas": 5,
"image": "nginx/nginx",
},
},
},
v1.CreateOptions{})

if err != nil {
panic(err)
}
} else {
crontab = &crontabList.Items[0]
}

pretty.Println(crontab)
}

RESTClient

如果用RESTClientCRUD某个资源,之前先要配置好rest.Config

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

func main() {
var kubeconfig *string
if home := homeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
flag.Parse()

config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
panic(err)
}

// 这里开始配置
config.APIPath = "/api"
// 资源配置,就像ClientSet配置每个资源一样
config.GroupVersion = &corev1.SchemeGroupVersion
// 序列化器(编解码器)
config.NegotiatedSerializer = scheme.Codecs.WithoutConversion()
// 限速器
config.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(rest.DefaultQPS, rest.DefaultBurst)
// RESTClient示例
restClient, err := rest.RESTClientFor(config)
if err != nil {
panic(err)
}

var (
options = &metav1.ListOptions{}
pods = &corev1.PodList{}
ctx = context.Background()
)

if err := restClient.Get().
Resource("pods").
VersionedParams(options, scheme.ParameterCodec).
Namespace("").
Do(ctx).
Into(pods); err != nil {
panic(err)
}

for _, item := range pods.Items {
fmt.Println(item.Namespace, "/", item.Name)
}
}

主要是以Resquest请求处理为主,封装了http.Client对象,但是http.Client对象是存放在RESTClient,当调用RESTClient时就返回Resquest对象,这是就可以调用Resources() VersionedParams() Do()等方法。最后在Do()方法返回Result类型,是用来进行对返回数据的处理,如Into(&pods)就是对数据转换成目录类型。

kubeconfig配置管理

下图是其中一个解析kubeconfig过程,主要涉及三个对象分别是DeferredLoadingClientConfig,DeferredLoadingClientConfig,DirectClientConfig

img

DeferredLoadingClientConfig

对象里有个loader成员,用来调用加载配置的规则,对象里每个可导出的方法里首先执行createClientConfig()方法(除ConfigAccess()外),通过这个方调用config.loader.Load()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
type DeferredLoadingClientConfig struct {
// 加载配置规则
loader ClientConfigLoader
...
}

func (config *DeferredLoadingClientConfig) createClientConfig() (ClientConfig, error) {
if config.clientConfig == nil {
...
if config.clientConfig == nil {
mergedConfig, err := config.loader.Load()
....
}
}

return config.clientConfig, nil
}

ClientCofnigLoadingRules

作用是当有多个配置的时候会合成一个clientcmdapi.Config对象,返回给DeferredLoadingClientConfig对象,如上述代码。

  • LoadFromFile()加载文件,读取字节码。
  • Load() 调用Codec.Decoder.Decode()编解码器解析文件内容。
1
2
3
4
5
6
7
type Config struct {
...
Clusters map[string]*Cluster `json:"clusters"`
AuthInfos map[string]*AuthInfo `json:"users"`
Contexts map[string]*Context `json:"contexts"`
...
}

DirectClientConfig

最后通过DirectClientConfig.ClientConfig()获取rest.Config对象返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (config *DeferredLoadingClientConfig) createClientConfig() (ClientConfig, error) {
if config.clientConfig == nil {
if config.clientConfig == nil {
mergedConfig, err := config.loader.Load()

if config.fallbackReader != nil {
mergedClientConfig = NewInteractiveClientConfig(*mergedConfig, config.overrides.CurrentContext, config.overrides, config.fallbackReader, config.loader)
} else {
// 运行client-go的example/out-of-cluster-client-configuration会执行这段
mergedClientConfig = NewNonInteractiveClientConfig(*mergedConfig, config.overrides.CurrentContext, config.overrides, config.loader)
}

config.clientConfig = mergedClientConfig
}
}

return config.clientConfig, nil
}

func (config *DeferredLoadingClientConfig) ClientConfig() (*restclient.Config, error) {
mergedClientConfig, err := config.createClientConfig()
if err != nil {
return nil, err
}
...
// 最后调用
mergedConfig, err := mergedClientConfig.ClientConfig()
...
}

最后附上对象图:

img

Informer 机制

Informer 机制架构设计

Reflector

DeltaFIFO

Indexer

WorkQueue

EventBroadcaster 事件管理器

代码生成器

client-gen 代码生成器

lister-gen 代码生成器

informer-gen 代码生成器