0%

【Kubernetes】Scheduler Extender

Scheduler Extender 是 k8s 调度器扩展的一种方式,它作为外部服务,支持 Filter、Preempt、Prioritize 和 Bind 的扩展。当 kube-scheduler 运行到相应阶段时,通过调用 Extender 注册的 webhook 来运行扩展的逻辑,影响调度流程中各阶段的决策结果。本文将介绍一个 Scheduler Extender 的实现方式,文中的所有代码可以在我的 Github 中找到。在开始了解 Scheduler Extender 之前,如果你还不了解 Kubernetes 的 Scheduler Framework 原理,可以先参考 这篇文章

调度扩展

Scheduler Extender 是一个独立运行的进程,其本质是一个 WebServer,遵守 kube-scheduler 约定的规则,以 HTTP Endpoint 的形式提供服务。为了让原生的 kube-scheduler 感知到 Scheduler Extender,不需要修改原生 Scheduler 的代码,只需要在运行原生 Scheduler 的时候加一个配置即可。如下所示:

1
2
3
4
5
6
7
"extenders" : [{
"urlPrefix": "http://localhost:8888/",
"filterVerb": "filter",
"prioritizeVerb": "prioritize",
"weight": 1,
"enableHttps": false
}

在这个配置中,会指定 Scheduler Extender 的服务地址,以及选择开启哪些接口。

以 Filter 阶段举例,执行过程会经过 2 个阶段:

  1. scheduler 会先执行内置的 Filter 策略,如果执行失败的话,会直接标识 Pod 调度失败。
  2. 如果内置的 Filter 策略执行成功的话,scheduler 通过 Http 调用 Extender 注册的 webhook, 将调度所需要的 Pod 和 Node 的信息发送到到 Extender,根据返回 filter 结果,作为最终结果。

对于 Filter 和 Priority 这两个操作,HTTP 请求的 Body 是这么一个结构体:

1
2
3
4
5
6
7
8
// ExtenderArgs represents the arguments needed by the extender to filter/prioritize
// nodes for a pod.
type ExtenderArgs struct {
// Pod being scheduled
Pod api.Pod `json:"pod"`
// List of candidate nodes where the pod can be scheduled
Nodes api.NodeList `json:"nodes"`
}

你只需要在 HTTP 处理代码中反序列化成这个结构体,然后执行自己的逻辑,并且返回对应的结果就可以了,其中 Filter 返回的结构体为 ExtenderFilterResult

1
2
3
4
5
6
7
8
9
10
11
12
13
// ExtenderFilterResult represents the results of a filter call to an extender
type ExtenderFilterResult struct {
// Filtered set of nodes where the pod can be scheduled; to be populated
// only if ExtenderConfig.NodeCacheCapable == false
Nodes *v1.NodeList
// Filtered set of nodes where the pod can be scheduled; to be populated
// only if ExtenderConfig.NodeCacheCapable == true
NodeNames *[]string
// Filtered out nodes where the pod can't be scheduled and the failure messages
FailedNodes FailedNodesMap
// Error message indicating failure
Error string
}

Priority 返回的结构体为 HostPriorityList

1
2
3
4
5
6
7
8
9
10
// HostPriority represents the priority of scheduling to a particular host, higher priority is better.
type HostPriority struct {
// Name of the host
Host string
// Score associated with the host
Score int64
}

// HostPriorityList declares a []HostPriority type.
type HostPriorityList []HostPriority

对 Binding 操作,可以通过 Scheduler Extender 来实现,此时kube-scheduler 请求的是另外一个结构体 ExtenderBindingArgs

1
2
3
4
5
6
7
8
9
10
11
// ExtenderBindingArgs represents the arguments to an extender for binding a pod to a node.
type ExtenderBindingArgs struct {
// PodName is the name of the pod being bound
PodName string
// PodNamespace is the namespace of the pod being bound
PodNamespace string
// PodUID is the UID of the pod being bound
PodUID types.UID
// Node selected by the scheduler
Node string
}

Binding 不需要返回特别的结果,只需要返回是否存在错误即可。

实现示例

Route

定义 Extender 提供的 HTTP EndPoint 如下:

1
2
3
4
5
6
7
8
const (
versionPath = "/version"
apiPrefix = "/scheduler"
bindPath = apiPrefix + "/bind"
preemptionPath = apiPrefix + "/preemption"
predicatesPrefix = apiPrefix + "/predicates"
prioritiesPrefix = apiPrefix + "/priorities"
)

Extender 收到来自于 kube-scheduler 的请求后,解析 HTTP Request,获取其请求参数,以 Filter 为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func PredicateRoute(predicate scheduler.Predicate) httprouter.Handle {
return func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
checkBody(w, r)

var buf bytes.Buffer
body := io.TeeReader(r.Body, &buf)
log.Print("info: ", predicate.Name, " ExtenderArgs = ", buf.String())

var extenderArgs extender.ExtenderArgs
var extenderFilterResult *extender.ExtenderFilterResult

if err := json.NewDecoder(body).Decode(&extenderArgs); err != nil {
extenderFilterResult = &extender.ExtenderFilterResult{
Nodes: nil,
FailedNodes: nil,
Error: err.Error(),
}
} else {
extenderFilterResult = predicate.Handler(extenderArgs)
}

if resultBody, err := json.Marshal(extenderFilterResult); err != nil {
panic(err)
} else {
log.Print("info: ", predicate.Name, " extenderFilterResult = ", string(resultBody))
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(resultBody)
}
}
}

这里做的主要事情是,从 http.Request解析出参数 ExtenderArgs,然后调用 predicate.Handler(extenderArgs),最后将返回结果序列化封装在 ResponseBody 中。其他像 PriorityBind 操作类似。

那么这个 Handler 是在哪里注册的呢?假设我们的 predicate.Name 叫做 always_true

1
2
3
4
func AddPredicate(router *httprouter.Router, predicate scheduler.Predicate) {
path := predicatesPrefix + "/" + predicate.Name
router.POST(path, PredicateRoute(predicate))
}

这里将 Predicate 注册在 /scheduler/predicates/always_true 路径下,当然你也可以直接使用 /scheduler/predicate,这个都是可以自定义的,只需要将调度器配置里面的路径修改下就好:

1
2
3
4
5
"extenders" : [{
"urlPrefix": "http://localhost:8888/scheduler",
"filterVerb": "predicates/always_true",
...
}

Filter

对于 Filter,其实现的 Handler 为 Handler(args extender.ExtenderArgs) *extender.ExtenderFilterResult,输入参数是 ExtenderArgs,输出结果是 ExtenderFilterResult。我们需要实现的是 func(pod v1.Pod, node v1.Node) (bool, error) ,用来判断这个 Pod 是否能够被调度到这个 Node 上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
type Predicate struct {
Name string
Func func(pod v1.Pod, node v1.Node) (bool, error)
}

func (p Predicate) Handler(args extender.ExtenderArgs) *extender.ExtenderFilterResult {
pod := args.Pod
canSchedule := make([]v1.Node, 0, len(args.Nodes.Items))
canNotSchedule := make(map[string]string)

for _, node := range args.Nodes.Items {
result, err := p.Func(*pod, node)
if err != nil {
canNotSchedule[node.Name] = err.Error()
} else {
if result {
canSchedule = append(canSchedule, node)
}
}
}

result := extender.ExtenderFilterResult{
Nodes: &v1.NodeList{
Items: canSchedule,
},
FailedNodes: canNotSchedule,
Error: "",
}

return &result
}

这里有一个最简单的 Filter 实现,对于所有的 Node,都通过 Filter。

1
2
3
4
5
6
TruePredicate = scheduler.Predicate{
Name: "always_true",
Func: func(pod v1.Pod, node v1.Node) (bool, error) {
return true, nil
},
}

Priority

Priority 操作负责根据输入的 Pod 情况,对输入的 []v1.Node 这个 List 里面的每个 Node 打分,输出的是一个打分了的 Node 列表,也即是 HostPriorityList 结构。

1
2
3
4
5
6
7
8
type Prioritize struct {
Name string
Func func(pod v1.Pod, nodes []v1.Node) (*extender.HostPriorityList, error)
}

func (p Prioritize) Handler(args extender.ExtenderArgs) (*extender.HostPriorityList, error) {
return p.Func(*args.Pod, args.Nodes.Items)
}

这里有一个简单的 Priority,对于所有的 Node 打分都是 0。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ZeroPriority = scheduler.Prioritize{
Name: "zero_score",
Func: func(_ v1.Pod, nodes []v1.Node) (*extender.HostPriorityList, error) {
var priorityList extender.HostPriorityList
priorityList = make([]extender.HostPriority, len(nodes))
for i, node := range nodes {
priorityList[i] = extender.HostPriority{
Host: node.Name,
Score: 0,
}
}
return &priorityList, nil
},
}

Bind

Bind 操作将 podNamespace 下叫做 podName,且 UID 为 podUID 的 Pod 绑定到 node 上,我们需要实现 func(podName string, podNamespace string, podUID types.UID, node string) error 的函数。

1
2
3
4
5
6
7
8
9
10
type Bind struct {
Func func(podName string, podNamespace string, podUID types.UID, node string) error
}

func (b Bind) Handler(args extender.ExtenderBindingArgs) *extender.ExtenderBindingResult {
err := b.Func(args.PodName, args.PodNamespace, args.PodUID, args.Node)
return &extender.ExtenderBindingResult{
Error: err.Error(),
}
}

这里有一个简单的 Bind, 不执行实际的 Bind 操作。

1
2
3
4
5
NoBind = scheduler.Bind{
Func: func(podName string, podNamespace string, podUID types.UID, node string) error {
return fmt.Errorf("This extender doesn't support Bind. Please make 'BindVerb' be empty in your ExtenderConfig.")
},
}

运行方式

编译镜像

这里使用 docker multi-stage build ,将 scheduler extender 编译出来,生成二进制打包到Image中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
FROM golang:1.13-alpine as builder
ARG VERSION=0.0.1

ENV GO111MODULE=on
ENV CGO_ENABLED=0
ENV GOOS=linux
ENV GOARCH=amd64

# config
WORKDIR /go/src/k8s-scheduler-extender-example
COPY go.mod .
COPY go.sum .
RUN GO111MODULE=on go mod download
COPY . .
RUN go install -ldflags "-s -w -X main.version=$VERSION" ./cmd/

# runtime image
FROM gcr.io/google_containers/ubuntu-slim:0.14
COPY --from=builder /go/bin/cmd /usr/bin/k8s-scheduler-extender-example
ENTRYPOINT ["k8s-scheduler-extender-example"]

执行命令:

1
2
3
4
$ IMAGE=YOUR_ORG/YOUR_IMAGE:YOUR_TAG

$ docker build . -t "${IMAGE}"
$ docker push "${IMAGE}"

配置 kube-scheduler

Scheduler Extender 的运行方式是以独立进程的形式运行,它可以通过下面三种方式运行:

  1. 将 Extender 放到 Kubernetes 的 Node 中用 System 来管理,也就是以进程形式运行
  2. 将 Extender 通过 Deployment 等形式部署到 Kubernetes 中,也就是以一个 Pod 的形式运行
  3. 将 Extender 以 Sidecar 形式与原生 Scheduler 运行在同一个 Pod 上(如果你的 Scheduler 是以 Pod 的形式运行的话)

无论是哪种方式,都需要修改 kube-scheduler 的配置,使其可以感受到 Scheduler Extender 的存在,下面将以第三种方式为例,其他两种方式使用方法类似。kube-scheduler 有一个 --config <string> 参数,可以提供一个配置文件,实现对于 kube-scheduler 的配置 ,如下所示:

1
2
3
4
5
6
ExecStart=/opt/kube/bin/kube-scheduler \
--config=/etc/sysconfig/kube-scheduler/extender-config.yaml \
--address=127.0.0.1 \
--master=http://127.0.0.1:8080 \
--leader-elect=true \
--v=2

这里就指定了一个 Config,里面的内容为:

1
2
3
4
5
6
7
8
apiVersion: kubescheduler.config.k8s.io/v1alpha1
kind: KubeSchedulerConfiguration
clientConnection:
kubeconfig: "/root/.kube/config"
algorithmSource:
policy:
file:
path: "/etc/sysconfig/kube-scheduler/extender-policy.json"

这里指定了一个 Algorithm Source,还需要加载另外一个配置文件,在里面指定 Scheduler Extender 的 HTTP Endpint:

1
2
3
4
5
6
7
8
9
{
"kind" : "Policy",
"apiVersion" : "v1",
"extenders" : [{
"urlPrefix": "http://localhost:8888/",
"filterVerb": "filter",
"enableHttps": false
}]
}

这里只指定了一个 Filter 的操作,如果你想要指定 Priority 的操作的话,那么可以加上:

1
2
3
4
5
6
7
8
9
 ... ...
"extenders" : [{
"urlPrefix": "http://localhost:8888/",
"filterVerb": "filter",
"prioritizeVerb": "prioritize",
"weight": 1,
"enableHttps": false
}
... ...

集群部署

前面提到,我们将按照第三种方式部署刚刚写完的 Scheduler Extender,也就是将 kube-schedulerextender 部署在同一个 Pod 中。

为此,我们首先创建一个 Deployment,里面包含有 kube-scheduler 和 我们自己写的 extender,这里直接使用的是腾讯云 TKE 的镜像,部署在TKE的集群中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-scheduler
namespace: kube-system
labels:
app: my-scheduler
spec:
replicas: 1
selector:
matchLabels:
app: my-scheduler
template:
metadata:
labels:
app: my-scheduler
spec:
serviceAccountName: my-scheduler
volumes:
- name: my-scheduler-config
configMap:
name: my-scheduler-config
containers:
- name: my-scheduler-ctr
image: ccr.ccs.tencentyun.com/tkeimages/hyperkube:v1.18.4-tke.5
imagePullPolicy: IfNotPresent
args:
- kube-scheduler
- --config=/my-scheduler/config.yaml
- -v=4
volumeMounts:
- name: my-scheduler-config
mountPath: /my-scheduler
- name: my-scheduler-extender-ctr
image: unicosmos/scheduler-extender-demo:latest
imagePullPolicy: IfNotPresent
livenessProbe:
httpGet:
path: /version
port: 80
readinessProbe:
httpGet:
path: /version
port: 80
ports:
- containerPort: 80

这里可以看到 kube-scheduler 容器使用的是 hyperkube:v1.18.4 镜像,挂载了 my-scheduler-config 这个 ConfigMap 作为配置文件。而 extender 开放了 80 端口提供给 kube-scheduler 访问。

接下来继续看配置文件 my-scheduler-config

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
apiVersion: v1
kind: ConfigMap
metadata:
name: my-scheduler-config
namespace: kube-system
data:
config.yaml: |
apiVersion: kubescheduler.config.k8s.io/v1alpha1
kind: KubeSchedulerConfiguration
schedulerName: my-scheduler
algorithmSource:
policy:
configMap:
namespace: kube-system
name: my-scheduler-policy
leaderElection:
leaderElect: true
lockObjectName: my-scheduler
lockObjectNamespace: kube-system

这里指定了 my-scheduler-policy 作为 policy 配置文件,同时在 leaderElection 中将 my-scheduler 作为发挥作用的调度器。下面是 my-scheduler-policy 的具体内容,本质上是一个 JSON 文件,它可以修改 predicatespriority 的配置,这里主要修改的是 extenders的配置。

因为 extenderkube-scheduler 在同一个 Pod 里面,所以这里的 urlPrefix 填写的地址是 localhost,如果 extender 是按照 Pod 单独部署在集群中,你可以使用 extender 的 Service 。除此之外,你也看到了 filterVerb 等 EndPoint 的配置,此处不再赘述。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
apiVersion: v1
kind: ConfigMap
metadata:
name: my-scheduler-policy
namespace: kube-system
data:
policy.cfg : |
{
"kind" : "Policy",
"apiVersion" : "v1",
"predicates" : [
{"name" : "PodFitsHostPorts"},
{"name" : "PodFitsResources"},
{"name" : "NoDiskConflict"},
{"name" : "MatchNodeSelector"},
{"name" : "HostName"}
],
"priorities" : [
{"name" : "LeastRequestedPriority", "weight" : 1},
{"name" : "BalancedResourceAllocation", "weight" : 1},
{"name" : "ServiceSpreadingPriority", "weight" : 1},
{"name" : "EqualPriority", "weight" : 1}
],
"extenders" : [{
"urlPrefix": "http://localhost/scheduler",
"filterVerb": "predicates/always_true",
"prioritizeVerb": "priorities/zero_score",
"preemptVerb": "preemption",
"bindVerb": "",
"weight": 1,
"enableHttps": false,
"nodeCacheCapable": false
}],
"hardPodAffinitySymmetricWeight" : 10
}

最后,为了让 my-scheduler 在集群中运行,需要设定其 rbac 权限,这里直接给予了 cluster-admin 的权限:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
apiVersion: v1
kind: ServiceAccount
metadata:
name: my-scheduler
namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: my-scheduler-cluster-admin
namespace: kube-system
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: cluster-admin
subjects:
- kind: ServiceAccount
namespace: kube-system
name: my-scheduler

所有的这些都在 extender.yaml 可以找到,你可以将 extender 的 image 换成自己的 image,然后部署到集群。

启动之后,可以查看 extender 的日志如下:

1
2
3
4
5
6
7
$ kubectl -n kube-system logs deploy/my-scheduler -c my-scheduler-extender-ctr -f
[ warn ] 2020/12/23 03:31:04 main.go:92: LOG_LEVEL="" is empty or invalid, fallling back to "INFO".
[ info ] 2020/12/23 03:31:04 main.go:106: Log level was set to INFO
[ info ] 2020/12/23 03:31:04 main.go:124: server starting on the port :80
[ info ] 2020/12/23 03:31:51 routes.go:41: always_true ExtenderArgs =
[ info ] 2020/12/23 03:31:51 routes.go:59: always_true extenderFilterResult = {"Nodes":{"metadata":{},"items":[{"metadata":{"name":"10.0.1.4","selfLink":"/api/v1/nodes/10.0.1.4","uid":"37397636-06a7-49c9-8105-8790517de04d","resourceVersion":"274946975","creationTimestamp":"2020-12-23T02:37:42Z","labels":{"beta.kubernetes.io/arch":"amd64","beta.kubernetes.io/instance-type":"S3.LARGE8","beta.kubernetes.io/os":"linux","cloud.tencent.com/node-instance-id":"ins-5a843lvl"
...

测试验证

下面对部署的 my-scheduler 进行验证,通过 test-pod 指定 schedulerName如下:

1
2
3
4
5
6
7
8
9
10
11
apiVersion: v1
kind: Pod
metadata:
name: test-pod
spec:
schedulerName: my-scheduler
containers:
- name: nginx
image: nginx
ports:
- containerPort: 80

部署 test-pod 如下:

1
2
3
4
5
6
7
8
9
10
11
12
$ kubectl create -f test-pod.yaml

$ kubectl describe pod test-pod
Name: test-pod
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal Scheduled <unknown> my-scheduler Successfully assigned default/test-pod to 10.0.1.4
Normal Pulling 7m34s kubelet, 10.0.1.4 Pulling image "nginx"
Normal Pulled 7m27s kubelet, 10.0.1.4 Successfully pulled image "nginx"
Normal Created 7m27s kubelet, 10.0.1.4 Created container nginx
Normal Started 7m27s kubelet, 10.0.1.4 Started container nginx

可以看到 my-scheduler 部署成功,并且正常发挥作用。

参考资料