// Registration is the service advertised by the Kubelet // Only when Kubelet answers with a success code to a Register Request // may Device Plugins start their service // Registration may fail when device plugin version is not supported by // Kubelet or the registered resourceName is already taken by another // active device plugin. Device plugin is expected to terminate upon registration failure serviceRegistration{ rpc Register(RegisterRequest) returns (Empty) {} } message DevicePluginOptions { // Indicates if PreStartContainer call is required before each container start bool pre_start_required = 1; }
messageRegisterRequest{ // Version of the API the Device Plugin was built against string version = 1; // Name of the unix socket the device plugin is listening on // PATH = path.Join(DevicePluginPath, endpoint) string endpoint = 2; // Schedulable resource name. As of now it's expected to be a DNS Label string resource_name = 3; // Options to be communicated with Device Manager options = 4; }
messageEmpty{ }
// DevicePlugin is the service advertised by Device Plugins serviceDevicePlugin{ // GetDevicePluginOptions returns options to be communicated with Device // Manager rpc GetDevicePluginOptions(Empty) returns (DevicePluginOptions) {} // ListAndWatch returns a stream of List of Devices // Whenever a Device state change or a Device disapears, ListAndWatch // returns the new list rpc ListAndWatch(Empty) returns (stream ListAndWatchResponse) {} // Allocate is called during container creation so that the Device // Plugin can run device specific operations and instruct Kubelet // of the steps to make the Device available in the container rpc Allocate(AllocateRequest) returns (AllocateResponse) {} // PreStartContainer is called, if indicated by Device Plugin during registeration phase, // before each container start. Device plugin can run device specific operations // such as reseting the device before making devices available to the container rpc PreStartContainer(PreStartContainerRequest) returns (PreStartContainerResponse) {} } // ListAndWatch returns a stream of List of Devices // Whenever a Device state change or a Device disapears, ListAndWatch // returns the new list message ListAndWatchResponse { repeated Device devices = 1; }
/* E.g: * struct Device { * ID: "GPU-fef8089b-4820-abfc-e83e-94318197576e", * State: "Healthy", *} */ messageDevice{ // A unique ID assigned by the device plugin used // to identify devices during the communication // Max length of this field is 63 characters string ID = 1; // Health of the device, can be healthy or unhealthy, see constants.go string health = 2; }
// - PreStartContainer is expected to be called before each container start if indicated by plugin during registration phase. // - PreStartContainer allows kubelet to pass reinitialized devices to containers. // - PreStartContainer allows Device Plugin to run device specific operations on // the Devices requested messagePreStartContainerRequest{ repeatedstring devicesIDs = 1; }
// PreStartContainerResponse will be send by plugin in response to PreStartContainerRequest messagePreStartContainerResponse{ }
// - Allocate is expected to be called during pod creation since allocation // failures for any container would result in pod startup failure. // - Allocate allows kubelet to exposes additional artifacts in a pod's // environment as directed by the plugin. // - Allocate allows Device Plugin to run device specific operations on // the Devices requested messageAllocateRequest{ repeated ContainerAllocateRequest container_requests = 1; }
// AllocateResponse includes the artifacts that needs to be injected into // a container for accessing 'deviceIDs' that were mentioned as part of // 'AllocateRequest'. // Failure Handling: // if Kubelet sends an allocation request for dev1 and dev2. // Allocation on dev1 succeeds but allocation on dev2 fails. // The Device plugin should send a ListAndWatch update and fail the // Allocation request messageAllocateResponse{ repeated ContainerAllocateResponse container_responses = 1; }
messageContainerAllocateResponse{ // List of environment variable to be set in the container to access one of more devices. map<string, string> envs = 1; // Mounts for the container. repeated Mount mounts = 2; // Devices for the container. repeated DeviceSpec devices = 3; // Container annotations to pass to the container runtime map<string, string> annotations = 4; }
// Mount specifies a host volume to mount into a container. // where device library or tools are installed on host and container messageMount{ // Path of the mount within the container. string container_path = 1; // Path of the mount on the host. string host_path = 2; // If set, the mount is read-only. bool read_only = 3; }
// DeviceSpec specifies a host device to mount into a container. messageDeviceSpec{ // Path of the device within the container. string container_path = 1; // Path of the device on the host. string host_path = 2; // Cgroups permissions of the device, candidates are one or more of // * r - allows container to read from the specified device. // * w - allows container to write to the specified device. // * m - allows container to create device files that do not yet exist. string permissions = 3; }
gofunc() { lastCrashTime := time.Now() restartCount := 0 for { log.Printf("Starting GRPC server for '%s'", m.resourceName) err := m.server.Serve(sock) if err == nil { break }
log.Printf("GRPC server for '%s' crashed with error: %v", m.resourceName, err)
// restart if it has not been too often // i.e. if server has crashed more than 5 times and it didn't last more than one hour each time if restartCount > 5 { // quit log.Fatalf("GRPC server for '%s' has repeatedly crashed recently. Quitting", m.resourceName) } timeSinceLastCrash := time.Since(lastCrashTime).Seconds() lastCrashTime = time.Now() if timeSinceLastCrash > 3600 { // it has been one hour since the last crash.. reset the count // to reflect on the frequency restartCount = 1 } else { restartCount++ } } }()
// Wait for server to start by launching a blocking connexion conn, err := m.dial(m.socket, 5*time.Second) if err != nil { return err } conn.Close()
for _, d := range devices { gpu, _, _, err := nvml.ParseMigDeviceUUID(d.ID) if err != nil { gpu = d.ID }
err = nvml.RegisterEventForDevice(eventSet, nvml.XidCriticalError, gpu) if err != nil && strings.HasSuffix(err.Error(), "Not Supported") { log.Printf("Warning: %s is too old to support healthchecking: %s. Marking it unhealthy.", d.ID, err) unhealthy <- d continue } check(err) }
for { select { case <-stop: return default: }
e, err := nvml.WaitForEvent(eventSet, 5000) if err != nil && e.Etype != nvml.XidCriticalError { continue }
// FIXME: formalize the full list and document it. // http://docs.nvidia.com/deploy/xid-errors/index.html#topic_4 // Application errors: the GPU should still be healthy if e.Edata == 31 || e.Edata == 43 || e.Edata == 45 { continue }
if e.UUID == nil || len(*e.UUID) == 0 { // All devices are unhealthy log.Printf("XidCriticalError: Xid=%d, All devices will go unhealthy.", e.Edata) for _, d := range devices { unhealthy <- d } continue }
for _, d := range devices { // Please see https://github.com/NVIDIA/gpu-monitoring-tools/blob/148415f505c96052cb3b7fdf443b34ac853139ec/bindings/go/nvml/nvml.h#L1424 // for the rationale why gi and ci can be set as such when the UUID is a full GPU UUID and not a MIG device UUID. gpu, gi, ci, err := nvml.ParseMigDeviceUUID(d.ID) if err != nil { gpu = d.ID gi = 0xFFFFFFFF ci = 0xFFFFFFFF }
if gpu == *e.UUID && gi == *e.GpuInstanceId && ci == *e.ComputeInstanceId { log.Printf("XidCriticalError: Xid=%d on Device=%s, the device will go unhealthy.", e.Edata, d.ID) unhealthy <- d } } } }
type ManagerImpl struct { socketname string socketdir string
endpoints map[string]endpointInfo // Key is ResourceName mutex sync.Mutex
server *grpc.Server wg sync.WaitGroup
// activePods is a method for listing active pods on the node // so the amount of pluginResources requested by existing pods // could be counted when updating allocated devices activePods ActivePodsFunc
// sourcesReady provides the readiness of kubelet configuration sources such as apiserver update readiness. // We use it to determine when we can purge inactive pods from checkpointed state. sourcesReady config.SourcesReady
// callback is used for updating devices' states in one time call. // e.g. a new device is advertised, two old devices are deleted and a running device fails. callback monitorCallback
// allDevices is a map by resource name of all the devices currently registered to the device manager allDevices map[string]map[string]pluginapi.Device
// healthyDevices contains all of the registered healthy resourceNames and their exported device IDs. healthyDevices map[string]sets.String
// unhealthyDevices contains all of the unhealthy devices and their exported device IDs. unhealthyDevices map[string]sets.String
// allocatedDevices contains allocated deviceIds, keyed by resourceName. allocatedDevices map[string]sets.String
// podDevices contains pod to allocated device mapping. podDevices podDevices checkpointManager checkpointmanager.CheckpointManager
// List of NUMA Nodes available on the underlying machine numaNodes []int
// Store of Topology Affinties that the Device Manager can query. topologyAffinityStore topologymanager.Store
// devicesToReuse contains devices that can be reused as they have been allocated to // init containers. devicesToReuse PodReusableDevices }
// The following structures are populated with real implementations in manager.Start() // Before that, initializes them to perform no-op operations. manager.activePods = func() []*v1.Pod { return []*v1.Pod{} } manager.sourcesReady = &sourcesReadyStub{} checkpointManager, err := checkpointmanager.NewCheckpointManager(dir) if err != nil { returnnil, fmt.Errorf("failed to initialize checkpoint manager: %v", err) } manager.checkpointManager = checkpointManager
// Loads in allocatedDevices information from disk. err := m.readCheckpoint() if err != nil { klog.Warningf("Continue after failing to read checkpoint file. Device allocation info may NOT be up-to-date. Err: %v", err) }
socketPath := filepath.Join(m.socketdir, m.socketname) if err = os.MkdirAll(m.socketdir, 0750); err != nil { return err } if selinux.SELinuxEnabled() { if err := selinux.SetFileLabel(m.socketdir, config.KubeletPluginsDirSELinuxLabel); err != nil { klog.Warningf("Unprivileged containerized plugins might not work. Could not set selinux context on %s: %v", m.socketdir, err) } }
// Removes all stale sockets in m.socketdir. Device plugins can monitor // this and use it as a signal to re-register with the new Kubelet. if err := m.removeContents(m.socketdir); err != nil { klog.Errorf("Fail to clean up stale contents under %s: %v", m.socketdir, err) }
for { select { case<-m.stop: return nil case d := <-m.health: // FIXME: there is no way to recover from the Unhealthy state. d.Health = pluginapi.Unhealthy log.Printf("'%s' device marked unhealthy: %s", m.resourceName, d.ID) s.Send(&pluginapi.ListAndWatchResponse{Devices: m.apiDevices()}) } } }
func(g *GpuDeviceManager)Devices() []*Device { n, err := nvml.GetDeviceCount() check(err)
var devs []*Device for i := uint(0); i < n; i++ { d, err := nvml.NewDeviceLite(i) check(err)
migEnabled, err := d.IsMigEnabled() check(err)
if migEnabled && g.skipMigEnabledGPUs { continue }
devs = append(devs, buildDevice(d)) }
return devs }
Allocation
kubelet 接收到被调度到本节点的pods后
HandlePodAdditions
当 Node 上的 Kubelet 监听到有新的 Pod 创建时,会调用 HandlerPodAdditions 来处理 Pod 创建的事件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
func(kl *Kubelet)syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler, syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent)bool { select { case u, open := <-configCh: switch u.Op { case kubetypes.ADD: klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods)) handler.HandlePodAdditions(u.Pods) case kubetypes.UPDATE: klog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletionTimestamps(u.Pods)) handler.HandlePodUpdates(u.Pods) // ... } case e := <-plegCh: // ... } returntrue }
接下来进一步看下 HandlerPodAdditions 的实现,对于传入的每一个 Pod ,如果它没有被 terminate,则通过 canAdmitPod 检查是否可以允许该 Pod 创建。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
func(kl *Kubelet)HandlePodAdditions(pods []*v1.Pod) { start := kl.clock.Now() sort.Sort(sliceutils.PodsByCreationTime(pods)) for _, pod := range pods { existingPods := kl.podManager.GetPods() kl.podManager.AddPod(pod) // ... if !kl.podIsTerminated(pod) { activePods := kl.filterOutTerminatedPods(existingPods) if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok { kl.rejectPod(pod, reason, message) continue } } // ... } }
canAdmitPod 里面,Kubelet 将会依次执行每一个 admit handler 来看 Pod 能否通过。
1 2 3 4 5 6 7 8 9 10 11
// "pod" is new pod, while "pods" are all admitted pods func(kl *Kubelet)canAdmitPod(pods []*v1.Pod, pod *v1.Pod)(bool, string, string) { attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: pods} for _, podAdmitHandler := range kl.admitHandlers { if result := podAdmitHandler.Admit(attrs); !result.Admit { returnfalse, result.Reason, result.Message } }
returntrue, "", "" }
admitHandlers 是一个 PodAdmitHandler 的切片,其接口如下:
1 2 3 4
type PodAdmitHandler interface { // Admit evaluates if a pod can be admitted. Admit(attrs *PodAdmitAttributes) PodAdmitResult }
func(m *ManagerImpl)Allocate(pod *v1.Pod, container *v1.Container)error { if _, ok := m.devicesToReuse[string(pod.UID)]; !ok { m.devicesToReuse[string(pod.UID)] = make(map[string]sets.String) } // If pod entries to m.devicesToReuse other than the current pod exist, delete them. for podUID := range m.devicesToReuse { if podUID != string(pod.UID) { delete(m.devicesToReuse, podUID) } } // Allocate resources for init containers first as we know the caller always loops // through init containers before looping through app containers. Should the caller // ever change those semantics, this logic will need to be amended. for _, initContainer := range pod.Spec.InitContainers { if container.Name == initContainer.Name { if err := m.allocateContainerResources(pod, container, m.devicesToReuse[string(pod.UID)]); err != nil { return err } m.podDevices.addContainerAllocatedResources(string(pod.UID), container.Name, m.devicesToReuse[string(pod.UID)]) returnnil } } if err := m.allocateContainerResources(pod, container, m.devicesToReuse[string(pod.UID)]); err != nil { return err } m.podDevices.removeContainerAllocatedResources(string(pod.UID), container.Name, m.devicesToReuse[string(pod.UID)]) returnnil
func(m *ManagerImpl)allocateContainerResources(pod *v1.Pod, container *v1.Container, devicesToReuse map[string]sets.String)error { podUID := string(pod.UID) contName := container.Name allocatedDevicesUpdated := false // Extended resources are not allowed to be overcommitted. // Since device plugin advertises extended resources, // therefore Requests must be equal to Limits and iterating // over the Limits should be sufficient. for k, v := range container.Resources.Limits { resource := string(k) needed := int(v.Value()) klog.V(3).Infof("needs %d %s", needed, resource) if !m.isDevicePluginResource(resource) { continue } // Updates allocatedDevices to garbage collect any stranded resources // before doing the device plugin allocation. if !allocatedDevicesUpdated { m.UpdateAllocatedDevices() allocatedDevicesUpdated = true } allocDevices, err := m.devicesToAllocate(podUID, contName, resource, needed, devicesToReuse[resource]) if err != nil { return err } if allocDevices == nil || len(allocDevices) <= 0 { continue }
startRPCTime := time.Now() // Manager.Allocate involves RPC calls to device plugin, which // could be heavy-weight. Therefore we want to perform this operation outside // mutex lock. Note if Allocate call fails, we may leave container resources // partially allocated for the failed container. We rely on UpdateAllocatedDevices() // to garbage collect these resources later. Another side effect is that if // we have X resource A and Y resource B in total, and two containers, container1 // and container2 both require X resource A and Y resource B. Both allocation // requests may fail if we serve them in mixed order. // TODO: may revisit this part later if we see inefficient resource allocation // in real use as the result of this. Should also consider to parallelize device // plugin Allocate grpc calls if it becomes common that a container may require // resources from multiple device plugins. m.mutex.Lock() eI, ok := m.endpoints[resource] m.mutex.Unlock() if !ok { m.mutex.Lock() m.allocatedDevices = m.podDevices.devices() m.mutex.Unlock() return fmt.Errorf("unknown Device Plugin %s", resource) }
devs := allocDevices.UnsortedList() // TODO: refactor this part of code to just append a ContainerAllocationRequest // in a passed in AllocateRequest pointer, and issues a single Allocate call per pod. klog.V(3).Infof("Making allocation request for devices %v for device plugin %s", devs, resource) resp, err := eI.e.allocate(devs) metrics.DevicePluginAllocationDuration.WithLabelValues(resource).Observe(metrics.SinceInSeconds(startRPCTime)) if err != nil { // In case of allocation failure, we want to restore m.allocatedDevices // to the actual allocated state from m.podDevices. m.mutex.Lock() m.allocatedDevices = m.podDevices.devices() m.mutex.Unlock() return err }
func(m *ManagerImpl)devicesToAllocate(podUID, contName, resource string, required int, reusableDevices sets.String)(sets.String, error) { m.mutex.Lock() defer m.mutex.Unlock() needed := required // Gets list of devices that have already been allocated. // This can happen if a container restarts for example. devices := m.podDevices.containerDevices(podUID, contName, resource) if devices != nil { klog.V(3).Infof("Found pre-allocated devices for resource %s container %q in Pod %q: %v", resource, contName, podUID, devices.List()) needed = needed - devices.Len() // A pod's resource is not expected to change once admitted by the API server, // so just fail loudly here. We can revisit this part if this no longer holds. if needed != 0 { returnnil, fmt.Errorf("pod %q container %q changed request for resource %q from %d to %d", podUID, contName, resource, devices.Len(), required) } } if needed == 0 { // No change, no work. returnnil, nil } klog.V(3).Infof("Needs to allocate %d %q for pod %q container %q", needed, resource, podUID, contName) // Needs to allocate additional devices. if _, ok := m.healthyDevices[resource]; !ok { returnnil, fmt.Errorf("can't allocate unregistered device %s", resource) } devices = sets.NewString() // Allocates from reusableDevices list first. for device := range reusableDevices { devices.Insert(device) needed-- if needed == 0 { return devices, nil } } // Needs to allocate additional devices. if m.allocatedDevices[resource] == nil { m.allocatedDevices[resource] = sets.NewString() } // Gets Devices in use. devicesInUse := m.allocatedDevices[resource] // Gets a list of available devices. available := m.healthyDevices[resource].Difference(devicesInUse) if available.Len() < needed { returnnil, fmt.Errorf("requested number of devices unavailable for %s. Requested: %d, Available: %d", resource, needed, available.Len()) } // By default, pull devices from the unsorted list of available devices. allocated := available.UnsortedList()[:needed] // If topology alignment is desired, update allocated to the set of devices // with the best alignment. hint := m.topologyAffinityStore.GetAffinity(podUID, contName) if m.deviceHasTopologyAlignment(resource) && hint.NUMANodeAffinity != nil { allocated = m.takeByTopology(resource, available, hint.NUMANodeAffinity, needed) } // Updates m.allocatedDevices with allocated devices to prevent them // from being allocated to other pods/containers, given that we are // not holding lock during the rpc call. for _, device := range allocated { m.allocatedDevices[resource].Insert(device) devices.Insert(device) } return devices, nil }
kubelet_node_status.go调用deviceManager的GetCapacity()获取device的状态,将device状态添加到node info并通过kube-apiserver存入etcd,GetCapacity()返回device server含有的所有device、已经分配给pod使用的device、pod不能使用的device即no-active的device kubelet_node_status.go根据返回的数据更新node info