Scheduler Extender
是 k8s 调度器扩展的一种方式,它作为外部服务,支持 Filter、Preempt、Prioritize 和 Bind 的扩展。当 kube-scheduler
运行到相应阶段时,通过调用 Extender 注册的 webhook 来运行扩展的逻辑,影响调度流程中各阶段的决策结果。本文将介绍一个 Scheduler Extender
的实现方式,文中的所有代码可以在我的 Github 中找到。在开始了解 Scheduler Extender 之前,如果你还不了解 Kubernetes 的 Scheduler Framework 原理,可以先参考 这篇文章 。
调度扩展 Scheduler Extender
是一个独立运行的进程,其本质是一个 WebServer
,遵守 kube-scheduler
约定的规则,以 HTTP Endpoint 的形式提供服务。为了让原生的 kube-scheduler
感知到 Scheduler Extender
,不需要修改原生 Scheduler 的代码,只需要在运行原生 Scheduler 的时候加一个配置即可。如下所示:
1 2 3 4 5 6 7 "extenders" : [{ "urlPrefix": "http://localhost:8888/", "filterVerb": "filter", "prioritizeVerb": "prioritize", "weight": 1, "enableHttps": false }
在这个配置中,会指定 Scheduler Extender
的服务地址,以及选择开启哪些接口。
以 Filter 阶段举例,执行过程会经过 2 个阶段:
scheduler 会先执行内置的 Filter 策略,如果执行失败的话,会直接标识 Pod 调度失败。
如果内置的 Filter 策略执行成功的话,scheduler 通过 Http 调用 Extender 注册的 webhook, 将调度所需要的 Pod 和 Node 的信息发送到到 Extender,根据返回 filter 结果,作为最终结果。
对于 Filter 和 Priority 这两个操作,HTTP 请求的 Body 是这么一个结构体:
1 2 3 4 5 6 7 8 type ExtenderArgs struct { Pod api.Pod `json:"pod"` Nodes api.NodeList `json:"nodes"` }
你只需要在 HTTP 处理代码中反序列化成这个结构体,然后执行自己的逻辑,并且返回对应的结果就可以了,其中 Filter 返回的结构体为 ExtenderFilterResult
:
1 2 3 4 5 6 7 8 9 10 11 12 13 type ExtenderFilterResult struct { Nodes *v1.NodeList NodeNames *[]string FailedNodes FailedNodesMap Error string }
Priority 返回的结构体为 HostPriorityList
:
1 2 3 4 5 6 7 8 9 10 type HostPriority struct { Host string Score int64 } type HostPriorityList []HostPriority
对 Binding 操作,可以通过 Scheduler Extender
来实现,此时kube-scheduler
请求的是另外一个结构体 ExtenderBindingArgs
:
1 2 3 4 5 6 7 8 9 10 11 type ExtenderBindingArgs struct { PodName string PodNamespace string PodUID types.UID Node string }
Binding 不需要返回特别的结果,只需要返回是否存在错误即可。
实现示例 Route 定义 Extender 提供的 HTTP EndPoint
如下:
1 2 3 4 5 6 7 8 const ( versionPath = "/version" apiPrefix = "/scheduler" bindPath = apiPrefix + "/bind" preemptionPath = apiPrefix + "/preemption" predicatesPrefix = apiPrefix + "/predicates" prioritiesPrefix = apiPrefix + "/priorities" )
Extender 收到来自于 kube-scheduler
的请求后,解析 HTTP Request,获取其请求参数,以 Filter
为例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 func PredicateRoute (predicate scheduler.Predicate) httprouter .Handle { return func (w http.ResponseWriter, r *http.Request, _ httprouter.Params) { checkBody(w, r) var buf bytes.Buffer body := io.TeeReader(r.Body, &buf) log.Print("info: " , predicate.Name, " ExtenderArgs = " , buf.String()) var extenderArgs extender.ExtenderArgs var extenderFilterResult *extender.ExtenderFilterResult if err := json.NewDecoder(body).Decode(&extenderArgs); err != nil { extenderFilterResult = &extender.ExtenderFilterResult{ Nodes: nil , FailedNodes: nil , Error: err.Error(), } } else { extenderFilterResult = predicate.Handler(extenderArgs) } if resultBody, err := json.Marshal(extenderFilterResult); err != nil { panic (err) } else { log.Print("info: " , predicate.Name, " extenderFilterResult = " , string (resultBody)) w.Header().Set("Content-Type" , "application/json" ) w.WriteHeader(http.StatusOK) w.Write(resultBody) } } }
这里做的主要事情是,从 http.Request
解析出参数 ExtenderArgs
,然后调用 predicate.Handler(extenderArgs)
,最后将返回结果序列化封装在 ResponseBody
中。其他像 Priority
和 Bind
操作类似。
那么这个 Handler 是在哪里注册的呢?假设我们的 predicate.Name
叫做 always_true
:
1 2 3 4 func AddPredicate (router *httprouter.Router, predicate scheduler.Predicate) { path := predicatesPrefix + "/" + predicate.Name router.POST(path, PredicateRoute(predicate)) }
这里将 Predicate
注册在 /scheduler/predicates/always_true
路径下,当然你也可以直接使用 /scheduler/predicate
,这个都是可以自定义的,只需要将调度器配置里面的路径修改下就好:
1 2 3 4 5 "extenders" : [{ "urlPrefix": "http://localhost:8888/scheduler" , "filterVerb": "predicates/always_true" , ... }
Filter 对于 Filter,其实现的 Handler 为 Handler(args extender.ExtenderArgs) *extender.ExtenderFilterResult
,输入参数是 ExtenderArgs
,输出结果是 ExtenderFilterResult
。我们需要实现的是 func(pod v1.Pod, node v1.Node) (bool, error)
,用来判断这个 Pod 是否能够被调度到这个 Node 上。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 type Predicate struct { Name string Func func (pod v1.Pod, node v1.Node) (bool , error) } func (p Predicate) Handler (args extender.ExtenderArgs) *extender .ExtenderFilterResult { pod := args.Pod canSchedule := make ([]v1.Node, 0 , len (args.Nodes.Items)) canNotSchedule := make (map [string ]string ) for _, node := range args.Nodes.Items { result, err := p.Func(*pod, node) if err != nil { canNotSchedule[node.Name] = err.Error() } else { if result { canSchedule = append (canSchedule, node) } } } result := extender.ExtenderFilterResult{ Nodes: &v1.NodeList{ Items: canSchedule, }, FailedNodes: canNotSchedule, Error: "" , } return &result }
这里有一个最简单的 Filter
实现,对于所有的 Node,都通过 Filter。
1 2 3 4 5 6 TruePredicate = scheduler.Predicate{ Name: "always_true" , Func: func (pod v1.Pod, node v1.Node) (bool , error) { return true , nil }, }
Priority Priority
操作负责根据输入的 Pod 情况,对输入的 []v1.Node
这个 List
里面的每个 Node 打分,输出的是一个打分了的 Node 列表,也即是 HostPriorityList
结构。
1 2 3 4 5 6 7 8 type Prioritize struct { Name string Func func (pod v1.Pod, nodes []v1.Node) (*extender.HostPriorityList, error) } func (p Prioritize) Handler (args extender.ExtenderArgs) (*extender.HostPriorityList, error) { return p.Func(*args.Pod, args.Nodes.Items) }
这里有一个简单的 Priority
,对于所有的 Node 打分都是 0。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 ZeroPriority = scheduler.Prioritize{ Name: "zero_score" , Func: func (_ v1.Pod, nodes []v1.Node) (*extender.HostPriorityList, error) { var priorityList extender.HostPriorityList priorityList = make ([]extender.HostPriority, len (nodes)) for i, node := range nodes { priorityList[i] = extender.HostPriority{ Host: node.Name, Score: 0 , } } return &priorityList, nil }, }
Bind Bind
操作将 podNamespace
下叫做 podName
,且 UID 为 podUID
的 Pod 绑定到 node
上,我们需要实现 func(podName string, podNamespace string, podUID types.UID, node string) error
的函数。
1 2 3 4 5 6 7 8 9 10 type Bind struct { Func func (podName string , podNamespace string , podUID types.UID, node string ) error } func (b Bind) Handler (args extender.ExtenderBindingArgs) *extender .ExtenderBindingResult { err := b.Func(args.PodName, args.PodNamespace, args.PodUID, args.Node) return &extender.ExtenderBindingResult{ Error: err.Error(), } }
这里有一个简单的 Bind
, 不执行实际的 Bind 操作。
1 2 3 4 5 NoBind = scheduler.Bind{ Func: func (podName string , podNamespace string , podUID types.UID, node string ) error { return fmt.Errorf("This extender doesn't support Bind. Please make 'BindVerb' be empty in your ExtenderConfig." ) }, }
运行方式 编译镜像 这里使用 docker multi-stage build ,将 scheduler extender
编译出来,生成二进制打包到Image中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 FROM golang:1.13 -alpine as builderARG VERSION=0.0 .1 ENV GO111MODULE=onENV CGO_ENABLED=0 ENV GOOS=linuxENV GOARCH=amd64WORKDIR /go/src/k8s-scheduler-extender-example COPY go.mod . COPY go.sum . RUN GO111MODULE=on go mod download COPY . . RUN go install -ldflags "-s -w -X main.version=$VERSION " ./cmd/ FROM gcr.io/google_containers/ubuntu-slim:0.14 COPY --from=builder /go/bin/cmd /usr/bin/k8s-scheduler-extender-example ENTRYPOINT ["k8s-scheduler-extender-example" ]
执行命令:
1 2 3 4 $ IMAGE=YOUR_ORG/YOUR_IMAGE:YOUR_TAG $ docker build . -t "${IMAGE} " $ docker push "${IMAGE} "
配置 kube-scheduler Scheduler Extender 的运行方式是以独立进程的形式运行,它可以通过下面三种方式运行:
将 Extender 放到 Kubernetes 的 Node 中用 System 来管理,也就是以进程形式运行
将 Extender 通过 Deployment
等形式部署到 Kubernetes 中,也就是以一个 Pod 的形式运行
将 Extender 以 Sidecar 形式与原生 Scheduler 运行在同一个 Pod 上(如果你的 Scheduler 是以 Pod 的形式运行的话)
无论是哪种方式,都需要修改 kube-scheduler
的配置,使其可以感受到 Scheduler Extender
的存在,下面将以第三种方式为例,其他两种方式使用方法类似。kube-scheduler
有一个 --config <string>
参数,可以提供一个配置文件,实现对于 kube-scheduler
的配置 ,如下所示:
1 2 3 4 5 6 ExecStart=/opt/kube/bin/kube-scheduler \ --config=/etc/sysconfig/kube-scheduler/extender-config.yaml \ --address=127.0.0.1 \ --master=http://127.0.0.1:8080 \ --leader-elect=true \ --v=2
这里就指定了一个 Config,里面的内容为:
1 2 3 4 5 6 7 8 apiVersion: kubescheduler.config.k8s.io/v1alpha1 kind: KubeSchedulerConfiguration clientConnection: kubeconfig: "/root/.kube/config" algorithmSource: policy: file: path: "/etc/sysconfig/kube-scheduler/extender-policy.json"
这里指定了一个 Algorithm Source,还需要加载另外一个配置文件,在里面指定 Scheduler Extender 的 HTTP Endpint:
1 2 3 4 5 6 7 8 9 { "kind" : "Policy" , "apiVersion" : "v1" , "extenders" : [{ "urlPrefix" : "http://localhost:8888/" , "filterVerb" : "filter" , "enableHttps" : false }] }
这里只指定了一个 Filter 的操作,如果你想要指定 Priority 的操作的话,那么可以加上:
1 2 3 4 5 6 7 8 9 ... ... "extenders" : [{ "urlPrefix": "http://localhost:8888/", "filterVerb": "filter", "prioritizeVerb": "prioritize", "weight": 1, "enableHttps": false } ... ...
集群部署 前面提到,我们将按照第三种方式部署刚刚写完的 Scheduler Extender
,也就是将 kube-scheduler
和 extender
部署在同一个 Pod 中。
为此,我们首先创建一个 Deployment
,里面包含有 kube-scheduler
和 我们自己写的 extender
,这里直接使用的是腾讯云 TKE 的镜像,部署在TKE的集群中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 apiVersion: apps/v1 kind: Deployment metadata: name: my-scheduler namespace: kube-system labels: app: my-scheduler spec: replicas: 1 selector: matchLabels: app: my-scheduler template: metadata: labels: app: my-scheduler spec: serviceAccountName: my-scheduler volumes: - name: my-scheduler-config configMap: name: my-scheduler-config containers: - name: my-scheduler-ctr image: ccr.ccs.tencentyun.com/tkeimages/hyperkube:v1.18.4-tke.5 imagePullPolicy: IfNotPresent args: - kube-scheduler - --config=/my-scheduler/config.yaml - -v=4 volumeMounts: - name: my-scheduler-config mountPath: /my-scheduler - name: my-scheduler-extender-ctr image: unicosmos/scheduler-extender-demo:latest imagePullPolicy: IfNotPresent livenessProbe: httpGet: path: /version port: 80 readinessProbe: httpGet: path: /version port: 80 ports: - containerPort: 80
这里可以看到 kube-scheduler
容器使用的是 hyperkube:v1.18.4
镜像,挂载了 my-scheduler-config
这个 ConfigMap
作为配置文件。而 extender
开放了 80 端口提供给 kube-scheduler
访问。
接下来继续看配置文件 my-scheduler-config
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 apiVersion: v1 kind: ConfigMap metadata: name: my-scheduler-config namespace: kube-system data: config.yaml: | apiVersion: kubescheduler.config.k8s.io/v1alpha1 kind: KubeSchedulerConfiguration schedulerName: my-scheduler algorithmSource: policy: configMap: namespace: kube-system name: my-scheduler-policy leaderElection: leaderElect: true lockObjectName: my-scheduler lockObjectNamespace: kube-system
这里指定了 my-scheduler-policy
作为 policy
配置文件,同时在 leaderElection
中将 my-scheduler
作为发挥作用的调度器。下面是 my-scheduler-policy
的具体内容,本质上是一个 JSON
文件,它可以修改 predicates
和 priority
的配置,这里主要修改的是 extenders
的配置。
因为 extender
和 kube-scheduler
在同一个 Pod 里面,所以这里的 urlPrefix
填写的地址是 localhost
,如果 extender
是按照 Pod 单独部署在集群中,你可以使用 extender
的 Service 。除此之外,你也看到了 filterVerb
等 EndPoint 的配置,此处不再赘述。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 apiVersion: v1 kind: ConfigMap metadata: name: my-scheduler-policy namespace: kube-system data: policy.cfg : | { "kind" : "Policy" , "apiVersion" : "v1" , "predicates" : [ {"name" : "PodFitsHostPorts" }, {"name" : "PodFitsResources" }, {"name" : "NoDiskConflict" }, {"name" : "MatchNodeSelector" }, {"name" : "HostName" } ], "priorities" : [ {"name" : "LeastRequestedPriority" , "weight" : 1 }, {"name" : "BalancedResourceAllocation" , "weight" : 1 }, {"name" : "ServiceSpreadingPriority" , "weight" : 1 }, {"name" : "EqualPriority" , "weight" : 1 } ], "extenders" : [{ "urlPrefix": "http://localhost/scheduler" , "filterVerb": "predicates/always_true" , "prioritizeVerb": "priorities/zero_score" , "preemptVerb": "preemption" , "bindVerb": "" , "weight": 1 , "enableHttps": false , "nodeCacheCapable": false }], "hardPodAffinitySymmetricWeight" : 10 }
最后,为了让 my-scheduler
在集群中运行,需要设定其 rbac
权限,这里直接给予了 cluster-admin
的权限:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 apiVersion: v1 kind: ServiceAccount metadata: name: my-scheduler namespace: kube-system --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding metadata: name: my-scheduler-cluster-admin namespace: kube-system roleRef: apiGroup: rbac.authorization.k8s.io kind: ClusterRole name: cluster-admin subjects: - kind: ServiceAccount namespace: kube-system name: my-scheduler
所有的这些都在 extender.yaml 可以找到,你可以将 extender 的 image 换成自己的 image,然后部署到集群。
启动之后,可以查看 extender
的日志如下:
1 2 3 4 5 6 7 $ kubectl -n kube-system logs deploy/my-scheduler -c my-scheduler-extender-ctr -f [ warn ] 2020/12/23 03:31:04 main.go:92: LOG_LEVEL="" is empty or invalid, fallling back to "INFO" . [ info ] 2020/12/23 03:31:04 main.go:106: Log level was set to INFO [ info ] 2020/12/23 03:31:04 main.go:124: server starting on the port :80 [ info ] 2020/12/23 03:31:51 routes.go:41: always_true ExtenderArgs = [ info ] 2020/12/23 03:31:51 routes.go:59: always_true extenderFilterResult = {"Nodes" :{"metadata" :{},"items" :[{"metadata" :{"name" :"10.0.1.4" ,"selfLink" :"/api/v1/nodes/10.0.1.4" ,"uid" :"37397636-06a7-49c9-8105-8790517de04d" ,"resourceVersion" :"274946975" ,"creationTimestamp" :"2020-12-23T02:37:42Z" ,"labels" :{"beta.kubernetes.io/arch" :"amd64" ,"beta.kubernetes.io/instance-type" :"S3.LARGE8" ,"beta.kubernetes.io/os" :"linux" ,"cloud.tencent.com/node-instance-id" :"ins-5a843lvl" ...
测试验证 下面对部署的 my-scheduler
进行验证,通过 test-pod
指定 schedulerName
如下:
1 2 3 4 5 6 7 8 9 10 11 apiVersion: v1 kind: Pod metadata: name: test-pod spec: schedulerName: my-scheduler containers: - name: nginx image: nginx ports: - containerPort: 80
部署 test-pod
如下:
1 2 3 4 5 6 7 8 9 10 11 12 $ kubectl create -f test -pod.yaml $ kubectl describe pod test -pod Name: test -pod Events: Type Reason Age From Message ---- ------ ---- ---- ------- Normal Scheduled <unknown> my-scheduler Successfully assigned default/test -pod to 10.0.1.4 Normal Pulling 7m34s kubelet, 10.0.1.4 Pulling image "nginx" Normal Pulled 7m27s kubelet, 10.0.1.4 Successfully pulled image "nginx" Normal Created 7m27s kubelet, 10.0.1.4 Created container nginx Normal Started 7m27s kubelet, 10.0.1.4 Started container nginx
可以看到 my-scheduler
部署成功,并且正常发挥作用。
参考资料