syncPod
kubelet源码分析(五)之 syncPod
以下代码分析基于
kubernetes v1.12.0版本。
本文主要分析kubelet中syncPod的部分。
managePodLoop通过读取podUpdateschannel的信息,执行syncPodFn函数,而syncPodFn函数在newPodWorkers的时候赋值了,即kubelet.syncPod。
managePodLoop完整代码如下:
此部分代码位于pkg/kubelet/pod_workers.go
func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
var lastSyncTime time.Time
for update := range podUpdates {
err := func() error {
podUID := update.Pod.UID
// This is a blocking call that would return only if the cache
// has an entry for the pod that is newer than minRuntimeCache
// Time. This ensures the worker doesn't start syncing until
// after the cache is at least newer than the finished time of
// the previous sync.
status, err := p.podCache.GetNewerThan(podUID, lastSyncTime)
if err != nil {
// This is the legacy event thrown by manage pod loop
// all other events are now dispatched from syncPodFn
p.recorder.Eventf(update.Pod, v1.EventTypeWarning, events.FailedSync, "error determining status: %v", err)
return err
}
// 该部分的syncPodFn实际上的实现函数是kubelet.syncPod
err = p.syncPodFn(syncPodOptions{
mirrorPod: update.MirrorPod,
pod: update.Pod,
podStatus: status,
killPodOptions: update.KillPodOptions,
updateType: update.UpdateType,
})
lastSyncTime = time.Now()
return err
}()
// notify the call-back function if the operation succeeded or not
if update.OnCompleteFunc != nil {
update.OnCompleteFunc(err)
}
if err != nil {
// IMPORTANT: we do not log errors here, the syncPodFn is responsible for logging errors
glog.Errorf("Error syncing pod %s (%q), skipping: %v", update.Pod.UID, format.Pod(update.Pod), err)
}
p.wrapUp(update.Pod.UID, err)
}
}以下分析syncPod相关逻辑。
2. syncPod
syncPod可以理解为是一个单个pod进行同步任务的事务脚本。其中入参是syncPodOptions,syncPodOptions记录了需要同步的pod的相关信息。具体定义如下:
syncPod主要执行以下的工作流:
如果是正在创建的pod,则记录pod worker的启动
latency。调用
generateAPIPodStatus为pod提供v1.PodStatus信息。如果pod是第一次运行,记录pod的启动
latency。更新
status manager中的pod状态。如果pod不应该被运行则杀死pod。
如果pod是一个
static pod,并且没有对应的mirror pod,则创建一个mirror pod。如果没有pod的数据目录则给pod创建对应的数据目录。
等待volume被attach或mount。
获取pod的secret数据。
调用
container runtime的SyncPod函数,执行相关pod操作。更新pod的
ingress和egress的traffic limit。
当以上任务流中有任何的error,则return error。在下一次执行syncPod的任务流会被再次执行。对于错误信息会被记录到event中,方便debug。
以下对syncPod的执行过程进行分析。
syncPod的代码位于pkg/kubelet/kubelet.go
2.1. SyncPodKill
首先,获取syncPodOptions的pod信息。
如果pod是需要被杀死的,则执行killPod,会在指定的宽限期内杀死pod。
2.2. SyncPodCreate
如果pod是需要被创建的,则记录pod的启动latency,latency与pod在apiserver中第一次被记录相关。
通过pod和pod status生成最终的api pod status并设置pod的IP。
记录pod到running状态的时间。
如果pod是不可运行的,则更新pod和container的状态和相应的原因。
并更新status manager中的状态信息,杀死不可运行的pod。
如果网络插件还没到Ready状态,则只有在使用host网络模式的情况下才启动pod。
2.3. Cgroups
给pod创建Cgroups,如果cgroups-per-qos参数开启,则申请相应的资源。对于terminated的pod不需要创建或更新pod的Cgroups。
当重新启动kubelet并且启用cgroups-per-qos时,应该间歇性地终止所有pod的运行容器并在qos cgroup hierarchy下重新启动。
如果pod的cgroup已经存在或者pod第一次运行,不杀死pod中容器。
如果pod被杀死并且重启策略是Never,则不创建或更新对应的Cgroups,否则创建和更新pod的Cgroups。
其中创建Cgroups是通过containerManager的UpdateQOSCgroups来执行。
2.4. Mirror Pod
如果pod是一个static pod,没有对应的mirror pod,则创建一个mirror pod;如果存在mirror pod则删除再重建一个mirror pod。
2.5. makePodDataDirs
给pod创建数据目录。
其中数据目录包括
PodDir:{kubelet.rootDirectory}/pods/podUIDPodVolumesDir:{PodDir}/volumesPodPluginsDir:{PodDir}/plugins
2.6. mount volumes
对非terminated状态的pod挂载volume。
2.7. PullSecretsForPod
获取pod的secret数据。
getPullSecretsForPod具体实现函数如下:
2.8. containerRuntime.SyncPod
调用container runtime的SyncPod函数,执行相关pod操作,由此kubelet.syncPod的操作逻辑转入containerRuntime.SyncPod函数中。
SyncPod主要执行sync操作使得运行的pod达到期望状态的pod。主要执行以下操作:
计算
sandbox和container的变化。必要的时候杀死pod。
杀死所有不需要运行的
container。必要时创建
sandbox。创建
init container。创建正常的
container。
Runtime.SyncPod部分代码位于pkg/kubelet/kuberuntime/kuberuntime_manager.go
3.1. computePodActions
计算sandbox和container的变化。
3.2. killPodWithSyncResult
必要的时候杀死pod。
3.3. killContainer
杀死所有不需要运行的container。
3.4. createPodSandbox
必要时创建sandbox。
3.5. start init container
创建init container。
3.6. start containers
创建正常的container。
startContainer启动一个容器并返回是否成功。
主要包括以下几个步骤:
拉取镜像
创建容器
启动容器
运行post start lifecycle hooks(如果有设置此项)
startContainer完整代码如下:
startContainer部分代码位于pkg/kubelet/kuberuntime/kuberuntime_container.go
以下对startContainer分段分析:
4.1. pull image
通过EnsureImageExists方法拉取拉取指定pod容器的镜像,并返回镜像信息和错误。
4.2. CreateContainer
首先生成container的*v1.ObjectReference对象,该对象包括container的相关信息。
统计container的重启次数,新的容器默认重启次数为0。
生成container的配置。
调用runtimeService,执行CreateContainer的操作。
4.3. StartContainer
执行runtimeService的StartContainer方法,来启动容器。
4.4. execute post start hook
如果有指定Lifecycle.PostStart,则执行PostStart操作,PostStart如果执行失败,则容器会根据重启的规则进行重启。
5. 总结
kubelet的工作是管理pod在Node上的生命周期(包括增删改查),kubelet通过各种类型的manager异步工作各自执行各自的任务,其中使用到了多种的channel来控制状态信号变化的传递,例如比较重要的channel有podUpdates <-chan UpdatePodOptions,来传递pod的变化情况。
创建pod的调用逻辑
syncLoopIteration-->kubetypes.ADD-->HandlePodAdditions(u.Pods)-->dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)-->podWorkers.UpdatePod-->managePodLoop(podUpdates)-->syncPod(o syncPodOptions)-->containerRuntime.SyncPod-->startContainer
参考:
https://github.com/kubernetes/kubernetes/blob/v1.12.0/pkg/kubelet/kubelet.go
最后更新于
这有帮助吗?