0%

【Kubernetes解读】Scheduling Framework

Kubernetes Scheduling Framework 是在Kubernetes 1.15版本后新出现的一种调度框架。Scheduling Framework 在原有的调度流程中, 定义了丰富扩展点接口,开发者可以通过实现扩展点所定义的接口来实现插件,将插件注册到扩展点。Scheduling Framework 在执行调度流程时,运行到相应的扩展点时,会调用用户注册的插件,影响调度决策的结果。通过这种方式来将用户的调度逻辑集成到 Scheduling Framework 中。

Motivation

随着越来越多的Feature被加入到Kubernetes调度器,整个调度器的代码量越来越大,逻辑越来越复杂,使得对其的维护和调试越来越困难。与此同时,对于那些自己定制调度器的研发人员来说,跟上社区的进展也越来越难。在Kubernetes 1.15版本前的调度器支持了Scheduler Extender的方式来基于HTTP的Webhook来对调度器支持的功能扩展。但是,他们明显存在以下几个问题:

  • 调度器功能扩展点的数目有限。Extender只有Filter、Prioritize、Preempt和Bind这几个扩展点,而没有起其他的扩展点,比如不能在运行Predicate函数前调用
  • 每一次调用调度器都需要Marshaling和Unmarshaling JSON数据,相比于直接调用原生函数而言Webhook的HTTP请求方式更慢
  • 难以通知Extender调度器已经放弃调度一个Pod
  • 因为Extender以独立进程形式运行,他们不能使用Scheduler Cache

以上的问题影响了Scheduler的性能与扩展新的特性,基于此提出的Framework框架会将现有的功能转换为Plugin的形式,比如Predicate函数和Prioritize函数。这些Plugin将会被编译进调度器的二进制文件中。此外,对于那些自己定制调度器的研发人员可以基于未经修改的调度器代码和自己的插件代码编译自己的调度器。

目标:

  • 让调度器更加可扩展
  • 通过移动调度器的代码到Plugin从而让其核心代码更加简单
  • 在Framework中提出扩展点
  • 提出一个机制来接收Plugin的结果,并且根结收到的结果来继续或者终止调度
  • 提出一个机制来处理问题并且和Plugin通信

Proposals

Scheduling Framework定义了新的扩展点和Go API,这种方式称作Plugin。通过配置Scheduler的ComponentConfig可以开启、关闭或者重排序这些插件。such

Scheduling Cycle & Binding Cycle

每次尝试调度一个Pod都会经历两个阶段:

  • Scheduling Cycle:为Pod选择一个Node进行调度
  • Binding Cycle:将Scheduling Cycle阶段的决定应用到集群

这两个阶段一起被称作是 Scheduling Context,也就是调度上下文。

不同的Scheduling Cycle间串行运行,同一个时间只有一个 scheduling cycle,是线程安全的。binding cycle 是异步执行的,同一个时间中可能会有多个 binding cycle 在运行,是线程不安全的。

如果一个Pod被认为是Unschedulable,或者是发生了内存错误,那么一个Scheduling Cycle或者Binding Cycle会被终止。然后这个Pod可能被返回到调度队列中。如果一个Binding Cycle终止了,那么会触发 Un-reserve插件。

Extension points

下图定义了Scheduling Framework中暴露的扩展点,其中Filter等同于原来的Predicate,Scoring等同于之前的Prioritize。对于一个插件,它可能在一个或者多个扩展点注册,从而满足业务的不同逻辑。

Scheduling Framework

Queue Sort

这里的插件用来对调度队列中的Pod排序,同一时间只有一个Queue Sort可以被启用。

Pre-filter

这里的插件用来预处理Pod信息,或者检查集群或Pod需满足的特定条件。

Filter

Filter用于过滤那些不能运行对应Pod的Node,调度器可能会同时评估这些Node,在一个Scheduling Cycle中Filter插件可能被运行多次。

Pre-Score

在V1alpha2中,Pre-Score已经可用,之前被称作Post-Filter。

这个插件会接收来自Filter阶段的Node列表,可以用这些数据来更新内部状态或者产生日志或Metrics。

Scoring

Score插件会有两个阶段:

  • Scoring:为Filter阶段的Node列表中的Node排序,调度器对于每个Node调用每个Score插件中的 Score方法
  • Normalize Scoring:用于修改各个Score插件的分数,从而计算出最终的Node得分列表

Score插件最终输出的数值必须在 [MinNodeScore, MaxNodeScore] 区间,如果不是,Scheduling Cycle将会被终止。

Reserve

Reserve插件是一个信息型的插件。当一个Node上的资源被保留给某个Pod的时候,那些需要维护runtime状态的插件,需要利用Reserve扩展点来接收来自Scheduler的通知。这发生在调度器实际上绑定Pod到Node的时候,用来避免在调度器等待Bind成功时候发生Race Condition。

Reserve是Scheduling Cycle的最后一步,一点Pod进入Reserved状态,它之后要么在调度失败的时候触发Un-reserve插件,要么调度成功的时候触发Post-bind插件。

Note:这个概念以前叫做 Assume

Permit

Permit插件用来阻止或者推迟对一个Pod的绑定,一个Permit可以做下面三件操作

  • Allow;执行approve操作后,一个Pod将被送去Binding
  • Reject:执行deny操作后,Pod被返回调度队列,同时触发Un-reserve插件
  • Wait (with a timeout):执行wait操作后,一个Pod保持在Permit阶段知道一个Plugin对它approve;如果timeout了,wait将会变为den y,之后Pod被返回调度队列,触发Un-reserve插件

WaitPod

在framework interface中定义了Permit阶段中处于等待的WaitingPod。

pkg/scheduler/framework/v1alpha1/interface.go
1
2
3
4
5
6
7
8
9
10
11
12
13
// WaitingPod represents a pod currently waiting in the permit phase.
type WaitingPod interface {
// GetPod returns a reference to the waiting pod.
GetPod() *v1.Pod
// GetPendingPlugins returns a list of pending permit plugin's name.
GetPendingPlugins() []string
// Allow declares the waiting pod is allowed to be scheduled by plugin pluginName.
// If this is the last remaining plugin to allow, then a success signal is delivered
// to unblock the pod.
Allow(pluginName string)
// Reject declares the waiting pod unschedulable.
Reject(msg string)
}

waitingPod则是一个具体的pod的等待实例,其内部通过pendingPlugins保存插件定义的timer等待时间,对外通过chan *status来接受当前pod的状态,并通过读写锁来进行串行化。

pkg/scheduler/framework/v1alpha1/waiting_pods_map.go
1
2
3
4
5
6
type waitingPod struct {
pod *v1.Pod
pendingPlugins map[string]*time.Timer
s chan *Status
mu sync.RWMutex
}

waitingPodsMap其内部通过pod的uid保存一个map映射,同时通过读写锁来进行数据保护

pkg/scheduler/framework/v1alpha1/waiting_pods_map.go
1
2
3
4
type waitingPodsMap struct {
pods map[types.UID]WaitingPod
mu sync.RWMutex
}

会根据每个plugin的wait等待时间构建N个timer, 如果任一的timer到期,则就拒绝

pkg/scheduler/framework/v1alpha1/waiting_pods_map.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func newWaitingPod(pod *v1.Pod, pluginsMaxWaitTime map[string]time.Duration) *waitingPod {
wp := &waitingPod{
pod: pod,
s: make(chan *Status, 1),
}

wp.pendingPlugins = make(map[string]*time.Timer, len(pluginsMaxWaitTime))
// The time.AfterFunc calls wp.Reject which iterates through pendingPlugins map. Acquire the
// lock here so that time.AfterFunc can only execute after newWaitingPod finishes.
wp.mu.Lock()
defer wp.mu.Unlock()
// 根据插件的等待时间来构建timer,如果有任一timer到期,还未曾有任何plugin Allow则会进行Reject
for k, v := range pluginsMaxWaitTime {
plugin, waitTime := k, v
wp.pendingPlugins[plugin] = time.AfterFunc(waitTime, func() {
msg := fmt.Sprintf("rejected due to timeout after waiting %v at plugin %v",
waitTime, plugin)
wp.Reject(msg)
})
}

return wp
}

Allow

允许操作必须等待所有的plugin都Allow后,才能发送允许事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (w *waitingPod) Allow(pluginName string) bool {
w.mu.Lock()
defer w.mu.Unlock()
if timer, exist := w.pendingPlugins[pluginName]; exist {
// 停止当前plugin的定时器
timer.Stop()
delete(w.pendingPlugins, pluginName)
}

// Only signal success status after all plugins have allowed
if len(w.pendingPlugins) != 0 {
return true
}
// 只有当所有的plugin都允许,才会发生成功允许事件
select {
case w.s <- NewStatus(Success, ""): // 发送事件
return true
default:
return false
}
}

Reject

任一一个plugin的定时器到期,或者plugin主动发起reject操作,则都会暂停所有的定时器,并进行消息广播

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (w *waitingPod) Reject(msg string) bool {
w.mu.RLock()
defer w.mu.RUnlock()
// 停止所有的timer
for _, timer := range w.pendingPlugins {
timer.Stop()
}

// 通过管道发送拒绝事件
select {
case w.s <- NewStatus(Unschedulable, msg):
return true
default:
return false
}
}

Premit 阶段 Wait 实现

首先会遍历所有的插件,然后如果发现状态设置为Wait,则会根据插件的等待时间进行wait操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (f *framework) RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(permit, status.Code().String()).Observe(metrics.SinceInSeconds(startTime))
}()
pluginsWaitTime := make(map[string]time.Duration)
statusCode := Success
for _, pl := range f.permitPlugins {
status, timeout := f.runPermitPlugin(ctx, pl, state, pod, nodeName)
if !status.IsSuccess() {
if status.IsUnschedulable() {
msg := fmt.Sprintf("rejected pod %q by permit plugin %q: %v", pod.Name, pl.Name(), status.Message())
klog.V(4).Infof(msg)
return NewStatus(status.Code(), msg)
}
if status.Code() == Wait {
// Not allowed to be greater than maxTimeout.
if timeout > maxTimeout {
timeout = maxTimeout
}
pluginsWaitTime[pl.Name()] = timeout
statusCode = Wait
} else {
msg := fmt.Sprintf("error while running %q permit plugin for pod %q: %v", pl.Name(), pod.Name, status.Message())
klog.Error(msg)
return NewStatus(Error, msg)
}
}
}
if statusCode == Wait {
waitingPod := newWaitingPod(pod, pluginsWaitTime)
f.waitingPods.add(waitingPod)
msg := fmt.Sprintf("one or more plugins asked to wait and no plugin rejected pod %q", pod.Name)
klog.V(4).Infof(msg)
return NewStatus(Wait, msg)
}
return nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// WaitOnPermit will block, if the pod is a waiting pod, until the waiting pod is rejected or allowed.
func (f *framework) WaitOnPermit(ctx context.Context, pod *v1.Pod) (status *Status) {
waitingPod := f.waitingPods.get(pod.UID)
if waitingPod == nil {
return nil
}
defer f.waitingPods.remove(pod.UID)
klog.V(4).Infof("pod %q waiting on permit", pod.Name)

startTime := time.Now()
s := <-waitingPod.s
metrics.PermitWaitDuration.WithLabelValues(s.Code().String()).Observe(metrics.SinceInSeconds(startTime))

if !s.IsSuccess() {
if s.IsUnschedulable() {
msg := fmt.Sprintf("pod %q rejected while waiting on permit: %v", pod.Name, s.Message())
klog.V(4).Infof(msg)
return NewStatus(s.Code(), msg)
}
msg := fmt.Sprintf("error received while waiting on permit for pod %q: %v", pod.Name, s.Message())
klog.Error(msg)
return NewStatus(Error, msg)
}
return nil
}

Pre-Bind

执行Bind前的必要工作,比如在目标Node上设置一个网络磁盘并且mount上去。

Bind

当所有的Pre-Bind插件执行完毕后,才会执行Bind插件,每一个Bind插件都按照配置的顺序被调用。一个Bind插件可以选择是否处理对应的Pod,一旦一个Bind插件选择处理Pod,剩余的其他Bind插件都被跳过。

Post-Bind

这是一个信息型的插件,当Bind成功后,可以用来清理一些关联的资源。

Un-reserve

这是一个信息型的插件,当一个Pod被reserved了,然后在之后的阶段被reject,就会执行Un-reserve插件,主要用来清理一些reserved Pod的状态。

Plugin API

Plugin首先需要注册和配置,然后使用扩展点的接口,一般形式如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type Plugin interface {
Name() string
}

type QueueSortPlugin interface {
Plugin
Less(*PodInfo, *PodInfo) bool
}


type PreFilterPlugin interface {
Plugin
PreFilter(CycleState, *v1.Pod) *Status
}

// ...

CycleState

很多插件函数会调用一个CycleState参数,它代表了当前的调度上下文,主要是负责调度流程中数据的保存和克隆,其对外暴露了读写锁接口,各扩展点插件可以根据需求独立进行加锁选择。

The CycleState also provides an API similar to context.WithValue that can be used to pass data between plugins at different extension points. Multiple plugins can share the state or communicate via this mechanism. The state is preserved only during a single scheduling context. It is worth noting that plugins are assumed to be trusted. The scheduler does not prevent one plugin from accessing or modifying another plugin’s state.

* The only exception is for queue sort plugins.

WARNING: The data available through a CycleState is not valid after a scheduling context ends, and plugins should not hold references to that data longer than necessary.

CycleState主要保存StateData数据,只需要实现一个clone接口即可,CycleState里面的数据,可以被当前framework所有的插件进行数据增加和修改,里面会通过读写锁来保证线程安全,但并不会针对插件进行限制,即信任所有插件,可以任意进行增删。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type CycleState struct {
mx sync.RWMutex
storage map[StateKey]StateData
// if recordPluginMetrics is true, PluginExecutionDuration will be recorded for this cycle.
recordPluginMetrics bool
}

// StateData is a generic type for arbitrary data stored in CycleState.
type StateData interface {
// Clone is an interface to make a copy of StateData. For performance reasons,
// clone should make shallow copies for members (e.g., slices or maps) that are not
// impacted by PreFilter's optional AddPod/RemovePod methods.
Clone() StateData
}

对外接口的实现,需要对应的插件主动选择进行加读锁或者加写锁,然后进行相关数据的读取和修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (c *CycleState) Read(key StateKey) (StateData, error) {
if v, ok := c.storage[key]; ok {
return v, nil
}
return nil, errors.New(NotFound)
}

// Write stores the given "val" in CycleState with the given "key".
// This function is not thread safe. In multi-threaded code, lock should be
// acquired first.
func (c *CycleState) Write(key StateKey, val StateData) {
c.storage[key] = val
}

// Delete deletes data with the given key from CycleState.
// This function is not thread safe. In multi-threaded code, lock should be
// acquired first.
func (c *CycleState) Delete(key StateKey) {
delete(c.storage, key)
}

// Lock acquires CycleState lock.
func (c *CycleState) Lock() {
c.mx.Lock()
}

// Unlock releases CycleState lock.
func (c *CycleState) Unlock() {
c.mx.Unlock()
}

// RLock acquires CycleState read lock.
func (c *CycleState) RLock() {
c.mx.RLock()
}

// RUnlock releases CycleState read lock.
func (c *CycleState) RUnlock() {
c.mx.RUnlock()
}

FrameworkHandle

FrameworkHandle提供了对于ApiServer和SchedulerCache的访问。FrameworkHandle作为一个Interface,具体是通过framework结构体实现的。

While the CycleState provides APIs relevant to a single scheduling context, the FrameworkHandle provides APIs relevant to the lifetime of a plugin. This is how plugins can get a client (kubernetes.Interface) and SharedInformerFactory, or read data from the scheduler’s cache of cluster state. The handle will also provide APIs to list and approve or reject waiting pods.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type FrameworkHandle interface {
SnapshotSharedLister() schedulerlisters.SharedLister

// IterateOverWaitingPods acquires a read lock and iterates over the WaitingPods map.
IterateOverWaitingPods(callback func(WaitingPod))

// GetWaitingPod returns a waiting pod given its UID.
GetWaitingPod(uid types.UID) WaitingPod

// RejectWaitingPod rejects a waiting pod given its UID.
RejectWaitingPod(uid types.UID)

// ClientSet returns a kubernetes clientSet.
ClientSet() clientset.Interface

SharedInformerFactory() informers.SharedInformerFactory

// VolumeBinder returns the volume binder used by scheduler.
VolumeBinder() scheduling.SchedulerVolumeBinder
}

Plugin Registration

对于每一个插件,都需要定义一个构造函数,并且把它加入到注册工厂中。

1
2
3
4
5
6
7
8
9
10
11
type PluginFactory = func(runtime.Unknown, FrameworkHandle) (Plugin, error)

type Registry map[string]PluginFactory

func NewRegistry() Registry {
return Registry{
fooplugin.Name: fooplugin.New,
barplugin.Name: barplugin.New,
// New plugins are registered here.
}
}

NodeAffinity为例,在定义了NodeAffinity这个插件结构体后,需要有一个New函数,函数参数同上面的PluginFactory

1
2
3
4
5
6
7
8
9
// NodeAffinity is a plugin that checks if a pod node selector matches the node label.
type NodeAffinity struct {
handle framework.FrameworkHandle
}

// New initializes a new plugin and returns it.
func New(_ *runtime.Unknown, h framework.FrameworkHandle) (framework.Plugin, error) {
return &NodeAffinity{handle: h}, nil
}

那么是在哪里将这些插件注册到Framework的呢?在 NewFramework函数中对Framework初始化时,会对registry中的每一个插件,如果是配置需要的插件,就调用factory工厂函数,对各个插件初始化构造,并注册到 pluginsMap中。

pkg/scheduler/framework/v1alpha1/framework.go
1
2
3
4
5
6
7
8
9
10
11
12
13
pluginsMap := make(map[string]Plugin)
for name, factory := range r {
// initialize only needed plugins.
if _, ok := pg[name]; !ok {
continue
}

p, err := factory(pluginConfig[name], f)
if err != nil {
return nil, fmt.Errorf("error initializing plugin %q: %v", name, err)
}
pluginsMap[name] = p
}

这里的pluginsMap是一个临时的字典,真正注册到framework结构体中,是调用updatePluginList函数利用反射进行注册。

pkg/scheduler/framework/v1alpha1/framework.go
1
2
3
4
5
 for _, e := range f.getExtensionPoints(plugins) {
if err := updatePluginList(e.slicePtr, e.plugins, pluginsMap); err != nil {
return nil, err
}
}

这里的e是framework的插件扩展点

pkg/scheduler/framework/v1alpha1/framework.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (f *framework) getExtensionPoints(plugins *config.Plugins) []extensionPoint {
return []extensionPoint{
{plugins.PreFilter, &f.preFilterPlugins},
{plugins.Filter, &f.filterPlugins},
{plugins.Reserve, &f.reservePlugins},
{plugins.PreScore, &f.preScorePlugins},
{plugins.Score, &f.scorePlugins},
{plugins.PreBind, &f.preBindPlugins},
{plugins.Bind, &f.bindPlugins},
{plugins.PostBind, &f.postBindPlugins},
{plugins.Unreserve, &f.unreservePlugins},
{plugins.Permit, &f.permitPlugins},
{plugins.QueueSort, &f.queueSortPlugins},
}
}

其中,slicePtr指向的是对于framework不同扩展点,存储的不同插件具体实现;而 plugins 这个 PluginSet这个配置,主要配置哪些插件开启,哪些插件关闭。

1
2
3
4
5
6
7
type extensionPoint struct {
// the set of plugins to be configured at this extension point.
plugins *config.PluginSet
// a pointer to the slice storing plugins implementations that will run at this
// extension point.
slicePtr interface{}
}

updatePluginList函数其具体实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func updatePluginList(pluginList interface{}, pluginSet *config.PluginSet, pluginsMap map[string]Plugin) error {
if pluginSet == nil {
return nil
}

// 首先通过Elem获取当前数组的类型
plugins := reflect.ValueOf(pluginList).Elem()
// 通过数组类型来获取数组内部元素的类型
pluginType := plugins.Type().Elem()
set := sets.NewString()
for _, ep := range pluginSet.Enabled {
pg, ok := pluginsMap[ep.Name]
if !ok {
return fmt.Errorf("%s %q does not exist", pluginType.Name(), ep.Name)
}

// 合法性检查:如果发现当前插件未实现当前接口,则报错
if !reflect.TypeOf(pg).Implements(pluginType) {
return fmt.Errorf("plugin %q does not extend %s plugin", ep.Name, pluginType.Name())
}

if set.Has(ep.Name) {
return fmt.Errorf("plugin %q already registered as %q", ep.Name, pluginType.Name())
}


set.Insert(ep.Name)

// 追加插件到slice中,并保存指针指向
newPlugins := reflect.Append(plugins, reflect.ValueOf(pg))
plugins.Set(newPlugins)
}
return nil
}

那么问题来了,Registry中的插件是在哪里注册的呢?在 pkg/scheduler/scheduler.go

pkg/scheduler/scheduler.go
1
2
3
4
registry := frameworkplugins.NewInTreeRegistry()
if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {
return nil, err
}

这个是在plugin目录下硬编码的,也就是Framework内置的插件。对于自己写的插件,需要通过 frameworkOutOfTreeRegistry 配置。

pkg/scheduler/framework/plugins/registry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func NewInTreeRegistry() framework.Registry {
return framework.Registry{
defaultpodtopologyspread.Name: defaultpodtopologyspread.New,
imagelocality.Name: imagelocality.New,
tainttoleration.Name: tainttoleration.New,
nodename.Name: nodename.New,
nodeports.Name: nodeports.New,
nodepreferavoidpods.Name: nodepreferavoidpods.New,
nodeaffinity.Name: nodeaffinity.New,
podtopologyspread.Name: podtopologyspread.New,
nodeunschedulable.Name: nodeunschedulable.New,
noderesources.FitName: noderesources.NewFit,
noderesources.BalancedAllocationName: noderesources.NewBalancedAllocation,
noderesources.MostAllocatedName: noderesources.NewMostAllocated,
noderesources.LeastAllocatedName: noderesources.NewLeastAllocated,
noderesources.RequestedToCapacityRatioName: noderesources.NewRequestedToCapacityRatio,
noderesources.ResourceLimitsName: noderesources.NewResourceLimits,
volumebinding.Name: volumebinding.New,
volumerestrictions.Name: volumerestrictions.New,
volumezone.Name: volumezone.New,
nodevolumelimits.CSIName: nodevolumelimits.NewCSI,
nodevolumelimits.EBSName: nodevolumelimits.NewEBS,
nodevolumelimits.GCEPDName: nodevolumelimits.NewGCEPD,
nodevolumelimits.AzureDiskName: nodevolumelimits.NewAzureDisk,
nodevolumelimits.CinderName: nodevolumelimits.NewCinder,
interpodaffinity.Name: interpodaffinity.New,
nodelabel.Name: nodelabel.New,
serviceaffinity.Name: serviceaffinity.New,
queuesort.Name: queuesort.New,
defaultbinder.Name: defaultbinder.New,
}
}

最后一个问题,对于Plugins目录下的每个插件,是如何决定自己属于那个扩展点呢?

Plugin Lifecycle

Initialization

There are two steps to plugin initialization. First, plugins are registered. Second, the scheduler uses its configuration to decide which plugins to instantiate. If a plugin registers for multiple extension points, it is instantiated only once.

When a plugin is instantiated, it is passed config args and a FrameworkHandle.

Concurrency

There are two types of concurrency that plugin writers should consider. A plugin might be invoked several times concurrently when evaluating multiple nodes, and a plugin may be called concurrently from different scheduling contexts.

Note: Within one scheduling context, each extension point is evaluated serially.

In the main thread of the scheduler, only one scheduling cycle is processed at a time. Any extension point up to and including reserve will be finished before the next scheduling cycle begins*. After the reserve phase, the binding cycle is executed asynchronously. This means that a plugin could be called concurrently from two different scheduling contexts, provided that at least one of the calls is to an extension point after reserve. Stateful plugins should take care to handle these situations.

Finally, un-reserve plugins may be called from either the Permit thread or the Bind thread, depending on how the pod was rejected.

* The queue sort extension point is a special case. It is not part of a scheduling context and may be called concurrently for many pod pairs.

image

Configuring Plugins

Interaction with Cluster Autoscaler

调度器核心组件流程

调度器初始化

调度器参数初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var defaultSchedulerOptions = schedulerOptions{
profiles: []schedulerapi.KubeSchedulerProfile{
// Profiles' default plugins are set from the algorithm provider.
{SchedulerName: v1.DefaultSchedulerName},
},
schedulerAlgorithmSource: schedulerapi.SchedulerAlgorithmSource{
Provider: defaultAlgorithmSourceProviderName(),
},
disablePreemption: false,
percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
bindTimeoutSeconds: BindTimeoutSeconds,
podInitialBackoffSeconds: int64(internalqueue.DefaultPodInitialBackoffDuration.Seconds()),
podMaxBackoffSeconds: int64(internalqueue.DefaultPodMaxBackoffDuration.Seconds()),
}

插件工厂注册表的初始化

插件工厂注册表的初始化分为两个部分in tree和out of tree即当前版本自带的和用户自定义的两部分

1
2
3
4
5
6
// 首先进行当前版本的插件注册表的注册
registry := frameworkplugins.NewInTreeRegistry()
// 加载用户自定义的插件注册表
if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {
return nil, err
}

事件informer回调handler绑定

绑定事件回调主要是通过AddAllEventHandlers主要是将各种资源数据通过SchedulerCache放入本地缓存中,同时针对未调度的pod(!assignedPod即没有绑定Node的pod)加入到调度队列中

1
2
3
4
5
6
func AddAllEventHandlers(
sched *Scheduler,
schedulerName string,
informerFactory informers.SharedInformerFactory,
podInformer coreinformers.PodInformer,
) {

触发未调度队列中的pod转移

当资源发生变化的时候,比如service、volume等就会对unschedulableQ中的之前调度失败的pod进行重试,选择将其转移到activeQ或者backoffQ中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(event string) {
p.lock.Lock()
defer p.lock.Unlock()
unschedulablePods := make([]*framework.PodInfo, 0, len(p.unschedulableQ.podInfoMap))
// 获取所有unschedulable的pod
for _, pInfo := range p.unschedulableQ.podInfoMap {
unschedulablePods = append(unschedulablePods, pInfo)
}
// 将unschedulable的pod转移到backoffQ队列或者activeQ队列中
p.movePodsToActiveOrBackoffQueue(unschedulablePods, event)
// 修改迁移调度器请求周期, 在失败的时候会进行比较pod的moveRequestCycle是否>=schedulingCycle
p.moveRequestCycle = p.schedulingCycle
p.cond.Broadcast()
}

启动调度器

最后则会启动调度器,其核心流程是在scheduleOne中

1
2
3
4
5
6
7
8
9
10
11
func (sched *Scheduler) Run(ctx context.Context) {
// 首先会进行同步缓存
if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) {
return
}
// 启动调度队列的后台定时任务
sched.SchedulingQueue.Run()
// 启动调度流程
wait.UntilWithContext(ctx, sched.scheduleOne, 0)
sched.SchedulingQueue.Close()
}

构建调度器核心组件基础数据

获取等待调度的pod

获取等待调度的pod则直接通过NextPod进行,其实内部就是对 schedulingQueue.pop的封装

pkg/scheduler/scheduler.go
1
2
3
4
5
6
// 从队列中获取等待调度的pod
podInfo := sched.NextPod()
// pod could be nil when schedulerQueue is closed
if podInfo == nil || podInfo.Pod == nil {
return
}

这里的 NextPod 方法是在Scheduler初始化时候指定的。

pkg/scheduler/factory.go
1
2
3
4
5
6
7
8
9
10
return &Scheduler{
SchedulerCache: c.schedulerCache,
Algorithm: algo,
Profiles: profiles,
NextPod: internalqueue.MakeNextPodFunc(podQueue),
Error: MakeDefaultErrorFunc(c.client, podQueue, c.schedulerCache),
StopEverything: c.StopEverything,
VolumeBinder: c.volumeBinder,
SchedulingQueue: podQueue,
}, nil

具体实现方法如下:

pkg/scheduler/internal/queue/scheduling_queue.go
1
2
3
4
5
6
7
8
9
10
11
func MakeNextPodFunc(queue SchedulingQueue) func() *framework.PodInfo {
return func() *framework.PodInfo {
podInfo, err := queue.Pop()
if err == nil {
klog.V(4).Infof("About to try and schedule pod %v/%v", podInfo.Pod.Namespace, podInfo.Pod.Name)
return podInfo
}
klog.Errorf("Error while retrieving next pod from scheduling queue: %v", err)
return nil
}
}

获取调度器Profile

pkg/scheduler/scheduler.go
1
prof, err := sched.profileForPod(pod)

这里的Profile是根据每个Pod制定的Scheduler Name获得对应的调度器Profile

pkg/scheduler/scheduler.go
1
2
3
4
5
6
7
func (sched *Scheduler) profileForPod(pod *v1.Pod) (*profile.Profile, error) {
prof, ok := sched.Profiles[pod.Spec.SchedulerName]
if !ok {
return nil, fmt.Errorf("profile not found for scheduler name %q", pod.Spec.SchedulerName)
}
return prof, nil
}

这里的 Profiles是一个对于Framework的封装

pkg/scheduler/profile/profile.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Profile is a scheduling profile.
type Profile struct {
framework.Framework
Recorder events.EventRecorder
}

// NewProfile builds a Profile for the given configuration.
func NewProfile(cfg config.KubeSchedulerProfile, frameworkFact FrameworkFactory, recorderFact RecorderFactory) (*Profile, error) {
f, err := frameworkFact(cfg)
if err != nil {
return nil, err
}
r := recorderFact(cfg.SchedulerName)
return &Profile{
Framework: f,
Recorder: r,
}, nil
}

// Map holds profiles indexed by scheduler name.
type Map map[string]*Profile

在创建scheduler的时候,建立profiles

1
profiles, err := profile.NewMap(c.profiles, c.buildFramework, c.recorderFactory)

关于Profile,可以参考 https://github.com/kubernetes/kubernetes/pull/88285

跳过提议Pod重调度

skipPodSchedule即检查当前 pod是否可以进行跳过

1
2
3
if sched.skipPodSchedule(pod) {
return
}

下面两种情况下Pod就不需要进行重复的调度

  • pod已经被删除
  • pod已经被提议调度到某个节点,此时如果只是版本的更新,即除了ResourceVersion、Annotations、NodeName三个字段其余的都未曾变化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// skipPodSchedule returns true if we could skip scheduling the pod for specified cases.
func (sched *Scheduler) skipPodSchedule(prof *profile.Profile, pod *v1.Pod) bool {
// Case 1: pod is being deleted.
if pod.DeletionTimestamp != nil {
prof.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
klog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
return true
}

// Case 2: pod has been assumed and pod updates could be skipped.
// An assumed pod can be added again to the scheduling queue if it got an update event
// during its previous scheduling cycle but before getting assumed.
if sched.skipPodUpdate(pod) {
return true
}

return false
}

具体的判断规则是

pkg/scheduler/eventhandler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// skipPodUpdate checks whether the specified pod update should be ignored.
// This function will return true if
// - The pod has already been assumed, AND
// - The pod has only its ResourceVersion, Spec.NodeName, Annotations,
// ManagedFields, Finalizers and/or Conditions updated.
func (sched *Scheduler) skipPodUpdate(pod *v1.Pod) bool {
// Non-assumed pods should never be skipped.
isAssumed, err := sched.SchedulerCache.IsAssumedPod(pod)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", pod.Namespace, pod.Name, err))
return false
}
if !isAssumed {
return false
}

// Gets the assumed pod from the cache.
assumedPod, err := sched.SchedulerCache.GetPod(pod)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to get assumed pod %s/%s from cache: %v", pod.Namespace, pod.Name, err))
return false
}

// Compares the assumed pod in the cache with the pod update. If they are
// equal (with certain fields excluded), this pod update will be skipped.
f := func(pod *v1.Pod) *v1.Pod {
p := pod.DeepCopy()
// ResourceVersion must be excluded because each object update will
// have a new resource version.
p.ResourceVersion = ""
// Spec.NodeName must be excluded because the pod assumed in the cache
// is expected to have a node assigned while the pod update may nor may
// not have this field set.
p.Spec.NodeName = ""
// Annotations must be excluded for the reasons described in
// https://github.com/kubernetes/kubernetes/issues/52914.
p.Annotations = nil
// Same as above, when annotations are modified with ServerSideApply,
// ManagedFields may also change and must be excluded
p.ManagedFields = nil
// The following might be changed by external controllers, but they don't
// affect scheduling decisions.
p.Finalizers = nil
p.Status.Conditions = nil
return p
}
assumedPodCopy, podCopy := f(assumedPod), f(pod)
if !reflect.DeepEqual(assumedPodCopy, podCopy) {
return false
}
klog.V(3).Infof("Skipping pod %s/%s update", pod.Namespace, pod.Name)
return true
}

构建调度上下文

生成CycleState和context, 其中CycleState用于进行调度器周期上线文数据传递共享,而context则负责统一的退出协调管理

1
2
3
4
5
// 构建CycleState和context
state := framework.NewCycleState()
state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent)
schedulingCycleCtx, cancel := context.WithCancel(ctx)
defer cancel()

正常调度流程

调度算法执行

正常调度只需要调度ScheduleAlgorithm来进行调度,具体实现细节可以看之前的文章

1
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, state, pod)

Assume Pod

1
err = sched.assume(assumedPod, scheduleResult.SuggestedHost)

如果一个Pod被提议存储到某个节点,则会先将其加入到SchedulerCache中,同时从SchedulingQueue中移除,避免重复调度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {

assumed.Spec.NodeName = host

// 存储到SchedulerCache中这样下个调度周期中,pod会占用对应node的资源
if err := sched.SchedulerCache.AssumePod(assumed); err != nil {
klog.Errorf("scheduler cache AssumePod failed: %v", err)
return err
}
// if "assumed" is a nominated pod, we should remove it from internal cache
// 从调度队列中移除pod
if sched.SchedulingQueue != nil {
sched.SchedulingQueue.DeleteNominatedPodIfExists(assumed)
}

return nil
}

绑定调度流程

首先会遍历所有的插件,然后如果发现状态设置为Wait,则会根据插件的等待时间进行wait操作。

1
2
// Run "permit" plugins.
runPermitStatus := prof.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)

运行Permit插件之后,Bind阶段可以异步运行,依次绑定Volumes,运行Prebind插件,执行bind操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
go func() {
bindingCycleCtx, cancel := context.WithCancel(ctx)
defer cancel()

waitOnPermitStatus := prof.WaitOnPermit(bindingCycleCtx, assumedPod)

// Bind volumes first before Pod
if !allBound {
err := sched.bindVolumes(assumedPod)
}

// Run "prebind" plugins.
preBindStatus := prof.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)

err := sched.bind(bindingCycleCtx, prof, assumedPod, scheduleResult.SuggestedHost, state)

// Run "postbind" plugins.
prof.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
}()

抢占流程

如果是预选失败的,并且当前调度器允许抢占功能,则会进行抢占调度处理即sched.preempt

1
2
3
4
5
6
7
8
9
10
11
12
13
if fitError, ok := err.(*core.FitError); ok {
// 如果是预选失败则进行
if sched.DisablePreemption {
klog.V(3).Infof("Pod priority feature is not enabled or preemption is disabled by scheduler configuration." +
" No preemption is performed.")
} else {
preemptionStartTime := time.Now()
// 抢占调度
sched.preempt(schedulingCycleCtx, state, fwk, pod, fitError)
metrics.PreemptionAttempts.Inc()
metrics.SchedulingAlgorithmPreemptionEvaluationDuration.Observe(metrics.SinceInSeconds(preemptionStartTime))
metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime))
}

获取抢占者

首先通过apiserver获取当前需要执行抢占的pod的最新Pod信息

1
2
3
4
5
preemptor, err := sched.podPreemptor.getUpdatedPod(preemptor)
if err != nil {
klog.Errorf("Error getting the updated preemptor pod object: %v", err)
return "", err
}

通过抢占算法筛选

通过Preempt筛选要进行抢占操作的node节点、待驱逐的pod、待驱逐的提议的pod

1
2
3
4
5
node, victims, nominatedPodsToClear, err := sched.Algorithm.Preempt(ctx, state, preemptor, scheduleErr)
if err != nil {
klog.Errorf("Error preempting victims to make room for %v/%v: %v", preemptor.Namespace, preemptor.Name, err)
return "", err
}

更新调度队列中的Pod信息

如果节点抢占一个pod成功,则会更新队列中的抢占节点的提议节点信息,这样在下个调度周期中,就可以使用该信息

1
sched.SchedulingQueue.UpdateNominatedPodForNode(preemptor, nodeName)

更新Pod的提议节点信息

这里会直接调用apiserver中节点的提议节点信息,为什么要这样做呢?因为当前pod已经抢占了node上部分的节点信息,但是在被抢占的pod完全从节点上删除之前的这段时间,该pod调度依然会失败,但是此时不能继续调用抢占流程了,因为你已经执行了抢占,此时只需要等待对应节点上的node都删除,则再次继续尝试调度

1
err = sched.podPreemptor.setNominatedNodeName(preemptor, nodeName)

删除被驱逐节点

删除被驱逐节点直接调用apiserver进行操作,如果此时发现当前pod还在等待插件的Allow操作,则直接进行Reject

1
2
3
4
5
6
7
8
9
10
11
12
13
for _, victim := range victims {
// 调用apiserver进行删除pod
if err := sched.podPreemptor.deletePod(victim); err != nil {
klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err)
return "", err
}
// If the victim is a WaitingPod, send a reject message to the PermitPlugin
if waitingPod := fwk.GetWaitingPod(victim.UID); waitingPod != nil {
waitingPod.Reject("preempted")
}
sched.Recorder.Eventf(victim, preemptor, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName)

}

更新被抢占的提议节点

针对那些已经被提议调度到当前node的pod,会将其node设置为空,重新进行调度选择

1
2
3
4
5
6
7
8
for _, p := range nominatedPodsToClear {
// 清理这些提议的pod
rErr := sched.podPreemptor.removeNominatedNodeName(p)
if rErr != nil {
klog.Errorf("Cannot remove 'NominatedPod' field of pod: %v", rErr)
// We do not return as this error is not critical.
}
}

Algorithm Source

Algorithm Provider

Algorithm Policy

参考资料