apiVersion:apiextensions.k8s.io/v1beta1 kind:CustomResourceDefinition metadata: # name must match the spec fields below, and be in the form: <plural>.<group> name:foos.example.houmin.cc spec: # group name to use for REST API: /apis/<group>/<version> group:example.houmin.cc # list of versions supported by this CustomResourceDefinition version:v1 names: # kind is normally the CamelCased singular type. Your resource manifests use this. kind:Foo # plural name to be used in the URL: /apis/<group>/<version>/<plural> plural:foos # singular name to be used as an alias on the CLI and for display singular:foo # shortNames allow shorter string to match your resource on the CLI shortNames: -fo # either Namespaced or Cluster scope:Namespaced
根据上面的manifest文件,即可创建CRD。
1 2
# kubectl create -f crd.yaml customresourcedefinition.apiextensions.k8s.io/foos.example.houmin.cc created
+groupName=example.houmin.cc,则定义了这个包对应的crddemo API 组的名字,注意这个注释必须就在package之上 (see Issue #53893).
如果你有一些Type不需要 deepcopy,那么你可以通过一个 Local Tag // +k8s:deepcopy-gen=false 来为这个 Typo 不生成 deep copy。如果没有打开package全局生成 deepcopy的开关,你可以对每个你想要的Type使用Local Tag // +k8s:deepcopy-gen=true 为它生成 deepcopy。
+genclient:noStatus 的意思是:这个 API 资源类型定义里,没有 Status 字段,这个tag告诉 client-gen 不要生成 UpdateStatus方法,一般用于使用子资源分离的例如/status分离的,用来避免更新到status资源(当然代码的struct中也没有status)
// notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh) // Start method is non-blocking and runs all registered informers in a dedicated goroutine. kubeInformerFactory.Start(stopCh) exampleInformerFactory.Start(stopCh)
funcinit() { flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.") flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") }
// Controller is the controller implementation for Foo resources type Controller struct { // kubeclientset is a standard kubernetes clientset kubeclientset kubernetes.Interface // sampleclientset is a clientset for our own API group sampleclientset clientset.Interface
// workqueue is a rate limited work queue. This is used to queue work to be // processed instead of performing it as soon as a change happens. This // means we can ensure we only process a fixed amount of resources at a // time, and makes it easy to ensure we are never processing the same item // simultaneously in two different workers. workqueue workqueue.RateLimitingInterface // recorder is an event recorder for recording Event resources to the // Kubernetes API. recorder record.EventRecorder }
// NewController returns a new sample controller funcNewController( kubeclientset kubernetes.Interface, sampleclientset clientset.Interface, deploymentInformer appsinformers.DeploymentInformer, fooInformer informers.FooInformer) *Controller {
// Create event broadcaster // Add example types to the default Kubernetes Scheme so Events can be // logged for example types. utilruntime.Must(samplescheme.AddToScheme(scheme.Scheme)) klog.V(4).Info("Creating event broadcaster") eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartStructuredLogging(0) eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
klog.Info("Setting up event handlers") // Set up an event handler for when Foo resources change fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.enqueueFoo, UpdateFunc: func(old, newinterface{}) { controller.enqueueFoo(new) }, }) deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.handleObject, UpdateFunc: func(old, newinterface{}) { newDepl := new.(*appsv1.Deployment) oldDepl := old.(*appsv1.Deployment) if newDepl.ResourceVersion == oldDepl.ResourceVersion { // Periodic resync will send update events for all known Deployments. // Two different versions of the same Deployment will always have different RVs. return } controller.handleObject(new) }, DeleteFunc: controller.handleObject, })
return controller }
通过上面Controller的代码实现,我们基本实现了控制器ListAndWatch的事件注册逻辑:通过 APIServer 的 LIST API获取所有最新版本的 API 对象;然后,再通过 WATCH-API 来监听所有这些API对象的变化。通过监听到的事件变化,Informer 就可以实时地更新本地缓存,并且调用这些事件对应的 EventHandler了。
// Start the informer factories to begin populating the informer caches klog.Info("Starting Foo controller")
// Wait for the caches to be synced before starting workers klog.Info("Waiting for informer caches to sync") if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, c.foosSynced); !ok { return fmt.Errorf("failed to wait for caches to sync") }
klog.Info("Starting workers") // Launch two workers to process Foo resources for i := 0; i < threadiness; i++ { go wait.Until(c.runWorker, time.Second, stopCh) }
klog.Info("Started workers") <-stopCh klog.Info("Shutting down workers")
// runWorker is a long-running function that will continually call the // processNextWorkItem function in order to read and process a message on the // workqueue. func(c *Controller)runWorker() { for c.processNextWorkItem() { } }
// processNextWorkItem will read a single work item off the workqueue and // attempt to process it, by calling the syncHandler. func(c *Controller)processNextWorkItem()bool { obj, shutdown := c.workqueue.Get()
if shutdown { returnfalse }
// We wrap this block in a func so we can defer c.workqueue.Done. err := func(obj interface{})error { // We call Done here so the workqueue knows we have finished // processing this item. We also must remember to call Forget if we // do not want this work item being re-queued. For example, we do // not call Forget if a transient error occurs, instead the item is // put back on the workqueue and attempted again after a back-off // period. defer c.workqueue.Done(obj) var key string var ok bool // We expect strings to come off the workqueue. These are of the // form namespace/name. We do this as the delayed nature of the // workqueue means the items in the informer cache may actually be // more up to date that when the item was initially put onto the // workqueue. if key, ok = obj.(string); !ok { // As the item in the workqueue is actually invalid, we call // Forget here else we'd go into a loop of attempting to // process a work item that is invalid. c.workqueue.Forget(obj) utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) returnnil } // Run the syncHandler, passing it the namespace/name string of the // Foo resource to be synced. if err := c.syncHandler(key); err != nil { // Put the item back on the workqueue to handle any transient errors. c.workqueue.AddRateLimited(key) return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) } // Finally, if no error occurs we Forget this item so it does not // get queued again until another change happens. c.workqueue.Forget(obj) klog.Infof("Successfully synced '%s'", key) returnnil }(obj)
if err != nil { utilruntime.HandleError(err) returntrue }
// syncHandler compares the actual state with the desired, and attempts to // converge the two. It then updates the Status block of the Foo resource // with the current status of the resource. func(c *Controller)syncHandler(key string)error { // Convert the namespace/name string into a distinct namespace and name namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key)) returnnil }
// Get the Foo resource with this namespace/name foo, err := c.foosLister.Foos(namespace).Get(name) if err != nil { // The Foo resource may no longer exist, in which case we stop // processing. if errors.IsNotFound(err) { utilruntime.HandleError(fmt.Errorf("foo '%s' in work queue no longer exists", key)) returnnil }
return err }
deploymentName := foo.Spec.DeploymentName if deploymentName == "" { // We choose to absorb the error here as the worker would requeue the // resource otherwise. Instead, the next time the resource is updated // the resource will be queued again. utilruntime.HandleError(fmt.Errorf("%s: deployment name must be specified", key)) returnnil }
// Get the deployment with the name specified in Foo.spec deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName) // If the resource doesn't exist, we'll create it if errors.IsNotFound(err) { deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(context.TODO(), newDeployment(foo), metav1.CreateOptions{}) }
// If an error occurs during Get/Create, we'll requeue the item so we can // attempt processing again later. This could have been caused by a // temporary network failure, or any other transient reason. if err != nil { return err }
// If the Deployment is not controlled by this Foo resource, we should log // a warning to the event recorder and return error msg. if !metav1.IsControlledBy(deployment, foo) { msg := fmt.Sprintf(MessageResourceExists, deployment.Name) c.recorder.Event(foo, corev1.EventTypeWarning, ErrResourceExists, msg) return fmt.Errorf(msg) }
// If this number of the replicas on the Foo resource is specified, and the // number does not equal the current desired replicas on the Deployment, we // should update the Deployment resource. if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas { klog.V(4).Infof("Foo %s replicas: %d, deployment replicas: %d", name, *foo.Spec.Replicas, *deployment.Spec.Replicas) deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(context.TODO(), newDeployment(foo), metav1.UpdateOptions{}) }
// If an error occurs during Update, we'll requeue the item so we can // attempt processing again later. This could have been caused by a // temporary network failure, or any other transient reason. if err != nil { return err }
// Finally, we update the status block of the Foo resource to reflect the // current state of the world err = c.updateFooStatus(foo, deployment) if err != nil { return err }
# ./crddemo --kubeconfig=.kube/config I1106 10:47:59.055510 11946 controller.go:115] Setting up event handlers I1106 10:47:59.055608 11946 controller.go:156] Starting Foo controller I1106 10:47:59.055620 11946 controller.go:159] Waiting for informer caches to sync E1106 10:47:59.079342 11946 reflector.go:138] pkg/client/informers/externalversions/factory.go:116: Failed to watch *v1.Foo: failed to list *v1.Foo: the server could not find the requested resource (get foos.example.houmin.cc) E1106 10:48:00.369747 11946 reflector.go:138] pkg/client/informers/externalversions/factory.go:116: Failed to watch *v1.Foo: failed to list *v1.Foo: the server could not find the requested resource (get foos.example.houmin.cc) E1106 10:48:03.038137 11946 reflector.go:138] pkg/client/informers/externalversions/factory.go:116: Failed to watch *v1.Foo: failed to list *v1.Foo: the server could not find the requested resource (get foos.example.houmin.cc)