0%

【Kubernetes解读】Scheduler

调度器(Scheduler)是 Kubernetes 的核心组件,它的主要功能是为待运行的工作负载 Pod 绑定运行的节点 Node。从最早基于谓词和优先级(Predicates and Priorities)的调度器,到 V1.15基于调度框架(Scheduling Framework)的调度器,Kubernetes的调度器正在快速演进,以满足不同场景对于资源调度的需求。

本文是「Kubernetes解读」的第二篇,本篇将首先介绍Kubernetes Scheduler的背景和它的演进过程,然后会通过 Kubernetes 1.16 版本分析基于谓词与优先级的调度器原理。在「Kubernetes解读」的第三篇 Scheduling Framework 中,我将通过 Kubernetes 1.18 版本分析基于Framwork的调度器原理。

Scheduler 概述

关于任务资源调度

In computing, scheduling is the method by which work is assigned to resources that complete the work.

在计算机中,调度指的是为任务(Work)分配它所需要的资源(Resource),从而使得完成任务的方法。这里的任务可能是计算的线程,进程或者是数据流,与此同时,对应的资源可能是CPU、网络、内存或者是扩展卡等硬件资源。在计算机中,调度系统无处不在,不论是操作系统级别的调度器,还是编程语言级别的调度器,或者是CDN的资源调度,打车平台订单的调度等等。

调度的核心就是对有限资源的合理分配,以达到我们期待实现的调度目标,本质上是解决资源的需求与供给不平衡的问题。

一个调度系统可能会有多种调度目标,比如:

  • 最大化吞吐量
  • 最小化等待时间
  • 最小化延时或者响应时间
  • 最大化公平

在实践中,这些指标往往是互相矛盾的,因此调度器的设计往往是根据实际需求的权衡利弊的折中方案。在操作系统的进程调度器中,待调度的任务就是线程,而需要给任务分配的资源就是CPU时间。对于Kubernetes来说,它调度的基本单位是Pod,这些Pod会被调度到不同的Node上执行。不同的节点上资源类型不同,包括CPU、GPU和内存等资源。这些资源可以被拆分,但是都属于当前节点。

任务资源调度设计的挑战

  • 调度:任务最少等待时间与优先级
  • 调度: 任务的本地性:尽可能将任务分配到包含任务执行资源的节点上
  • 集群:高可用性
  • 系统:可扩展性:系统如何如何应对业务需求的变化,提供的一种可扩展机制,在集群默认调度策略不满足业务需求时,通过扩展接口,来进行系统的扩展满足业务需求

Pod调度场景的挑战

亲和性与反亲和性

在kubernetes中的亲和性主要是指Pod和Node两种资源

  • 亲和性:
    • Pod和Pod之间的亲和性
    • Pod和Node之间的亲和性
  • 反亲和性
    • Pod和Pod之间的反亲和性
    • Pod和Node之间的反亲和性

举个例子:

  • Pod之间的反亲和: 为了保证高可用我们通常会将同一业务的多个节点分散在不通的数据中心和机架
  • Pod与Node亲和性: 比如某些需要磁盘IO操作的Pod,我们可以调度到具有SSD的机器上,提高IO性能

多租户与容量规划

多租户通常是为了进行集群资源的隔离,在业务系统中,通常会按照业务线来进行资源的隔离,同时会给业务设定对应的容量,从而避免单个业务线资源的过度使用影响整个公司的所有业务

Zone和Node的选择

zone通常是在业务容灾中常见的概念,通过将服务分散在多个数据中心,避免因为单个数据中心故障导致业务完全不可用

因为之前亲和性的问题,如何在多个zone中的所有node中选择出一个合适的节点,则是一个比较大的挑战

多样化资源的扩展

系统资源除了cpu、内存还包括网络、磁盘io、gpu等等,针对其余资源的分配调度,kubernetes还需要提供额外的扩展机制来进行调度扩展的支持

资源混部

kubernetes初期是针对pod调度场景而生,主要其实是在线web业务,这类任务的特点大部分都是无状态的,那如何针对离线场景的去支持离线的批处理计算等任务

Kubernetes Pod LifeCycle

下图展示了一个Pod在Kubernetes集群中从创建到运行的过程:

  • 用户通过REST API 向ApiServer 创建 Deployment/DaemonSet/Job等任务
  • ApiServer收到用户请求后,存储相关数据到Etcd
  • Scheduler通过监听ApiServer,获取未调度的 Pod 列表
  • Scheduler通过调度算法算出分配给Pod的Node,并将Node信息和Pod进行绑定,结果存储在Etcd
  • Node上的Kubelet感知到调度结果,拉取镜像并运行Pod

Kubernetes Pod LifeCycle

可以看到,Scheduler作为Kubernetes集群中核心模块,可以被视作一个黑盒:

  • 黑盒的输入为待调度的 Pod 和全部计算节点(Node)的信息
  • 黑盒的输出为经过内部调度算法和策略处理算出的最优节点

Scheduler 基本职责

kube-scheduler 是作为单独的进程启动的,可以总结 kube-scheduler 的职责有以下这些:

  1. Schduler高可用:基于Etcd实现分布式锁的竞争,实现高可用
  2. 调度资源监听:基于List/Watch机制监听ApiServer上资源的变化,这里的资源主要指的是 Pod 和 Node ;
  3. 调度节点分配:通过内部算法算出最优节点,并将结果写入Etcd

Scheduler 演进

从Kubernetes v1.0发布开始,Scheduler就采用了基于谓词和优先级的算法进行调度,在完全切换到Scheduling Framework之前,分别在

  • v1.2 引入了Scheduler Extender,支持外部扩展
  • v1.5 为调度器的优先级算法引入Map/Reduce的计算模式
  • v1.15 提出了基于Scheduling Framework的方式,实现Scheduler的轻量化、接口化与组件化
  • v1.18 将所有策略全部组件化,默认调度流程切换为Scheduling Framework
  • v1.19 将抢占过程也组件化,同时支持multi scheduling profile

Scheduler Evolution Timeline

随着容器化技术普及,Kubernetes已经成为容器管理领域的事实标准,除了传统的互联网场景的应用,像AI、大数据、边缘计算等场景也开始迁移到 k8s 平台。与此同时,不同场景对于k8s调度器提出的要求也越来越高,k8s调度器正在快速演进中。

在本篇后续的分析中,将基于Kubernetes 1.16版本对其设计实现的原理和思路进行分析。

Scheduler 初始化

调度器结构体初识

首先看一下调度器这个结构体的实现,其中比较关键的成员是 SchedulerCacheSchedulingQueueAlgorithm

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type Scheduler struct {
SchedulerCache internalcache.Cache
Algorithm core.ScheduleAlgorithm
GetBinder func(pod *v1.Pod) factory.Binder
PodConditionUpdater factory.PodConditionUpdater
PodPreemptor factory.PodPreemptor
Framework framework.Framework
NextPod func() *v1.Pod
WaitForCacheSync func() bool
Error func(*v1.Pod, error)
Recorder events.EventRecorder
StopEverything <-chan struct{}
VolumeBinder *volumebinder.VolumeBinder
DisablePreemption bool
SchedulingQueue internalqueue.SchedulingQueue
}

这里定义了几个标准动作的函数

  • NextPod():当有下一个可用的Pod的时候,返回对应Pod,否则阻塞。
  • WaitForCache():用于等待Cache同步。
  • Error():当调度出现错误的时候,会调用Error函数,其参数是错误的Pod和错误。
pkg/scheduler/factory/factory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
return &Config{
SchedulerCache: c.schedulerCache,
Algorithm: algo,
GetBinder: getBinderFunc(c.client, extenders),
PodConditionUpdater: &podConditionUpdater{c.client},
PodPreemptor: &podPreemptor{c.client},
Framework: c.framework,
WaitForCacheSync: func() bool {
return cache.WaitForCacheSync(c.StopEverything, c.scheduledPodsHasSynced)
},
NextPod: internalqueue.MakeNextPodFunc(c.podQueue),
Error: MakeDefaultErrorFunc(c.client, c.podQueue, c.schedulerCache, c.StopEverything),
StopEverything: c.StopEverything,
VolumeBinder: c.volumeBinder,
SchedulingQueue: c.podQueue,
}, nil

在创建 scheduler Config的时候,会依次对这几个函数定义

  • NextPod():调用SchedulingQueue的MakeNextPodFunc来获取下一个可调用Pod,本质上是调用Queue的Pop方法。
  • WaitForCache():调用SchedulerCache的WaitForCacheSync来等待Cache同步
  • Error():调用 MakeDefaultErrorFunc函数注入一个失败处理函数,主要讲失败的Pod放入到合适的队列重新再调度

除了这几个主要函数外,还为Scheduler结构定义了几个动作:

1
2
3
4
5
6
7
func (sched *Scheduler) schedule(pod *v1.Pod, pluginContext *framework.PluginContext) (core.ScheduleResult, error)
func (sched *Scheduler) preempt(pluginContext *framework.PluginContext, fwk framework.Framework, preemptor *v1.Pod, scheduleErr error) (string, error)
func (sched *Scheduler) assumeVolumes(assumed *v1.Pod, host string) (allBound bool, err error)
func (sched *Scheduler) bindVolumes(assumed *v1.Pod) error
func (sched *Scheduler) assume(assumed *v1.Pod, host string) error
func (sched *Scheduler) bind(assumed *v1.Pod, targetNode string, pluginContext *framework.PluginContext) error
func (sched *Scheduler) recordSchedulingFailure(pod *v1.Pod, err error, reason string, message string)

其中:

  • schedule():输入是Pod,输出是调度结果,执行调度主要逻辑,通过 genericScheduler实现
  • preempt():抢占调度,通过genericScheduler实现,并且更新 Nominated
  • assumeVolumes():根据选择的binding来更新Volume Cache
  • bindVolumes():绑定PV
  • assume():将Pod状态调整到Cache中,变为assumed
  • Bind():执行绑定操作

调度器参数初始化

我们在创建Scheduler结构体的时候会制定很多的参数:

cmd/kube-scheduler/server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Create the scheduler.
sched, err := scheduler.New(cc.Client,
cc.InformerFactory.Core().V1().Nodes(),
cc.PodInformer,
cc.InformerFactory.Core().V1().PersistentVolumes(),
cc.InformerFactory.Core().V1().PersistentVolumeClaims(),
cc.InformerFactory.Core().V1().ReplicationControllers(),
cc.InformerFactory.Apps().V1().ReplicaSets(),
cc.InformerFactory.Apps().V1().StatefulSets(),
cc.InformerFactory.Core().V1().Services(),
cc.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
cc.InformerFactory.Storage().V1().StorageClasses(),
cc.InformerFactory.Storage().V1beta1().CSINodes(),
cc.Recorder,
cc.ComponentConfig.AlgorithmSource,
stopCh,
registry,
cc.ComponentConfig.Plugins,
cc.ComponentConfig.PluginConfig,
scheduler.WithName(cc.ComponentConfig.SchedulerName),
scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight),
scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
scheduler.WithBindTimeoutSeconds(*cc.ComponentConfig.BindTimeoutSeconds))
if err != nil {
return err
}

这里的 New传递来自于cmd的参数,并且创建一个 Configurator

pkg/scheduler/scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// New returns a Scheduler
func New(client clientset.Interface,
nodeInformer coreinformers.NodeInformer,
podInformer coreinformers.PodInformer,
pvInformer coreinformers.PersistentVolumeInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
replicationControllerInformer coreinformers.ReplicationControllerInformer,
replicaSetInformer appsinformers.ReplicaSetInformer,
statefulSetInformer appsinformers.StatefulSetInformer,
serviceInformer coreinformers.ServiceInformer,
pdbInformer policyinformers.PodDisruptionBudgetInformer,
storageClassInformer storageinformersv1.StorageClassInformer,
csiNodeInformer storageinformersv1beta1.CSINodeInformer,
recorder events.EventRecorder,
schedulerAlgorithmSource kubeschedulerconfig.SchedulerAlgorithmSource,
stopCh <-chan struct{},
registry framework.Registry,
plugins *kubeschedulerconfig.Plugins,
pluginConfig []kubeschedulerconfig.PluginConfig,
opts ...func(o *schedulerOptions)) (*Scheduler, error) {

// Set up the configurator which can create schedulers from configs.
configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
Client: client,
NodeInformer: nodeInformer,
PodInformer: podInformer,
PvInformer: pvInformer,
PvcInformer: pvcInformer,
ReplicationControllerInformer: replicationControllerInformer,
ReplicaSetInformer: replicaSetInformer,
StatefulSetInformer: statefulSetInformer,
ServiceInformer: serviceInformer,
PdbInformer: pdbInformer,
StorageClassInformer: storageClassInformer,
CSINodeInformer: csiNodeInformer,
HardPodAffinitySymmetricWeight: options.hardPodAffinitySymmetricWeight,
DisablePreemption: options.disablePreemption,
PercentageOfNodesToScore: options.percentageOfNodesToScore,
BindTimeoutSeconds: options.bindTimeoutSeconds,
Registry: registry,
Plugins: plugins,
PluginConfig: pluginConfig,
})

var config *factory.Config

// 根据不同的schedulerAlgorithmSource创建不同的config
switch {
case source.Provider != nil:
config = configurator.CreateFromProvider(*source.Provider)
case source.Policy != nil:
config = configurator.CreateFromConfig(*policy)
default:
}

// Create the scheduler.
sched := NewFromConfig(config)

AddAllEventHandlers(sched, options.schedulerName, nodeInformer, podInformer, pvInformer, pvcInformer, serviceInformer, storageClassInformer, csiNodeInformer)
}

ConfigFactoryArgs是哪里来的?来自于命令行参数解析出来的。

pkg/scheduler/factory/factory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// ConfigFactoryArgs is a set arguments passed to NewConfigFactory.
type ConfigFactoryArgs struct {
Client clientset.Interface
NodeInformer coreinformers.NodeInformer
PodInformer coreinformers.PodInformer
PvInformer coreinformers.PersistentVolumeInformer
PvcInformer coreinformers.PersistentVolumeClaimInformer
ReplicationControllerInformer coreinformers.ReplicationControllerInformer
ReplicaSetInformer appsinformers.ReplicaSetInformer
StatefulSetInformer appsinformers.StatefulSetInformer
ServiceInformer coreinformers.ServiceInformer
PdbInformer policyinformers.PodDisruptionBudgetInformer
StorageClassInformer storageinformersv1.StorageClassInformer
CSINodeInformer storageinformersv1beta1.CSINodeInformer
HardPodAffinitySymmetricWeight int32
DisablePreemption bool
PercentageOfNodesToScore int32
BindTimeoutSeconds int64
StopCh <-chan struct{}
Registry framework.Registry
Plugins *config.Plugins
PluginConfig []config.PluginConfig
}

基于 ConfigFactoryArgs 构建 Configurator对象,在这个 NewConfigFactory函数里

  • 创建新的framework对象
  • 创建新的SchedulingQueue:podQueue
  • 创建新的SchedulerCache对象
  • 创建新的VolumeBinder
1
2
// NewConfigFactory initializes the default implementation of a Configurator.
func NewConfigFactory(args *ConfigFactoryArgs) *Configurator {}

当收到StopEverything的信号时,关闭podQueue。

CreateFromConfig 用于注册Predicate函数、注册Prioritize函数、生成Extender列表

pkg/scheduler/factory/factory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// CreateFromConfig creates a scheduler from the configuration file
func (c *Configurator) CreateFromConfig(policy schedulerapi.Policy) (*Config, error) {
klog.V(2).Infof("Creating scheduler from configuration: %v", policy)

// validate the policy configuration
if err := validation.ValidatePolicy(policy); err != nil {
return nil, err
}

predicateKeys := sets.NewString()
if policy.Predicates == nil {
klog.V(2).Infof("Using predicates from algorithm provider '%v'", DefaultProvider)
provider, err := GetAlgorithmProvider(DefaultProvider)
if err != nil {
return nil, err
}
predicateKeys = provider.FitPredicateKeys
} else {
for _, predicate := range policy.Predicates {
klog.V(2).Infof("Registering predicate: %s", predicate.Name)
predicateKeys.Insert(RegisterCustomFitPredicate(predicate))
}
}

priorityKeys := sets.NewString()
if policy.Priorities == nil {
klog.V(2).Infof("Using priorities from algorithm provider '%v'", DefaultProvider)
provider, err := GetAlgorithmProvider(DefaultProvider)
if err != nil {
return nil, err
}
priorityKeys = provider.PriorityFunctionKeys
} else {
for _, priority := range policy.Priorities {
klog.V(2).Infof("Registering priority: %s", priority.Name)
priorityKeys.Insert(RegisterCustomPriorityFunction(priority))
}
}

var extenders []algorithm.SchedulerExtender
if len(policy.ExtenderConfigs) != 0 {
ignoredExtendedResources := sets.NewString()
var ignorableExtenders []algorithm.SchedulerExtender
for ii := range policy.ExtenderConfigs {
klog.V(2).Infof("Creating extender with config %+v", policy.ExtenderConfigs[ii])
extender, err := core.NewHTTPExtender(&policy.ExtenderConfigs[ii])
if err != nil {
return nil, err
}
if !extender.IsIgnorable() {
extenders = append(extenders, extender)
} else {
ignorableExtenders = append(ignorableExtenders, extender)
}
for _, r := range policy.ExtenderConfigs[ii].ManagedResources {
if r.IgnoredByScheduler {
ignoredExtendedResources.Insert(string(r.Name))
}
}
}
// place ignorable extenders to the tail of extenders
extenders = append(extenders, ignorableExtenders...)
predicates.RegisterPredicateMetadataProducerWithExtendedResourceOptions(ignoredExtendedResources)
}
// Providing HardPodAffinitySymmetricWeight in the policy config is the new and preferred way of providing the value.
// Give it higher precedence than scheduler CLI configuration when it is provided.
if policy.HardPodAffinitySymmetricWeight != 0 {
c.hardPodAffinitySymmetricWeight = policy.HardPodAffinitySymmetricWeight
}
// When AlwaysCheckAllPredicates is set to true, scheduler checks all the configured
// predicates even after one or more of them fails.
if policy.AlwaysCheckAllPredicates {
c.alwaysCheckAllPredicates = policy.AlwaysCheckAllPredicates
}

return c.CreateFromKeys(predicateKeys, priorityKeys, extenders)
}

CreateFromKeys 基于刚才生成的predicateKeys, priorityKeys, extenders 得到PredicateFunc、PriorityFuncs,同时创建NewGenericScheduler,最后返回Config结构体。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// CreateFromKeys creates a scheduler from a set of registered fit predicate keys and priority keys.
func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error) {
klog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v'", predicateKeys, priorityKeys)

if c.GetHardPodAffinitySymmetricWeight() < 1 || c.GetHardPodAffinitySymmetricWeight() > 100 {
return nil, fmt.Errorf("invalid hardPodAffinitySymmetricWeight: %d, must be in the range 1-100", c.GetHardPodAffinitySymmetricWeight())
}

predicateFuncs, err := c.GetPredicates(predicateKeys)
if err != nil {
return nil, err
}

priorityConfigs, err := c.getPriorityFunctionConfigs(priorityKeys)
if err != nil {
return nil, err
}

priorityMetaProducer, err := c.getPriorityMetadataProducer()
if err != nil {
return nil, err
}

predicateMetaProducer, err := c.GetPredicateMetadataProducer()
if err != nil {
return nil, err
}

algo := core.NewGenericScheduler(
c.schedulerCache,
c.podQueue,
predicateFuncs,
predicateMetaProducer,
priorityConfigs,
priorityMetaProducer,
c.framework,
extenders,
c.volumeBinder,
c.pVCLister,
c.pdbLister,
c.alwaysCheckAllPredicates,
c.disablePreemption,
c.percentageOfNodesToScore,
c.enableNonPreempting,
)

return &Config{
SchedulerCache: c.schedulerCache,
Algorithm: algo,
GetBinder: getBinderFunc(c.client, extenders),
PodConditionUpdater: &podConditionUpdater{c.client},
PodPreemptor: &podPreemptor{c.client},
Framework: c.framework,
WaitForCacheSync: func() bool {
return cache.WaitForCacheSync(c.StopEverything, c.scheduledPodsHasSynced)
},
NextPod: internalqueue.MakeNextPodFunc(c.podQueue),
Error: MakeDefaultErrorFunc(c.client, c.podQueue, c.schedulerCache, c.StopEverything),
StopEverything: c.StopEverything,
VolumeBinder: c.volumeBinder,
SchedulingQueue: c.podQueue,
}, nil
}

事件Informer回调handler绑定

pkg/scheduler/eventhandler.go中,会将 informer 监听到的资源变更事件与对应的 handler绑定,绑定事件回调主要是通过AddAllEventHandlers主要是将各种资源数据通过SchedulerCache放入本地缓存中,同时针对未调度的pod(!assignedPod即没有绑定Node的pod)加入到调度队列中。主要的事件包括

  • Scheduled Pod Cache
    • 增加:addPodToCache
    • 更新:updatePodInCache
    • 删除:deletePodFromCache
  • Unscheduled Pod Queue
    • 增加:addPodToSchedulingQueue
    • 更新:updatePodInSchedulingQueue
    • 删除:deletePodFromSchedulingQueue
  • Node资源变更:
    • 增加:addNodeToCache
    • 更新:updateNodeInCache
    • 删除:deleteNodeInCache
  • PV资源变更
    • 增加:onPvAdd
    • 更新:onPvUpdate
  • PVC资源变更
    • 增加:onPvcAdd
    • 更新:onPvcUpdate
  • Service资源变更:这个主要是会影响 ServiceAffinity
    • 增加:onServiceAdd
    • 更新:onServiceUpdate
    • 删除:onServiceDelete
  • StorageClass资源变更
    • 增加:onStorageClassAdd
1
2
3
4
5
6
7
8
9
10
11
12
13
// AddAllEventHandlers is a helper function used in tests and in Scheduler
// to add event handlers for various informers.
func AddAllEventHandlers(
sched *Scheduler,
schedulerName string,
nodeInformer coreinformers.NodeInformer,
podInformer coreinformers.PodInformer,
pvInformer coreinformers.PersistentVolumeInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
serviceInformer coreinformers.ServiceInformer,
storageClassInformer storageinformersv1.StorageClassInformer,
csiNodeInformer storageinformersv1beta1.CSINodeInformer,
) {}

当集群资源发生变动时,比如service、volume等就会对unschedulableQ中的之前调度失败的pod进行重试,选择将其转移到activeQ或者backoffQ中,这时候会调用MoveAllToActiveQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (p *PriorityQueue) MoveAllToActiveQueue() {
p.lock.Lock()
defer p.lock.Unlock()

// There is a chance of errors when adding pods to other queues,
// we make a temporary slice to store the pods,
// since the probability is low, we set its len to 0
addErrorPods := make([]*framework.PodInfo, 0)

for _, pInfo := range p.unschedulableQ.podInfoMap {
pod := pInfo.Pod
if p.isPodBackingOff(pod) {
if err := p.podBackoffQ.Add(pInfo); err != nil {
klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err)
addErrorPods = append(addErrorPods, pInfo)
}
} else {
if err := p.activeQ.Add(pInfo); err != nil {
klog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err)
addErrorPods = append(addErrorPods, pInfo)
}
}
}
p.unschedulableQ.clear()
// Adding pods that we could not move to Active queue or Backoff queue back to the Unschedulable queue
for _, podInfo := range addErrorPods {
p.unschedulableQ.addOrUpdate(podInfo)
}
p.moveRequestCycle = p.schedulingCycle
p.cond.Broadcast()
}

启动调度器

那么整个Scheduler是如何跑起来的呢?它的入口是Run函数,一直运行scheduleOne函数,进入一个 control loop,直到收到了StopEverything的信号。

pkg/scheduler/scheduler.go
1
2
3
4
5
6
7
func (sched *Scheduler) Run() {
if !sched.WaitForCacheSync() {
return
}

go wait.Until(sched.scheduleOne, 0, sched.StopEverything)
}

SchedulingQueue 三级调度队列

SchedulingQueue 是一个Interface,它提供了以下的方法实现对于Pod的入队出队操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type SchedulingQueue interface {
Add(pod *v1.Pod) error
AddIfNotPresent(pod *v1.Pod) error
AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error
SchedulingCycle() int64
Pop() (*v1.Pod, error)
Update(oldPod, newPod *v1.Pod) error
Delete(pod *v1.Pod) error
MoveAllToActiveQueue()
AssignedPodAdded(pod *v1.Pod)
AssignedPodUpdated(pod *v1.Pod)
NominatedPodsForNode(nodeName string) []*v1.Pod
PendingPods() []*v1.Pod
Close()
UpdateNominatedPodForNode(pod *v1.Pod, nodeName string)
DeleteNominatedPodIfExists(pod *v1.Pod)
NumUnschedulablePods() int
}

实际上是通过 PriorityQueue 来实现这个queue的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type PriorityQueue struct {
stop <-chan struct{}
clock util.Clock
podBackoff *PodBackoffMap

lock sync.RWMutex
cond sync.Cond

activeQ *util.Heap
podBackoffQ *util.Heap
unschedulableQ *UnschedulablePodsMap
nominatedPods *nominatedPodMap
schedulingCycle int64
moveRequestCycle int64
closed bool
}

这里的优先级队列是一个三级调度队列,其主要包括:

  • 活动队列 activeQ:activeQ中存储当前系统中所有正在等待调度的Pod
  • 不可调度队列 unschedulableQ:当Pod的资源在当前集群中不能被满足时,则会被加入到一个不可调度队列中,然后等待稍后再进行尝试
  • backoffQ 队列:backoff机制是并发编程中常见的一种机制,即如果任务反复执行依旧失败,则会按次增长等待调度时间,降低重试效率,从而避免反复失败浪费调度资源。针对调度失败的pod会优先存储在backoff队列中,等待后续重试。

对于 backoffQunschedulableQ队列,我们需要定期从其中拿出Pod,放入到activeQ队列。

  • 每隔1秒执行 flushBackoffQCompleted,去找到backoffQ中等待到期的Pod,将其放入到activeQ中
  • 每隔30秒执行 flushUnschedulableQLeftover,如果当前时间-pod的最后调度时间大于60s,就重新调度,转移到podBackoffQ或者activeQ中
1
2
3
4
func (p *PriorityQueue) run() {
go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)
}

三级调度队列

ActiveQ 队列

当集群有新的Pod的时候

什么时候会有新的Pod呢?也就是集群资源发生变更的时候:要么是创建了新的Pod,要么增加了PV,改变了Node等资源,导致原来不可调度的Pod可以调度了,这个时候会调用SchedulingQueue.MoveAllToActiveQueue(参见pkg/scheduler/eventhandler.go)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (p *PriorityQueue) MoveAllToActiveQueue() {
p.lock.Lock()
defer p.lock.Unlock()

// There is a chance of errors when adding pods to other queues,
// we make a temporary slice to store the pods,
// since the probability is low, we set its len to 0
addErrorPods := make([]*framework.PodInfo, 0)

for _, pInfo := range p.unschedulableQ.podInfoMap {
pod := pInfo.Pod
if p.isPodBackingOff(pod) {
if err := p.podBackoffQ.Add(pInfo); err != nil {
klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err)
addErrorPods = append(addErrorPods, pInfo)
}
} else {
if err := p.activeQ.Add(pInfo); err != nil {
klog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err)
addErrorPods = append(addErrorPods, pInfo)
}
}
}
p.unschedulableQ.clear()
// Adding pods that we could not move to Active queue or Backoff queue back to the Unschedulable queue
for _, podInfo := range addErrorPods {
p.unschedulableQ.addOrUpdate(podInfo)
}
p.moveRequestCycle = p.schedulingCycle
p.cond.Broadcast()
}

同时,会更新moveRequestCycle参数。

ActiveQ加入操作干了啥呢?

  • 会将Pod将入到activeQ,并且从backoffQ和 unschedulableQ中移除当前Pod
  • 同时广播通知阻塞在Pop操作的scheduler获取新的Pod
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (p *PriorityQueue) Add(pod *v1.Pod) error {
p.lock.Lock()
defer p.lock.Unlock()
pInfo := p.newPodInfo(pod)
// 加入activeQ
if err := p.activeQ.Add(pInfo); err != nil {
klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
return err
}
// 从unschedulableQ删除
if p.unschedulableQ.get(pod) != nil {
klog.Errorf("Error: pod %v/%v is already in the unschedulable queue.", pod.Namespace, pod.Name)
p.unschedulableQ.delete(pod)
}
// Delete pod from backoffQ if it is backing off
// 从podBackoffQ删除
if err := p.podBackoffQ.Delete(pInfo); err == nil {
klog.Errorf("Error: pod %v/%v is already in the podBackoff queue.", pod.Namespace, pod.Name)
}
// 存储pod和被提名的node
p.nominatedPods.add(pod, "")
p.cond.Broadcast()

return nil
}

当Pod调度失败时

当调度失败的时候,scheduler会同时调用scheduler's.Error来调度之前失败的Pod

pkg/scheduler/scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
func (sched *Scheduler) recordSchedulingFailure(pod *v1.Pod, err error, reason string, message string) {
// 错误回调
sched.Error(pod, err)
sched.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", message)
if err := sched.PodConditionUpdater.Update(pod, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: reason,
Message: err.Error(),
}); err != nil {
klog.Errorf("Error updating the condition of the pod %s/%s: %v", pod.Namespace, pod.Name, err)
}
}

那么这个错误处理函数到底干了啥呢?

pkg/scheduler/factory/factory.go
1
2
3
4
5
6
7
8
9
10
func(pod *v1.Pod, err error) {
podSchedulingCycle := podQueue.SchedulingCycle()
// 省略非核心代码
if len(pod.Spec.NodeName) == 0 {
//重新放回队列
if err := podQueue.AddUnschedulableIfNotPresent(pod, podSchedulingCycle); err != nil {
klog.Error(err)
}
}
}

一个Pod调度失败,一种选择是放入到 backoffQ中,另一种选择是放入到 unschedulableQ 中,到底如何选择呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error {
...
// Every unschedulable pod is subject to backoff timers.
p.backoffPod(pod)

// If a move request has been received, move it to the BackoffQ, otherwise move
// it to unschedulableQ.
if p.moveRequestCycle >= podSchedulingCycle {
if err := p.podBackoffQ.Add(pInfo); err != nil {
return fmt.Errorf("error adding pod %v to the backoff queue: %v", pod.Name, err)
}
} else {
p.unschedulableQ.addOrUpdate(pInfo)
}

p.nominatedPods.add(pod, "")
return nil
}

一般来说,当一个Pod不能够被调度的时候,它会被放到 unschedulableQ 中,但是如果收到了一个Move Request,那么就将这个Pod移到BackoffQ。这是因为最近集群资源发生了变更,如果放到 BackoffQ,会更快的进行尝试这个Pod,更快地使它得到调度。

BackoffQ 队列

BackoffQ是一个堆,每次获取堆顶的元素,查看是否到期,如果到期则将其Pop出来,加入到activeQ中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (p *PriorityQueue) flushBackoffQCompleted() {
p.lock.Lock()
defer p.lock.Unlock()

for {
// 获取堆顶元素
rawPodInfo := p.podBackoffQ.Peek()
if rawPodInfo == nil {
return
}
pod := rawPodInfo.(*framework.PodInfo).Pod
// 获取到期时间
boTime, found := p.podBackoff.GetBackoffTime(nsNameForPod(pod))
if !found {
// 如果当前已经不在podBackoff中,则就pop出来然后放入到activeQ
klog.Errorf("Unable to find backoff value for pod %v in backoffQ", nsNameForPod(pod))
p.podBackoffQ.Pop()
p.activeQ.Add(rawPodInfo)
defer p.cond.Broadcast()
continue
}

// 未超时
if boTime.After(p.clock.Now()) {
return
}
// 超时就pop出来
_, err := p.podBackoffQ.Pop()
if err != nil {
klog.Errorf("Unable to pop pod %v from backoffQ despite backoff completion.", nsNameForPod(pod))
return
}
// 加入到activeQ中
p.activeQ.Add(rawPodInfo)
defer p.cond.Broadcast()
}
}

UnschedulableQ 队列

如果当前时间-pod的最后调度时间大于60s,就重新调度,转移到podBackoffQ或者activeQ中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (p *PriorityQueue) flushUnschedulableQLeftover() {
p.lock.Lock()
defer p.lock.Unlock()

var podsToMove []*framework.PodInfo
currentTime := p.clock.Now()
for _, pInfo := range p.unschedulableQ.podInfoMap {
lastScheduleTime := pInfo.Timestamp
// 如果该pod1分钟内没有被调度就加入到podsToMove
if currentTime.Sub(lastScheduleTime) > unschedulableQTimeInterval {
podsToMove = append(podsToMove, pInfo)
}
}

if len(podsToMove) > 0 {
// podsToMove将这些pod移动到activeQ
p.movePodsToActiveQueue(podsToMove)
}
}

NominatedPodMap

优先级队列有一个nominatedPods用来保存那些被提议运行在特定Nodes上的Pods,其数据结构为:

1
2
3
4
5
6
7
8
9
type nominatedPodMap struct {
// nominatedPods is a map keyed by a node name and the value is a list of
// pods which are nominated to run on the node. These are pods which can be in
// the activeQ or unschedulableQ.
nominatedPods map[string][]*v1.Pod
// nominatedPodToNode is map keyed by a Pod UID to the node name where it is
// nominated.
nominatedPodToNode map[ktypes.UID]string
}

NextPod()

获取下一个Pod的方法,本质上是一个出队操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
// MakeNextPodFunc returns a function to retrieve the next pod from a given
// scheduling queue
func MakeNextPodFunc(queue SchedulingQueue) func() *v1.Pod {
return func() *v1.Pod {
pod, err := queue.Pop()
if err == nil {
klog.V(4).Infof("About to try and schedule pod %v/%v", pod.Namespace, pod.Name)
return pod
}
klog.Errorf("Error while retrieving next pod from scheduling queue: %v", err)
return nil
}
}

Scheduler Cache

为什么需要 Scheduler Cache ? 这里的Cache主要用来收集Pod和Node级别的信息,便于Generic Scheduler在调度时高效的查询。

Cache collects pods’ information and provides node-level aggregated information.

It’s intended for generic scheduler to do efficient lookup.

下面是 schedulerCache结构体的详细定义,关于每个字段的具体含义,将在后面具体阐述。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type schedulerCache struct {
stop <-chan struct{}
ttl time.Duration
period time.Duration

mu sync.RWMutex
assumedPods map[string]bool
podStates map[string]*podState
nodes map[string]*nodeInfoListItem
csiNodes map[string]*storagev1beta1.CSINode
headNode *nodeInfoListItem
nodeTree *NodeTree
imageStates map[string]*imageState
}

Pod 状态

Cache的操作都是以Pod为中心的,对于每次Pod Events,Cache会做递增式update,下面是Cache的状态机。

1
2
3
4
5
6
7
8
9
10
11
12
13
// State Machine of a pod's events in scheduler's cache
// +-------------------------------------------+ +----+
// | Add | | |
// | | | | Update
// + Assume Add v v |
//Initial +--------> Assumed +------------+---> Added <--+
// ^ + + | +
// | | | | |
// | | | Add | | Remove
// | | | | |
// | | | + |
// +----------------+ +-----------> Expired +----> Deleted
// Forget Expire

这里有几个Event需要解释

  • Assume:assumes a pod scheduled and aggregates the pod’s information into its node
  • Forget:removes an assumed pod from cache
  • Expire:After expiration, its information would be subtracted
  • Add:either confirms a pod if it’s assumed, or adds it back if it’s expired
  • Update:removes oldPod’s information and adds newPod’s information
  • Remove:removes a pod. The pod’s information would be subtracted from assigned node.

与此同时还对应有Pod的几种状态,其中 InitialExpiredDeleted这三种状态的Pod在Cache中实际上是不存在的,这里只是为了状态机的表示方便。关于这几个状态的改变,有一个具体的实现结构体,主要是通过 podStateassumedPods 这两个map的状态来实现的。

Scheduler Pod Cache State Machine

在Cache的调度过程中,我们有以下几个假设

  • Pod是不会被Assume两次的
  • 一个Pod可能会直接被Add而不经过scheduler,这种情况下,我们只会看见Add Event而不会看见Assume Event
  • 如果一个Pod没有被Add过,那么他不会被Remove或者Update
  • ExpiredDeleted都是有效的最终状态。

Node 状态

在Cache中,Node通过双向链表的形式保存信息:

1
2
3
4
5
type nodeInfoListItem struct {
info *schedulernodeinfo.NodeInfo
next *nodeInfoListItem
prev *nodeInfoListItem
}

其中,NodeInfo保存的信息如下所示,包含了和Node相关的一系列信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type NodeInfo struct {
node *v1.Node

pods []*v1.Pod
podsWithAffinity []*v1.Pod
usedPorts HostPortInfo

requestedResource *Resource
nonzeroRequest *Resource
allocatableResource *Resource

taints []v1.Taint
taintsErr error

imageStates map[string]*ImageStateSummary

TransientInfo *TransientSchedulerInfo

memoryPressureCondition v1.ConditionStatus
diskPressureCondition v1.ConditionStatus
pidPressureCondition v1.ConditionStatus

generation int64
}

在上面的 schedulerCache 中通过 nodes 这个 map 和 headNode这个指针可以很快的访问Node相关信息。

NodeInfo 的更新

当收到informer通知,知道集群Node信息发生改变时,会更新Cache中的Node信息。

1
2
3
4
5
6
7
nodeInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: sched.addNodeToCache,
UpdateFunc: sched.updateNodeInCache,
DeleteFunc: sched.deleteNodeFromCache,
},
)

这里的addupdatedelete会分别调用Cache的 AddNodeUpdateNodeRemoveNode等函数。以 AddNode为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (cache *schedulerCache) AddNode(node *v1.Node) error {
cache.mu.Lock()
defer cache.mu.Unlock()

n, ok := cache.nodes[node.Name]
if !ok {
n = newNodeInfoListItem(schedulernodeinfo.NewNodeInfo())
cache.nodes[node.Name] = n
} else {
cache.removeNodeImageStates(n.info.Node())
}
cache.moveNodeInfoToHead(node.Name)

cache.nodeTree.AddNode(node)
cache.addNodeImageStates(node, n.info)
return n.info.SetNode(node)
}
  • 根据需要可以创建新的 NodeInfo 结构体,并且插入到双向链表中。
  • 每次更新Cache中的Node信息时,会将该Node移动到链表头。
  • 同时会更新 NodeTreeNodeImageStates中的信息。

NodeTree 实现节点打散

在Cache中还有一个NodeTree的指针用一个树形结构体保存Node的相关信息,目的是用于节点打散。节点打散主要是指的调度器调度的时候,在满足调度需求的情况下,为了保证pod均匀分配到所有的node节点上,通常会按照逐个zone逐个node节点进行分配,从而让pod节点打散在整个集群中。

NodeTree的结构如下所示,NodeTree的tree是一个字典,key是zone的名字,value是一个nodeArray,通过这样可以把不同zone的Node分隔开。nodeArray负责存储一个zone下面的所有node节点,并且通过lastIndex记录当前zone分配的节点索引。

1
2
3
4
5
6
7
8
9
10
11
12
type NodeTree struct {
tree map[string]*nodeArray // a map from zone (region-zone) to an array of nodes in the zone.
zones []string // a list of all the zones in the tree (keys)
zoneIndex int
numNodes int
mu sync.RWMutex
}

type nodeArray struct {
nodes []string
lastIndex int
}

我们可以把整个集群的Node看成二维数组,分别是zoneIndexnodeIndex

Scheduler Cache State Machine

每一次在 findNodesThatFit 函数中,通过调用 nodeName := g.cache.NodeTree().Next() 来获得下一个检查的Node,其具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (nt *NodeTree) Next() string {
nt.mu.Lock()
defer nt.mu.Unlock()
if len(nt.zones) == 0 {
return ""
}
numExhaustedZones := 0
for {
if nt.zoneIndex >= len(nt.zones) {
nt.zoneIndex = 0
}
zone := nt.zones[nt.zoneIndex]
nt.zoneIndex++
// We do not check the exhausted zones before calling next() on the zone. This ensures
// that if more nodes are added to a zone after it is exhausted, we iterate over the new nodes.
nodeName, exhausted := nt.tree[zone].next()
if exhausted {
numExhaustedZones++
if numExhaustedZones >= len(nt.zones) { // all zones are exhausted. we should reset.
nt.resetExhausted()
}
} else {
return nodeName
}
}
}

每次先从当前 zoneIndex 获取新的zone,然后更新 zoneIndex。在对应zone的 NodeArray中,调用其 next 方法,获得对应的Node,同时更新 nodeIndex

Snapshot

当scheduler获取一个待调度的pod,则需要从Cache中获取当前集群中的快照数据(当前此时集群中node的统计信息),用于后续调度流程中使用。

1
2
3
4
type Snapshot struct {
NodeInfoMap map[string]*NodeInfo
Generation int64
}

Snapshot的创建与更新

创建主要位于kubernetes/pkg/scheduler/core/generic_scheduler.go,实际上就是创建一个空的snapshot对象

1
nodeInfoSnapshot:         framework.NodeInfoSnapshot(),

数据的更新则是通过snapshot方法来调用Cache的更新接口来进行更新

1
2
3
4
func (g *genericScheduler) snapshot() error {
// Used for all fit and priority funcs.
return g.cache.UpdateNodeInfoSnapshot(g.nodeInfoSnapshot)
}

借助headNode实现增量标记

随着集群中node和pod的数量的增加,如果每次都全量获取snapshot则会严重影响调度器的调度效率,在Cache中通过一个双向链表和node的递增计数(etcd实现)来实现增量更新。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (cache *schedulerCache) UpdateNodeInfoSnapshot(nodeSnapshot *schedulernodeinfo.Snapshot) error {
cache.mu.Lock()
defer cache.mu.Unlock()
balancedVolumesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes)

// 获取当前snapshot的Genration
snapshotGeneration := nodeSnapshot.Generation

// 遍历双向链表,更新snapshot信息
for node := cache.headNode; node != nil; node = node.next {
if node.info.GetGeneration() <= snapshotGeneration {
//所有node信息都更新完毕
break
}
if balancedVolumesEnabled && node.info.TransientInfo != nil {
// Transient scheduler info is reset here.
node.info.TransientInfo.ResetTransientSchedulerInfo()
}
if np := node.info.Node(); np != nil {
nodeSnapshot.NodeInfoMap[np.Name] = node.info.Clone()
}
}
// 更新snapshot的genration
if cache.headNode != nil {
nodeSnapshot.Generation = cache.headNode.info.GetGeneration()
}

// 如果snapshot里面包含过期的pod信息则进行清理工作
if len(nodeSnapshot.NodeInfoMap) > len(cache.nodes) {
for name := range nodeSnapshot.NodeInfoMap {
if _, ok := cache.nodes[name]; !ok {
delete(nodeSnapshot.NodeInfoMap, name)
}
}
}
return nil
}

数据过期清理

数据存储

Cache要定时将之前在经过本地scheduler分配完成后的假设的pod的信息进行清理,如果这些pod在给定时间内仍然没有感知到对应的pod真正的添加事件则就这些pod删除。

1
assumedPods map[string]bool

后台定时任务

默认每1s进行清理一次,设定的 ttl 默认是30s。

1
2
3
func (cache *schedulerCache) run() {
go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop)
}

清理逻辑

清理逻辑主要是针对那些已经完成绑定的pod来进行,如果一个pod完成了在scheduler里面的所有操作后,会有一个过期时间,当前是30s,如果超过该时间即deadline小于当前的时间就删除该pod。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// cleanupAssumedPods exists for making test deterministic by taking time as input argument.
func (cache *schedulerCache) cleanupAssumedPods(now time.Time) {
cache.mu.Lock()
defer cache.mu.Unlock()

// The size of assumedPods should be small
for key := range cache.assumedPods {
ps, ok := cache.podStates[key]
if !ok {
panic("Key found in assumed set but not in podStates. Potentially a logical error.")
}
// 未完成绑定的pod不会被进行清理
if !ps.bindingFinished {
klog.V(3).Infof("Couldn't expire cache for pod %v/%v. Binding is still in progress.",
ps.pod.Namespace, ps.pod.Name)
continue
}
// 在完成bind之后会设定一个过期时间,目前是30s,如果deadline即bind时间+30s小于当前时间就过期删除
if now.After(*ps.deadline) {
klog.Warningf("Pod %s/%s expired", ps.pod.Namespace, ps.pod.Name)
if err := cache.expirePod(key, ps); err != nil {
klog.Errorf("ExpirePod failed for %s: %v", key, err)
}
}
}
}

清理pod

清理pod主要分为如下几个部分:

  1. 对应pod假定分配node的信息
  2. 清理映射的podState信息
1
2
3
4
5
6
7
8
func (cache *schedulerCache) expirePod(key string, ps *podState) error {
if err := cache.removePod(ps.pod); err != nil {
return err
}
delete(cache.assumedPods, key)
delete(cache.podStates, key)
return nil
}

Predicate 预选

调度器的目的就是将调度队列中的Pod合理地分配到具有匹配资源的Node上,在Scheduling Framework之前其算法步骤就是预选与优选。预选就是从当前集群中所有节点中,选择满足当前Pod资源和亲和性等要求Node节点,起的是过滤的作用。预选需要考虑的问题是,当集群中Node节点众多时,如何快速高效的过滤出这样的节点。

Predicate 算法注册

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
func init() {
// Register functions that extract metadata used by predicates computations.
factory.RegisterPredicateMetadataProducerFactory(
func(args factory.PluginFactoryArgs) predicates.PredicateMetadataProducer {
return predicates.NewPredicateMetadataFactory(args.PodLister)
})

// IMPORTANT NOTES for predicate developers:
// Registers predicates and priorities that are not enabled by default, but user can pick when creating their
// own set of priorities/predicates.

// PodFitsPorts has been replaced by PodFitsHostPorts for better user understanding.
// For backwards compatibility with 1.0, PodFitsPorts is registered as well.
factory.RegisterFitPredicate("PodFitsPorts", predicates.PodFitsHostPorts)
// Fit is defined based on the absence of port conflicts.
// This predicate is actually a default predicate, because it is invoked from
// predicates.GeneralPredicates()
factory.RegisterFitPredicate(predicates.PodFitsHostPortsPred, predicates.PodFitsHostPorts)
// Fit is determined by resource availability.
// This predicate is actually a default predicate, because it is invoked from
// predicates.GeneralPredicates()
factory.RegisterFitPredicate(predicates.PodFitsResourcesPred, predicates.PodFitsResources)
// Fit is determined by the presence of the Host parameter and a string match
// This predicate is actually a default predicate, because it is invoked from
// predicates.GeneralPredicates()
factory.RegisterFitPredicate(predicates.HostNamePred, predicates.PodFitsHost)
// Fit is determined by node selector query.
factory.RegisterFitPredicate(predicates.MatchNodeSelectorPred, predicates.PodMatchNodeSelector)

// Fit is determined by volume zone requirements.
factory.RegisterFitPredicateFactory(
predicates.NoVolumeZoneConflictPred,
func(args factory.PluginFactoryArgs) predicates.FitPredicate {
return predicates.NewVolumeZonePredicate(args.PVInfo, args.PVCInfo, args.StorageClassInfo)
},
)
// Fit is determined by whether or not there would be too many AWS EBS volumes attached to the node
factory.RegisterFitPredicateFactory(
predicates.MaxEBSVolumeCountPred,
func(args factory.PluginFactoryArgs) predicates.FitPredicate {
return predicates.NewMaxPDVolumeCountPredicate(predicates.EBSVolumeFilterType, args.CSINodeInfo, args.StorageClassInfo, args.PVInfo, args.PVCInfo)
},
)
// Fit is determined by whether or not there would be too many GCE PD volumes attached to the node
factory.RegisterFitPredicateFactory(
predicates.MaxGCEPDVolumeCountPred,
func(args factory.PluginFactoryArgs) predicates.FitPredicate {
return predicates.NewMaxPDVolumeCountPredicate(predicates.GCEPDVolumeFilterType, args.CSINodeInfo, args.StorageClassInfo, args.PVInfo, args.PVCInfo)
},
)
// Fit is determined by whether or not there would be too many Azure Disk volumes attached to the node
factory.RegisterFitPredicateFactory(
predicates.MaxAzureDiskVolumeCountPred,
func(args factory.PluginFactoryArgs) predicates.FitPredicate {
return predicates.NewMaxPDVolumeCountPredicate(predicates.AzureDiskVolumeFilterType, args.CSINodeInfo, args.StorageClassInfo, args.PVInfo, args.PVCInfo)
},
)
factory.RegisterFitPredicateFactory(
predicates.MaxCSIVolumeCountPred,
func(args factory.PluginFactoryArgs) predicates.FitPredicate {
return predicates.NewCSIMaxVolumeLimitPredicate(args.CSINodeInfo, args.PVInfo, args.PVCInfo, args.StorageClassInfo)
},
)
factory.RegisterFitPredicateFactory(
predicates.MaxCinderVolumeCountPred,
func(args factory.PluginFactoryArgs) predicates.FitPredicate {
return predicates.NewMaxPDVolumeCountPredicate(predicates.CinderVolumeFilterType, args.CSINodeInfo, args.StorageClassInfo, args.PVInfo, args.PVCInfo)
},
)

// Fit is determined by inter-pod affinity.
factory.RegisterFitPredicateFactory(
predicates.MatchInterPodAffinityPred,
func(args factory.PluginFactoryArgs) predicates.FitPredicate {
return predicates.NewPodAffinityPredicate(args.NodeInfo, args.PodLister)
},
)

// Fit is determined by non-conflicting disk volumes.
factory.RegisterFitPredicate(predicates.NoDiskConflictPred, predicates.NoDiskConflict)

// GeneralPredicates are the predicates that are enforced by all Kubernetes components
// (e.g. kubelet and all schedulers)
factory.RegisterFitPredicate(predicates.GeneralPred, predicates.GeneralPredicates)

// Fit is determined by node memory pressure condition.
factory.RegisterFitPredicate(predicates.CheckNodeMemoryPressurePred, predicates.CheckNodeMemoryPressurePredicate)

// Fit is determined by node disk pressure condition.
factory.RegisterFitPredicate(predicates.CheckNodeDiskPressurePred, predicates.CheckNodeDiskPressurePredicate)

// Fit is determined by node pid pressure condition.
factory.RegisterFitPredicate(predicates.CheckNodePIDPressurePred, predicates.CheckNodePIDPressurePredicate)

// Fit is determined by node conditions: not ready, network unavailable or out of disk.
factory.RegisterMandatoryFitPredicate(predicates.CheckNodeConditionPred, predicates.CheckNodeConditionPredicate)

// Fit is determined based on whether a pod can tolerate all of the node's taints
factory.RegisterFitPredicate(predicates.PodToleratesNodeTaintsPred, predicates.PodToleratesNodeTaints)

// Fit is determined by volume topology requirements.
factory.RegisterFitPredicateFactory(
predicates.CheckVolumeBindingPred,
func(args factory.PluginFactoryArgs) predicates.FitPredicate {
return predicates.NewVolumeBindingPredicate(args.VolumeBinder)
},
)
}

局部最优

预选流程需要从当前集群中选择一台符合要求的node。随着集群规模的增长,如果每次遍历所有集群node则会必然导致性能的下降,于是通过局部最优解的方式,缩小筛选节点的数量。具体来说,genericScheduler定义了 minFeasibleNodesToFindminFeasibleNodesPercentageToFind这两个常量。

1
2
3
4
const (
minFeasibleNodesToFind = 100
minFeasibleNodesPercentageToFind = 5
)
  • minFeasibleNodesToFind:定义了在调度阶段参与打分的最小节点数,默认为100。
  • minFeasibleNodesPercentageToFind:定义了在调度阶段参与打分的最小百分比,默认为5%。

通过numFeasibleNodesToFind 函数,结合当前集群中的Node数量,和默认的最小值来决定本次预选阶段需要获取的node节点数量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes int32) {
// 如果当前节点数量小于minFeasibleNodesToFind即小于100台node
// 同理百分比如果大于100就是全量取样
// 这两种情况都直接遍历整个集群中所有节点
if numAllNodes < minFeasibleNodesToFind || g.percentageOfNodesToScore >= 100 {
return numAllNodes
}

// schedulerapi.DefaultPercentageOfNodesToScore = 50
adaptivePercentage := g.percentageOfNodesToScore
if adaptivePercentage <= 0 {
adaptivePercentage = schedulerapi.DefaultPercentageOfNodesToScore - numAllNodes/125
if adaptivePercentage < minFeasibleNodesPercentageToFind {
adaptivePercentage = minFeasibleNodesPercentageToFind
}
}

// 正常取样计算:比如numAllNodes为5000,而adaptivePercentage为50%
// 则numNodes=50000*0.5/100=250
numNodes = numAllNodes * adaptivePercentage / 100
if numNodes < minFeasibleNodesToFind { // 如果小于最少取样则按照最少取样进行取样
return minFeasibleNodesToFind
}

return numNodes
}

并行加速

在当前k8s版本中,默认会启动16个goroutine来进行并行的预选,从而提高预选的性能

并行取样主要通过调用下面的函数来启动16个goroutine来进行并行取样,并通过ctx来协调退出

1
workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)

img

通过channel来构建取样索引的管道,每个worker会负责从channel获取的指定索引取样node的填充

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc) {
var stop <-chan struct{}
if ctx != nil {
stop = ctx.Done()
}

// 生成指定数量索引,worker通过索引来进行预选成功节点的存储
toProcess := make(chan int, pieces)
for i := 0; i < pieces; i++ {
toProcess <- i
}
close(toProcess)

if pieces < workers {
workers = pieces
}

wg := sync.WaitGroup{}
wg.Add(workers)
for i := 0; i < workers; i++ {
// 启动多个goroutine
go func() {
defer utilruntime.HandleCrash()
defer wg.Done()
for piece := range toProcess {
select {
case <-stop:
return
default:
//获取索引,后续会通过该索引来进行结果的存储
doWorkPiece(piece)
}
}
}()
}
// 等待退出
wg.Wait()
}

具体每个实际并发执行函数为,它通过在 NodeTree 获取下一个可用的Node,然后调用 podFitsOnNode来检查该Pod是否可以运行在对应的Node上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
checkNode := func(i int) {
nodeName := g.cache.NodeTree().Next()

fits, failedPredicates, status, err := g.podFitsOnNode(
pluginContext,
pod,
meta,
g.nodeInfoSnapshot.NodeInfoMap[nodeName],
g.predicates,
g.schedulingQueue,
g.alwaysCheckAllPredicates,
)
...
}

两轮筛选

为了检查一个Pod是否能够运行在给定的Node上,我们通过运行 podFitsOnNode来检查一系列的predicate函数。在这里我们会运行两轮筛选。

  • 面向未来调度的预选:
    • 如果在这个Node上有相同或者更高优先级的Nominated Pods,我们把这些pods加入到meta和nodeInfo中,然后运行predicate算法。之所以考虑更高优先级,是因为当前Pod抢占了低优先级Pod的资源是OK的,但是如果占有了更高优先级资源是不允许的。
    • 如果在筛选的时候,没有Nominated Pods,或者第一轮筛选中没有通过,那么就不会运行第二轮筛选。
  • 面向当前资源的预选:
    • 在这一轮筛选中,如果通过了所有的算法,那么需要在这些pods不加入的情况下,再运行一轮筛选。

第二轮筛选必须存在的原因是,有些预选算法(比如Pod间的亲和性算法)在没有Nominated Pods的条件下可能不会通过筛选。本质上运行两次是一种保守的决策算法。如果我们把nominated pod视作正在运行,那么resource和Pod间anti-affinity算法更有可能失败;如果我们不把nominated pod视作正在运行,那么像pod间的亲和性算法更有可能失败。本质上我们不能假定 Nominated Pods 是否运行,因为它们现在没有运行,而且有可能被调度到另一个Node运行。

通过两轮筛选在无论那些优先级高的pod是否被调度到当前node上,都可以满足pod的调度需求,在调度的流程中只需要获取之前注册的调度算法,完成预选检测,如果发现有条件不通过则不会进行第二轮筛选,继续选择下一个节点。

两轮预选

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (g *genericScheduler) podFitsOnNode(
pluginContext *framework.PluginContext,
pod *v1.Pod,
meta predicates.PredicateMetadata,
info *schedulernodeinfo.NodeInfo,
predicateFuncs map[string]predicates.FitPredicate,
queue internalqueue.SchedulingQueue,
alwaysCheckAllPredicates bool,
) (bool, []predicates.PredicateFailureReason, *framework.Status, error) {
...
podsAdded := false
for i := 0; i < 2; i++ {
metaToUse := meta
nodeInfoToUse := info
if i == 0 {
podsAdded, metaToUse, nodeInfoToUse = addNominatedPods(pod, meta, info, queue)
} else if !podsAdded || len(failedPredicates) != 0 {
break
}
for _, predicateKey := range predicates.Ordering() {
...
}

return len(failedPredicates) == 0 && status.IsSuccess(), failedPredicates, status, nil
}

Priority 优选

优选阶段主要是对通过了预选过滤的节点按照各种算法打分,打分的结果以 HostPriority 的形式记录

1
2
3
4
5
6
7
8
9
10
// HostPriority represents the priority of scheduling to a particular host, higher priority is better.
type HostPriority struct {
// Name of the host
Host string
// Score associated with the host
Score int
}

// HostPriorityList declares a []HostPriority type.
type HostPriorityList []HostPriority

为了提高优选过程中的计算速度,采用了 Map/Reduce 的方法对计算并行加速,结果存储在一个二维数组中。无锁计算结果的保存主要是通过下面的二维数组实现, 如果要存储一个算法针对某个node的结果,其实只需要通过两个索引即可:算法索引和节点索引。

image.png

1
2
// 在计算的时候,会传入nodes []*v1.Node的数组,存储所有的节点,节点索引主要是指的该部分
results := make([]schedulerapi.HostPriorityList, len(priorityConfigs), len(priorityConfigs))

Priority算法注册

在优选过程中,每一种策略都以 PriorityConfig 结构表示,具体包含 Map 函数和 Reduce函数。

1
2
3
4
5
6
7
8
9
type PriorityConfig struct {
Name string
Map PriorityMapFunction
Reduce PriorityReduceFunction
// TODO: Remove it after migrating all functions to
// Map-Reduce pattern.
Function PriorityFunction
Weight int
}

这两种函数定义如下:

1
2
3
4
5
// Map:输入是(pod, meta, nodeInfo),输出是该Pod根据该算法在该节点算出的得分
type PriorityMapFunction func(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (schedulerapi.HostPriority, error)

// Reduce:输入是(pod, meta, map[string]*NodeInfo, result)
type PriorityReduceFunction func(pod *v1.Pod, meta interface{}, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, result schedulerapi.HostPriorityList) error

那么,这些算法是在哪里注册的呢?在factory目录下有注册函数,指定算法名和map/reduce函数以及权重,

pkg/scheduler/factory/plugins.go
1
2
3
4
5
6
7
8
9
10
11
12
func RegisterPriorityMapReduceFunction(
name string,
mapFunction priorities.PriorityMapFunction,
reduceFunction priorities.PriorityReduceFunction,
weight int) string {
return RegisterPriorityConfigFactory(name, PriorityConfigFactory{
MapReduceFunction: func(PluginFactoryArgs) (priorities.PriorityMapFunction, priorities.PriorityReduceFunction) {
return mapFunction, reduceFunction
},
Weight: weight,
})
}

pkg/scheduler/algorithmprovider/defaults/register_priorities.go中有 init函数来注册:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func init() {
// Register functions that extract metadata used by priorities computations.
factory.RegisterPriorityMetadataProducerFactory(
func(args factory.PluginFactoryArgs) priorities.PriorityMetadataProducer {
return priorities.NewPriorityMetadataFactory(args.ServiceLister, args.ControllerLister, args.ReplicaSetLister, args.StatefulSetLister)
})

...

// Prioritize nodes by least requested utilization.
factory.RegisterPriorityMapReduceFunction(priorities.LeastRequestedPriority, priorities.LeastRequestedPriorityMap, nil, 1)

// Prioritizes nodes to help achieve balanced resource usage
factory.RegisterPriorityMapReduceFunction(priorities.BalancedResourceAllocation, priorities.BalancedResourceAllocationMap, nil, 1)

// Set this weight large enough to override all other priority functions.
// TODO: Figure out a better way to do this, maybe at same time as fixing #24720.
factory.RegisterPriorityMapReduceFunction(priorities.NodePreferAvoidPodsPriority, priorities.CalculateNodePreferAvoidPodsPriorityMap, nil, 10000)

// Prioritizes nodes that have labels matching NodeAffinity
factory.RegisterPriorityMapReduceFunction(priorities.NodeAffinityPriority, priorities.CalculateNodeAffinityPriorityMap, priorities.CalculateNodeAffinityPriorityReduce, 1)

// Prioritizes nodes that marked with taint which pod can tolerate.
factory.RegisterPriorityMapReduceFunction(priorities.TaintTolerationPriority, priorities.ComputeTaintTolerationPriorityMap, priorities.ComputeTaintTolerationPriorityReduce, 1)

// ImageLocalityPriority prioritizes nodes that have images requested by the pod present.
factory.RegisterPriorityMapReduceFunction(priorities.ImageLocalityPriority, priorities.ImageLocalityPriorityMap, nil, 1)
}

下面对每一个策略进行简单分析。

基于节点索引的Map计算

Map算法将Node方向的计算并行化,对于每一个Node,循环计算该Node在各个算法上的得分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
nodeInfo := nodeNameToInfo[nodes[index].Name]
for i := range priorityConfigs {
if priorityConfigs[i].Function != nil {
continue
}

var err error
results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
if err != nil {
appendError(err)
results[i][index].Host = nodes[index].Name
}
}
})

基于算法索引的Reduce计算

Reduce计算,则是为每个算法的计算都启动一个goroutine,每个goroutine通过算法索引来进行该算法的所有map阶段的结果的读取,并进行计算,后续结果仍然存储在对应的位置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
for i := range priorityConfigs {
if priorityConfigs[i].Reduce == nil {
continue
}
wg.Add(1)
go func(index int) {
defer wg.Done()
if err := priorityConfigs[index].Reduce(pod, meta, nodeNameToInfo, results[index]); err != nil {
appendError(err)
}
if klog.V(10) {
for _, hostPriority := range results[index] {
klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), hostPriority.Host, priorityConfigs[index].Name, hostPriority.Score)
}
}
}(i)
}

实际上优选算法中有 Reduce 函数的并不多,只有 NodeAffinityTaintToleration两个算有有Reduce函数,而且它们实质上都是调用的 NormalizeReduce。本质上就是将之前算出来的得分正则化,使其处于 [0, maxPriority]区间。因此,在Scheduling Framework框架下,这一部分被 Normalize Scoring阶段所取代。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func NormalizeReduce(maxPriority int, reverse bool) PriorityReduceFunction {
return func(
_ *v1.Pod,
_ interface{},
_ map[string]*schedulernodeinfo.NodeInfo,
result schedulerapi.HostPriorityList) error {

var maxCount int
for i := range result {
if result[i].Score > maxCount {
maxCount = result[i].Score
}
}

if maxCount == 0 {
if reverse {
for i := range result {
result[i].Score = maxPriority
}
}
return nil
}

for i := range result {
score := result[i].Score

score = maxPriority * score / maxCount
if reverse {
score = maxPriority - score
}

result[i].Score = score
}
return nil
}
}

Preempt 抢占

抢占调度是分布式调度中一种常见的设计,其核心目标是当不能为高优先级的任务分配资源的时候,会通过抢占低优先级的任务来进行高优先级的调度。

抢占核心流程

抢占条件检测

如果发现需要执行抢占的pod有提名的node,并且对应node上面存在比自己优先级低的pod正在进行删除, 则不允许进行抢占。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func podEligibleToPreemptOthers(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, enableNonPreempting bool) bool {
if enableNonPreempting && pod.Spec.PreemptionPolicy != nil && *pod.Spec.PreemptionPolicy == v1.PreemptNever {
klog.V(5).Infof("Pod %v/%v is not eligible for preemption because it has a preemptionPolicy of %v", pod.Namespace, pod.Name, v1.PreemptNever)
return false
}
nomNodeName := pod.Status.NominatedNodeName
if len(nomNodeName) > 0 {
if nodeInfo, found := nodeNameToInfo[nomNodeName]; found {
podPriority := util.GetPodPriority(pod)
for _, p := range nodeInfo.Pods() {
if p.DeletionTimestamp != nil && util.GetPodPriority(p) < podPriority {
// 正在终止的优先级低于当前pod的pod就不会进行抢占
return false
}
}
}
}
return true
}

筛选潜在节点

每个node在预选阶段都会进行一个标记,标记当前node执行预选失败的原因,筛选潜在节点主要是根据对应的错误来进行筛选,如果不是不可解决的预选错误,则该node节点就可以参与接下来的抢占阶段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func nodesWherePreemptionMightHelp(nodes []*v1.Node, fitErr *FitError) []*v1.Node {
potentialNodes := []*v1.Node{}
// 根据预选阶段的错误原因,如果不存在无法解决的错误,则这些node可能在接下来的抢占流程中被使用
for _, node := range nodes {
if fitErr.FilteredNodesStatuses[node.Name].Code() == framework.UnschedulableAndUnresolvable {
continue
}
failedPredicates, _ := fitErr.FailedPredicates[node.Name]
if !unresolvablePredicateExists(failedPredicates) {
// 如果我们发现并不是不可解决的调度错误的时候,就将这个节点加入到这里
// 可能通过后续的调整会让这些node重新满足
klog.V(3).Infof("Node %v is a potential node for preemption.", node.Name)
potentialNodes = append(potentialNodes, node)
}
}
return potentialNodes
}

并行筛选节点

筛选抢占节点主要是并行对之前筛选潜在node进行尝试,通过驱逐低优先级pod满足高优先级pod调度,最终会筛选一批可以通过抢占来满足pod调度需要的节点, 其核心实现时通过selectVictimsOnNode来进行检测。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func (g *genericScheduler) selectNodesForPreemption(
pluginContext *framework.PluginContext,
pod *v1.Pod,
nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
potentialNodes []*v1.Node,
fitPredicates map[string]predicates.FitPredicate,
metadataProducer predicates.PredicateMetadataProducer,
queue internalqueue.SchedulingQueue,
pdbs []*policy.PodDisruptionBudget,
) (map[*v1.Node]*schedulerapi.Victims, error) {
nodeToVictims := map[*v1.Node]*schedulerapi.Victims{}
var resultLock sync.Mutex

// We can use the same metadata producer for all nodes.
meta := metadataProducer(pod, nodeNameToInfo)
checkNode := func(i int) {
nodeName := potentialNodes[i].Name
var metaCopy predicates.PredicateMetadata
if meta != nil {
metaCopy = meta.ShallowCopy()
}
pods, numPDBViolations, fits := g.selectVictimsOnNode(pluginContext, pod, metaCopy, nodeNameToInfo[nodeName], fitPredicates, queue, pdbs)
if fits {
resultLock.Lock()
victims := schedulerapi.Victims{
Pods: pods,
NumPDBViolations: numPDBViolations,
}
nodeToVictims[potentialNodes[i]] = &victims
resultLock.Unlock()
}
}
workqueue.ParallelizeUntil(context.TODO(), 16, len(potentialNodes), checkNode)
return nodeToVictims, nil
}

单点筛选节点

selectVictimsOnNode即单点筛选流程是针对单个node来指向具体的驱逐抢占决策的流程, 其核心流程如下

image.png

优先级筛选

优先级筛选首先会对当前node上面的所有节点进行优先级排序,移除所有比当前pod低的pod

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
potentialVictims := util.SortableList{CompFunc: util.MoreImportantPod}
nodeInfoCopy := nodeInfo.Clone()

removePod := func(rp *v1.Pod) {
nodeInfoCopy.RemovePod(rp)
if meta != nil {
meta.RemovePod(rp, nodeInfoCopy.Node())
}
}
addPod := func(ap *v1.Pod) {
nodeInfoCopy.AddPod(ap)
if meta != nil {
meta.AddPod(ap, nodeInfoCopy)
}
}
podPriority := util.GetPodPriority(pod)
for _, p := range nodeInfoCopy.Pods() {
if util.GetPodPriority(p) < podPriority {
// 移除所有优先级比自己低的pod
potentialVictims.Items = append(potentialVictims.Items, p)
removePod(p)
}
}

预选判断

对移除所有优先级比自己的pod之后,会尝试进行预选流程,如果发现预选流程失败,则当前node即使通过移除所有比自己优先级低的pod也不能满足调度需求,则就进行下一个node判断

1
2
3
4
5
6
7
if fits, _, _, err := g.podFitsOnNode(pluginContext, pod, meta, nodeInfoCopy, fitPredicates, queue, false); !fits {
if err != nil {
klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
}

return nil, 0, false
}

PDB分组与分组算法

PDB分组就是对当前节点上筛选出来的低优先级pod按照是否有PDB匹配来进行分组,分为违反PDB和未违反PDB的两组。

1
violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims.Items, pdbs)

分组算法其实也不难,只需要遍历所有的pdb和pod就可以得到最终的分组。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func filterPodsWithPDBViolation(pods []interface{}, pdbs []*policy.PodDisruptionBudget) (violatingPods, nonViolatingPods []*v1.Pod) {
for _, obj := range pods {
pod := obj.(*v1.Pod)
pdbForPodIsViolated := false
// A pod with no labels will not match any PDB. So, no need to check.
if len(pod.Labels) != 0 {
for _, pdb := range pdbs {
if pdb.Namespace != pod.Namespace {
continue
}
selector, err := metav1.LabelSelectorAsSelector(pdb.Spec.Selector)
if err != nil {
continue
}
// A PDB with a nil or empty selector matches nothing.
if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
continue
}
// We have found a matching PDB.
if pdb.Status.PodDisruptionsAllowed <= 0 {
pdbForPodIsViolated = true
break
}
}
}
if pdbForPodIsViolated {
violatingPods = append(violatingPods, pod)
} else {
nonViolatingPods = append(nonViolatingPods, pod)
}
}
return violatingPods, nonViolatingPods
}

违反PDB计数与最少驱逐汇总

会分别对违反PDB和不违反的pod集合来进行reprievePod检测,如果加入当前pod后,不能满足预选筛选流程,则该pod则必须被进行移除加入到victims中, 同时如果是违反PDB的pod则需要进行违反pdb计数numViolatingVictim

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
reprievePod := func(p *v1.Pod) bool { 
// 我们首先将pod加入到meta中
addPod(p)
fits, _, _, _ := g.podFitsOnNode(pluginContext, pod, meta, nodeInfoCopy, fitPredicates, queue, false)
//
if !fits {
// 如果我们加入了pod然后导致了预选不成功,则这个pod必须给移除
removePod(p)
victims = append(victims, p) // 添加到我们需要移除的列表里面
klog.V(5).Infof("Pod %v/%v is a potential preemption victim on node %v.", p.Namespace, p.Name, nodeInfo.Node().Name)
}
return fits
}
for _, p := range violatingVictims {
if !reprievePod(p) {
numViolatingVictim++
}
}
// Now we try to reprieve non-violating victims.
for _, p := range nonViolatingVictims {
// 尝试移除未违反pdb的pod
reprievePod(p)
}
return victims, numViolatingVictim, true

筛选最优抢占

最优筛选主要是通过 pickOneNodeForPreemption 实现,其中筛选数据存储结构主要是通过重用minNodes1和minNodes2两段内存来进行实现,这两个node数组分别配有两个计数器lenNodes1和lenNodes2, 针对具有相同优先级、相同数量的node,每增加一个会进行一次计数器累加, 核心算法流程如下

image.png

最少违反PDB

最少违反PDB是根据前面统计的违反PDB的计数统计,找到最少违反的node,如果是单个node则直接返回筛选结束

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
minNumPDBViolatingPods := math.MaxInt32
var minNodes1 []*v1.Node
lenNodes1 := 0
for node, victims := range nodesToVictims {
if len(victims.Pods) == 0 {
// 如果发现一个noed不需要任何抢占,则返回它
return node
}
numPDBViolatingPods := victims.NumPDBViolations
if numPDBViolatingPods < minNumPDBViolatingPods {
// 如果小于最小pdb数量, 如果数量发生变化,就重置
minNumPDBViolatingPods = numPDBViolatingPods
minNodes1 = nil
lenNodes1 = 0
}
if numPDBViolatingPods == minNumPDBViolatingPods {
// 多个相同的node会进行追加,并累加计数器
minNodes1 = append(minNodes1, node)
lenNodes1++
}
}
if lenNodes1 == 1 {
return minNodes1[0]
}

最高优先级最小优先

最高优先级最小优先是指通过对比多个node的最高优先级的pod,优先级最低的那个node被选中,如果多个则进行下一个算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
minHighestPriority := int32(math.MaxInt32)
var minNodes2 = make([]*v1.Node, lenNodes1)
lenNodes2 := 0
for i := 0; i < lenNodes1; i++ {
node := minNodes1[i]
victims := nodesToVictims[node]
// highestPodPriority is the highest priority among the victims on this node.
// 返回优先级最高的pod
highestPodPriority := util.GetPodPriority(victims.Pods[0])
if highestPodPriority < minHighestPriority {
// 重置状态
minHighestPriority = highestPodPriority
lenNodes2 = 0
}

if highestPodPriority == minHighestPriority {
// 如果优先级相等则加入进去
minNodes2[lenNodes2] = node
lenNodes2++
}
}
if lenNodes2 == 1 {
return minNodes2[0]
}

优先级总和最低优先

统计每个node上的所有被抢占的pod的优先级的总和,然后在多个node之间进行比较,优先级总和最低的节点被选中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
minSumPriorities := int64(math.MaxInt64)
lenNodes1 = 0
for i := 0; i < lenNodes2; i++ {
var sumPriorities int64
node := minNodes2[i]
// 统计所有优先级
for _, pod := range nodesToVictims[node].Pods {

sumPriorities += int64(util.GetPodPriority(pod)) + int64(math.MaxInt32+1)
}
if sumPriorities < minSumPriorities {
minSumPriorities = sumPriorities
lenNodes1 = 0
}
if sumPriorities == minSumPriorities {
minNodes1[lenNodes1] = node
lenNodes1++
}
}
// 最少优先级的node
if lenNodes1 == 1 {
return minNodes1[0]
}

最少抢占数量优先

最少抢占数量优先即统计每个node被抢占的节点数量,数量最少得被选中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
minNumPods := math.MaxInt32
lenNodes2 = 0
for i := 0; i < lenNodes1; i++ {
node := minNodes1[i]
numPods := len(nodesToVictims[node].Pods)
if numPods < minNumPods {
minNumPods = numPods
lenNodes2 = 0
}
if numPods == minNumPods {
minNodes2[lenNodes2] = node
lenNodes2++
}
}
// 最少节点数量
if lenNodes2 == 1 {
return minNodes2[0]
}

最近更新节点优先

该算法会筛选每个node驱逐的pod中优先级最高的pod的最早更新时间(其实就是说这个pod早就被创建了),然后在多个node之间进行比较,如果谁上面的时间越新(即这个node上的pod可能是最近被调度上去的),则就选中这个节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
latestStartTime := util.GetEarliestPodStartTime(nodesToVictims[minNodes2[0]])
if latestStartTime == nil {
// If the earliest start time of all pods on the 1st node is nil, just return it,
// which is not expected to happen.
// 如果第一个节点上所有pod的最早开始时间为零,那么返回它
klog.Errorf("earliestStartTime is nil for node %s. Should not reach here.", minNodes2[0])
return minNodes2[0]
}
nodeToReturn := minNodes2[0]
for i := 1; i < lenNodes2; i++ {
node := minNodes2[i]
// Get earliest start time of all pods on the current node.
// 获取当前node最早启动时间
earliestStartTimeOnNode := util.GetEarliestPodStartTime(nodesToVictims[node])
if earliestStartTimeOnNode == nil {
klog.Errorf("earliestStartTime is nil for node %s. Should not reach here.", node)
continue
}
if earliestStartTimeOnNode.After(latestStartTime.Time) {
latestStartTime = earliestStartTimeOnNode
nodeToReturn = node
}
}

return nodeToReturn

Scheduler Extender

社区最初提供的方案是通过 Extender 的形式来扩展 scheduler。Extender 是外部服务,支持 Filter、Preempt、Prioritize 和 Bind 的扩展,scheduler 运行到相应阶段时,通过调用 Extender 注册的 webhook 来运行扩展的逻辑,影响调度流程中各阶段的决策结果。

以 Filter 阶段举例,执行过程会经过 2 个阶段:

1、scheduler 会先执行内置的 Filter 策略,如果执行失败的话,会直接标识 Pod 调度失败。
2、如果内置的 Filter 策略执行成功的话,scheduler 通过 Http 调用 Extender 注册的 webhook, 将调度所需要的 Pod 和 Node 的信息发送到到 Extender,根据返回 filter 结果,作为最终结果。

Scheduler Extender

我们可以发现 Extender 存在以下问题:

1、调用 Extender 的接口是 HTTP 请求,受到网络环境的影响,性能远低于本地的函数调用。同时每次调用都需要将 Pod 和 Node 的信息进行 marshaling 和 unmarshalling 的操作,会进一步降低性能。
2、用户可以扩展的点比较有限,位置比较固定,无法支持灵活的扩展,例如只能在执行完默认的 Filter 策略后才能调用。

基于以上介绍,Extender 的方式在集群规模较小,调度效率要求不高的情况下,是一个灵活可用的扩展方案,但是在正常生产环境的大型集群中,Extender 无法支持高吞吐量,性能较差。

Multiple Schedulers

Scheduler 在 Kubernetes 集群中其实类似于一个特殊的 Controller,通过监听 Pod 和 Node 的信息,给 Pod 挑选最佳的节点,更新 Pod 的 spec.NodeName 的信息来将调度结果同步到节点。所以对于部分有特殊的调度需求的用户,有些开发者通过自研 Custom Scheduler 来完成以上的流程,然后通过和 default scheduler 同时部署的方式,来支持自己特殊的调度需求。

进击的Kubernetes调度系统(一):Scheduling Framework

Custom Scheduler 会存在一下问题:

1、如果与 default scheduler 同时部署,因为每个调度器所看到的资源视图都是全局的,所以在调度决策中可能会在同一时刻在同一个节点资源上调度不同的 Pod,导致节点资源冲突的问题。
2、有些用户将调度器所能调度的资源通过 Label 划分不同的池子,可以避免资源冲突的现象出现。但是这样又会导致整体集群资源利用率的下降。
3、有些用户选择通过完全自研的方式来替换 default scheduler,这种会带来比较高的研发成本,以及 Kubernetes 版本升级后可能存在的兼容性问题。

Scheduler Extender 的性能较差可是维护成本较小,Custom Scheduler 的研发和维护的成本特别高但是性能较好,这种情况是开发者面临这种两难处境。这时候 Kubernetes Scheduling Framework V2 横空出世,给我们带来鱼和熊掌可以兼得的方案。

参考资料