源码结构
Client
client-go里面有许许多多的库,先从RESTClient
说起,client-go一共支持4种客户端逻辑:
DiscoveryClient
:发现版本,请求访问k8s集群的API信息,如kubectl api-versions
。
ClientSet
:客户端集合,仅限使用k8s内置资源,如Pods
,Service
等。
DynamicClient
:动态客户端,用于无类型资源, 如CRD
。
RESTClient
: 实现rest.Interface
接口,自由度高,有需要时可以进行封装。
其中前三者都是基于RESTClient
实现,可以在下图或源码中看到restClient踪影(DynamicClient
是client命名)。
![Client关系图]()
DiscoveryClient
实现方法:
ServerGroups
: 返回metav1.ApiGroup
列表。
ServerResourcesForGroupVersion
: 根据GV参数获取metav1.APIResource
列表。
ServerResources
: 获取metav1.APIResource
列表。
ServerGroupsAndResources
: 返回metav1.APIGroup
数组和metav1.APIResource
列表。
ServerPreferredResources
: 返回首选的资源列表。
ServerPreferredNamespacedResources
: 返回首选的命名空间层级资源列表。
ServerVersion
: 返回服务器版本。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| func main() { var kubeconfig *string if home := homedir.HomeDir(); home != "" { kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file") } else { kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file") } flag.Parse()
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig) if err != nil { panic(err) }
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config) if err != nil { panic(err) }
version, err := discoveryClient.ServerVersion() if err != nil { panic(err) }
pretty.Println(version) }
|
ClientSet
使用ClientSet
比较简单,只需要配置好RESTClient
就可以操作系统资源。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| func main() { var kubeconfig *string if home := homeDir(); home != "" { kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file") } else { kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file") } flag.Parse()
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig) if err != nil { panic(err.Error()) }
clientset, err := kubernetes.NewForConfig(config) if err != nil { panic(err.Error()) } for { pods, err := clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{}) if err != nil { panic(err.Error()) } fmt.Printf("There are %d pods in the cluster\n", len(pods.Items))
namespace := "default" pod := "example-xxxxx" _, err = clientset.CoreV1().Pods(namespace).Get(context.TODO(), pod, metav1.GetOptions{}) if errors.IsNotFound(err) { fmt.Printf("Pod %s in namespace %s not found\n", pod, namespace) } else if statusError, isStatus := err.(*errors.StatusError); isStatus { fmt.Printf("Error getting pod %s in namespace %s: %v\n", pod, namespace, statusError.ErrStatus.Message) } else if err != nil { panic(err.Error()) } else { fmt.Printf("Found pod %s in namespace %s\n", pod, namespace) }
time.Sleep(10 * time.Second) } }
|
DynamicClient
对自定义资源类型(CRD),采用DynamicClient
进行CRUD操作,创建或返回数据的是Unstructured
类型,本质是map[string]interface
。目前有很多关于CRD库已经从自定义资源生成代码,关联相关类型,所以基本上没有这种接近原生数据的开发。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| func main() { var kubeconfig *string if home := homedir.HomeDir(); home != "" { kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file") } else { kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file") } flag.Parse()
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig) if err != nil { panic(err) }
dynamicClient, err := dynamic.NewForConfig(config) if err != nil { panic(err) }
gvr := schema.GroupVersionResource{ Group: "stable.example.com", Version: "v1", Resource: "crontabs", }
crontabList, err := dynamicClient.Resource(gvr).List(context.TODO(), v1.ListOptions{}) if err != nil { panic(err) }
var crontab *unstructured.Unstructured if len(crontabList.Items) < 1 { crontab, err = dynamicClient.Resource(gvr). Namespace("default"). Create(context.TODO(), &unstructured.Unstructured{ Object: map[string]interface{}{ "apiVersion": "stable.example.com/v1", "kind": "CronTab", "metadata": map[string]interface{}{ "name": "demo-crontab", }, "spec": map[string]interface{}{ "cronSpec": "* * * * 1", "replicas": 5, "image": "nginx/nginx", }, }, }, v1.CreateOptions{})
if err != nil { panic(err) } } else { crontab = &crontabList.Items[0] }
pretty.Println(crontab) }
|
RESTClient
如果用RESTClient
CRUD某个资源,之前先要配置好rest.Config
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| func main() { var kubeconfig *string if home := homeDir(); home != "" { kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file") } else { kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file") } flag.Parse()
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig) if err != nil { panic(err) }
config.APIPath = "/api" config.GroupVersion = &corev1.SchemeGroupVersion config.NegotiatedSerializer = scheme.Codecs.WithoutConversion() config.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(rest.DefaultQPS, rest.DefaultBurst) restClient, err := rest.RESTClientFor(config) if err != nil { panic(err) }
var ( options = &metav1.ListOptions{} pods = &corev1.PodList{} ctx = context.Background() )
if err := restClient.Get(). Resource("pods"). VersionedParams(options, scheme.ParameterCodec). Namespace(""). Do(ctx). Into(pods); err != nil { panic(err) }
for _, item := range pods.Items { fmt.Println(item.Namespace, "/", item.Name) } }
|
主要是以Resquest
请求处理为主,封装了http.Client
对象,但是http.Client
对象是存放在RESTClient
,当调用RESTClient
时就返回Resquest
对象,这是就可以调用Resources()
VersionedParams()
Do()
等方法。最后在Do()
方法返回Result
类型,是用来进行对返回数据的处理,如Into(&pods)
就是对数据转换成目录类型。
kubeconfig配置管理
下图是其中一个解析kubeconfig
过程,主要涉及三个对象分别是DeferredLoadingClientConfig
,DeferredLoadingClientConfig
,DirectClientConfig
。
![img]()
DeferredLoadingClientConfig
对象里有个loader
成员,用来调用加载配置的规则,对象里每个可导出的方法里首先执行createClientConfig()
方法(除ConfigAccess()
外),通过这个方调用config.loader.Load()
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| type DeferredLoadingClientConfig struct { loader ClientConfigLoader ... }
func (config *DeferredLoadingClientConfig) createClientConfig() (ClientConfig, error) { if config.clientConfig == nil { ... if config.clientConfig == nil { mergedConfig, err := config.loader.Load() .... } }
return config.clientConfig, nil }
|
ClientCofnigLoadingRules
作用是当有多个配置的时候会合成一个clientcmdapi.Config
对象,返回给DeferredLoadingClientConfig
对象,如上述代码。
LoadFromFile()
加载文件,读取字节码。
Load()
调用Codec.Decoder.Decode()
编解码器解析文件内容。
1 2 3 4 5 6 7
| type Config struct { ... Clusters map[string]*Cluster `json:"clusters"` AuthInfos map[string]*AuthInfo `json:"users"` Contexts map[string]*Context `json:"contexts"` ... }
|
DirectClientConfig
最后通过DirectClientConfig.ClientConfig()
获取rest.Config
对象返回。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| func (config *DeferredLoadingClientConfig) createClientConfig() (ClientConfig, error) { if config.clientConfig == nil { if config.clientConfig == nil { mergedConfig, err := config.loader.Load() if config.fallbackReader != nil { mergedClientConfig = NewInteractiveClientConfig(*mergedConfig, config.overrides.CurrentContext, config.overrides, config.fallbackReader, config.loader) } else { mergedClientConfig = NewNonInteractiveClientConfig(*mergedConfig, config.overrides.CurrentContext, config.overrides, config.loader) }
config.clientConfig = mergedClientConfig } }
return config.clientConfig, nil }
func (config *DeferredLoadingClientConfig) ClientConfig() (*restclient.Config, error) { mergedClientConfig, err := config.createClientConfig() if err != nil { return nil, err } ... mergedConfig, err := mergedClientConfig.ClientConfig() ... }
|
最后附上对象图:
![img]()
Reflector
DeltaFIFO
Indexer
WorkQueue
EventBroadcaster 事件管理器
代码生成器
client-gen 代码生成器
lister-gen 代码生成器