0%

【Kubernetes】Device Plugin

Kubernetes 原生支持对于CPU和内存资源的发现,但是有很多其他的设备 kubelet不能原生处理,比如GPU、FPGA、RDMA、存储设备和其他类似的异构计算资源设备。为了能够使用这些设备资源,我们需要进行各个设备的初始化和设置。按照 Kubernetes 的 OutOfTree 的哲学理念,我们不应该把各个厂商的设备初始化设置相关代码与 Kubernetes 核心代码放在一起。与之相反,我们需要一种机制能够让各个设备厂商向 Kubelet 上报设备资源,而不需要修改 Kubernetes 核心代码。这即是 Device Plugin 这一机制的来源,本文将介绍 Device Plugin 的实现原理,并介绍其使用。

Device 插件原理

Device Plugin 实际上是一个 gPRC server,Device 插件一般推荐使用 DaemonSet 的方式部署,并将 /var/lib/kubelet/device-plugins 以 Volume 的形式挂载到容器中。当然,也可以手动运行的方式来部署,但这样就没有失败自动恢复的功能了。

为了能够使用某个厂商的特定设备,一般有两步:

  • kubectl create -f http://vendor.com/device-plugin-daemonset.yaml
  • 执行 kubectl describe nodes的时候,相关设备会出现在node status中:vendor-domain/vendor-device

当 Device Plugin 向 kubelet 注册后,kubelet 就通过 RPC 与 Device Plugin 交互:

  • ListAndWatch() :让 kubelet 发现设备资源和对应属性,并且在设备资源发生变动的时候接收通知
  • Allocate() :kubelet 在创建容器前通过 Allocate来申请相关设备资源

Process

Registration

为了向 kubelet 告知 Device Plugin 的存在,Device Plugin 必须向 kubelet 发出注册请求,这之后 kubelet 才会和 Device Plugin 通过 gRPC交互,具体过程如下:

  • Device Plugin 向 Kubelet 发送一个 RegisterRequest的请求
  • Kubelet 收到 RegisterRequest 请求后,返回一个 RegisterResponse,如果Kubelet碰到任何错误,会把错误附在Response中
  • 如果 Device Plugin 没有收到任何错误,则启动他的 gRPC server

插件启动后要持续监控 Kubelet 的状态,并在 Kubelet 重启后重新注册自己。比如,Kubelet 刚启动后会清空 /var/lib/kubelet/device-plugins/ 目录,所以插件作者可以监控自己监听的 unix socket 是否被删除了,并根据此事件重新注册自己

Unix Socket

Device Plugin 和 Kubelet 通过在一个 Unix Socket上使用 gRPC 交互,当启动 gRPC server的时候,Device Plugin 将会在 /var/lib/kubelet/device-plugins/ 这个 HostPath 创建一个 UnixSocket,比如 /var/lib/kubelet/device-plugins/nvidiaGPU.sock

在实现 Device 插件时需要注意

  • 插件启动时,需要通过 /var/lib/kubelet/device-plugins/kubelet.sock 向 Kubelet 注册,同时提供插件的 Unix Socket 名称、API 的版本号和插件名称(格式为 vendor-domain/resource,如 nvidia.com/gpu)。Kubelet 会将这些设备暴露到 Node 状态中,方便后续调度器使用
  • 插件启动后向 Kubelet 发送插件列表、按需分配设备并持续监控设备的实时状态

Protocol Overview

Protocol Overview

API specification

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
// Registration is the service advertised by the Kubelet
// Only when Kubelet answers with a success code to a Register Request
// may Device Plugins start their service
// Registration may fail when device plugin version is not supported by
// Kubelet or the registered resourceName is already taken by another
// active device plugin. Device plugin is expected to terminate upon registration failure
service Registration {
rpc Register(RegisterRequest) returns (Empty) {}
}

message DevicePluginOptions {
// Indicates if PreStartContainer call is required before each container start
bool pre_start_required = 1;
}

message RegisterRequest {
// Version of the API the Device Plugin was built against
string version = 1;
// Name of the unix socket the device plugin is listening on
// PATH = path.Join(DevicePluginPath, endpoint)
string endpoint = 2;
// Schedulable resource name. As of now it's expected to be a DNS Label
string resource_name = 3;
// Options to be communicated with Device Manager
options = 4;
}

message Empty {
}

// DevicePlugin is the service advertised by Device Plugins
service DevicePlugin {
// GetDevicePluginOptions returns options to be communicated with Device
// Manager
rpc GetDevicePluginOptions(Empty) returns (DevicePluginOptions) {}

// ListAndWatch returns a stream of List of Devices
// Whenever a Device state change or a Device disapears, ListAndWatch
// returns the new list
rpc ListAndWatch(Empty) returns (stream ListAndWatchResponse) {}

// Allocate is called during container creation so that the Device
// Plugin can run device specific operations and instruct Kubelet
// of the steps to make the Device available in the container
rpc Allocate(AllocateRequest) returns (AllocateResponse) {}

// PreStartContainer is called, if indicated by Device Plugin during registeration phase,
// before each container start. Device plugin can run device specific operations
// such as reseting the device before making devices available to the container
rpc PreStartContainer(PreStartContainerRequest) returns (PreStartContainerResponse) {}
}

// ListAndWatch returns a stream of List of Devices
// Whenever a Device state change or a Device disapears, ListAndWatch
// returns the new list
message ListAndWatchResponse {
repeated Device devices = 1;
}

/* E.g:
* struct Device {
* ID: "GPU-fef8089b-4820-abfc-e83e-94318197576e",
* State: "Healthy",
*} */
message Device {
// A unique ID assigned by the device plugin used
// to identify devices during the communication
// Max length of this field is 63 characters
string ID = 1;
// Health of the device, can be healthy or unhealthy, see constants.go
string health = 2;
}

// - PreStartContainer is expected to be called before each container start if indicated by plugin during registration phase.
// - PreStartContainer allows kubelet to pass reinitialized devices to containers.
// - PreStartContainer allows Device Plugin to run device specific operations on
// the Devices requested
message PreStartContainerRequest {
repeated string devicesIDs = 1;
}

// PreStartContainerResponse will be send by plugin in response to PreStartContainerRequest
message PreStartContainerResponse {
}

// - Allocate is expected to be called during pod creation since allocation
// failures for any container would result in pod startup failure.
// - Allocate allows kubelet to exposes additional artifacts in a pod's
// environment as directed by the plugin.
// - Allocate allows Device Plugin to run device specific operations on
// the Devices requested
message AllocateRequest {
repeated ContainerAllocateRequest container_requests = 1;
}

message ContainerAllocateRequest {
repeated string devicesIDs = 1;
}

// AllocateResponse includes the artifacts that needs to be injected into
// a container for accessing 'deviceIDs' that were mentioned as part of
// 'AllocateRequest'.
// Failure Handling:
// if Kubelet sends an allocation request for dev1 and dev2.
// Allocation on dev1 succeeds but allocation on dev2 fails.
// The Device plugin should send a ListAndWatch update and fail the
// Allocation request
message AllocateResponse {
repeated ContainerAllocateResponse container_responses = 1;
}

message ContainerAllocateResponse {
// List of environment variable to be set in the container to access one of more devices.
map<string, string> envs = 1;
// Mounts for the container.
repeated Mount mounts = 2;
// Devices for the container.
repeated DeviceSpec devices = 3;
// Container annotations to pass to the container runtime
map<string, string> annotations = 4;
}

// Mount specifies a host volume to mount into a container.
// where device library or tools are installed on host and container
message Mount {
// Path of the mount within the container.
string container_path = 1;
// Path of the mount on the host.
string host_path = 2;
// If set, the mount is read-only.
bool read_only = 3;
}

// DeviceSpec specifies a host device to mount into a container.
message DeviceSpec {
// Path of the device within the container.
string container_path = 1;
// Path of the device on the host.
string host_path = 2;
// Cgroups permissions of the device, candidates are one or more of
// * r - allows container to read from the specified device.
// * w - allows container to write to the specified device.
// * m - allows container to create device files that do not yet exist.
string permissions = 3;
}

插件生命周期管理

插件启动时,以grpc的形式通过/var/lib/kubelet/device-plugins/kubelet.sock向Kubelet注册,同时提供插件的监听Unix Socket,API版本号和设备名称(比如nvidia.com/gpu)。Kubelet将会把这些设备暴露到Node状态中,以Extended Resource的要求发送到API server中,后续Scheduler会根据这些信息进行调度。

插件启动后,Kubelet会建立一个到插件的listAndWatch长连接,当插件检测到某个设备不健康的时候,就会主动通知Kubelet。此时如果这个设备处于空闲状态,Kubelet就会将其挪出可分配列表;如果该设备已经被某个pod使用,Kubelet就会将该Pod杀掉

插件启动后可以利用Kubelet的socket持续检查Kubelet的状态,如果Kubelet重启,插件也会相应的重启,并且重新向Kubelet注册自己

NVIDIA Device Plugin

NVIDIA 提供了一个基于 Device Plugins 接口的 GPU 设备插件 NVIDIA/k8s-device-plugin

部署

1
kubectl apply -f https://raw.githubusercontent.com/NVIDIA/k8s-device-plugin/master/nvidia-device-plugin.yml

创建 Pod 时请求 GPU 资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
apiVersion: v1
kind: Pod
metadata:
name: pod1
spec:
restartPolicy: OnFailure
containers:
- image: nvidia/cuda
name: pod1-ctr
command: ["sleep"]
args: ["100000"]

resources:
limits:
nvidia.com/gpu: 1

注意:使用该插件时需要配置 nvidia-docker 2.0,并配置 nvidia 为默认运行时 (即配置 docker daemon 的选项 --default-runtime=nvidia)。nvidia-docker 2.0 的安装方法为(以 Ubuntu Xenial 为例,其他系统的安装方法可以参考 这里):

整个Kubernetes调度GPU的过程如下:

  • GPU Device plugin 部署到GPU节点上,通过 ListAndWatch 接口,上报注册节点的GPU信息和对应的DeviceID。
  • 当有声明 nvidia.com/gpu 的GPU Pod创建出现,调度器会综合考虑GPU设备的空闲情况,将Pod调度到有充足GPU设备的节点上。
  • 节点上的kubelet 启动Pod时,根据request中的声明调用各个Device plugin 的 allocate接口, 由于容器声明了GPU。 kubelet 根据之前 ListAndWatch 接口收到的Device信息,选取合适的设备,DeviceID 作为参数,调用GPU DevicePlugin的 Allocate 接口
  • GPU DevicePlugin ,接收到调用,将DeviceID 转换为 NVIDIA_VISIBLE_DEVICES 环境变量,返回kubelet
  • kubelet将环境变量注入到Pod, 启动容器
  • 容器启动时, gpu-container-runtime 调用 gpu-containers-runtime-hook
  • gpu-containers-runtime-hook 根据容器的 NVIDIA_VISIBLE_DEVICES 环境变量,转换为 --devices 参数,调用 nvidia-container-cli prestart
  • nvidia-container-cli 根据 --devices ,将GPU设备映射到容器中。 并且将宿主机的Nvidia Driver Lib 的so文件也映射到容器中。 此时容器可以通过这些so文件,调用宿主机的Nvidia Driver。

在前面 API Specification 中,通过 Protobuf 定义了 DevicePlugin 应该提供的服务,在 Kubelet 中会调用 DevicePluginClient 来使用对应的服务,这里的 DevicePluginClient 即是通过 Protobuf 自动生成的代码。

k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1/api.pb.go
1
2
3
4
5
6
type DevicePluginClient interface {
GetDevicePluginOptions(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*DevicePluginOptions, error)
ListAndWatch(ctx context.Context, in *Empty, opts ...grpc.CallOption) (DevicePlugin_ListAndWatchClient, error)
Allocate(ctx context.Context, in *AllocateRequest, opts ...grpc.CallOption) (*AllocateResponse, error)
PreStartContainer(ctx context.Context, in *PreStartContainerRequest, opts ...grpc.CallOption) (*PreStartContainerResponse, error)
}

NVIDIA/k8s-device-plugin 中,我们可以看到上面不同服务的具体实现:

1
2
3
4
func (m *NvidiaDevicePlugin) GetDevicePluginOptions(context.Context, *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error)
func (m *NvidiaDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error
func (m *NvidiaDevicePlugin) Allocate(ctx context.Context, reqs *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error)
func (m *NvidiaDevicePlugin) PreStartContainer(context.Context, *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error)

NVIDIA/k8s-device-plugin 来说,这里的关键数据结构为 NvidiaDevicePlugin,它实现了 Device Plugin 架构定义的API:

1
2
3
4
5
6
7
8
9
10
11
12
type NvidiaDevicePlugin struct {
ResourceManager
resourceName string
deviceListEnvvar string
allocatePolicy gpuallocator.Policy
socket string

server *grpc.Server
cachedDevices []*Device
health chan *Device
stop chan interface{}
}

下面根据 Device Plugin 的生命周期,依次分析每个部分的实现机制。

NVIDIA DevicePlugin 启动

NVIDIAk8s-device-plugin 启动之后逻辑如下,总的来说干了三件事:

  • Serve:启动 gRPC server
  • Register:向 Kubelet 注册给定的 resourceName
  • CheckHealth:执行设备的健康检查逻辑,当检查到不健康的设备时,写到 unhealthy 的 channel 中
1
2
3
4
5
6
7
8
9
10
11
12
13
func (m *NvidiaDevicePlugin) Start() error {
m.initialize()

err := m.Serve()
// ...

err = m.Register()
// ...

go m.CheckHealth(m.stop, m.cachedDevices, m.health)

return nil
}

Serve

Serve 监听在/var/lib/kubelet/device-plugins/nvidia-gpu.sock 这 个 Unix Socket,并且启动了 gRPC server,其他的就是启动失败重试的逻辑了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func (m *NvidiaDevicePlugin) Serve() error {
os.Remove(m.socket)
sock, err := net.Listen("unix", m.socket)
if err != nil {
return err
}

pluginapi.RegisterDevicePluginServer(m.server, m)

go func() {
lastCrashTime := time.Now()
restartCount := 0
for {
log.Printf("Starting GRPC server for '%s'", m.resourceName)
err := m.server.Serve(sock)
if err == nil {
break
}

log.Printf("GRPC server for '%s' crashed with error: %v", m.resourceName, err)

// restart if it has not been too often
// i.e. if server has crashed more than 5 times and it didn't last more than one hour each time
if restartCount > 5 {
// quit
log.Fatalf("GRPC server for '%s' has repeatedly crashed recently. Quitting", m.resourceName)
}
timeSinceLastCrash := time.Since(lastCrashTime).Seconds()
lastCrashTime = time.Now()
if timeSinceLastCrash > 3600 {
// it has been one hour since the last crash.. reset the count
// to reflect on the frequency
restartCount = 1
} else {
restartCount++
}
}
}()

// Wait for server to start by launching a blocking connexion
conn, err := m.dial(m.socket, 5*time.Second)
if err != nil {
return err
}
conn.Close()

return nil
}

Register

Register 通过和 /var/lib/kubelet/device-plugins/kubelet.sock 这个 Unix SocketKubelet 注册,传递了 DevicePluginUnix Socket 的 Endpoint、资源的名称、API的版本号等信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (m *NvidiaDevicePlugin) Register() error {
conn, err := m.dial(pluginapi.KubeletSocket, 5*time.Second)
if err != nil {
return err
}
defer conn.Close()

client := pluginapi.NewRegistrationClient(conn)
reqt := &pluginapi.RegisterRequest{
Version: pluginapi.Version,
Endpoint: path.Base(m.socket),
ResourceName: m.resourceName,
Options: &pluginapi.DevicePluginOptions{
GetPreferredAllocationAvailable: (m.allocatePolicy != nil),
},
}

_, err = client.Register(context.Background(), reqt)
if err != nil {
return err
}
return nil
}

CheckHealth

这里调用了 nvml.NewEventSet 来监听 GPU 是否发生变化的事件,并且将 unhealthy Device 传递给 m.health 这个channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
func checkHealth(stop <-chan interface{}, devices []*Device, unhealthy chan<- *Device) {
disableHealthChecks := strings.ToLower(os.Getenv(envDisableHealthChecks))
if disableHealthChecks == "all" {
disableHealthChecks = allHealthChecks
}
if strings.Contains(disableHealthChecks, "xids") {
return
}

eventSet := nvml.NewEventSet()
defer nvml.DeleteEventSet(eventSet)

for _, d := range devices {
gpu, _, _, err := nvml.ParseMigDeviceUUID(d.ID)
if err != nil {
gpu = d.ID
}

err = nvml.RegisterEventForDevice(eventSet, nvml.XidCriticalError, gpu)
if err != nil && strings.HasSuffix(err.Error(), "Not Supported") {
log.Printf("Warning: %s is too old to support healthchecking: %s. Marking it unhealthy.", d.ID, err)
unhealthy <- d
continue
}
check(err)
}

for {
select {
case <-stop:
return
default:
}

e, err := nvml.WaitForEvent(eventSet, 5000)
if err != nil && e.Etype != nvml.XidCriticalError {
continue
}

// FIXME: formalize the full list and document it.
// http://docs.nvidia.com/deploy/xid-errors/index.html#topic_4
// Application errors: the GPU should still be healthy
if e.Edata == 31 || e.Edata == 43 || e.Edata == 45 {
continue
}

if e.UUID == nil || len(*e.UUID) == 0 {
// All devices are unhealthy
log.Printf("XidCriticalError: Xid=%d, All devices will go unhealthy.", e.Edata)
for _, d := range devices {
unhealthy <- d
}
continue
}

for _, d := range devices {
// Please see https://github.com/NVIDIA/gpu-monitoring-tools/blob/148415f505c96052cb3b7fdf443b34ac853139ec/bindings/go/nvml/nvml.h#L1424
// for the rationale why gi and ci can be set as such when the UUID is a full GPU UUID and not a MIG device UUID.
gpu, gi, ci, err := nvml.ParseMigDeviceUUID(d.ID)
if err != nil {
gpu = d.ID
gi = 0xFFFFFFFF
ci = 0xFFFFFFFF
}

if gpu == *e.UUID && gi == *e.GpuInstanceId && ci == *e.ComputeInstanceId {
log.Printf("XidCriticalError: Xid=%d on Device=%s, the device will go unhealthy.", e.Edata, d.ID)
unhealthy <- d
}
}
}
}

Kubelet DeviceManager

DeviceManager 启动

kubernetes/pkg/kubelet/cm/devicemanager/manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
type ManagerImpl struct {
socketname string
socketdir string

endpoints map[string]endpointInfo // Key is ResourceName
mutex sync.Mutex

server *grpc.Server
wg sync.WaitGroup

// activePods is a method for listing active pods on the node
// so the amount of pluginResources requested by existing pods
// could be counted when updating allocated devices
activePods ActivePodsFunc

// sourcesReady provides the readiness of kubelet configuration sources such as apiserver update readiness.
// We use it to determine when we can purge inactive pods from checkpointed state.
sourcesReady config.SourcesReady

// callback is used for updating devices' states in one time call.
// e.g. a new device is advertised, two old devices are deleted and a running device fails.
callback monitorCallback

// allDevices is a map by resource name of all the devices currently registered to the device manager
allDevices map[string]map[string]pluginapi.Device

// healthyDevices contains all of the registered healthy resourceNames and their exported device IDs.
healthyDevices map[string]sets.String

// unhealthyDevices contains all of the unhealthy devices and their exported device IDs.
unhealthyDevices map[string]sets.String

// allocatedDevices contains allocated deviceIds, keyed by resourceName.
allocatedDevices map[string]sets.String

// podDevices contains pod to allocated device mapping.
podDevices podDevices
checkpointManager checkpointmanager.CheckpointManager

// List of NUMA Nodes available on the underlying machine
numaNodes []int

// Store of Topology Affinties that the Device Manager can query.
topologyAffinityStore topologymanager.Store

// devicesToReuse contains devices that can be reused as they have been allocated to
// init containers.
devicesToReuse PodReusableDevices
}

Device Managerkubelet 启动时的 NewContainerManager 中创建,属于 containerManager 的子模块。

1
2
3
4
5
6
7
8
9
10
11
12
13
func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, devicePluginEnabled bool, recorder record.EventRecorder) (ContainerManager, error) {
// ...

klog.Infof("Creating device plugin manager: %t", devicePluginEnabled)
if devicePluginEnabled {
cm.deviceManager, err = devicemanager.NewManagerImpl(numaNodeInfo, cm.topologyManager)
cm.topologyManager.AddHintProvider(cm.deviceManager)
} else {
cm.deviceManager, err = devicemanager.NewManagerStub()
}

// ...
}

具体创建 DeviceManager 的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func newManagerImpl(socketPath string, numaNodeInfo cputopology.NUMANodeInfo, topologyAffinityStore topologymanager.Store) (*ManagerImpl, error) {
klog.V(2).Infof("Creating Device Plugin manager at %s", socketPath)

if socketPath == "" || !filepath.IsAbs(socketPath) {
return nil, fmt.Errorf(errBadSocket+" %s", socketPath)
}

var numaNodes []int
for node := range numaNodeInfo {
numaNodes = append(numaNodes, node)
}

dir, file := filepath.Split(socketPath)
manager := &ManagerImpl{
endpoints: make(map[string]endpointInfo),

socketname: file,
socketdir: dir,
allDevices: make(map[string]map[string]pluginapi.Device),
healthyDevices: make(map[string]sets.String),
unhealthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
podDevices: make(podDevices),
numaNodes: numaNodes,
topologyAffinityStore: topologyAffinityStore,
devicesToReuse: make(PodReusableDevices),
}
manager.callback = manager.genericDeviceUpdateCallback

// The following structures are populated with real implementations in manager.Start()
// Before that, initializes them to perform no-op operations.
manager.activePods = func() []*v1.Pod { return []*v1.Pod{} }
manager.sourcesReady = &sourcesReadyStub{}
checkpointManager, err := checkpointmanager.NewCheckpointManager(dir)
if err != nil {
return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
}
manager.checkpointManager = checkpointManager

return manager, nil
}

其中除了构建 DeviceManager 相关的结构之外,另外做的一个事情就是注册了一个 callback,用来处理对应 devicesadddeleteupdate 事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices []pluginapi.Device) {
m.mutex.Lock()
m.healthyDevices[resourceName] = sets.NewString()
m.unhealthyDevices[resourceName] = sets.NewString()
m.allDevices[resourceName] = make(map[string]pluginapi.Device)
for _, dev := range devices {
m.allDevices[resourceName][dev.ID] = dev
if dev.Health == pluginapi.Healthy {
m.healthyDevices[resourceName].Insert(dev.ID)
} else {
m.unhealthyDevices[resourceName].Insert(dev.ID)
}
}
m.mutex.Unlock()
if err := m.writeCheckpoint(); err != nil {
klog.Errorf("writing checkpoint encountered %v", err)
}
}

接下来到了 DeviceManager 启动的方法,它读取了 checkpoint file 中的数据,恢复 ManagerImpl中的相关数据,包括:

  • podDevices
  • allocatedDevices
  • healthyDevices
  • unhealthyDevices
  • endpoints

然后将 /var/lib/kubelet/device-plugins/ 下面的除了 checkpiont文件 的所有文件清空,也就是清空所有的socket文件,包括自己的 kubelet.sock,以及其他所有之前的 DevicePlugin 的socket文件。最后创建 kubelet.sock 并启动 gRPC Server对外提供gRPC服务,其中 Register()用于 DevicePlugin 调用进行插件注册。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error {
klog.V(2).Infof("Starting Device Plugin manager")

m.activePods = activePods
m.sourcesReady = sourcesReady

// Loads in allocatedDevices information from disk.
err := m.readCheckpoint()
if err != nil {
klog.Warningf("Continue after failing to read checkpoint file. Device allocation info may NOT be up-to-date. Err: %v", err)
}

socketPath := filepath.Join(m.socketdir, m.socketname)
if err = os.MkdirAll(m.socketdir, 0750); err != nil {
return err
}
if selinux.SELinuxEnabled() {
if err := selinux.SetFileLabel(m.socketdir, config.KubeletPluginsDirSELinuxLabel); err != nil {
klog.Warningf("Unprivileged containerized plugins might not work. Could not set selinux context on %s: %v", m.socketdir, err)
}
}

// Removes all stale sockets in m.socketdir. Device plugins can monitor
// this and use it as a signal to re-register with the new Kubelet.
if err := m.removeContents(m.socketdir); err != nil {
klog.Errorf("Fail to clean up stale contents under %s: %v", m.socketdir, err)
}

s, err := net.Listen("unix", socketPath)
if err != nil {
klog.Errorf(errListenSocket+" %v", err)
return err
}

m.wg.Add(1)
m.server = grpc.NewServer([]grpc.ServerOption{}...)

pluginapi.RegisterRegistrationServer(m.server, m)
go func() {
defer m.wg.Done()
m.server.Serve(s)
}()

klog.V(2).Infof("Serving device plugin registration server on %q", socketPath)

return nil
}

DeviceManager 注册

DeviceManager 接收到 DevicePlugin的 RegisterRequest请求,其结构体如下

1
2
3
4
5
6
type RegisterRequest struct {
Version string
Endpoint string
ResourceName string
Options *DevicePluginOptions
}

检查注册的device Name、version是否符合 Extended Resource 的规则,Name不能属于kubernetes.i o,得有自己的domain,比如nvidia.com

根据 endpoint 信息创建 EndpointImpl 对象,即根据 endpoint 建立 socket 连接:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (m *ManagerImpl) RegisterPlugin(pluginName string, endpoint string, versions []string) error {
klog.V(2).Infof("Registering Plugin %s at endpoint %s", pluginName, endpoint)

e, err := newEndpointImpl(endpoint, pluginName, m.callback)
if err != nil {
return fmt.Errorf("failed to dial device plugin with socketPath %s: %v", endpoint, err)
}

options, err := e.client.GetDevicePluginOptions(context.Background(), &pluginapi.Empty{})
if err != nil {
return fmt.Errorf("failed to get device plugin options: %v", err)
}

m.registerEndpoint(pluginName, options, e)
go m.runEndpoint(pluginName, e)

return nil
}

下面是 endPointsImpl 的具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
type endpointImpl struct {
client pluginapi.DevicePluginClient
clientConn *grpc.ClientConn

socketPath string
resourceName string
stopTime time.Time

mutex sync.Mutex
cb monitorCallback
}

func newEndpointImpl(socketPath, resourceName string, callback monitorCallback) (*endpointImpl, error) {
client, c, err := dial(socketPath)
if err != nil {
klog.Errorf("Can't create new endpoint with path %s err %v", socketPath, err)
return nil, err
}

return &endpointImpl{
client: client,
clientConn: c,

socketPath: socketPath,
resourceName: resourceName,

cb: callback,
}, nil
}

执行 EndpointImpl 对象的 run(),在 run方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (e *endpointImpl) run() {
stream, err := e.client.ListAndWatch(context.Background(), &pluginapi.Empty{})
if err != nil {
klog.Errorf(errListAndWatch, e.resourceName, err)

return
}

for {
response, err := stream.Recv()
if err != nil {
klog.Errorf(errListAndWatch, e.resourceName, err)
return
}

devs := response.Devices
klog.V(2).Infof("State pushed for device plugin %s", e.resourceName)

var newDevs []pluginapi.Device
for _, d := range devs {
newDevs = append(newDevs, *d)
}

e.callback(e.resourceName, newDevs)
}
}
  • 调用 DevicePluginListAndWatch gRPC 接口,通过长连接持续获取 ListAndWatch gRPC stream
  • stream 流中获取的devices详情列表然后调用Endpoint的 callback,也就是 ManagerImpl 注册的callback方法genericDeviceUpdateCallback进行Device Manager的缓存更新并写到checkpoint文件中
  • run()是通过协程启动的,持续获取device server的ListAndWatch结果,持续更新device状态
  • 当获取异常时,deviceManager断开连接,将device设置为不健康的状态。

ListAndWatch

看一下 DevicePlugin 实现的 ListAndWatch,先是立马返回device详情列表,然后开启协程,一旦感知device的健康状态发生变化了,更新 device 详情列表再次返回给 deviceManager。回想起健康检查,DevicePluginCheckHealth 就就会将设备的健康状态传递给 m.health 这个 channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (m *NvidiaDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error {
s.Send(&pluginapi.ListAndWatchResponse{Devices: m.apiDevices()})

for {
select {
case <-m.stop:
return nil
case d := <-m.health:
// FIXME: there is no way to recover from the Unhealthy state.
d.Health = pluginapi.Unhealthy
log.Printf("'%s' device marked unhealthy: %s", m.resourceName, d.ID)
s.Send(&pluginapi.ListAndWatchResponse{Devices: m.apiDevices()})
}
}
}

那么问题来了,DevicePlugin 是如何知道有多少 Device 的呢?我们看看 apiDevices 的实现:

1
2
3
4
5
6
7
func (m *NvidiaDevicePlugin) apiDevices() []*pluginapi.Device {
var pdevs []*pluginapi.Device
for _, d := range m.cachedDevices {
pdevs = append(pdevs, &d.Device)
}
return pdevs
}

这里的 cachedDevices 是通过 ResourceManager 获得的 Device 信息,其具体通过 GpuDeviceManager 结构来实现,可以看到它们是调用了 nvml 库而实现的。这里还有一个 MigDeviceManager 本质上相同,不再概述。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (g *GpuDeviceManager) Devices() []*Device {
n, err := nvml.GetDeviceCount()
check(err)

var devs []*Device
for i := uint(0); i < n; i++ {
d, err := nvml.NewDeviceLite(i)
check(err)

migEnabled, err := d.IsMigEnabled()
check(err)

if migEnabled && g.skipMigEnabledGPUs {
continue
}

devs = append(devs, buildDevice(d))
}

return devs
}

Allocation

kubelet 接收到被调度到本节点的pods后

HandlePodAdditions

当 Node 上的 Kubelet 监听到有新的 Pod 创建时,会调用 HandlerPodAdditions 来处理 Pod 创建的事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
switch u.Op {
case kubetypes.ADD:
klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
klog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletionTimestamps(u.Pods))
handler.HandlePodUpdates(u.Pods)
// ...
}
case e := <-plegCh:
// ...
}
return true
}

接下来进一步看下 HandlerPodAdditions 的实现,对于传入的每一个 Pod ,如果它没有被 terminate,则通过 canAdmitPod 检查是否可以允许该 Pod 创建。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
start := kl.clock.Now()
sort.Sort(sliceutils.PodsByCreationTime(pods))
for _, pod := range pods {
existingPods := kl.podManager.GetPods()
kl.podManager.AddPod(pod)

// ...
if !kl.podIsTerminated(pod) {
activePods := kl.filterOutTerminatedPods(existingPods)
if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
kl.rejectPod(pod, reason, message)
continue
}
}
// ...
}
}

canAdmitPod 里面,Kubelet 将会依次执行每一个 admit handler 来看 Pod 能否通过。

1
2
3
4
5
6
7
8
9
10
11
// "pod" is new pod, while "pods" are all admitted pods
func (kl *Kubelet) canAdmitPod(pods []*v1.Pod, pod *v1.Pod) (bool, string, string) {
attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: pods}
for _, podAdmitHandler := range kl.admitHandlers {
if result := podAdmitHandler.Admit(attrs); !result.Admit {
return false, result.Reason, result.Message
}
}

return true, "", ""
}

admitHandlers 是一个 PodAdmitHandler 的切片,其接口如下:

1
2
3
4
type PodAdmitHandler interface {
// Admit evaluates if a pod can be admitted.
Admit(attrs *PodAdmitAttributes) PodAdmitResult
}

Kubelet 在创建的时候会添加一系列的 PodAdmitHandler 用于检查,对pod的资源做一些准入判断,比如:

  • evictionAdmitHandler :当节点有内存压力时,拒绝创建best effort的pod,还有其它条件先略过
  • TopologyPodAdmitHandler:拒绝创建因为Topology locality冲突而无法分配资源的pod
1
2
3
  klet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler)
klet.admitHandlers.AddPodAdmitHandler(klet.containerManager.GetAllocateResourcesPodAdmitHandler())
// ...

与我们 DevicePlugin 相关的则是 containerManagerresourceAllocator,这里会分别调用 DeviceManagerCpuManagerAllocate 函数,看是否能够申请到相关的资源。这里会对 Pod 的每一个 InitContainerContainer检查,看能否申请到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (m *resourceAllocator) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
pod := attrs.Pod

for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
err := m.deviceManager.Allocate(pod, &container)
if err != nil {
return lifecycle.PodAdmitResult{
Message: fmt.Sprintf("Allocate failed due to %v, which is unexpected", err),
Reason: "UnexpectedAdmissionError",
Admit: false,
}
}

if m.cpuManager != nil {
err = m.cpuManager.Allocate(pod, &container)
if err != nil {
return lifecycle.PodAdmitResult{
Message: fmt.Sprintf("Allocate failed due to %v, which is unexpected", err),
Reason: "UnexpectedAdmissionError",
Admit: false,
}
}
}
}

return lifecycle.PodAdmitResult{Admit: true}
}

接下来我们看 ManagerImplAllocate 函数实现。

ManagerImpl.Allocate

  • allocateContainerResources为Pod中的init container分配devices,并更新deviceManager中PodDevices缓存;
  • allocateContainerResources为 Pod中的regular container分配devices,并更新deviceManager中PodDevices缓存
    • 每次在为Pod分配devices之前,都去检查一下此时的active pods,并与podDevices缓存中的pods进行比对,将已经terminated的Pods的devices从podDevices中删除,即进行了devices的GC操作。
    • healthyDevices 中随机分配对应数量的devices给该Pod,并注意更新allocatedDevices,否则会导致一个device被分配给多个Pod。
    • 拿到devices后,就通过Grpc调用 DevicePluginAllocate方法,DevicePlugin 返回 ContainerAllocateResponse (包括注入的环境变量、挂载信息、Annotations),deviceManager
    • 根据 pod uuidcontainer name 将返回的信息存入 podDevices 缓存,更新 podDevices 缓存信息,并将deviceManager 中缓存数据更新到 checkpoint 文件中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (m *ManagerImpl) Allocate(pod *v1.Pod, container *v1.Container) error {
if _, ok := m.devicesToReuse[string(pod.UID)]; !ok {
m.devicesToReuse[string(pod.UID)] = make(map[string]sets.String)
}
// If pod entries to m.devicesToReuse other than the current pod exist, delete them.
for podUID := range m.devicesToReuse {
if podUID != string(pod.UID) {
delete(m.devicesToReuse, podUID)
}
}
// Allocate resources for init containers first as we know the caller always loops
// through init containers before looping through app containers. Should the caller
// ever change those semantics, this logic will need to be amended.
for _, initContainer := range pod.Spec.InitContainers {
if container.Name == initContainer.Name {
if err := m.allocateContainerResources(pod, container, m.devicesToReuse[string(pod.UID)]); err != nil {
return err
}
m.podDevices.addContainerAllocatedResources(string(pod.UID), container.Name, m.devicesToReuse[string(pod.UID)])
return nil
}
}
if err := m.allocateContainerResources(pod, container, m.devicesToReuse[string(pod.UID)]); err != nil {
return err
}
m.podDevices.removeContainerAllocatedResources(string(pod.UID), container.Name, m.devicesToReuse[string(pod.UID)])
return nil

}

接下来我们看 allocateContainerResource 的实现,因为扩展资源是DevicePlugin 所发现的,而扩展资源不允许过量提交,因此要求容器中的 RequestLimits 相等,并且 DevicePlugin 会遍历所有的 Limits 保证资源是充足的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Container, devicesToReuse map[string]sets.String) error {
podUID := string(pod.UID)
contName := container.Name
allocatedDevicesUpdated := false
// Extended resources are not allowed to be overcommitted.
// Since device plugin advertises extended resources,
// therefore Requests must be equal to Limits and iterating
// over the Limits should be sufficient.
for k, v := range container.Resources.Limits {
resource := string(k)
needed := int(v.Value())
klog.V(3).Infof("needs %d %s", needed, resource)
if !m.isDevicePluginResource(resource) {
continue
}
// Updates allocatedDevices to garbage collect any stranded resources
// before doing the device plugin allocation.
if !allocatedDevicesUpdated {
m.UpdateAllocatedDevices()
allocatedDevicesUpdated = true
}
allocDevices, err := m.devicesToAllocate(podUID, contName, resource, needed, devicesToReuse[resource])
if err != nil {
return err
}
if allocDevices == nil || len(allocDevices) <= 0 {
continue
}

startRPCTime := time.Now()
// Manager.Allocate involves RPC calls to device plugin, which
// could be heavy-weight. Therefore we want to perform this operation outside
// mutex lock. Note if Allocate call fails, we may leave container resources
// partially allocated for the failed container. We rely on UpdateAllocatedDevices()
// to garbage collect these resources later. Another side effect is that if
// we have X resource A and Y resource B in total, and two containers, container1
// and container2 both require X resource A and Y resource B. Both allocation
// requests may fail if we serve them in mixed order.
// TODO: may revisit this part later if we see inefficient resource allocation
// in real use as the result of this. Should also consider to parallelize device
// plugin Allocate grpc calls if it becomes common that a container may require
// resources from multiple device plugins.
m.mutex.Lock()
eI, ok := m.endpoints[resource]
m.mutex.Unlock()
if !ok {
m.mutex.Lock()
m.allocatedDevices = m.podDevices.devices()
m.mutex.Unlock()
return fmt.Errorf("unknown Device Plugin %s", resource)
}

devs := allocDevices.UnsortedList()
// TODO: refactor this part of code to just append a ContainerAllocationRequest
// in a passed in AllocateRequest pointer, and issues a single Allocate call per pod.
klog.V(3).Infof("Making allocation request for devices %v for device plugin %s", devs, resource)
resp, err := eI.e.allocate(devs)
metrics.DevicePluginAllocationDuration.WithLabelValues(resource).Observe(metrics.SinceInSeconds(startRPCTime))
if err != nil {
// In case of allocation failure, we want to restore m.allocatedDevices
// to the actual allocated state from m.podDevices.
m.mutex.Lock()
m.allocatedDevices = m.podDevices.devices()
m.mutex.Unlock()
return err
}

if len(resp.ContainerResponses) == 0 {
return fmt.Errorf("no containers return in allocation response %v", resp)
}

// Update internal cached podDevices state.
m.mutex.Lock()
m.podDevices.insert(podUID, contName, resource, allocDevices, resp.ContainerResponses[0])
m.mutex.Unlock()
}

// Checkpoints device to container allocation information.
return m.writeCheckpoint()
}

我们看到,这里通过 resp, err := eI.e.allocate(devs) 执行 RPC 调用,进入到了 DevicePlugin 的逻辑。这里有一个问题,RPC 远程调用中的 deviceIDs 参数是怎么来的呢?我们看到这里有一个 devicesToAllocate的调用。这里的主要逻辑如下:

  • 拿到对应Pod的对应容器已经申请的资源的设备列表,检查是否只申请了部分,如果只有一部分,那么报错
  • 然后从 resuableDevices 结构中拿到可以使用的设备列表,如果可用的足够则返回,否则继续从 healthyDevices 中找
  • healthyDevices 去掉已经在使用的设备,然后检查是否足够,如果不够则报错
  • 如果足够的话,根据是否有满足拓扑亲和性去拿到足够的设备列表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, required int, reusableDevices sets.String) (sets.String, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
needed := required
// Gets list of devices that have already been allocated.
// This can happen if a container restarts for example.
devices := m.podDevices.containerDevices(podUID, contName, resource)
if devices != nil {
klog.V(3).Infof("Found pre-allocated devices for resource %s container %q in Pod %q: %v", resource, contName, podUID, devices.List())
needed = needed - devices.Len()
// A pod's resource is not expected to change once admitted by the API server,
// so just fail loudly here. We can revisit this part if this no longer holds.
if needed != 0 {
return nil, fmt.Errorf("pod %q container %q changed request for resource %q from %d to %d", podUID, contName, resource, devices.Len(), required)
}
}
if needed == 0 {
// No change, no work.
return nil, nil
}
klog.V(3).Infof("Needs to allocate %d %q for pod %q container %q", needed, resource, podUID, contName)
// Needs to allocate additional devices.
if _, ok := m.healthyDevices[resource]; !ok {
return nil, fmt.Errorf("can't allocate unregistered device %s", resource)
}
devices = sets.NewString()
// Allocates from reusableDevices list first.
for device := range reusableDevices {
devices.Insert(device)
needed--
if needed == 0 {
return devices, nil
}
}
// Needs to allocate additional devices.
if m.allocatedDevices[resource] == nil {
m.allocatedDevices[resource] = sets.NewString()
}
// Gets Devices in use.
devicesInUse := m.allocatedDevices[resource]
// Gets a list of available devices.
available := m.healthyDevices[resource].Difference(devicesInUse)
if available.Len() < needed {
return nil, fmt.Errorf("requested number of devices unavailable for %s. Requested: %d, Available: %d", resource, needed, available.Len())
}
// By default, pull devices from the unsorted list of available devices.
allocated := available.UnsortedList()[:needed]
// If topology alignment is desired, update allocated to the set of devices
// with the best alignment.
hint := m.topologyAffinityStore.GetAffinity(podUID, contName)
if m.deviceHasTopologyAlignment(resource) && hint.NUMANodeAffinity != nil {
allocated = m.takeByTopology(resource, available, hint.NUMANodeAffinity, needed)
}
// Updates m.allocatedDevices with allocated devices to prevent them
// from being allocated to other pods/containers, given that we are
// not holding lock during the rpc call.
for _, device := range allocated {
m.allocatedDevices[resource].Insert(device)
devices.Insert(device)
}
return devices, nil
}

RPC 调用成功后,会将对应的 Response 记录到 m.podDevices 中。

1
2
3
4
5
6
7
8
9
10
11
12
func (pdev podDevices) insert(podUID, contName, resource string, devices sets.String, resp *pluginapi.ContainerAllocateResponse) {
if _, podExists := pdev[podUID]; !podExists {
pdev[podUID] = make(containerDevices)
}
if _, contExists := pdev[podUID][contName]; !contExists {
pdev[podUID][contName] = make(resourceAllocateInfo)
}
pdev[podUID][contName][resource] = deviceAllocateInfo{
deviceIds: devices,
allocResp: resp,
}
}

DevicePlugin.Allocate

Allocate 接口给容器加上 NVIDIA_VISIBLE_DEVICES 环境变量,设置了相关的 DeviceSpec参数,将 Response 返回给 Kubelet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (m *NvidiaDevicePlugin) Allocate(ctx context.Context, reqs *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
responses := pluginapi.AllocateResponse{}
for _, req := range reqs.ContainerRequests {
for _, id := range req.DevicesIDs {
if !m.deviceExists(id) {
return nil, fmt.Errorf("invalid allocation request for '%s': unknown device: %s", m.resourceName, id)
}
}

response := pluginapi.ContainerAllocateResponse{}

if *deviceListStrategyFlag == DeviceListStrategyEnvvar {
response.Envs = m.apiEnvs(m.deviceListEnvvar, req.DevicesIDs)
}
if *deviceListStrategyFlag == DeviceListStrategyVolumeMounts {
response.Envs = m.apiEnvs(m.deviceListEnvvar, []string{deviceListAsVolumeMountsContainerPathRoot})
response.Mounts = m.apiMounts(req.DevicesIDs)
}
if *passDeviceSpecs {
response.Devices = m.apiDeviceSpecs(req.DevicesIDs)
}

responses.ContainerResponses = append(responses.ContainerResponses, &response)
}

return &responses, nil
}

前面我们提到, Nvidia的 gpu-container-runtime 根据容器的 NVIDIA_VISIBLE_DEVICES 环境变量,会决定这个容器是否为GPU容器,并且可以使用哪些GPU设备。 而Nvidia GPU device plugin做的事情,就是根据kubelet 请求中的GPU DeviceId, 转换为 NVIDIA_VISIBLE_DEVICES 环境变量返回给kubelet, kubelet收到返回内容后,会自动将返回的环境变量注入到容器中。当容器中包含环境变量,启动时 gpu-container-runtime 会根据 NVIDIA_VISIBLE_DEVICES 里声明的设备信息,将设备映射到容器中,并将对应的Nvidia Driver Lib 也映射到容器中。

Device 的使用

在kubelet的 GetResource 中,会调用 DeviceManagerGetDeviceRunContainerOptions,并将这些 options添加到kubecontainer.RunContainerOptions 中。RunContainerOptions 包括 EnvsMountsDevicesPortMappingsAnnotations等信息。kubelet调用 GetResources() 为启动container获取启动参数 runtimeapi.ContainerConfig{Args...}

kubernetes/pkg/kubelet/cm/container_manager_linux.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) {
opts := &kubecontainer.RunContainerOptions{}
// Allocate should already be called during predicateAdmitHandler.Admit(),
// just try to fetch device runtime information from cached state here
devOpts, err := cm.deviceManager.GetDeviceRunContainerOptions(pod, container)
if err != nil {
return nil, err
} else if devOpts == nil {
return opts, nil
}
opts.Devices = append(opts.Devices, devOpts.Devices...)
opts.Mounts = append(opts.Mounts, devOpts.Mounts...)
opts.Envs = append(opts.Envs, devOpts.Envs...)
opts.Annotations = append(opts.Annotations, devOpts.Annotations...)
return opts, nil
}

GetDeviceRunContainerOptions() 根据 pod uuidcontainer namepodDevices 缓存(device的分配过程中会设置缓存数据)中取出Envs、Mounts、Devices、PortMappings、Annotations等信息,另外对于一些PreStartRequired为true的 DevicePlugin,deviceManager需要在启动container之前调用 DevicePluginPreStartContainergrpc接口,做一些device的初始化工作,超时时间限制为30秒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (m *ManagerImpl) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) (*DeviceRunContainerOptions, error) {
podUID := string(pod.UID)
contName := container.Name
needsReAllocate := false
for k := range container.Resources.Limits {
resource := string(k)
if !m.isDevicePluginResource(resource) {
continue
}
err := m.callPreStartContainerIfNeeded(podUID, contName, resource)
if err != nil {
return nil, err
}
// This is a device plugin resource yet we don't have cached
// resource state. This is likely due to a race during node
// restart. We re-issue allocate request to cover this race.
if m.podDevices.containerDevices(podUID, contName, resource) == nil {
needsReAllocate = true
}
}
if needsReAllocate {
klog.V(2).Infof("needs re-allocate device plugin resources for pod %s, container %s", podUID, container.Name)
if err := m.Allocate(pod, container); err != nil {
return nil, err
}
}
m.mutex.Lock()
defer m.mutex.Unlock()
return m.podDevices.deviceRunContainerOptions(string(pod.UID), container.Name), nil
}

Device 的状态管理

device的状态管理涉及到以下3个部分:

  • node上的device状态管理当kubelet更新node status时会调用GetCapacity更新device plugins对应的Resource信息。

kubelet_node_status.go调用deviceManager的GetCapacity()获取device的状态,将device状态添加到node info并通过kube-apiserver存入etcd,GetCapacity()返回device server含有的所有device、已经分配给pod使用的device、pod不能使用的device即no-active的device kubelet_node_status.go根据返回的数据更新node info

  • kubelet deviceManager服务的device状态管理其实在device的注册、device分配中都有讲解,即使用checkpoint机制默认是将podDevices以 PodDevicesEntry的格式存入/var/lib/kubelet/device-plugins/kubelet_internal_checkpoint 文件
1
2
3
4
5
6
7
type PodDevicesEntry struct {
PodUID string
ContainerName string
ResourceName string
DeviceIDs []string
AllocResp []byte //包含启动container时使用的Envs、Mounts、Devices、PortMappings、Annotations等信息
}

只要device的状态发生了变化(如注册新device、device被分配、device的健康状态发生变化、device被删除),就要将podDevices存入kubelet_internal_checkpoint 文件。kubelet在启动或重启时,都需要读取kubelet_internal_checkpoint 文件里的数据,并以podDevices格式存入podDevices缓存。

  • DevicePlugin 上报device状态在device的注册部分已经讲解过,归纳为
    • deviceManager 注册完 DevicePlugin 后,会跟 DevicePlugin 建立长连接,持续获取 DevicePlugin 的ListAndWatch结果,持续更新device状态;
    • 当获取异常时,deviceManager断开连接,将device设置为不健康的状态;
    • DevicePlugin 默认会重启重新注册,重新上报device的状态

参考资料