0%

【Kubernetes】Kubelet

Kubelet 作为 k8s 集群中 Node 上的关键组件,在每个 Node 上以 Agent 进程的形式运行,负责管理 Pod 和其中容器的生命周期。Kubelet 主要功能是定时从某个地方获取节点上 pod/container 的期望状态,并调用容器平台接口达到这个状态。本文将作为 Kubelet 分析的开篇,介绍 Kubelet 的主要功能和实现原理。

功能分析

下图展示了 kubelet 内部组件结构,Kubelet 由许多内部组件构成

  • Kubelet API,包括 10250 端口的认证 API、4194 端口的 cAdvisor API、10255 端口的只读 API 以及 10248 端口的健康检查 API
  • syncLoop:从 API 或者 manifest 目录接收 Pod 更新,发送到 podWorkers 处理,大量使用 channel 处理来处理异步请求
  • 辅助的 manager,如 cAdvisor、PLEG、Volume Manager 等,处理 syncLoop 以外的其他工作
  • CRI:容器执行引擎接口,负责与 container runtime shim 通信,关于 CRI 的更多内容可以参考 CRI
  • 容器执行引擎,如 dockershim、rkt 等
  • 网络插件,目前支持 CNI 和 kubenet

pod 管理

Kubelet 基于 PodSpec 来工作,它需要确保这些 PodSpec 中描述的容器处于运行状态且运行状况良好,因此 Kubelet 不管理那些不是由 k8s 创建的容器。PodSpec 的来源有以下四种:

  • API Server:通过 API Server 监听 etcd 目录获取数据,这也是最主要的来源
  • File:利用命令行参数传递路径,kubelet 周期性地监视此路径下的文件是否有更新,监视周期默认为 20s
  • HTTP Endpoint:利用命令行参数指定 HTTP 端点。 此端点的监视周期默认为 20 秒
  • HTTP Server:kubelet 还可以侦听 HTTP 并响应简单的 API (目前没有完整规范)来提交新的清单

监听 API Server

Kubelet 通过 API Server Client 使用 List/Watch 的方式监听 /registry/nodes/<NodeName>/registry/pods 路径,将获取的信息同步到本地缓存中。Kubelet 监听 etcd,所有针对 Pod 的操作都将会被 Kubelet 监听到:

  • 如果发现有新的绑定到本节点的 Pod,则按照 PodSpec 的要求创建该 Pod。
  • 如果发现本地的 Pod 被修改,则 Kubelet 会做出相应的修改,比如删除 Pod 中某个容器时,则通过 Docker Client 删除该容器。
  • 如果发现删除本节点的 Pod,则删除相应的 Pod,并通过 Docker Client 删除 Pod 中的容器。

Kubelet 读取监听到的信息,如果是创建和修改 Pod 任务,则执行如下处理:

  • 为该 Pod 创建一个数据目录
  • 从 API Server 读取该 PodSpec
  • 为该 Pod 挂载外部卷
  • 下载 Pod 用到的 Secret
  • 检查已经在节点上运行的 Pod,如果该 Pod 没有容器或 Pause 容器没有启动,则先停止 Pod 里所有容器的进程。如果在 Pod 中有需要删除的容器,则删除这些容器
  • kubernetes/pause 镜像为每个 Pod 创建一个容器。Pause 容器用于接管 Pod 中所有其他容器的网络。每创建一个新的 Pod,Kubelet 都会先创建一个 Pause 容器,然后创建其他容器。
  • 为 Pod 中的每个容器做如下处理:
    1. 为容器计算一个 hash 值,然后用容器的名字去 Docker 查询对应容器的 hash 值。若查找到容器,且两者 hash 值不同,则停止 Docker 中容器的进程,并停止与之关联的 Pause 容器的进程;若两者相同,则不做任何处理;
    2. 如果容器被终止了,且容器没有指定的 restartPolicy,则不做任何处理;
    3. 调用 Docker Client 下载容器镜像,调用 Docker Client 运行容器。

Static Pod

所有以非 API Server 方式创建的 Pod 都叫 Static Pod。Kubelet 将 Static Pod 的状态汇报给 API Server,API Server 为该 Static Pod 创建一个 Mirror Pod 和其相匹配。Mirror Pod 的状态将真实反映 Static Pod 的状态。当 Static Pod 被删除时,与之相对应的 Mirror Pod 也会被删除。

容器健康检查

容器健康检查通过 LivenessProbeReadinessProbe 两类探针来判断容器是否健康。

  • LivenessProbe :用于判断容器是否健康,告诉 Kubelet 一个容器什么时候处于不健康的状态。如果 LivenessProbe 探针探测到容器不健康,则 Kubelet 将删除该容器,并根据容器的重启策略做相应的处理。如果一个容器不包含 LivenessProbe 探针,那么 Kubelet 认为该容器的 LivenessProbe 探针返回的值永远是 Success
  • ReadinessProbe:用于判断容器是否启动完成且准备接收请求。如果 ReadinessProbe 探针探测到失败,则 Pod 的状态将被修改。Endpoint Controller 将从 Service 的 Endpoint 中删除包含该容器所在 Pod 的 IP 地址的 Endpoint 条目。

Kubelet 定期调用容器中的 LivenessProbe 探针来诊断容器的健康状况。LivenessProbe 包含如下三种实现方式:

  • ExecAction:在容器内部执行一个命令,如果该命令的退出状态码为 0,则表明容器健康;
  • TCPSocketAction:通过容器的 IP 地址和端口号执行 TCP 检查,如果端口能被访问,则表明容器健康;
  • HTTPGetAction:通过容器的 IP 地址和端口号及路径调用 HTTP GET 方法,如果响应的状态码大于等于 200 且小于 400,则认为容器状态健康。

cAdvisor 容器监控

Kubernetes 集群中,应用程序的执行情况可以在不同的级别上监测到,这些级别包括:容器、Pod、Service 和整个集群。Kubelet 通过 cAdvisor 获取其所在节点及容器的数据。

cAdvisor 是一个开源的分析容器资源使用率和性能特性的代理工具,集成到 Kubelet中,当Kubelet启动时会同时启动cAdvisor,且一个cAdvisor只监控一个Node节点的信息。cAdvisor 自动查找所有在其所在节点上的容器,自动采集 CPU、内存、文件系统和网络使用的统计信息。cAdvisor 通过它所在节点机的 Root 容器,采集并分析该节点机的全面使用情况。

关于 cAdvisor 更多的内容,可以参考 cAdvisor

Kubelet Eviction

Kubelet 会监控资源的使用情况,并使用驱逐机制防止计算和存储资源耗尽。在驱逐时,Kubelet 将 Pod 的所有容器停止,并将 PodPhase 设置为 Failed。Kubelet 定期(housekeeping-interval)检查系统的资源是否达到了预先配置的驱逐阈值,包括

Eviction Signal Condition Description
memory.available MemoryPressue memory.available := node.status.capacity[memory] - node.stats.memory.workingSet (计算方法参考这里
nodefs.available DiskPressure nodefs.available := node.stats.fs.available(Kubelet Volume以及日志等)
nodefs.inodesFree DiskPressure nodefs.inodesFree := node.stats.fs.inodesFree
imagefs.available DiskPressure imagefs.available := node.stats.runtime.imagefs.available(镜像以及容器可写层等)
imagefs.inodesFree DiskPressure imagefs.inodesFree := node.stats.runtime.imagefs.inodesFree

这些驱逐阈值可以使用百分比,也可以使用绝对值,如

1
2
3
--eviction-hard=memory.available<500Mi,nodefs.available<1Gi,imagefs.available<100Gi
--eviction-minimum-reclaim="memory.available=0Mi,nodefs.available=500Mi,imagefs.available=2Gi"`
--system-reserved=memory=1.5Gi

这些驱逐信号可以分为软驱逐和硬驱逐

  • 软驱逐(Soft Eviction):配合驱逐宽限期(eviction-soft-grace-period和eviction-max-pod-grace-period)一起使用。系统资源达到软驱逐阈值并在超过宽限期之后才会执行驱逐动作。
  • 硬驱逐(Hard Eviction ):系统资源达到硬驱逐阈值时立即执行驱逐动作。

驱逐动作包括回收节点资源和驱逐用户 Pod 两种:

  • 回收节点资源
    • 配置了 imagefs 阈值时
      • 达到 nodefs 阈值:删除已停止的 Pod
      • 达到 imagefs 阈值:删除未使用的镜像
    • 未配置 imagefs 阈值时
      • 达到 nodefs阈值时,按照删除已停止的 Pod 和删除未使用镜像的顺序清理资源
  • 驱逐用户 Pod
    • 驱逐顺序为:BestEffort、Burstable、Guaranteed
    • 配置了 imagefs 阈值时
      • 达到 nodefs 阈值,基于 nodefs 用量驱逐(local volume + logs)
      • 达到 imagefs 阈值,基于 imagefs 用量驱逐(容器可写层)
    • 未配置 imagefs 阈值时
      • 达到 nodefs阈值时,按照总磁盘使用驱逐(local volume + logs + 容器可写层)

工作原理

Kubelet 的工作核心是 SyncLoop 这个控制循环,驱动整个控制循环的事件包括:

  • kubetypes.PodUpdate:pod更新事件
  • pleg.PodLifeEvent:pod生命周期变化
  • periodic sync events:kubelet本身设置的执行周期
  • housekeeping events:定时清理事件

SyncLoop

在SyncLoop循环上还有很多 Manager

  • statusManager: 负责维护状态信息,并把 pod 状态更新到 apiserver,但是它并不负责监控 pod 状态的变化,而是提供对应的接口供其他组件调用,比如 probeManager
  • PLEG(Pod Lifecycle Event Generator) PLEG 是 kubelet 的核心模块,PLEG 会一直调用 container runtime 获取本节点 containers/sandboxes 的信息,并与自身维护的 pods cache 信息进行对比,生成对应的 PodLifecycleEvent,然后输出到 eventChannel 中,通过 eventChannel 发送到 kubelet syncLoop 进行消费,然后由 kubelet syncPod 来触发 pod 同步处理过程,最终达到用户的期望状态
  • imageManager: 调用 kubecontainer 提供的PullImage/GetImageRef/ListImages/RemoveImage/ImageStates 方法来保证pod 运行所需要的镜像。
  • volumeManager: 负责 node 节点上 pod 所使用 volume 的管理,volume 与 pod 的生命周期关联,负责 pod 创建删除过程中 volume 的 mount/umount/attach/detach 流程,kubernetes 采用 volume Plugins 的方式,实现存储卷的挂载等操作,内置几十种存储插件。
  • containerManager: 负责 node 节点上运行的容器的 cgroup 配置信息,kubelet 启动参数如果指定 --cgroups-per-qos 的时候,kubelet 会启动 goroutine 来周期性的更新 pod 的 cgroup 信息,维护其正确性,该参数默认为 true,实现了 pod 的Guaranteed/BestEffort/Burstable 三种级别的 Qos。
  • runtimeManager: containerRuntime 负责 kubelet 与不同的 runtime 实现进行对接,实现对于底层 container 的操作,初始化之后得到的 runtime 实例将会被之前描述的组件所使用
  • podManager:提供了接口来存储和访问 pod 的信息,维持 static pod 和 mirror pods 的关系,podManager 会被statusManager/volumeManager/runtimeManager 所调用,podManager 的接口处理流程里面会调用 secretManager 以及 configMapManager
  • probeManager :依赖于 statusManagerlivenessManagercontainerRefManager,会定时去监控 pod 中容器的健康状况
  • evictionManager: 当节点的内存、磁盘或 inode 等资源不足时,达到了配置的 evict 策略, node 会变为 pressure 状态,此时 kubelet 会按照 qosClass 顺序来驱赶 pod,以此来保证节点的稳定性。
  • imageGC: 负责 node 节点的镜像回收,当本地的存放镜像的本地磁盘空间达到某阈值的时候,会触发镜像的回收,删除掉不被 pod 所使用的镜像
  • containerGC:负责清理 node 节点上已消亡的 container,具体的 GC 操作由runtime 来实现
  • cAdvisor 是 google 开发的容器监控工具,集成在 kubelet 中,起到收集本节点和容器的监控信息,cAvisor 模块对外提供了 interface 接口,该接口也被 imageManager,OOMWatcher,containerManager 等所使用。
  • OOMWatcher 系统 OOM 的监听器,会与 cadvisor 模块之间建立 SystemOOM,通过 Watch方式从 cadvisor 那里收到的 OOM 信号,并产生相关事件。
  • containerRefManager 容器引用的管理,相对简单的Manager,用来报告容器的创建,失败等事件,通过定义 map 来实现了 containerID 与 v1.ObjectReferece 容器引用的映射

源码分析

kubernetes\pkg\kubelet\kubelet.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
(kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
//注册 logServer
if kl.logServer == nil {
kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
}
if kl.kubeClient == nil {
klog.Warning("No api server defined - no node status update will be sent.")
}

if kl.cloudResourceSyncManager != nil {
go kl.cloudResourceSyncManager.Run(wait.NeverStop)
}
//调用 kl.initializeModules 首先启动不依赖 container runtime 的一些模块
if err := kl.initializeModules(); err != nil {
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
klog.Fatal(err)
}

//启动 volume manager
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)

if kl.kubeClient != nil {
//执行 kl.syncNodeStatus 定时同步 Node 状态
go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
//调用 kl.fastStatusUpdateOnce 更新容器运行时启动时间以及执行首次状态同步
go kl.fastStatusUpdateOnce()

// start syncing lease
//NodeLease 机制
go kl.nodeLeaseController.Run(wait.NeverStop)
}
//执行 kl.updateRuntimeUp 定时更新 Runtime 状态
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)

// Set up iptables util rules
//执行 kl.syncNetworkUtil 定时同步 iptables 规则
if kl.makeIPTablesUtilChains {
kl.initNetworkUtil()
}

//获取 pk.podKillingCh异常pod, 并定时清理异常 pod
go wait.Until(kl.podKiller.PerformPodKillingWork, 1*time.Second, wait.NeverStop)

// Start component sync loops.
//启动 statusManager、probeManager、runtimeClassManager
kl.statusManager.Start()
kl.probeManager.Start()

// Start syncing RuntimeClasses if enabled.
if kl.runtimeClassManager != nil {
kl.runtimeClassManager.Start(wait.NeverStop)
}

// Start the pod lifecycle event generator.
//启动 pleg 该模块主要用于周期性地向 container runtime 刷新当前所有容器的状态
//https://github.com/kubernetes/community/blob/master/contributors/design-proposals/node/pod-lifecycle-event-generator.md
kl.pleg.Start()
kl.syncLoop(updates, kl)
}

这个方法会做以下事情:

  1. 注册logServer;
  2. 如果设置了Cloud Provider,那么会启动云资源管理器,具体的可以查看文章:cloud-provider
  3. 调用kl.initializeModules启动不依赖 container runtime 的一些模块,这个方法我们下面再分析;
  4. 启动 volume manager;
  5. 执行 kl.syncNodeStatus 定时同步 Node 状态;
  6. 调用kl.fastStatusUpdateOnce启动一个循环更新pod CIDR、runtime状态以及node状态;
  7. 调用 kl.nodeLeaseController.Run 启动NodeLease机制,NodeLease机制是一种上报心跳的方式,可以通过更加轻量化节约资源的方式,并提升性能上报node的心跳信息,具体看: Lease object
  8. 执行 kl.updateRuntimeUp 定时更新 Runtime 状态;
  9. 执行kl.syncNetworkUtil 定时同步 iptables 规则;
  10. 获取 pk.podKillingCh 异常pod, 并定时清理异常 pod;
  11. 然后启动 statusManager、probeManager、runtimeClassManager;
  12. 启动 pleg模块,该模块主要用于周期性地向 container runtime 上报当前所有容器的状态,具体可以看:Pod Lifecycle Event Generator (PLEG)
  13. 调用 kl.syncLoop 启动kublet事件循环;

initializeModules

下面我们看看initializeModules方法做了些什么。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
(kl *Kubelet) initializeModules() error {
...
//创建文件目录
if err := kl.setupDataDirs(); err != nil {
return err
}

//创建 ContainerLogsDir
if _, err := os.Stat(ContainerLogsDir); err != nil {
if err := kl.os.MkdirAll(ContainerLogsDir, 0755); err != nil {
return fmt.Errorf("failed to create directory %q: %v", ContainerLogsDir, err)
}
}

//启动 imageManager
kl.imageManager.Start()

//启动 certificate manager ,证书相关
if kl.serverCertificateManager != nil {
kl.serverCertificateManager.Start()
}

//启动 oomWatcher.
if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {
return fmt.Errorf("failed to start OOM watcher %v", err)
}

//启动 resource analyzer,刷新volume stats到缓存中
kl.resourceAnalyzer.Start()

return nil
}

initializeModules方法主要做了以下几件事:

  1. 创建创建文件目录、Container的log目录;
  2. 启动 imageManager,这个管理器实际上是realImageGCManager,我们待会看;
  3. 启动 certificate manager ,证书相关;
  4. 启动 oomWatcher监视器;
  5. 启动 resource analyzer,定时刷新volume stats到缓存中;

realImageGCManager#Start

文件路径:pkg/kubelet/images/image_gc_manager.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
(im *realImageGCManager) Start() {
go wait.Until(func() {
var ts time.Time
if im.initialized {
ts = time.Now()
}
//找出所有的image,并删除不再使用的image
_, err := im.detectImages(ts)
if err != nil {
klog.Warningf("[imageGCManager] Failed to monitor images: %v", err)
} else {
im.initialized = true
}
}, 5*time.Minute, wait.NeverStop)

//更新image的缓存
go wait.Until(func() {
//调用容器接口,获取最新的image
images, err := im.runtime.ListImages()
if err != nil {
klog.Warningf("[imageGCManager] Failed to update image list: %v", err)
} else {
im.imageCache.set(images)
}
}, 30*time.Second, wait.NeverStop)

}

realImageGCManagerstart 方法会启动两个协程,然后分别定时调用 detectImages 方法与 imageCacheset 方法。detectImages 方法里面主要就是调用 ImageServiceRuntimeService 的方法找出所有正在使用的image,然后删除不再使用的image。

这里 ListImagesdetectImages 里面用到的GetPods方法都是调用了CRI的方法,

fastStatusUpdateOnce

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
(kl *Kubelet) fastStatusUpdateOnce() {
for {
time.Sleep(100 * time.Millisecond)
node, err := kl.GetNode()
if err != nil {
klog.Errorf(err.Error())
continue
}
if len(node.Spec.PodCIDRs) != 0 {
podCIDRs := strings.Join(node.Spec.PodCIDRs, ",")
if _, err := kl.updatePodCIDR(podCIDRs); err != nil {
klog.Errorf("Pod CIDR update to %v failed %v", podCIDRs, err)
continue
}
//更新 Runtime 状态
kl.updateRuntimeUp()
//更新 节点 状态
kl.syncNodeStatus()
return
}
}
}

FastStatusUpdateOnce 函数启动一个循环,尝试立即更新POD CIDR。更新pod CIDR后,它会触发运行时更新和节点状态更新。函数在一次成功的节点状态更新后直接返回。该功能仅在 kubelet 启动期间执行,通过尽快更新 pod cidr、运行时状态和节点状态来提高准备就绪节点的延迟。

updateRuntimeUp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//首次执行的时候会初始化runtime依赖模块,然后更新runtimeState
func (kl *Kubelet) updateRuntimeUp() {
kl.updateRuntimeMux.Lock()
defer kl.updateRuntimeMux.Unlock()
//获取 containerRuntime Status
s, err := kl.containerRuntime.Status()
if err != nil {
klog.Errorf("Container runtime sanity check failed: %v", err)
return
}
if s == nil {
klog.Errorf("Container runtime status is nil")
return
}
klog.V(4).Infof("Container runtime status: %v", s)
//检查 network 和 runtime 是否处于 ready 状态
networkReady := s.GetRuntimeCondition(kubecontainer.NetworkReady)
if networkReady == nil || !networkReady.Status {
klog.Errorf("Container runtime network not ready: %v", networkReady)
kl.runtimeState.setNetworkState(fmt.Errorf("runtime network not ready: %v", networkReady))
} else {
// Set nil if the container runtime network is ready.
kl.runtimeState.setNetworkState(nil)
}
// information in RuntimeReady condition will be propagated to NodeReady condition.
//获取运行时状态
runtimeReady := s.GetRuntimeCondition(kubecontainer.RuntimeReady)
// If RuntimeReady is not set or is false, report an error.
if runtimeReady == nil || !runtimeReady.Status {
err := fmt.Errorf("Container runtime not ready: %v", runtimeReady)
klog.Error(err)
kl.runtimeState.setRuntimeState(err)
return
}
kl.runtimeState.setRuntimeState(nil)
//调用 kl.initializeRuntimeDependentModules 启动依赖模块
kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules)
kl.runtimeState.setRuntimeSync(kl.clock.Now())
}

updateRuntimeUp会获取container运行状态信息,然后根据返回RuntimeStatus检查网络、runtime是不是已经处于ready状态;接着调用kl.initializeRuntimeDependentModules初始化依赖模块,这里会启动cadvisor、containerManager、evictionManager、containerLogManager、pluginManager;最后设置Runtime同步时间。

最后看看syncLoop方法

syncLoop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
(kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
...
syncTicker := time.NewTicker(time.Second)
defer syncTicker.Stop()
housekeepingTicker := time.NewTicker(housekeepingPeriod)
defer housekeepingTicker.Stop()
plegCh := kl.pleg.Watch()
for {
...
kl.syncLoopMonitor.Store(kl.clock.Now())
if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
break
}
kl.syncLoopMonitor.Store(kl.clock.Now())
}
}

syncLoop方法在一个循环中不断的调用 syncLoopIteration 方法执行主要逻辑。

syncLoopIteration

syncLoopIteration方法比较长,拆开来看。

syncCh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
(kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
//方法会监听多个 channel,当发现任何一个 channel 有数据就交给 handler 去处理,在 handler 中通过调用 dispatchWork 分发任务
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
//该模块将同时 watch 3 个不同来源的 pod 信息的变化(file,http,apiserver),
//一旦某个来源的 pod 信息发生了更新(创建/更新/删除),这个 channel 中就会出现被更新的 pod 信息和更新的具体操作;
case u, open := <-configCh:
if !open {
klog.Errorf("Update channel is closed. Exiting the sync loop.")
return false
}

switch u.Op {
case kubetypes.ADD:
klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
klog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletionTimestamps(u.Pods))
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
klog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
klog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))
handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
klog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))
handler.HandlePodUpdates(u.Pods)
case kubetypes.SET:
klog.Errorf("Kubelet does not support snapshot update")
default:
klog.Errorf("Invalid event type received: %d.", u.Op)
}

kl.sourcesReady.AddSource(u.Source)

...
}

configCh 读取配置事件的管道,该模块将同时 watch 3 个不同来源的 pod 信息的变化(file,http,apiserver),一旦某个来源的 pod 信息发生了更新(创建/更新/删除),这个 channel 中就会出现被更新的 pod 信息和更新的具体操作。这里对于pod的操作我们下一篇再讲。

plegCh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
(kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
//方法会监听多个 channel,当发现任何一个 channel 有数据就交给 handler 去处理,在 handler 中通过调用 dispatchWork 分发任务
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
...
case e := <-plegCh:
if e.Type == pleg.ContainerStarted {
kl.lastContainerStartedTime.Add(e.ID, time.Now())
}
if isSyncPodWorthy(e) {
if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
klog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e)
handler.HandlePodSyncs([]*v1.Pod{pod})
} else {
klog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e)
}
}
if e.Type == pleg.ContainerDied {
if containerID, ok := e.Data.(string); ok {
kl.cleanUpContainersInPod(e.ID, containerID)
}
}
...
}

PLEG.Start 的时候会每秒钟启动调用一次relist,根据最新的 PodStatus 生成PodLiftCycleEvent,然后存入到 PLEG Channel 中。

syncLoop 会调用 pleg.Watch 方法获取 PLEG Channel 管道,然后传给syncLoopIteration方法,在syncLoopIteration方法中也就是plegCh这个管道,syncLoopIteration会消费plegCh中的数据,在 handler 中通过调用 dispatchWork 分发任务。

syncCh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
(kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
...
// 每秒钟会执行到一次
case <-syncCh:
// Sync pods waiting for sync
podsToSync := kl.getPodsToSync()
if len(podsToSync) == 0 {
break
}
klog.V(4).Infof("SyncLoop (SYNC): %d pods; %s", len(podsToSync), format.Pods(podsToSync))
//同步最新保存的 pod 状态
handler.HandlePodSyncs(podsToSync)
...
}

syncCh是由syncLoop方法里面创建的一个定时任务,每秒钟会向syncCh添加一个数据,然后就会执行到这里。这个方法会同步所有等待同步的pod。

livenessManager.Updates

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
(kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
...
case update := <-kl.livenessManager.Updates():
//如果探针检测失败,需要更新pod的状态
if update.Result == proberesults.Failure {
pod, ok := kl.podManager.GetPodByUID(update.PodUID)
if !ok {
klog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update)
break
}
klog.V(1).Infof("SyncLoop (container unhealthy): %q", format.Pod(pod))
handler.HandlePodSyncs([]*v1.Pod{pod})
}
...
}

对失败的pod或者liveness检查失败的pod进行sync操作。

housekeepingCh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
(kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
...
// 每两秒钟执行一次
case <-housekeepingCh:
if !kl.sourcesReady.AllReady() {
klog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.")
} else {
klog.V(4).Infof("SyncLoop (housekeeping)")
//执行一些清理工作,包括终止pod workers、删除不想要的pod,移除volumes、pod目录
if err := handler.HandlePodCleanups(); err != nil {
klog.Errorf("Failed cleaning pods: %v", err)
}
}
...
}

housekeepingCh 这个管道也是由 syncLoop 创建,每两秒钟会触发清理。

总结

kubelet.Run 部分主要执行kubelet包含的各种manager的运行,大部分会以一部线程的方式定时运行。接下来看了syncLoop主函数,这个函数主要对pod的生命周期进行管理,包括对pod进行add 、update、remove、delete等操作,这些具体的代码执行过程留到下一篇,pod的初始化时再讲,syncLoop还需要更新根据不同的channel触发不同的操作,如更新runtime缓存、同步pod、触发清理pod、liveness检查失败的pod进行sync操作等。

参考资料