调度器(Scheduler)是 Kubernetes 的核心组件,它的主要功能是为待运行的工作负载 Pod 绑定运行的节点 Node。从最早基于谓词和优先级(Predicates and Priorities)的调度器,到 V1.15基于调度框架(Scheduling Framework)的调度器,Kubernetes的调度器正在快速演进,以满足不同场景对于资源调度的需求。
// CreateFromConfig creates a scheduler from the configuration file func(c *Configurator)CreateFromConfig(policy schedulerapi.Policy)(*Config, error) { klog.V(2).Infof("Creating scheduler from configuration: %v", policy)
// validate the policy configuration if err := validation.ValidatePolicy(policy); err != nil { returnnil, err }
predicateKeys := sets.NewString() if policy.Predicates == nil { klog.V(2).Infof("Using predicates from algorithm provider '%v'", DefaultProvider) provider, err := GetAlgorithmProvider(DefaultProvider) if err != nil { returnnil, err } predicateKeys = provider.FitPredicateKeys } else { for _, predicate := range policy.Predicates { klog.V(2).Infof("Registering predicate: %s", predicate.Name) predicateKeys.Insert(RegisterCustomFitPredicate(predicate)) } }
priorityKeys := sets.NewString() if policy.Priorities == nil { klog.V(2).Infof("Using priorities from algorithm provider '%v'", DefaultProvider) provider, err := GetAlgorithmProvider(DefaultProvider) if err != nil { returnnil, err } priorityKeys = provider.PriorityFunctionKeys } else { for _, priority := range policy.Priorities { klog.V(2).Infof("Registering priority: %s", priority.Name) priorityKeys.Insert(RegisterCustomPriorityFunction(priority)) } }
var extenders []algorithm.SchedulerExtender iflen(policy.ExtenderConfigs) != 0 { ignoredExtendedResources := sets.NewString() var ignorableExtenders []algorithm.SchedulerExtender for ii := range policy.ExtenderConfigs { klog.V(2).Infof("Creating extender with config %+v", policy.ExtenderConfigs[ii]) extender, err := core.NewHTTPExtender(&policy.ExtenderConfigs[ii]) if err != nil { returnnil, err } if !extender.IsIgnorable() { extenders = append(extenders, extender) } else { ignorableExtenders = append(ignorableExtenders, extender) } for _, r := range policy.ExtenderConfigs[ii].ManagedResources { if r.IgnoredByScheduler { ignoredExtendedResources.Insert(string(r.Name)) } } } // place ignorable extenders to the tail of extenders extenders = append(extenders, ignorableExtenders...) predicates.RegisterPredicateMetadataProducerWithExtendedResourceOptions(ignoredExtendedResources) } // Providing HardPodAffinitySymmetricWeight in the policy config is the new and preferred way of providing the value. // Give it higher precedence than scheduler CLI configuration when it is provided. if policy.HardPodAffinitySymmetricWeight != 0 { c.hardPodAffinitySymmetricWeight = policy.HardPodAffinitySymmetricWeight } // When AlwaysCheckAllPredicates is set to true, scheduler checks all the configured // predicates even after one or more of them fails. if policy.AlwaysCheckAllPredicates { c.alwaysCheckAllPredicates = policy.AlwaysCheckAllPredicates }
// CreateFromKeys creates a scheduler from a set of registered fit predicate keys and priority keys. func(c *Configurator)CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender)(*Config, error) { klog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v'", predicateKeys, priorityKeys)
if c.GetHardPodAffinitySymmetricWeight() < 1 || c.GetHardPodAffinitySymmetricWeight() > 100 { returnnil, fmt.Errorf("invalid hardPodAffinitySymmetricWeight: %d, must be in the range 1-100", c.GetHardPodAffinitySymmetricWeight()) }
// AddAllEventHandlers is a helper function used in tests and in Scheduler // to add event handlers for various informers. funcAddAllEventHandlers( sched *Scheduler, schedulerName string, nodeInformer coreinformers.NodeInformer, podInformer coreinformers.PodInformer, pvInformer coreinformers.PersistentVolumeInformer, pvcInformer coreinformers.PersistentVolumeClaimInformer, serviceInformer coreinformers.ServiceInformer, storageClassInformer storageinformersv1.StorageClassInformer, csiNodeInformer storageinformersv1beta1.CSINodeInformer, ) {}
// There is a chance of errors when adding pods to other queues, // we make a temporary slice to store the pods, // since the probability is low, we set its len to 0 addErrorPods := make([]*framework.PodInfo, 0)
for _, pInfo := range p.unschedulableQ.podInfoMap { pod := pInfo.Pod if p.isPodBackingOff(pod) { if err := p.podBackoffQ.Add(pInfo); err != nil { klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err) addErrorPods = append(addErrorPods, pInfo) } } else { if err := p.activeQ.Add(pInfo); err != nil { klog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err) addErrorPods = append(addErrorPods, pInfo) } } } p.unschedulableQ.clear() // Adding pods that we could not move to Active queue or Backoff queue back to the Unschedulable queue for _, podInfo := range addErrorPods { p.unschedulableQ.addOrUpdate(podInfo) } p.moveRequestCycle = p.schedulingCycle p.cond.Broadcast() }
启动调度器
那么整个Scheduler是如何跑起来的呢?它的入口是Run函数,一直运行scheduleOne函数,进入一个 control loop,直到收到了StopEverything的信号。
pkg/scheduler/scheduler.go
1 2 3 4 5 6 7
func(sched *Scheduler)Run() { if !sched.WaitForCacheSync() { return }
go wait.Until(sched.scheduleOne, 0, sched.StopEverything) }
// There is a chance of errors when adding pods to other queues, // we make a temporary slice to store the pods, // since the probability is low, we set its len to 0 addErrorPods := make([]*framework.PodInfo, 0)
for _, pInfo := range p.unschedulableQ.podInfoMap { pod := pInfo.Pod if p.isPodBackingOff(pod) { if err := p.podBackoffQ.Add(pInfo); err != nil { klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err) addErrorPods = append(addErrorPods, pInfo) } } else { if err := p.activeQ.Add(pInfo); err != nil { klog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err) addErrorPods = append(addErrorPods, pInfo) } } } p.unschedulableQ.clear() // Adding pods that we could not move to Active queue or Backoff queue back to the Unschedulable queue for _, podInfo := range addErrorPods { p.unschedulableQ.addOrUpdate(podInfo) } p.moveRequestCycle = p.schedulingCycle p.cond.Broadcast() }
func(p *PriorityQueue)Add(pod *v1.Pod)error { p.lock.Lock() defer p.lock.Unlock() pInfo := p.newPodInfo(pod) // 加入activeQ if err := p.activeQ.Add(pInfo); err != nil { klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err) return err } // 从unschedulableQ删除 if p.unschedulableQ.get(pod) != nil { klog.Errorf("Error: pod %v/%v is already in the unschedulable queue.", pod.Namespace, pod.Name) p.unschedulableQ.delete(pod) } // Delete pod from backoffQ if it is backing off // 从podBackoffQ删除 if err := p.podBackoffQ.Delete(pInfo); err == nil { klog.Errorf("Error: pod %v/%v is already in the podBackoff queue.", pod.Namespace, pod.Name) } // 存储pod和被提名的node p.nominatedPods.add(pod, "") p.cond.Broadcast()
func(p *PriorityQueue)AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64)error { ... // Every unschedulable pod is subject to backoff timers. p.backoffPod(pod)
// If a move request has been received, move it to the BackoffQ, otherwise move // it to unschedulableQ. if p.moveRequestCycle >= podSchedulingCycle { if err := p.podBackoffQ.Add(pInfo); err != nil { return fmt.Errorf("error adding pod %v to the backoff queue: %v", pod.Name, err) } } else { p.unschedulableQ.addOrUpdate(pInfo) }
for { // 获取堆顶元素 rawPodInfo := p.podBackoffQ.Peek() if rawPodInfo == nil { return } pod := rawPodInfo.(*framework.PodInfo).Pod // 获取到期时间 boTime, found := p.podBackoff.GetBackoffTime(nsNameForPod(pod)) if !found { // 如果当前已经不在podBackoff中,则就pop出来然后放入到activeQ klog.Errorf("Unable to find backoff value for pod %v in backoffQ", nsNameForPod(pod)) p.podBackoffQ.Pop() p.activeQ.Add(rawPodInfo) defer p.cond.Broadcast() continue }
// 未超时 if boTime.After(p.clock.Now()) { return } // 超时就pop出来 _, err := p.podBackoffQ.Pop() if err != nil { klog.Errorf("Unable to pop pod %v from backoffQ despite backoff completion.", nsNameForPod(pod)) return } // 加入到activeQ中 p.activeQ.Add(rawPodInfo) defer p.cond.Broadcast() } }
type nominatedPodMap struct { // nominatedPods is a map keyed by a node name and the value is a list of // pods which are nominated to run on the node. These are pods which can be in // the activeQ or unschedulableQ. nominatedPods map[string][]*v1.Pod // nominatedPodToNode is map keyed by a Pod UID to the node name where it is // nominated. nominatedPodToNode map[ktypes.UID]string }
NextPod()
获取下一个Pod的方法,本质上是一个出队操作。
1 2 3 4 5 6 7 8 9 10 11 12 13
// MakeNextPodFunc returns a function to retrieve the next pod from a given // scheduling queue funcMakeNextPodFunc(queue SchedulingQueue)func() *v1.Pod { returnfunc() *v1.Pod { pod, err := queue.Pop() if err == nil { klog.V(4).Infof("About to try and schedule pod %v/%v", pod.Namespace, pod.Name) return pod } klog.Errorf("Error while retrieving next pod from scheduling queue: %v", err) returnnil } }
n, ok := cache.nodes[node.Name] if !ok { n = newNodeInfoListItem(schedulernodeinfo.NewNodeInfo()) cache.nodes[node.Name] = n } else { cache.removeNodeImageStates(n.info.Node()) } cache.moveNodeInfoToHead(node.Name)
type NodeTree struct { tree map[string]*nodeArray // a map from zone (region-zone) to an array of nodes in the zone. zones []string// a list of all the zones in the tree (keys) zoneIndex int numNodes int mu sync.RWMutex }
type nodeArray struct { nodes []string lastIndex int }
func(nt *NodeTree)Next()string { nt.mu.Lock() defer nt.mu.Unlock() iflen(nt.zones) == 0 { return"" } numExhaustedZones := 0 for { if nt.zoneIndex >= len(nt.zones) { nt.zoneIndex = 0 } zone := nt.zones[nt.zoneIndex] nt.zoneIndex++ // We do not check the exhausted zones before calling next() on the zone. This ensures // that if more nodes are added to a zone after it is exhausted, we iterate over the new nodes. nodeName, exhausted := nt.tree[zone].next() if exhausted { numExhaustedZones++ if numExhaustedZones >= len(nt.zones) { // all zones are exhausted. we should reset. nt.resetExhausted() } } else { return nodeName } } }
每次先从当前 zoneIndex 获取新的zone,然后更新 zoneIndex。在对应zone的 NodeArray中,调用其 next 方法,获得对应的Node,同时更新 nodeIndex。
// cleanupAssumedPods exists for making test deterministic by taking time as input argument. func(cache *schedulerCache)cleanupAssumedPods(now time.Time) { cache.mu.Lock() defer cache.mu.Unlock()
// The size of assumedPods should be small for key := range cache.assumedPods { ps, ok := cache.podStates[key] if !ok { panic("Key found in assumed set but not in podStates. Potentially a logical error.") } // 未完成绑定的pod不会被进行清理 if !ps.bindingFinished { klog.V(3).Infof("Couldn't expire cache for pod %v/%v. Binding is still in progress.", ps.pod.Namespace, ps.pod.Name) continue } // 在完成bind之后会设定一个过期时间,目前是30s,如果deadline即bind时间+30s小于当前时间就过期删除 if now.After(*ps.deadline) { klog.Warningf("Pod %s/%s expired", ps.pod.Namespace, ps.pod.Name) if err := cache.expirePod(key, ps); err != nil { klog.Errorf("ExpirePod failed for %s: %v", key, err) } } } }
funcinit() { // Register functions that extract metadata used by predicates computations. factory.RegisterPredicateMetadataProducerFactory( func(args factory.PluginFactoryArgs)predicates.PredicateMetadataProducer { return predicates.NewPredicateMetadataFactory(args.PodLister) })
// IMPORTANT NOTES for predicate developers: // Registers predicates and priorities that are not enabled by default, but user can pick when creating their // own set of priorities/predicates.
// PodFitsPorts has been replaced by PodFitsHostPorts for better user understanding. // For backwards compatibility with 1.0, PodFitsPorts is registered as well. factory.RegisterFitPredicate("PodFitsPorts", predicates.PodFitsHostPorts) // Fit is defined based on the absence of port conflicts. // This predicate is actually a default predicate, because it is invoked from // predicates.GeneralPredicates() factory.RegisterFitPredicate(predicates.PodFitsHostPortsPred, predicates.PodFitsHostPorts) // Fit is determined by resource availability. // This predicate is actually a default predicate, because it is invoked from // predicates.GeneralPredicates() factory.RegisterFitPredicate(predicates.PodFitsResourcesPred, predicates.PodFitsResources) // Fit is determined by the presence of the Host parameter and a string match // This predicate is actually a default predicate, because it is invoked from // predicates.GeneralPredicates() factory.RegisterFitPredicate(predicates.HostNamePred, predicates.PodFitsHost) // Fit is determined by node selector query. factory.RegisterFitPredicate(predicates.MatchNodeSelectorPred, predicates.PodMatchNodeSelector)
// Fit is determined by volume zone requirements. factory.RegisterFitPredicateFactory( predicates.NoVolumeZoneConflictPred, func(args factory.PluginFactoryArgs)predicates.FitPredicate { return predicates.NewVolumeZonePredicate(args.PVInfo, args.PVCInfo, args.StorageClassInfo) }, ) // Fit is determined by whether or not there would be too many AWS EBS volumes attached to the node factory.RegisterFitPredicateFactory( predicates.MaxEBSVolumeCountPred, func(args factory.PluginFactoryArgs)predicates.FitPredicate { return predicates.NewMaxPDVolumeCountPredicate(predicates.EBSVolumeFilterType, args.CSINodeInfo, args.StorageClassInfo, args.PVInfo, args.PVCInfo) }, ) // Fit is determined by whether or not there would be too many GCE PD volumes attached to the node factory.RegisterFitPredicateFactory( predicates.MaxGCEPDVolumeCountPred, func(args factory.PluginFactoryArgs)predicates.FitPredicate { return predicates.NewMaxPDVolumeCountPredicate(predicates.GCEPDVolumeFilterType, args.CSINodeInfo, args.StorageClassInfo, args.PVInfo, args.PVCInfo) }, ) // Fit is determined by whether or not there would be too many Azure Disk volumes attached to the node factory.RegisterFitPredicateFactory( predicates.MaxAzureDiskVolumeCountPred, func(args factory.PluginFactoryArgs)predicates.FitPredicate { return predicates.NewMaxPDVolumeCountPredicate(predicates.AzureDiskVolumeFilterType, args.CSINodeInfo, args.StorageClassInfo, args.PVInfo, args.PVCInfo) }, ) factory.RegisterFitPredicateFactory( predicates.MaxCSIVolumeCountPred, func(args factory.PluginFactoryArgs)predicates.FitPredicate { return predicates.NewCSIMaxVolumeLimitPredicate(args.CSINodeInfo, args.PVInfo, args.PVCInfo, args.StorageClassInfo) }, ) factory.RegisterFitPredicateFactory( predicates.MaxCinderVolumeCountPred, func(args factory.PluginFactoryArgs)predicates.FitPredicate { return predicates.NewMaxPDVolumeCountPredicate(predicates.CinderVolumeFilterType, args.CSINodeInfo, args.StorageClassInfo, args.PVInfo, args.PVCInfo) }, )
// Fit is determined by inter-pod affinity. factory.RegisterFitPredicateFactory( predicates.MatchInterPodAffinityPred, func(args factory.PluginFactoryArgs)predicates.FitPredicate { return predicates.NewPodAffinityPredicate(args.NodeInfo, args.PodLister) }, )
// Fit is determined by non-conflicting disk volumes. factory.RegisterFitPredicate(predicates.NoDiskConflictPred, predicates.NoDiskConflict)
// GeneralPredicates are the predicates that are enforced by all Kubernetes components // (e.g. kubelet and all schedulers) factory.RegisterFitPredicate(predicates.GeneralPred, predicates.GeneralPredicates)
// Fit is determined by node memory pressure condition. factory.RegisterFitPredicate(predicates.CheckNodeMemoryPressurePred, predicates.CheckNodeMemoryPressurePredicate)
// Fit is determined by node disk pressure condition. factory.RegisterFitPredicate(predicates.CheckNodeDiskPressurePred, predicates.CheckNodeDiskPressurePredicate)
// Fit is determined by node pid pressure condition. factory.RegisterFitPredicate(predicates.CheckNodePIDPressurePred, predicates.CheckNodePIDPressurePredicate)
// Fit is determined by node conditions: not ready, network unavailable or out of disk. factory.RegisterMandatoryFitPredicate(predicates.CheckNodeConditionPred, predicates.CheckNodeConditionPredicate)
// Fit is determined based on whether a pod can tolerate all of the node's taints factory.RegisterFitPredicate(predicates.PodToleratesNodeTaintsPred, predicates.PodToleratesNodeTaints)
// Fit is determined by volume topology requirements. factory.RegisterFitPredicateFactory( predicates.CheckVolumeBindingPred, func(args factory.PluginFactoryArgs)predicates.FitPredicate { return predicates.NewVolumeBindingPredicate(args.VolumeBinder) }, ) }
// HostPriority represents the priority of scheduling to a particular host, higher priority is better. type HostPriority struct { // Name of the host Host string // Score associated with the host Score int }
// HostPriorityList declares a []HostPriority type. type HostPriorityList []HostPriority
type PriorityConfig struct { Name string Map PriorityMapFunction Reduce PriorityReduceFunction // TODO: Remove it after migrating all functions to // Map-Reduce pattern. Function PriorityFunction Weight int }
这两种函数定义如下:
1 2 3 4 5
// Map:输入是(pod, meta, nodeInfo),输出是该Pod根据该算法在该节点算出的得分 type PriorityMapFunction func(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo)(schedulerapi.HostPriority, error)
// Reduce:输入是(pod, meta, map[string]*NodeInfo, result) type PriorityReduceFunction func(pod *v1.Pod, meta interface{}, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, result schedulerapi.HostPriorityList)error
funcinit() { // Register functions that extract metadata used by priorities computations. factory.RegisterPriorityMetadataProducerFactory( func(args factory.PluginFactoryArgs)priorities.PriorityMetadataProducer { return priorities.NewPriorityMetadataFactory(args.ServiceLister, args.ControllerLister, args.ReplicaSetLister, args.StatefulSetLister) })
...
// Prioritize nodes by least requested utilization. factory.RegisterPriorityMapReduceFunction(priorities.LeastRequestedPriority, priorities.LeastRequestedPriorityMap, nil, 1)
// Prioritizes nodes to help achieve balanced resource usage factory.RegisterPriorityMapReduceFunction(priorities.BalancedResourceAllocation, priorities.BalancedResourceAllocationMap, nil, 1)
// Set this weight large enough to override all other priority functions. // TODO: Figure out a better way to do this, maybe at same time as fixing #24720. factory.RegisterPriorityMapReduceFunction(priorities.NodePreferAvoidPodsPriority, priorities.CalculateNodePreferAvoidPodsPriorityMap, nil, 10000)
// Prioritizes nodes that have labels matching NodeAffinity factory.RegisterPriorityMapReduceFunction(priorities.NodeAffinityPriority, priorities.CalculateNodeAffinityPriorityMap, priorities.CalculateNodeAffinityPriorityReduce, 1)
// Prioritizes nodes that marked with taint which pod can tolerate. factory.RegisterPriorityMapReduceFunction(priorities.TaintTolerationPriority, priorities.ComputeTaintTolerationPriorityMap, priorities.ComputeTaintTolerationPriorityReduce, 1)
// ImageLocalityPriority prioritizes nodes that have images requested by the pod present. factory.RegisterPriorityMapReduceFunction(priorities.ImageLocalityPriority, priorities.ImageLocalityPriorityMap, nil, 1) }
下面对每一个策略进行简单分析。
基于节点索引的Map计算
Map算法将Node方向的计算并行化,对于每一个Node,循环计算该Node在各个算法上的得分。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) { nodeInfo := nodeNameToInfo[nodes[index].Name] for i := range priorityConfigs { if priorityConfigs[i].Function != nil { continue }
funcpodEligibleToPreemptOthers(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, enableNonPreempting bool)bool { if enableNonPreempting && pod.Spec.PreemptionPolicy != nil && *pod.Spec.PreemptionPolicy == v1.PreemptNever { klog.V(5).Infof("Pod %v/%v is not eligible for preemption because it has a preemptionPolicy of %v", pod.Namespace, pod.Name, v1.PreemptNever) returnfalse } nomNodeName := pod.Status.NominatedNodeName iflen(nomNodeName) > 0 { if nodeInfo, found := nodeNameToInfo[nomNodeName]; found { podPriority := util.GetPodPriority(pod) for _, p := range nodeInfo.Pods() { if p.DeletionTimestamp != nil && util.GetPodPriority(p) < podPriority { // 正在终止的优先级低于当前pod的pod就不会进行抢占 returnfalse } } } } returntrue }
funcfilterPodsWithPDBViolation(pods []interface{}, pdbs []*policy.PodDisruptionBudget)(violatingPods, nonViolatingPods []*v1.Pod) { for _, obj := range pods { pod := obj.(*v1.Pod) pdbForPodIsViolated := false // A pod with no labels will not match any PDB. So, no need to check. iflen(pod.Labels) != 0 { for _, pdb := range pdbs { if pdb.Namespace != pod.Namespace { continue } selector, err := metav1.LabelSelectorAsSelector(pdb.Spec.Selector) if err != nil { continue } // A PDB with a nil or empty selector matches nothing. if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) { continue } // We have found a matching PDB. if pdb.Status.PodDisruptionsAllowed <= 0 { pdbForPodIsViolated = true break } } } if pdbForPodIsViolated { violatingPods = append(violatingPods, pod) } else { nonViolatingPods = append(nonViolatingPods, pod) } } return violatingPods, nonViolatingPods }
latestStartTime := util.GetEarliestPodStartTime(nodesToVictims[minNodes2[0]]) if latestStartTime == nil { // If the earliest start time of all pods on the 1st node is nil, just return it, // which is not expected to happen. // 如果第一个节点上所有pod的最早开始时间为零,那么返回它 klog.Errorf("earliestStartTime is nil for node %s. Should not reach here.", minNodes2[0]) return minNodes2[0] } nodeToReturn := minNodes2[0] for i := 1; i < lenNodes2; i++ { node := minNodes2[i] // Get earliest start time of all pods on the current node. // 获取当前node最早启动时间 earliestStartTimeOnNode := util.GetEarliestPodStartTime(nodesToVictims[node]) if earliestStartTimeOnNode == nil { klog.Errorf("earliestStartTime is nil for node %s. Should not reach here.", node) continue } if earliestStartTimeOnNode.After(latestStartTime.Time) { latestStartTime = earliestStartTimeOnNode nodeToReturn = node } }