syncPod

kubelet源码分析(五)之 syncPod

以下代码分析基于 kubernetes v1.12.0 版本。

本文主要分析kubeletsyncPod的部分。

managePodLoop通过读取podUpdateschannel的信息,执行syncPodFn函数,而syncPodFn函数在newPodWorkers的时候赋值了,即kubelet.syncPod

managePodLoop完整代码如下:

此部分代码位于pkg/kubelet/pod_workers.go

func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
	var lastSyncTime time.Time
	for update := range podUpdates {
		err := func() error {
			podUID := update.Pod.UID
			// This is a blocking call that would return only if the cache
			// has an entry for the pod that is newer than minRuntimeCache
			// Time. This ensures the worker doesn't start syncing until
			// after the cache is at least newer than the finished time of
			// the previous sync.
			status, err := p.podCache.GetNewerThan(podUID, lastSyncTime)
			if err != nil {
				// This is the legacy event thrown by manage pod loop
				// all other events are now dispatched from syncPodFn
				p.recorder.Eventf(update.Pod, v1.EventTypeWarning, events.FailedSync, "error determining status: %v", err)
				return err
			}
      // 该部分的syncPodFn实际上的实现函数是kubelet.syncPod
			err = p.syncPodFn(syncPodOptions{
				mirrorPod:      update.MirrorPod,
				pod:            update.Pod,
				podStatus:      status,
				killPodOptions: update.KillPodOptions,
				updateType:     update.UpdateType,
			})
			lastSyncTime = time.Now()
			return err
		}()
		// notify the call-back function if the operation succeeded or not
		if update.OnCompleteFunc != nil {
			update.OnCompleteFunc(err)
		}
		if err != nil {
			// IMPORTANT: we do not log errors here, the syncPodFn is responsible for logging errors
			glog.Errorf("Error syncing pod %s (%q), skipping: %v", update.Pod.UID, format.Pod(update.Pod), err)
		}
		p.wrapUp(update.Pod.UID, err)
	}
}

以下分析syncPod相关逻辑。

syncPod可以理解为是一个单个pod进行同步任务的事务脚本。其中入参是syncPodOptionssyncPodOptions记录了需要同步的pod的相关信息。具体定义如下:

syncPod主要执行以下的工作流:

  • 如果是正在创建的pod,则记录pod worker的启动latency

  • 调用generateAPIPodStatus为pod提供v1.PodStatus信息。

  • 如果pod是第一次运行,记录pod的启动latency

  • 更新status manager中的pod状态。

  • 如果pod不应该被运行则杀死pod。

  • 如果pod是一个static pod,并且没有对应的mirror pod,则创建一个mirror pod

  • 如果没有pod的数据目录则给pod创建对应的数据目录。

  • 等待volume被attach或mount。

  • 获取pod的secret数据。

  • 调用container runtimeSyncPod函数,执行相关pod操作。

  • 更新pod的ingressegresstraffic limit

当以上任务流中有任何的error,则return error。在下一次执行syncPod的任务流会被再次执行。对于错误信息会被记录到event中,方便debug。

以下对syncPod的执行过程进行分析。

syncPod的代码位于pkg/kubelet/kubelet.go

2.1. SyncPodKill

首先,获取syncPodOptions的pod信息。

如果pod是需要被杀死的,则执行killPod,会在指定的宽限期内杀死pod。

2.2. SyncPodCreate

如果pod是需要被创建的,则记录pod的启动latencylatency与pod在apiserver中第一次被记录相关。

通过pod和pod status生成最终的api pod status并设置pod的IP。

记录pod到running状态的时间。

如果pod是不可运行的,则更新pod和container的状态和相应的原因。

并更新status manager中的状态信息,杀死不可运行的pod。

如果网络插件还没到Ready状态,则只有在使用host网络模式的情况下才启动pod。

2.3. Cgroups

给pod创建Cgroups,如果cgroups-per-qos参数开启,则申请相应的资源。对于terminated的pod不需要创建或更新pod的Cgroups

当重新启动kubelet并且启用cgroups-per-qos时,应该间歇性地终止所有pod的运行容器并在qos cgroup hierarchy下重新启动。

如果pod的cgroup已经存在或者pod第一次运行,不杀死pod中容器。

如果pod被杀死并且重启策略是Never,则不创建或更新对应的Cgroups,否则创建和更新pod的Cgroups

其中创建Cgroups是通过containerManagerUpdateQOSCgroups来执行。

2.4. Mirror Pod

如果pod是一个static pod,没有对应的mirror pod,则创建一个mirror pod;如果存在mirror pod则删除再重建一个mirror pod

2.5. makePodDataDirs

给pod创建数据目录。

其中数据目录包括

  • PodDir:{kubelet.rootDirectory}/pods/podUID

  • PodVolumesDir:{PodDir}/volumes

  • PodPluginsDir:{PodDir}/plugins

2.6. mount volumes

对非terminated状态的pod挂载volume

2.7. PullSecretsForPod

获取pod的secret数据。

getPullSecretsForPod具体实现函数如下:

2.8. containerRuntime.SyncPod

调用container runtimeSyncPod函数,执行相关pod操作,由此kubelet.syncPod的操作逻辑转入containerRuntime.SyncPod函数中。

SyncPod主要执行sync操作使得运行的pod达到期望状态的pod。主要执行以下操作:

  • 计算sandboxcontainer的变化。

  • 必要的时候杀死pod。

  • 杀死所有不需要运行的container

  • 必要时创建sandbox

  • 创建init container

  • 创建正常的container

Runtime.SyncPod部分代码位于pkg/kubelet/kuberuntime/kuberuntime_manager.go

3.1. computePodActions

计算sandboxcontainer的变化。

3.2. killPodWithSyncResult

必要的时候杀死pod。

3.3. killContainer

杀死所有不需要运行的container

3.4. createPodSandbox

必要时创建sandbox

3.5. start init container

创建init container

3.6. start containers

创建正常的container

startContainer启动一个容器并返回是否成功。

主要包括以下几个步骤:

  1. 拉取镜像

  2. 创建容器

  3. 启动容器

  4. 运行post start lifecycle hooks(如果有设置此项)

startContainer完整代码如下:

startContainer部分代码位于pkg/kubelet/kuberuntime/kuberuntime_container.go

以下对startContainer分段分析:

4.1. pull image

通过EnsureImageExists方法拉取拉取指定pod容器的镜像,并返回镜像信息和错误。

4.2. CreateContainer

首先生成container的*v1.ObjectReference对象,该对象包括container的相关信息。

统计container的重启次数,新的容器默认重启次数为0。

生成container的配置。

调用runtimeService,执行CreateContainer的操作。

4.3. StartContainer

执行runtimeServiceStartContainer方法,来启动容器。

4.4. execute post start hook

如果有指定Lifecycle.PostStart,则执行PostStart操作,PostStart如果执行失败,则容器会根据重启的规则进行重启。

5. 总结

kubelet的工作是管理pod在Node上的生命周期(包括增删改查),kubelet通过各种类型的manager异步工作各自执行各自的任务,其中使用到了多种的channel来控制状态信号变化的传递,例如比较重要的channel有podUpdates <-chan UpdatePodOptions,来传递pod的变化情况。

创建pod的调用逻辑

syncLoopIteration-->kubetypes.ADD-->HandlePodAdditions(u.Pods)-->dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)-->podWorkers.UpdatePod-->managePodLoop(podUpdates)-->syncPod(o syncPodOptions)-->containerRuntime.SyncPod-->startContainer

参考:

最后更新于

这有帮助吗?