Wait (with a timeout):执行wait操作后,一个Pod保持在Permit阶段知道一个Plugin对它approve;如果timeout了,wait将会变为den y,之后Pod被返回调度队列,触发Un-reserve插件
WaitPod
在framework interface中定义了Permit阶段中处于等待的WaitingPod。
pkg/scheduler/framework/v1alpha1/interface.go
1 2 3 4 5 6 7 8 9 10 11 12 13
// WaitingPod represents a pod currently waiting in the permit phase. type WaitingPod interface { // GetPod returns a reference to the waiting pod. GetPod() *v1.Pod // GetPendingPlugins returns a list of pending permit plugin's name. GetPendingPlugins() []string // Allow declares the waiting pod is allowed to be scheduled by plugin pluginName. // If this is the last remaining plugin to allow, then a success signal is delivered // to unblock the pod. Allow(pluginName string) // Reject declares the waiting pod unschedulable. Reject(msg string) }
wp.pendingPlugins = make(map[string]*time.Timer, len(pluginsMaxWaitTime)) // The time.AfterFunc calls wp.Reject which iterates through pendingPlugins map. Acquire the // lock here so that time.AfterFunc can only execute after newWaitingPod finishes. wp.mu.Lock() defer wp.mu.Unlock() // 根据插件的等待时间来构建timer,如果有任一timer到期,还未曾有任何plugin Allow则会进行Reject for k, v := range pluginsMaxWaitTime { plugin, waitTime := k, v wp.pendingPlugins[plugin] = time.AfterFunc(waitTime, func() { msg := fmt.Sprintf("rejected due to timeout after waiting %v at plugin %v", waitTime, plugin) wp.Reject(msg) }) }
// Only signal success status after all plugins have allowed iflen(w.pendingPlugins) != 0 { returntrue } // 只有当所有的plugin都允许,才会发生成功允许事件 select { case w.s <- NewStatus(Success, ""): // 发送事件 returntrue default: returnfalse } }
// WaitOnPermit will block, if the pod is a waiting pod, until the waiting pod is rejected or allowed. func(f *framework)WaitOnPermit(ctx context.Context, pod *v1.Pod)(status *Status) { waitingPod := f.waitingPods.get(pod.UID) if waitingPod == nil { returnnil } defer f.waitingPods.remove(pod.UID) klog.V(4).Infof("pod %q waiting on permit", pod.Name)
startTime := time.Now() s := <-waitingPod.s metrics.PermitWaitDuration.WithLabelValues(s.Code().String()).Observe(metrics.SinceInSeconds(startTime))
if !s.IsSuccess() { if s.IsUnschedulable() { msg := fmt.Sprintf("pod %q rejected while waiting on permit: %v", pod.Name, s.Message()) klog.V(4).Infof(msg) return NewStatus(s.Code(), msg) } msg := fmt.Sprintf("error received while waiting on permit for pod %q: %v", pod.Name, s.Message()) klog.Error(msg) return NewStatus(Error, msg) } returnnil }
The CycleState also provides an API similar to context.WithValue that can be used to pass data between plugins at different extension points. Multiple plugins can share the state or communicate via this mechanism. The state is preserved only during a single scheduling context. It is worth noting that plugins are assumed to be trusted. The scheduler does not prevent one plugin from accessing or modifying another plugin’s state.
WARNING: The data available through a CycleState is not valid after a scheduling context ends, and plugins should not hold references to that data longer than necessary.
type CycleState struct { mx sync.RWMutex storage map[StateKey]StateData // if recordPluginMetrics is true, PluginExecutionDuration will be recorded for this cycle. recordPluginMetrics bool }
// StateData is a generic type for arbitrary data stored in CycleState. type StateData interface { // Clone is an interface to make a copy of StateData. For performance reasons, // clone should make shallow copies for members (e.g., slices or maps) that are not // impacted by PreFilter's optional AddPod/RemovePod methods. Clone() StateData }
func(c *CycleState)Read(key StateKey)(StateData, error) { if v, ok := c.storage[key]; ok { return v, nil } returnnil, errors.New(NotFound) }
// Write stores the given "val" in CycleState with the given "key". // This function is not thread safe. In multi-threaded code, lock should be // acquired first. func(c *CycleState)Write(key StateKey, val StateData) { c.storage[key] = val }
// Delete deletes data with the given key from CycleState. // This function is not thread safe. In multi-threaded code, lock should be // acquired first. func(c *CycleState)Delete(key StateKey) { delete(c.storage, key) }
While the CycleState provides APIs relevant to a single scheduling context, the FrameworkHandle provides APIs relevant to the lifetime of a plugin. This is how plugins can get a client (kubernetes.Interface) and SharedInformerFactory, or read data from the scheduler’s cache of cluster state. The handle will also provide APIs to list and approve or reject waiting pods.
// NodeAffinity is a plugin that checks if a pod node selector matches the node label. type NodeAffinity struct { handle framework.FrameworkHandle }
// New initializes a new plugin and returns it. funcNew(_ *runtime.Unknown, h framework.FrameworkHandle)(framework.Plugin, error) { return &NodeAffinity{handle: h}, nil }
type extensionPoint struct { // the set of plugins to be configured at this extension point. plugins *config.PluginSet // a pointer to the slice storing plugins implementations that will run at this // extension point. slicePtr interface{} }
// 首先通过Elem获取当前数组的类型 plugins := reflect.ValueOf(pluginList).Elem() // 通过数组类型来获取数组内部元素的类型 pluginType := plugins.Type().Elem() set := sets.NewString() for _, ep := range pluginSet.Enabled { pg, ok := pluginsMap[ep.Name] if !ok { return fmt.Errorf("%s %q does not exist", pluginType.Name(), ep.Name) }
// 合法性检查:如果发现当前插件未实现当前接口,则报错 if !reflect.TypeOf(pg).Implements(pluginType) { return fmt.Errorf("plugin %q does not extend %s plugin", ep.Name, pluginType.Name()) }
if set.Has(ep.Name) { return fmt.Errorf("plugin %q already registered as %q", ep.Name, pluginType.Name()) }
There are two steps to plugin initialization. First, plugins are registered. Second, the scheduler uses its configuration to decide which plugins to instantiate. If a plugin registers for multiple extension points, it is instantiated only once.
There are two types of concurrency that plugin writers should consider. A plugin might be invoked several times concurrently when evaluating multiple nodes, and a plugin may be called concurrently from different scheduling contexts.
Note: Within one scheduling context, each extension point is evaluated serially.
In the main thread of the scheduler, only one scheduling cycle is processed at a time. Any extension point up to and including reserve will be finished before the next scheduling cycle begins*. After the reserve phase, the binding cycle is executed asynchronously. This means that a plugin could be called concurrently from two different scheduling contexts, provided that at least one of the calls is to an extension point after reserve. Stateful plugins should take care to handle these situations.
Finally, un-reserve plugins may be called from either the Permit thread or the Bind thread, depending on how the pod was rejected.
* The queue sort extension point is a special case. It is not part of a scheduling context and may be called concurrently for many pod pairs.
Configuring Plugins
Interaction with Cluster Autoscaler
调度器核心组件流程
调度器初始化
调度器参数初始化
1 2 3 4 5 6 7 8 9 10 11 12 13 14
var defaultSchedulerOptions = schedulerOptions{ profiles: []schedulerapi.KubeSchedulerProfile{ // Profiles' default plugins are set from the algorithm provider. {SchedulerName: v1.DefaultSchedulerName}, }, schedulerAlgorithmSource: schedulerapi.SchedulerAlgorithmSource{ Provider: defaultAlgorithmSourceProviderName(), }, disablePreemption: false, percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore, bindTimeoutSeconds: BindTimeoutSeconds, podInitialBackoffSeconds: int64(internalqueue.DefaultPodInitialBackoffDuration.Seconds()), podMaxBackoffSeconds: int64(internalqueue.DefaultPodMaxBackoffDuration.Seconds()), }
插件工厂注册表的初始化
插件工厂注册表的初始化分为两个部分in tree和out of tree即当前版本自带的和用户自定义的两部分
func(sched *Scheduler)profileForPod(pod *v1.Pod)(*profile.Profile, error) { prof, ok := sched.Profiles[pod.Spec.SchedulerName] if !ok { returnnil, fmt.Errorf("profile not found for scheduler name %q", pod.Spec.SchedulerName) } return prof, nil }
// skipPodSchedule returns true if we could skip scheduling the pod for specified cases. func(sched *Scheduler)skipPodSchedule(prof *profile.Profile, pod *v1.Pod)bool { // Case 1: pod is being deleted. if pod.DeletionTimestamp != nil { prof.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name) klog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name) returntrue }
// Case 2: pod has been assumed and pod updates could be skipped. // An assumed pod can be added again to the scheduling queue if it got an update event // during its previous scheduling cycle but before getting assumed. if sched.skipPodUpdate(pod) { returntrue }
// skipPodUpdate checks whether the specified pod update should be ignored. // This function will return true if // - The pod has already been assumed, AND // - The pod has only its ResourceVersion, Spec.NodeName, Annotations, // ManagedFields, Finalizers and/or Conditions updated. func(sched *Scheduler)skipPodUpdate(pod *v1.Pod)bool { // Non-assumed pods should never be skipped. isAssumed, err := sched.SchedulerCache.IsAssumedPod(pod) if err != nil { utilruntime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", pod.Namespace, pod.Name, err)) returnfalse } if !isAssumed { returnfalse }
// Gets the assumed pod from the cache. assumedPod, err := sched.SchedulerCache.GetPod(pod) if err != nil { utilruntime.HandleError(fmt.Errorf("failed to get assumed pod %s/%s from cache: %v", pod.Namespace, pod.Name, err)) returnfalse }
// Compares the assumed pod in the cache with the pod update. If they are // equal (with certain fields excluded), this pod update will be skipped. f := func(pod *v1.Pod) *v1.Pod { p := pod.DeepCopy() // ResourceVersion must be excluded because each object update will // have a new resource version. p.ResourceVersion = "" // Spec.NodeName must be excluded because the pod assumed in the cache // is expected to have a node assigned while the pod update may nor may // not have this field set. p.Spec.NodeName = "" // Annotations must be excluded for the reasons described in // https://github.com/kubernetes/kubernetes/issues/52914. p.Annotations = nil // Same as above, when annotations are modified with ServerSideApply, // ManagedFields may also change and must be excluded p.ManagedFields = nil // The following might be changed by external controllers, but they don't // affect scheduling decisions. p.Finalizers = nil p.Status.Conditions = nil return p } assumedPodCopy, podCopy := f(assumedPod), f(pod) if !reflect.DeepEqual(assumedPodCopy, podCopy) { returnfalse } klog.V(3).Infof("Skipping pod %s/%s update", pod.Namespace, pod.Name) returntrue }
// 存储到SchedulerCache中这样下个调度周期中,pod会占用对应node的资源 if err := sched.SchedulerCache.AssumePod(assumed); err != nil { klog.Errorf("scheduler cache AssumePod failed: %v", err) return err } // if "assumed" is a nominated pod, we should remove it from internal cache // 从调度队列中移除pod if sched.SchedulingQueue != nil { sched.SchedulingQueue.DeleteNominatedPodIfExists(assumed) }
returnnil }
绑定调度流程
首先会遍历所有的插件,然后如果发现状态设置为Wait,则会根据插件的等待时间进行wait操作。
1 2
// Run "permit" plugins. runPermitStatus := prof.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
// bind the pod to its host asynchronously (we can do this b/c of the assumption step above). gofunc() { bindingCycleCtx, cancel := context.WithCancel(ctx) defer cancel()
// Run "postbind" plugins. prof.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) }()
抢占流程
如果是预选失败的,并且当前调度器允许抢占功能,则会进行抢占调度处理即sched.preempt
1 2 3 4 5 6 7 8 9 10 11 12 13
if fitError, ok := err.(*core.FitError); ok { // 如果是预选失败则进行 if sched.DisablePreemption { klog.V(3).Infof("Pod priority feature is not enabled or preemption is disabled by scheduler configuration." + " No preemption is performed.") } else { preemptionStartTime := time.Now() // 抢占调度 sched.preempt(schedulingCycleCtx, state, fwk, pod, fitError) metrics.PreemptionAttempts.Inc() metrics.SchedulingAlgorithmPreemptionEvaluationDuration.Observe(metrics.SinceInSeconds(preemptionStartTime)) metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime)) }
获取抢占者
首先通过apiserver获取当前需要执行抢占的pod的最新Pod信息
1 2 3 4 5
preemptor, err := sched.podPreemptor.getUpdatedPod(preemptor) if err != nil { klog.Errorf("Error getting the updated preemptor pod object: %v", err) return"", err }
通过抢占算法筛选
通过Preempt筛选要进行抢占操作的node节点、待驱逐的pod、待驱逐的提议的pod
1 2 3 4 5
node, victims, nominatedPodsToClear, err := sched.Algorithm.Preempt(ctx, state, preemptor, scheduleErr) if err != nil { klog.Errorf("Error preempting victims to make room for %v/%v: %v", preemptor.Namespace, preemptor.Name, err) return"", err }
for _, victim := range victims { // 调用apiserver进行删除pod if err := sched.podPreemptor.deletePod(victim); err != nil { klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err) return"", err } // If the victim is a WaitingPod, send a reject message to the PermitPlugin if waitingPod := fwk.GetWaitingPod(victim.UID); waitingPod != nil { waitingPod.Reject("preempted") } sched.Recorder.Eventf(victim, preemptor, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName)
}
更新被抢占的提议节点
针对那些已经被提议调度到当前node的pod,会将其node设置为空,重新进行调度选择
1 2 3 4 5 6 7 8
for _, p := range nominatedPodsToClear { // 清理这些提议的pod rErr := sched.podPreemptor.removeNominatedNodeName(p) if rErr != nil { klog.Errorf("Cannot remove 'NominatedPod' field of pod: %v", rErr) // We do not return as this error is not critical. } }