# syncPod

## kubelet源码分析（五）之 syncPod

> 以下代码分析基于 `kubernetes v1.12.0` 版本。

本文主要分析`kubelet`中`syncPod`的部分。

## 1. [managePodLoop](https://github.com/kubernetes/kubernetes/blob/v1.12.0/pkg/kubelet/pod_workers.go#L153)

`managePodLoop`通过读取`podUpdates`channel的信息，执行`syncPodFn`函数，而`syncPodFn`函数在`newPodWorkers`的时候赋值了，即`kubelet.syncPod`。

managePodLoop完整代码如下：

> 此部分代码位于pkg/kubelet/pod\_workers.go

```go
func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
	var lastSyncTime time.Time
	for update := range podUpdates {
		err := func() error {
			podUID := update.Pod.UID
			// This is a blocking call that would return only if the cache
			// has an entry for the pod that is newer than minRuntimeCache
			// Time. This ensures the worker doesn't start syncing until
			// after the cache is at least newer than the finished time of
			// the previous sync.
			status, err := p.podCache.GetNewerThan(podUID, lastSyncTime)
			if err != nil {
				// This is the legacy event thrown by manage pod loop
				// all other events are now dispatched from syncPodFn
				p.recorder.Eventf(update.Pod, v1.EventTypeWarning, events.FailedSync, "error determining status: %v", err)
				return err
			}
      // 该部分的syncPodFn实际上的实现函数是kubelet.syncPod
			err = p.syncPodFn(syncPodOptions{
				mirrorPod:      update.MirrorPod,
				pod:            update.Pod,
				podStatus:      status,
				killPodOptions: update.KillPodOptions,
				updateType:     update.UpdateType,
			})
			lastSyncTime = time.Now()
			return err
		}()
		// notify the call-back function if the operation succeeded or not
		if update.OnCompleteFunc != nil {
			update.OnCompleteFunc(err)
		}
		if err != nil {
			// IMPORTANT: we do not log errors here, the syncPodFn is responsible for logging errors
			glog.Errorf("Error syncing pod %s (%q), skipping: %v", update.Pod.UID, format.Pod(update.Pod), err)
		}
		p.wrapUp(update.Pod.UID, err)
	}
}
```

以下分析`syncPod`相关逻辑。

## 2. [syncPod](https://github.com/kubernetes/kubernetes/blob/v1.12.0/pkg/kubelet/kubelet.go#L1465)

`syncPod`可以理解为是一个单个pod进行同步任务的事务脚本。其中入参是`syncPodOptions`，`syncPodOptions`记录了需要同步的pod的相关信息。具体定义如下：

```go
// syncPodOptions provides the arguments to a SyncPod operation.
type syncPodOptions struct {
	// the mirror pod for the pod to sync, if it is a static pod
	mirrorPod *v1.Pod
	// pod to sync
	pod *v1.Pod
	// the type of update (create, update, sync)
	updateType kubetypes.SyncPodType
	// the current status
	podStatus *kubecontainer.PodStatus
	// if update type is kill, use the specified options to kill the pod.
	killPodOptions *KillPodOptions
}
```

`syncPod`主要执行以下的工作流：

* 如果是正在创建的pod，则记录pod worker的启动`latency`。
* 调用`generateAPIPodStatus`为pod提供`v1.PodStatus`信息。
* 如果pod是第一次运行，记录pod的启动`latency`。
* 更新`status manager`中的pod状态。
* 如果pod不应该被运行则杀死pod。
* 如果pod是一个`static pod`，并且没有对应的`mirror pod`，则创建一个`mirror pod`。
* 如果没有pod的数据目录则给pod创建对应的数据目录。
* 等待volume被attach或mount。
* 获取pod的secret数据。
* 调用`container runtime`的`SyncPod`函数，执行相关pod操作。
* 更新pod的`ingress`和`egress`的`traffic limit`。

当以上任务流中有任何的error，则return error。在下一次执行`syncPod`的任务流会被再次执行。对于错误信息会被记录到event中，方便debug。

以下对`syncPod`的执行过程进行分析。

> syncPod的代码位于pkg/kubelet/kubelet.go

### 2.1. SyncPodKill

首先，获取`syncPodOptions`的pod信息。

```go
func (kl *Kubelet) syncPod(o syncPodOptions) error {
	// pull out the required options
	pod := o.pod
	mirrorPod := o.mirrorPod
	podStatus := o.podStatus
	updateType := o.updateType
    ...
}    
```

如果pod是需要被杀死的，则执行`killPod`，会在指定的宽限期内杀死pod。

```go
// if we want to kill a pod, do it now!
if updateType == kubetypes.SyncPodKill {
	killPodOptions := o.killPodOptions
	if killPodOptions == nil || killPodOptions.PodStatusFunc == nil {
		return fmt.Errorf("kill pod options are required if update type is kill")
	}
	apiPodStatus := killPodOptions.PodStatusFunc(pod, podStatus)
	kl.statusManager.SetPodStatus(pod, apiPodStatus)
	// we kill the pod with the specified grace period since this is a termination
	if err := kl.killPod(pod, nil, podStatus, killPodOptions.PodTerminationGracePeriodSecondsOverride); err != nil {
		kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
		// there was an error killing the pod, so we return that error directly
		utilruntime.HandleError(err)
		return err
	}
	return nil
}
```

### 2.2. SyncPodCreate

如果pod是需要被创建的，则记录pod的启动`latency`，`latency`与pod在apiserver中第一次被记录相关。

```go
// Latency measurements for the main workflow are relative to the
// first time the pod was seen by the API server.
var firstSeenTime time.Time
if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; ok {
	firstSeenTime = kubetypes.ConvertToTimestamp(firstSeenTimeStr).Get()
}

// Record pod worker start latency if being created
// TODO: make pod workers record their own latencies
if updateType == kubetypes.SyncPodCreate {
	if !firstSeenTime.IsZero() {
		// This is the first time we are syncing the pod. Record the latency
		// since kubelet first saw the pod if firstSeenTime is set.
		metrics.PodWorkerStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
	} else {
		glog.V(3).Infof("First seen time not recorded for pod %q", pod.UID)
	}
}
```

通过pod和pod status生成最终的api pod status并设置pod的IP。

```go
// Generate final API pod status with pod and status manager status
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
// The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576)
// TODO(random-liu): After writing pod spec into container labels, check whether pod is using host network, and
// set pod IP to hostIP directly in runtime.GetPodStatus
podStatus.IP = apiPodStatus.PodIP
```

记录pod到running状态的时间。

```go
// Record the time it takes for the pod to become running.
existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
if !ok || existingStatus.Phase == v1.PodPending && apiPodStatus.Phase == v1.PodRunning &&
	!firstSeenTime.IsZero() {
	metrics.PodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
}
```

如果pod是不可运行的，则更新pod和container的状态和相应的原因。

```go
runnable := kl.canRunPod(pod)
if !runnable.Admit {
	// Pod is not runnable; update the Pod and Container statuses to why.
	apiPodStatus.Reason = runnable.Reason
	apiPodStatus.Message = runnable.Message
	// Waiting containers are not creating.
	const waitingReason = "Blocked"
	for _, cs := range apiPodStatus.InitContainerStatuses {
		if cs.State.Waiting != nil {
			cs.State.Waiting.Reason = waitingReason
		}
	}
	for _, cs := range apiPodStatus.ContainerStatuses {
		if cs.State.Waiting != nil {
			cs.State.Waiting.Reason = waitingReason
		}
	}
}
```

并更新status manager中的状态信息，杀死不可运行的pod。

```go
// Update status in the status manager
kl.statusManager.SetPodStatus(pod, apiPodStatus)

// Kill pod if it should not be running
if !runnable.Admit || pod.DeletionTimestamp != nil || apiPodStatus.Phase == v1.PodFailed {
	var syncErr error
	if err := kl.killPod(pod, nil, podStatus, nil); err != nil {
		kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
		syncErr = fmt.Errorf("error killing pod: %v", err)
		utilruntime.HandleError(syncErr)
	} else {
		if !runnable.Admit {
			// There was no error killing the pod, but the pod cannot be run.
			// Return an error to signal that the sync loop should back off.
			syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message)
		}
	}
	return syncErr
}
```

如果网络插件还没到`Ready`状态，则只有在使用`host`网络模式的情况下才启动pod。

```go
// If the network plugin is not ready, only start the pod if it uses the host network
if rs := kl.runtimeState.networkErrors(); len(rs) != 0 && !kubecontainer.IsHostNetworkPod(pod) {
	kl.recorder.Eventf(pod, v1.EventTypeWarning, events.NetworkNotReady, "%s: %v", NetworkNotReadyErrorMsg, rs)
	return fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, rs)
}
```

### 2.3. Cgroups

给pod创建`Cgroups`，如果`cgroups-per-qos`参数开启，则申请相应的资源。对于`terminated`的pod不需要创建或更新pod的`Cgroups`。

当重新启动`kubelet`并且启用`cgroups-per-qos`时，应该间歇性地终止所有pod的运行容器并在`qos cgroup hierarchy`下重新启动。

如果pod的cgroup已经存在或者pod第一次运行，不杀死pod中容器。

```go
// Create Cgroups for the pod and apply resource parameters
// to them if cgroups-per-qos flag is enabled.
pcm := kl.containerManager.NewPodContainerManager()
// If pod has already been terminated then we need not create
// or update the pod's cgroup
if !kl.podIsTerminated(pod) {
	// When the kubelet is restarted with the cgroups-per-qos
	// flag enabled, all the pod's running containers
	// should be killed intermittently and brought back up
	// under the qos cgroup hierarchy.
	// Check if this is the pod's first sync
	firstSync := true
	for _, containerStatus := range apiPodStatus.ContainerStatuses {
		if containerStatus.State.Running != nil {
			firstSync = false
			break
		}
	}
	// Don't kill containers in pod if pod's cgroups already
	// exists or the pod is running for the first time
	podKilled := false
	if !pcm.Exists(pod) && !firstSync {
		if err := kl.killPod(pod, nil, podStatus, nil); err == nil {
			podKilled = true
		}
	}
    ...
```

如果pod被杀死并且重启策略是`Never`，则不创建或更新对应的`Cgroups`，否则创建和更新pod的`Cgroups`。

```go
// Create and Update pod's Cgroups
// Don't create cgroups for run once pod if it was killed above
// The current policy is not to restart the run once pods when
// the kubelet is restarted with the new flag as run once pods are
// expected to run only once and if the kubelet is restarted then
// they are not expected to run again.
// We don't create and apply updates to cgroup if its a run once pod and was killed above
if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) {
	if !pcm.Exists(pod) {
		if err := kl.containerManager.UpdateQOSCgroups(); err != nil {
			glog.V(2).Infof("Failed to update QoS cgroups while syncing pod: %v", err)
		}
		if err := pcm.EnsureExists(pod); err != nil {
			kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToCreatePodContainer, "unable to ensure pod container exists: %v", err)
			return fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err)
		}
	}
}
```

其中创建`Cgroups`是通过`containerManager`的`UpdateQOSCgroups`来执行。

```go
if err := kl.containerManager.UpdateQOSCgroups(); err != nil {
	glog.V(2).Infof("Failed to update QoS cgroups while syncing pod: %v", err)
}
```

### 2.4. Mirror Pod

如果pod是一个`static pod`，没有对应的`mirror pod`，则创建一个`mirror pod`；如果存在`mirror pod`则删除再重建一个`mirror pod`。

```go
// Create Mirror Pod for Static Pod if it doesn't already exist
if kubepod.IsStaticPod(pod) {
	podFullName := kubecontainer.GetPodFullName(pod)
	deleted := false
	if mirrorPod != nil {
		if mirrorPod.DeletionTimestamp != nil || !kl.podManager.IsMirrorPodOf(mirrorPod, pod) {
			// The mirror pod is semantically different from the static pod. Remove
			// it. The mirror pod will get recreated later.
			glog.Warningf("Deleting mirror pod %q because it is outdated", format.Pod(mirrorPod))
			if err := kl.podManager.DeleteMirrorPod(podFullName); err != nil {
				glog.Errorf("Failed deleting mirror pod %q: %v", format.Pod(mirrorPod), err)
			} else {
				deleted = true
			}
		}
	}
	if mirrorPod == nil || deleted {
		node, err := kl.GetNode()
		if err != nil || node.DeletionTimestamp != nil {
			glog.V(4).Infof("No need to create a mirror pod, since node %q has been removed from the cluster", kl.nodeName)
		} else {
			glog.V(4).Infof("Creating a mirror pod for static pod %q", format.Pod(pod))
			if err := kl.podManager.CreateMirrorPod(pod); err != nil {
				glog.Errorf("Failed creating a mirror pod for %q: %v", format.Pod(pod), err)
			}
		}
	}
}
```

### 2.5. makePodDataDirs

给pod创建数据目录。

```go
// Make data directories for the pod
if err := kl.makePodDataDirs(pod); err != nil {
	kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToMakePodDataDirectories, "error making pod data directories: %v", err)
	glog.Errorf("Unable to make pod data directories for pod %q: %v", format.Pod(pod), err)
	return err
}
```

其中数据目录包括

* `PodDir`：{kubelet.rootDirectory}/pods/podUID
* `PodVolumesDir`：{PodDir}/volumes
* `PodPluginsDir`：{PodDir}/plugins

```go
// makePodDataDirs creates the dirs for the pod datas.
func (kl *Kubelet) makePodDataDirs(pod *v1.Pod) error {
	uid := pod.UID
	if err := os.MkdirAll(kl.getPodDir(uid), 0750); err != nil && !os.IsExist(err) {
		return err
	}
	if err := os.MkdirAll(kl.getPodVolumesDir(uid), 0750); err != nil && !os.IsExist(err) {
		return err
	}
	if err := os.MkdirAll(kl.getPodPluginsDir(uid), 0750); err != nil && !os.IsExist(err) {
		return err
	}
	return nil
}
```

### 2.6. mount volumes

对非`terminated`状态的pod挂载`volume`。

```go
// Volume manager will not mount volumes for terminated pods
if !kl.podIsTerminated(pod) {
	// Wait for volumes to attach/mount
	if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
		kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to mount volumes for pod %q: %v", format.Pod(pod), err)
		glog.Errorf("Unable to mount volumes for pod %q: %v; skipping pod", format.Pod(pod), err)
		return err
	}
}
```

### 2.7. PullSecretsForPod

获取pod的secret数据。

```go
// Fetch the pull secrets for the pod
pullSecrets := kl.getPullSecretsForPod(pod)
```

`getPullSecretsForPod`具体实现函数如下：

```go
// getPullSecretsForPod inspects the Pod and retrieves the referenced pull
// secrets.
func (kl *Kubelet) getPullSecretsForPod(pod *v1.Pod) []v1.Secret {
	pullSecrets := []v1.Secret{}

	for _, secretRef := range pod.Spec.ImagePullSecrets {
		secret, err := kl.secretManager.GetSecret(pod.Namespace, secretRef.Name)
		if err != nil {
			glog.Warningf("Unable to retrieve pull secret %s/%s for %s/%s due to %v.  The image pull may not succeed.", pod.Namespace, secretRef.Name, pod.Namespace, pod.Name, err)
			continue
		}

		pullSecrets = append(pullSecrets, *secret)
	}

	return pullSecrets
}
```

### 2.8. containerRuntime.SyncPod

调用`container runtime`的`SyncPod`函数，执行相关pod操作，由此`kubelet.syncPod`的操作逻辑转入`containerRuntime.SyncPod`函数中。

```go
// Call the container runtime's SyncPod callback
result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
kl.reasonCache.Update(pod.UID, result)
if err := result.Error(); err != nil {
	// Do not return error if the only failures were pods in backoff
	for _, r := range result.SyncResults {
		if r.Error != kubecontainer.ErrCrashLoopBackOff && r.Error != images.ErrImagePullBackOff {
			// Do not record an event here, as we keep all event logging for sync pod failures
			// local to container runtime so we get better errors
			return err
		}
	}

	return nil
}
```

## 3. [Runtime.SyncPod](https://github.com/kubernetes/kubernetes/blob/v1.12.0/pkg/kubelet/kuberuntime/kuberuntime_manager.go#L578)

`SyncPod`主要执行sync操作使得运行的pod达到期望状态的pod。主要执行以下操作：

* 计算`sandbox`和`container`的变化。
* 必要的时候杀死pod。
* 杀死所有不需要运行的`container`。
* 必要时创建`sandbox`。
* 创建`init container`。
* 创建正常的`container`。

> Runtime.SyncPod部分代码位于pkg/kubelet/kuberuntime/kuberuntime\_manager.go

### 3.1. computePodActions

计算`sandbox`和`container`的变化。

```go
// Step 1: Compute sandbox and container changes.
podContainerChanges := m.computePodActions(pod, podStatus)
glog.V(3).Infof("computePodActions got %+v for pod %q", podContainerChanges, format.Pod(pod))
if podContainerChanges.CreateSandbox {
	ref, err := ref.GetReference(legacyscheme.Scheme, pod)
	if err != nil {
		glog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), err)
	}
	if podContainerChanges.SandboxID != "" {
		m.recorder.Eventf(ref, v1.EventTypeNormal, events.SandboxChanged, "Pod sandbox changed, it will be killed and re-created.")
	} else {
		glog.V(4).Infof("SyncPod received new pod %q, will create a sandbox for it", format.Pod(pod))
	}
}
```

### 3.2. killPodWithSyncResult

必要的时候杀死pod。

```go
// Step 2: Kill the pod if the sandbox has changed.
if podContainerChanges.KillPod {
	if !podContainerChanges.CreateSandbox {
		glog.V(4).Infof("Stopping PodSandbox for %q because all other containers are dead.", format.Pod(pod))
	} else {
		glog.V(4).Infof("Stopping PodSandbox for %q, will start new one", format.Pod(pod))
	}

	killResult := m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)
	result.AddPodSyncResult(killResult)
	if killResult.Error() != nil {
		glog.Errorf("killPodWithSyncResult failed: %v", killResult.Error())
		return
	}

	if podContainerChanges.CreateSandbox {
		m.purgeInitContainers(pod, podStatus)
	}
}
```

### 3.3. killContainer

杀死所有不需要运行的`container`。

```go
// Step 3: kill any running containers in this pod which are not to keep.
for containerID, containerInfo := range podContainerChanges.ContainersToKill {
	glog.V(3).Infof("Killing unwanted container %q(id=%q) for pod %q", containerInfo.name, containerID, format.Pod(pod))
	killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name)
	result.AddSyncResult(killContainerResult)
	if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, nil); err != nil {
		killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
		glog.Errorf("killContainer %q(id=%q) for pod %q failed: %v", containerInfo.name, containerID, format.Pod(pod), err)
		return
	}
}
```

### 3.4. createPodSandbox

必要时创建`sandbox`。

```go
// Step 4: Create a sandbox for the pod if necessary.
...
glog.V(4).Infof("Creating sandbox for pod %q", format.Pod(pod))
createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod))
result.AddSyncResult(createSandboxResult)
podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
if err != nil {
	createSandboxResult.Fail(kubecontainer.ErrCreatePodSandbox, msg)
	glog.Errorf("createPodSandbox for pod %q failed: %v", format.Pod(pod), err)
	ref, referr := ref.GetReference(legacyscheme.Scheme, pod)
	if referr != nil {
		glog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), referr)
	}
	m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedCreatePodSandBox, "Failed create pod sandbox: %v", err)
	return
}
glog.V(4).Infof("Created PodSandbox %q for pod %q", podSandboxID, format.Pod(pod))
```

### 3.5. start init container

创建`init container`。

```go
// Step 5: start the init container.
if container := podContainerChanges.NextInitContainerToStart; container != nil {
	// Start the next init container.
	startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name)
	result.AddSyncResult(startContainerResult)
	isInBackOff, msg, err := m.doBackOff(pod, container, podStatus, backOff)
	if isInBackOff {
		startContainerResult.Fail(err, msg)
		glog.V(4).Infof("Backing Off restarting init container %+v in pod %v", container, format.Pod(pod))
		return
	}

	glog.V(4).Infof("Creating init container %+v in pod %v", container, format.Pod(pod))
	if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeInit); err != nil {
		startContainerResult.Fail(err, msg)
		utilruntime.HandleError(fmt.Errorf("init container start failed: %v: %s", err, msg))
		return
	}

	// Successfully started the container; clear the entry in the failure
	glog.V(4).Infof("Completed init container %q for pod %q", container.Name, format.Pod(pod))
}
```

### 3.6. start containers

创建正常的`container`。

```go
// Step 6: start containers in podContainerChanges.ContainersToStart.
for _, idx := range podContainerChanges.ContainersToStart {
	container := &pod.Spec.Containers[idx]
	startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name)
	result.AddSyncResult(startContainerResult)

	isInBackOff, msg, err := m.doBackOff(pod, container, podStatus, backOff)
	if isInBackOff {
		startContainerResult.Fail(err, msg)
		glog.V(4).Infof("Backing Off restarting container %+v in pod %v", container, format.Pod(pod))
		continue
	}

	glog.V(4).Infof("Creating container %+v in pod %v", container, format.Pod(pod))
  // 通过startContainer来运行容器
	if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeRegular); err != nil {
		startContainerResult.Fail(err, msg)
		// known errors that are logged in other places are logged at higher levels here to avoid
		// repetitive log spam
		switch {
		case err == images.ErrImagePullBackOff:
			glog.V(3).Infof("container start failed: %v: %s", err, msg)
		default:
			utilruntime.HandleError(fmt.Errorf("container start failed: %v: %s", err, msg))
		}
		continue
	}
}
```

## 4. [startContainer](https://github.com/kubernetes/kubernetes/blob/v1.12.0/pkg/kubelet/kuberuntime/kuberuntime_container.go#L89)

`startContainer`启动一个容器并返回是否成功。

主要包括以下几个步骤：

1. 拉取镜像
2. 创建容器
3. 启动容器
4. 运行post start lifecycle hooks(如果有设置此项)

`startContainer`完整代码如下：

> startContainer部分代码位于pkg/kubelet/kuberuntime/kuberuntime\_container.go

```go
// startContainer starts a container and returns a message indicates why it is failed on error.
// It starts the container through the following steps:
// * pull the image
// * create the container
// * start the container
// * run the post start lifecycle hooks (if applicable)
func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, container *v1.Container, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, containerType kubecontainer.ContainerType) (string, error) {
	// Step 1: pull the image.
	imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets)
	if err != nil {
		m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err))
		return msg, err
	}

	// Step 2: create the container.
	ref, err := kubecontainer.GenerateContainerRef(pod, container)
	if err != nil {
		glog.Errorf("Can't make a ref to pod %q, container %v: %v", format.Pod(pod), container.Name, err)
	}
	glog.V(4).Infof("Generating ref for container %s: %#v", container.Name, ref)

	// For a new container, the RestartCount should be 0
	restartCount := 0
	containerStatus := podStatus.FindContainerStatusByName(container.Name)
	if containerStatus != nil {
		restartCount = containerStatus.RestartCount + 1
	}

	containerConfig, cleanupAction, err := m.generateContainerConfig(container, pod, restartCount, podIP, imageRef, containerType)
	if cleanupAction != nil {
		defer cleanupAction()
	}
	if err != nil {
		m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err))
		return grpc.ErrorDesc(err), ErrCreateContainerConfig
	}

	containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
	if err != nil {
		m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err))
		return grpc.ErrorDesc(err), ErrCreateContainer
	}
	err = m.internalLifecycle.PreStartContainer(pod, container, containerID)
	if err != nil {
		m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Internal PreStartContainer hook failed: %v", grpc.ErrorDesc(err))
		return grpc.ErrorDesc(err), ErrPreStartHook
	}
	m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.CreatedContainer, "Created container")

	if ref != nil {
		m.containerRefManager.SetRef(kubecontainer.ContainerID{
			Type: m.runtimeName,
			ID:   containerID,
		}, ref)
	}

	// Step 3: start the container.
	err = m.runtimeService.StartContainer(containerID)
	if err != nil {
		m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Error: %v", grpc.ErrorDesc(err))
		return grpc.ErrorDesc(err), kubecontainer.ErrRunContainer
	}
	m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.StartedContainer, "Started container")

	// Symlink container logs to the legacy container log location for cluster logging
	// support.
	// TODO(random-liu): Remove this after cluster logging supports CRI container log path.
	containerMeta := containerConfig.GetMetadata()
	sandboxMeta := podSandboxConfig.GetMetadata()
	legacySymlink := legacyLogSymlink(containerID, containerMeta.Name, sandboxMeta.Name,
		sandboxMeta.Namespace)
	containerLog := filepath.Join(podSandboxConfig.LogDirectory, containerConfig.LogPath)
	// only create legacy symlink if containerLog path exists (or the error is not IsNotExist).
	// Because if containerLog path does not exist, only dandling legacySymlink is created.
	// This dangling legacySymlink is later removed by container gc, so it does not make sense
	// to create it in the first place. it happens when journald logging driver is used with docker.
	if _, err := m.osInterface.Stat(containerLog); !os.IsNotExist(err) {
		if err := m.osInterface.Symlink(containerLog, legacySymlink); err != nil {
			glog.Errorf("Failed to create legacy symbolic link %q to container %q log %q: %v",
				legacySymlink, containerID, containerLog, err)
		}
	}

	// Step 4: execute the post start hook.
	if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
		kubeContainerID := kubecontainer.ContainerID{
			Type: m.runtimeName,
			ID:   containerID,
		}
		msg, handlerErr := m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart)
		if handlerErr != nil {
			m.recordContainerEvent(pod, container, kubeContainerID.ID, v1.EventTypeWarning, events.FailedPostStartHook, msg)
			if err := m.killContainer(pod, kubeContainerID, container.Name, "FailedPostStartHook", nil); err != nil {
				glog.Errorf("Failed to kill container %q(id=%q) in pod %q: %v, %v",
					container.Name, kubeContainerID.String(), format.Pod(pod), ErrPostStartHook, err)
			}
			return msg, fmt.Errorf("%s: %v", ErrPostStartHook, handlerErr)
		}
	}

	return "", nil
}
```

以下对`startContainer`分段分析：

### 4.1. pull image

通过`EnsureImageExists`方法拉取拉取指定pod容器的镜像，并返回镜像信息和错误。

```go
// Step 1: pull the image.
imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets)
if err != nil {
	m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err))
	return msg, err
}
```

### 4.2. CreateContainer

首先生成container的`*v1.ObjectReference`对象，该对象包括container的相关信息。

```go
// Step 2: create the container.
ref, err := kubecontainer.GenerateContainerRef(pod, container)
if err != nil {
	glog.Errorf("Can't make a ref to pod %q, container %v: %v", format.Pod(pod), container.Name, err)
}
glog.V(4).Infof("Generating ref for container %s: %#v", container.Name, ref)
```

统计container的重启次数，新的容器默认重启次数为0。

```go
// For a new container, the RestartCount should be 0
restartCount := 0
containerStatus := podStatus.FindContainerStatusByName(container.Name)
if containerStatus != nil {
	restartCount = containerStatus.RestartCount + 1
}
```

生成container的配置。

```go
containerConfig, cleanupAction, err := m.generateContainerConfig(container, pod, restartCount, podIP, imageRef, containerType)
if cleanupAction != nil {
	defer cleanupAction()
}
if err != nil {
	m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err))
	return grpc.ErrorDesc(err), ErrCreateContainerConfig
}
```

调用`runtimeService`，执行`CreateContainer`的操作。

```go
containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
if err != nil {
	m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err))
	return grpc.ErrorDesc(err), ErrCreateContainer
}
err = m.internalLifecycle.PreStartContainer(pod, container, containerID)
if err != nil {
	m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Internal PreStartContainer hook failed: %v", grpc.ErrorDesc(err))
	return grpc.ErrorDesc(err), ErrPreStartHook
}
m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.CreatedContainer, "Created container")

if ref != nil {
	m.containerRefManager.SetRef(kubecontainer.ContainerID{
		Type: m.runtimeName,
		ID:   containerID,
	}, ref)
}
```

### 4.3. StartContainer

执行`runtimeService`的`StartContainer`方法，来启动容器。

```go
// Step 3: start the container.
err = m.runtimeService.StartContainer(containerID)
if err != nil {
	m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Error: %v", grpc.ErrorDesc(err))
	return grpc.ErrorDesc(err), kubecontainer.ErrRunContainer
}
m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.StartedContainer, "Started container")

// Symlink container logs to the legacy container log location for cluster logging
// support.
// TODO(random-liu): Remove this after cluster logging supports CRI container log path.
containerMeta := containerConfig.GetMetadata()
sandboxMeta := podSandboxConfig.GetMetadata()
legacySymlink := legacyLogSymlink(containerID, containerMeta.Name, sandboxMeta.Name,
	sandboxMeta.Namespace)
containerLog := filepath.Join(podSandboxConfig.LogDirectory, containerConfig.LogPath)
// only create legacy symlink if containerLog path exists (or the error is not IsNotExist).
// Because if containerLog path does not exist, only dandling legacySymlink is created.
// This dangling legacySymlink is later removed by container gc, so it does not make sense
// to create it in the first place. it happens when journald logging driver is used with docker.
if _, err := m.osInterface.Stat(containerLog); !os.IsNotExist(err) {
	if err := m.osInterface.Symlink(containerLog, legacySymlink); err != nil {
		glog.Errorf("Failed to create legacy symbolic link %q to container %q log %q: %v",
			legacySymlink, containerID, containerLog, err)
	}
}
```

### 4.4. execute post start hook

如果有指定`Lifecycle.PostStart`，则执行`PostStart`操作，`PostStart`如果执行失败，则容器会根据重启的规则进行重启。

```go
// Step 4: execute the post start hook.
if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
	kubeContainerID := kubecontainer.ContainerID{
		Type: m.runtimeName,
		ID:   containerID,
	}
	msg, handlerErr := m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart)
	if handlerErr != nil {
		m.recordContainerEvent(pod, container, kubeContainerID.ID, v1.EventTypeWarning, events.FailedPostStartHook, msg)
		if err := m.killContainer(pod, kubeContainerID, container.Name, "FailedPostStartHook", nil); err != nil {
			glog.Errorf("Failed to kill container %q(id=%q) in pod %q: %v, %v",
				container.Name, kubeContainerID.String(), format.Pod(pod), ErrPostStartHook, err)
		}
		return msg, fmt.Errorf("%s: %v", ErrPostStartHook, handlerErr)
	}
}
```

## 5. 总结

kubelet的工作是管理pod在Node上的生命周期（包括增删改查），kubelet通过各种类型的manager异步工作各自执行各自的任务，其中使用到了多种的channel来控制状态信号变化的传递，例如比较重要的channel有`podUpdates <-chan UpdatePodOptions`，来传递pod的变化情况。

**创建pod的调用逻辑**

`syncLoopIteration-->kubetypes.ADD-->HandlePodAdditions(u.Pods)-->dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)-->podWorkers.UpdatePod-->managePodLoop(podUpdates)-->syncPod(o syncPodOptions)-->containerRuntime.SyncPod-->startContainer`

参考：

* <https://github.com/kubernetes/kubernetes/blob/v1.12.0/pkg/kubelet/kubelet.go>
* <https://github.com/kubernetes/kubernetes/blob/v1.12.0/pkg/kubelet/pod_workers.go>
* <https://github.com/kubernetes/kubernetes/blob/v1.12.0/pkg/kubelet/kuberuntime/kuberuntime_container.go>


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://k8s.huweihuang.com/project/code-analysis/kubelet/syncpod.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
