yuxianbing@ubuntu:~$ kubectl get service NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE kubernetes ClusterIP 10.254.0.1 <none> 443/TCP 38d yuxianbing@ubuntu:~$ kubectl get endpoints NAME ENDPOINTS AGE kubernetes 221.228.106.151:6443,58.215.170.46:6443,61.160.36.43:644338d
// Controller is the controller manager for the core bootstrap Kubernetes // controller loops, which manage creating the "kubernetes" service, the // "default", "kube-system" and "kube-public" namespaces, and provide the IP // repair check on service IPs type Controller struct { ServiceClient coreclient.ServicesGetter NamespaceClient coreclient.NamespacesGetter EventClient coreclient.EventsGetter
PublicIP net.IP // API Server绑定的IP,这个IP会作为kubernetes service的Endpoint的IP
// ServiceIP indicates where the kubernetes service will live. It may not be nil. ServiceIP net.IP // 按照我们的启动参数,这里是:10.254.0.1 ServicePort int// 按照我们的启动参数,这里是:643 ExtraServicePorts []api.ServicePort // 为空 ExtraEndpointPorts []api.EndpointPort // 为空 PublicServicePort int// 按照我们的启动参数,这里是6443 KubernetesServiceNodePort int// 缺省是基于ClusterIP启动模式,那么这里为0
runner *async.Runner }
Controller的主要功能:
创建kubernetes服务
创建default、kube-system和kube-public名字空间
并且基于Service的Cluster IP提供IP修复检查功能
Controller中的结构体中的几个重要的变量:
ServiceIP:是kubernetes对外服务的Cluster IP,按照启动样例,为:10.254.0.1 这个IP,是基于启动参数:service-cluster-ip-range而来,API Server启动过程中,会从中获取第一个Cluster IP网段中获取第一个IP,作为kubernetes这个服务的Cluster IP。 —service-cluster-ip-range ipNet A CIDR notation IP range from which to assign service cluster IPs. This must not overlap with any IP ranges assigned to nodes for pods.
PublicServicePort:基于启动参数secure-port来指定,缺省为6443。 —secure-port int The port on which to serve HTTPS with authentication and authorization. If 0, don’t serve HTTPS at all. (default 6443)
PublicIP:apiserver在集群中的服务IP,参考下面的参数 —advertise-address ip The IP address on which to advertise the apiserver to members of the cluster. This address must be reachable by the rest of the cluster. If blank, the —bind-address will be used. If —bind-address is unspecified, the host’s default interface will be used.
func(c *Config) createEndpointReconciler() reconcilers.EndpointReconciler { glog.Infof("Using reconciler: %v", c.ExtraConfig.EndpointReconcilerType) switchc.ExtraConfig.EndpointReconcilerType { // there are numerous test dependencies that depend on a default controller case"", reconcilers.MasterCountReconcilerType: returnc.createMasterCountReconciler() // 缺省情况,见NewServerRunOptions函数 case reconcilers.LeaseEndpointReconcilerType: returnc.createLeaseReconciler() case reconcilers.NoneEndpointReconcilerType: returnc.createNoneReconciler() default: glog.Fatalf("Reconciler not implemented: %v", c.ExtraConfig.EndpointReconcilerType) } returnnil }
// run all of the controllers once prior to returning from Start. if err := repairClusterIPs.RunOnce(); err != nil { // If we fail to repair cluster IPs apiserver is useless. We should restart and retry. glog.Fatalf("Unable to perform initial IP allocation check: %v", err) } if err := repairNodePorts.RunOnce(); err != nil { // If we fail to repair node ports apiserver is useless. We should restart and retry. glog.Fatalf("Unable to perform initial service nodePort check: %v", err) } // Service definition is reconciled during first run to correct port and type per expectations. if err := c.UpdateKubernetesService(true); err != nil { glog.Errorf("Unable to perform initial Kubernetes service initialization: %v", err) }
func(c *Controller)UpdateKubernetesService(reconcile bool) error { // Update service & endpoint records. // TODO: when it becomes possible to change this stuff, // stop polling and start watching. // TODO: add endpoints of all replicas, not just the elected master. if err := createNamespaceIfNeeded(c.NamespaceClient, metav1.NamespaceDefault); err != nil { return err }
// ReconcileEndpoints sets the endpoints for the given apiserver service (ro or rw). // ReconcileEndpoints expects that the endpoints objects it manages will all be // managed only by ReconcileEndpoints; therefore, to understand this, you need only // understand the requirements and the body of this function. // // Requirements: // * All apiservers MUST use the same ports for their {rw, ro} services. // * All apiservers MUST use ReconcileEndpoints and only ReconcileEndpoints to manage the // endpoints for their {rw, ro} services. // * All apiservers MUST know and agree on the number of apiservers expected // to be running (c.masterCount). // * ReconcileEndpoints is called periodically from all apiservers. func(r *masterCountEndpointReconciler)ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool)error { r.reconcilingLock.Lock() defer r.reconcilingLock.Unlock()
if r.stopReconcilingCalled { returnnil } // 查询出服务中的 Endpoints,放到变量e e, err := r.endpointClient.Endpoints(metav1.NamespaceDefault).Get(serviceName, metav1.GetOptions{}) if err != nil { e = &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: serviceName, Namespace: metav1.NamespaceDefault, }, } } if errors.IsNotFound(err) { // Simply create non-existing endpoints for the service. e.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: ip.String()}}, Ports: endpointPorts, }} _, err = r.endpointClient.Endpoints(metav1.NamespaceDefault).Create(e) return err }
// First, determine if the endpoint is in the format we expect (one // subset, ports matching endpointPorts, N IP addresses). // 检查服务中的 Endpoints信息,是否包含现有ip,端口是否符合,格式是否正确(只有一个subnet)等 formatCorrect, ipCorrect, portsCorrect := checkEndpointSubsetFormat(e, ip.String(), endpointPorts, r.masterCount, reconcilePorts) if !formatCorrect { // 如果超过一个subnet,或者没有 // Something is egregiously wrong, just re-make the endpoints record. e.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: ip.String()}}, Ports: endpointPorts, }} glog.Warningf("Resetting endpoints for master service %q to %#v", serviceName, e) _, err = r.endpointClient.Endpoints(metav1.NamespaceDefault).Update(e) // 重新初始化,把自己写到Endpoints中 return err } if ipCorrect && portsCorrect { returnnil } if !ipCorrect { // 没有找到自己,把自己加进去 // We *always* add our own IP address. e.Subsets[0].Addresses = append(e.Subsets[0].Addresses, api.EndpointAddress{IP: ip.String()})
// Lexicographic order is retained by this step. e.Subsets = endpoints.RepackSubsets(e.Subsets)
// If too many IP addresses, remove the ones lexicographically after our // own IP address. Given the requirements stated at the top of // this function, this should cause the list of IP addresses to // become eventually correct. if addrs := &e.Subsets[0].Addresses; len(*addrs) > r.masterCount { // addrs is a pointer because we're going to mutate it. for i, addr := range *addrs { if addr.IP == ip.String() { forlen(*addrs) > r.masterCount { // wrap around if necessary. remove := (i + 1) % len(*addrs) *addrs = append((*addrs)[:remove], (*addrs)[remove+1:]...) } break } } } } if !portsCorrect { // Reset ports. e.Subsets[0].Ports = endpointPorts } glog.Warningf("Resetting endpoints for master service %q to %v", serviceName, e) _, err = r.endpointClient.Endpoints(metav1.NamespaceDefault).Update(e) // 把自己加进来,并更新完成 return err }