// syncLoop is the main loop for processing changes. It watches for changes from// three channels (file, apiserver, and http) and creates a union of them. For// any new change seen, will run a sync against desired state and running state. If// no changes are seen to the configuration, will synchronize the last known desired// state every sync-frequency seconds. Never returns.func (kl *Kubelet) syncLoop(updates <-chankubetypes.PodUpdate, handler SyncHandler) { glog.Info("Starting kubelet main sync loop.")// The resyncTicker wakes up kubelet to checks if there are any pod workers// that need to be sync'd. A one-second period is sufficient because the// sync interval is defaulted to 10s. syncTicker := time.NewTicker(time.Second)defer syncTicker.Stop() housekeepingTicker := time.NewTicker(housekeepingPeriod)defer housekeepingTicker.Stop() plegCh := kl.pleg.Watch()const ( base =100* time.Millisecond max =5* time.Second factor =2 ) duration := basefor {if rs := kl.runtimeState.runtimeErrors(); len(rs) !=0 { glog.Infof("skipping pod synchronization - %v", rs)// exponential backoff time.Sleep(duration) duration = time.Duration(math.Min(float64(max), factor*float64(duration)))continue }// reset backoff if we have a success duration = base kl.syncLoopMonitor.Store(kl.clock.Now())if!kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {break } kl.syncLoopMonitor.Store(kl.clock.Now()) }}
func (kl *Kubelet) syncLoopIteration(configCh <-chankubetypes.PodUpdate, handler SyncHandler, syncCh <-chantime.Time, housekeepingCh <-chantime.Time, plegCh <-chan*pleg.PodLifecycleEvent) bool {select {case u, open :=<-configCh:// Update from a config source; dispatch it to the right handler// callback.if!open { glog.Errorf("Update channel is closed. Exiting the sync loop.")returnfalse }switch u.Op {case kubetypes.ADD: glog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))// After restarting, kubelet will get all existing pods through// ADD as if they are new pods. These pods will then go through the// admission process and *may* be rejected. This can be resolved// once we have checkpointing. handler.HandlePodAdditions(u.Pods)case kubetypes.UPDATE: glog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletionTimestamps(u.Pods)) handler.HandlePodUpdates(u.Pods)case kubetypes.REMOVE: glog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods)) handler.HandlePodRemoves(u.Pods)case kubetypes.RECONCILE: glog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods)) handler.HandlePodReconcile(u.Pods)case kubetypes.DELETE: glog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))// DELETE is treated as a UPDATE because of graceful deletion. handler.HandlePodUpdates(u.Pods)case kubetypes.RESTORE: glog.V(2).Infof("SyncLoop (RESTORE, %q): %q", u.Source, format.Pods(u.Pods))// These are pods restored from the checkpoint. Treat them as new// pods. handler.HandlePodAdditions(u.Pods)case kubetypes.SET:// TODO: Do we want to support this? glog.Errorf("Kubelet does not support snapshot update") }...}
case e :=<-plegCh:if isSyncPodWorthy(e) {// PLEG event for a pod; sync it.if pod, ok := kl.podManager.GetPodByUID(e.ID); ok { glog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e) handler.HandlePodSyncs([]*v1.Pod{pod}) } else {// If the pod no longer exists, ignore the event. glog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e) } }if e.Type == pleg.ContainerDied {if containerID, ok := e.Data.(string); ok { kl.cleanUpContainersInPod(e.ID, containerID) } }
case update :=<-kl.livenessManager.Updates():if update.Result == proberesults.Failure {// The liveness manager detected a failure; sync the pod.// We should not use the pod from livenessManager, because it is never updated after// initialization. pod, ok := kl.podManager.GetPodByUID(update.PodUID)if!ok {// If the pod no longer exists, ignore the update. glog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update)break } glog.V(1).Infof("SyncLoop (container unhealthy): %q", format.Pod(pod)) handler.HandlePodSyncs([]*v1.Pod{pod}) }
2.5. housekeepingCh
houseKeepingCh:触发清理pod。此处调用了HandlePodCleanups的函数。
case<-housekeepingCh:if!kl.sourcesReady.AllReady() {// If the sources aren't ready or volume manager has not yet synced the states,// skip housekeeping, as we may accidentally delete pods from unready sources. glog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.") } else { glog.V(4).Infof("SyncLoop (housekeeping)")if err := handler.HandlePodCleanups(); err !=nil { glog.Errorf("Failed cleaning pods: %v", err) } }
// SyncHandler is an interface implemented by Kubelet, for testabilitytypeSyncHandlerinterface {HandlePodAdditions(pods []*v1.Pod)HandlePodUpdates(pods []*v1.Pod)HandlePodRemoves(pods []*v1.Pod)HandlePodReconcile(pods []*v1.Pod)HandlePodSyncs(pods []*v1.Pod)HandlePodCleanups() error}
// HandlePodAdditions is the callback in SyncHandler for pods being added from// a config source.func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) { start := kl.clock.Now() sort.Sort(sliceutils.PodsByCreationTime(pods))for _, pod :=range pods {... }}
将pod添加到pod manager中。
for _, pod :=range pods {// Responsible for checking limits in resolv.confif kl.dnsConfigurer !=nil&& kl.dnsConfigurer.ResolverConfig !="" { kl.dnsConfigurer.CheckLimitsForResolvConf() } existingPods := kl.podManager.GetPods()// Always add the pod to the pod manager. Kubelet relies on the pod// manager as the source of truth for the desired state. If a pod does// not exist in the pod manager, it means that it has been deleted in// the apiserver and no action (other than cleanup) is required. kl.podManager.AddPod(pod)...}
如果是mirror pod,则对mirror pod进行处理。
if kubepod.IsMirrorPod(pod) { kl.handleMirrorPod(pod, start)continue}
if!kl.podIsTerminated(pod) {// Only go through the admission process if the pod is not// terminated.// We failed pods that we rejected, so activePods include all admitted// pods that are alive. activePods := kl.filterOutTerminatedPods(existingPods)// Check if we can admit the pod; if not, reject it.if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok { kl.rejectPod(pod, reason, message)continue }}
// HandlePodUpdates is the callback in the SyncHandler interface for pods// being updated from a config source.func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) { start := kl.clock.Now()for _, pod :=range pods {... }}
将pod更新到pod manager中。
for _, pod :=range pods {// Responsible for checking limits in resolv.confif kl.dnsConfigurer !=nil&& kl.dnsConfigurer.ResolverConfig !="" { kl.dnsConfigurer.CheckLimitsForResolvConf() } kl.podManager.UpdatePod(pod)...}
如果是mirror pod,则对mirror pod进行处理。
if kubepod.IsMirrorPod(pod) { kl.handleMirrorPod(pod, start)continue}
执行dispatchWork函数。
// TODO: Evaluate if we need to validate and reject updates.mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start)
// HandlePodRemoves is the callback in the SyncHandler interface for pods// being removed from a config source.func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) { start := kl.clock.Now()for _, pod :=range pods {... }}
从pod manager中删除pod。
for _, pod :=range pods { kl.podManager.DeletePod(pod)...}
如果是mirror pod,则对mirror pod进行处理。
if kubepod.IsMirrorPod(pod) { kl.handleMirrorPod(pod, start)continue}
调用kubelet的deletePod函数来删除pod。
// Deletion is allowed to fail because the periodic cleanup routine// will trigger deletion again.if err := kl.deletePod(pod); err !=nil { glog.V(2).Infof("Failed to delete pod %q, err: %v", format.Pod(pod), err)}
// deletePod deletes the pod from the internal state of the kubelet by:// 1. stopping the associated pod worker asynchronously// 2. signaling to kill the pod by sending on the podKillingCh channel//// deletePod returns an error if not all sources are ready or the pod is not// found in the runtime cache.func (kl *Kubelet) deletePod(pod *v1.Pod) error {if pod ==nil {return fmt.Errorf("deletePod does not allow nil pod") }if!kl.sourcesReady.AllReady() {// If the sources aren't ready, skip deletion, as we may accidentally delete pods// for sources that haven't reported yet.return fmt.Errorf("skipping delete because sources aren't ready yet") } kl.podWorkers.ForgetWorker(pod.UID)// Runtime cache may not have been updated to with the pod, but it's okay// because the periodic cleanup routine will attempt to delete again later. runningPods, err := kl.runtimeCache.GetPods()if err !=nil {return fmt.Errorf("error listing containers: %v", err) } runningPod := kubecontainer.Pods(runningPods).FindPod("", pod.UID)if runningPod.IsEmpty() {return fmt.Errorf("pod not found") } podPair :=kubecontainer.PodPair{APIPod: pod, RunningPod: &runningPod} kl.podKillingCh <-&podPair// TODO: delete the mirror pod here?// We leave the volume/directory cleanup to the periodic cleanup routine.returnnil}
// HandlePodReconcile is the callback in the SyncHandler interface for pods// that should be reconciled.func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) { start := kl.clock.Now()for _, pod :=range pods {... }}
将pod更新到pod manager中。
for _, pod :=range pods {// Update the pod in pod manager, status manager will do periodically reconcile according// to the pod manager. kl.podManager.UpdatePod(pod)...}
必要时调整pod的Ready状态,执行dispatchWork函数。
// Reconcile Pod "Ready" condition if necessary. Trigger sync pod for reconciliation.if status.NeedToReconcilePodReadiness(pod) { mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)}
如果pod被设定为需要被驱逐的,则删除pod中的容器。
// After an evicted pod is synced, all dead containers in the pod can be removed.if eviction.PodIsEvicted(pod.Status) {if podStatus, err := kl.podCache.Get(pod.UID); err ==nil { kl.containerDeletor.deleteContainersInPod("", podStatus, true) }}
// HandlePodSyncs is the callback in the syncHandler interface for pods// that should be dispatched to pod workers for sync.func (kl *Kubelet) HandlePodSyncs(pods []*v1.Pod) { start := kl.clock.Now()for _, pod :=range pods { mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start) }}
// HandlePodCleanups performs a series of cleanup work, including terminating// pod workers, killing unwanted pods, and removing orphaned volumes/pod// directories.// NOTE: This function is executed by the main sync loop, so it// should not contain any blocking calls.func (kl *Kubelet) HandlePodCleanups() error {// The kubelet lacks checkpointing, so we need to introspect the set of pods// in the cgroup tree prior to inspecting the set of pods in our pod manager.// this ensures our view of the cgroup tree does not mistakenly observe pods// that are added after the fact...var ( cgroupPods map[types.UID]cm.CgroupName err error )if kl.cgroupsPerQOS { pcm := kl.containerManager.NewPodContainerManager() cgroupPods, err = pcm.GetAllPodsFromCgroups()if err !=nil {return fmt.Errorf("failed to get list of pods that still exist on cgroup mounts: %v", err) } }...}
列出所有pod包括mirror pod。
allPods, mirrorPods := kl.podManager.GetPodsAndMirrorPods()// Pod phase progresses monotonically. Once a pod has reached a final state,// it should never leave regardless of the restart policy. The statuses// of such pods should not be changed, and there is no need to sync them.// TODO: the logic here does not handle two cases:// 1. If the containers were removed immediately after they died, kubelet// may fail to generate correct statuses, let alone filtering correctly.// 2. If kubelet restarted before writing the terminated status for a pod// to the apiserver, it could still restart the terminated pod (even// though the pod was not considered terminated by the apiserver).// These two conditions could be alleviated by checkpointing kubelet.activePods := kl.filterOutTerminatedPods(allPods)desiredPods :=make(map[types.UID]empty)for _, pod :=range activePods { desiredPods[pod.UID] =empty{}}
pod worker停止不再存在的pod的任务,并从probe manager中清除pod。
// Stop the workers for no-longer existing pods.// TODO: is here the best place to forget pod workers?kl.podWorkers.ForgetNonExistingPodWorkers(desiredPods)kl.probeManager.CleanupPods(activePods)
// Remove any orphaned volumes.// Note that we pass all pods (including terminated pods) to the function,// so that we don't remove volumes associated with terminated but not yet// deleted pods.err = kl.cleanupOrphanedPodDirs(allPods, runningPods)if err !=nil {// We want all cleanup tasks to be run even if one of them failed. So// we just log an error here and continue other cleanup tasks.// This also applies to the other clean up tasks. glog.Errorf("Failed cleaning up orphaned pod directories: %v", err)}
移除mirror pod。
// Remove any orphaned mirror pods.kl.podManager.DeleteOrphanedMirrorPods()
删除不再运行的pod的cgroup。
// Remove any cgroups in the hierarchy for pods that are no longer running.if kl.cgroupsPerQOS { kl.cleanupOrphanedPodCgroups(cgroupPods, activePods)}
// dispatchWork starts the asynchronous sync of the pod in a pod worker.// If the pod is terminated, dispatchWorkfunc (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {if kl.podIsTerminated(pod) {if pod.DeletionTimestamp !=nil {// If the pod is in a terminated state, there is no pod worker to// handle the work item. Check if the DeletionTimestamp has been// set, and force a status update to trigger a pod deletion request// to the apiserver. kl.statusManager.TerminatePod(pod) }return }// Run the sync in an async worker. kl.podWorkers.UpdatePod(&UpdatePodOptions{ Pod: pod, MirrorPod: mirrorPod, UpdateType: syncType, OnCompleteFunc: func(err error) {if err !=nil { metrics.PodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start)) } }, })// Note the number of containers for new pods.if syncType == kubetypes.SyncPodCreate { metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers))) }}
// dispatchWork starts the asynchronous sync of the pod in a pod worker.// If the pod is terminated, dispatchWorkfunc (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {if kl.podIsTerminated(pod) {if pod.DeletionTimestamp !=nil {// If the pod is in a terminated state, there is no pod worker to// handle the work item. Check if the DeletionTimestamp has been// set, and force a status update to trigger a pod deletion request// to the apiserver. kl.statusManager.TerminatePod(pod) }return }...}
// Run the sync in an async worker.kl.podWorkers.UpdatePod(&UpdatePodOptions{ Pod: pod, MirrorPod: mirrorPod, UpdateType: syncType, OnCompleteFunc: func(err error) {if err !=nil { metrics.PodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start)) } },})
当创建类型是SyncPodCreate(即创建pod的时候),统计新pod中容器的数目。
// Note the number of containers for new pods.if syncType == kubetypes.SyncPodCreate { metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))}
// PodWorkers is an abstract interface for testability.typePodWorkersinterface {UpdatePod(options *UpdatePodOptions)ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty)ForgetWorker(uid types.UID)}
// Apply the new setting to the specified pod.// If the options provide an OnCompleteFunc, the function is invoked if the update is accepted.// Update requests are ignored if a kill pod request is pending.func (p *podWorkers) UpdatePod(options *UpdatePodOptions) { pod := options.Pod uid := pod.UIDvar podUpdates chanUpdatePodOptionsvar exists bool p.podLock.Lock()defer p.podLock.Unlock()if podUpdates, exists = p.podUpdates[uid]; !exists {// We need to have a buffer here, because checkForUpdates() method that// puts an update into channel is called from the same goroutine where// the channel is consumed. However, it is guaranteed that in such case// the channel is empty, so buffer of size 1 is enough. podUpdates =make(chanUpdatePodOptions, 1) p.podUpdates[uid] = podUpdates// Creating a new pod worker either means this is a new pod, or that the// kubelet just restarted. In either case the kubelet is willing to believe// the status of the pod for the first pod worker sync. See corresponding// comment in syncPod.gofunc() {defer runtime.HandleCrash() p.managePodLoop(podUpdates) }() }if!p.isWorking[pod.UID] { p.isWorking[pod.UID] =true podUpdates <-*options } else {// if a request to kill a pod is pending, we do not let anything overwrite that request. update, found := p.lastUndeliveredWorkUpdate[pod.UID]if!found || update.UpdateType != kubetypes.SyncPodKill { p.lastUndeliveredWorkUpdate[pod.UID] =*options } }}
func (p *podWorkers) managePodLoop(podUpdates <-chanUpdatePodOptions) {var lastSyncTime time.Timefor update :=range podUpdates { err :=func() error { podUID := update.Pod.UID// This is a blocking call that would return only if the cache// has an entry for the pod that is newer than minRuntimeCache// Time. This ensures the worker doesn't start syncing until// after the cache is at least newer than the finished time of// the previous sync. status, err := p.podCache.GetNewerThan(podUID, lastSyncTime)if err !=nil {// This is the legacy event thrown by manage pod loop// all other events are now dispatched from syncPodFn p.recorder.Eventf(update.Pod, v1.EventTypeWarning, events.FailedSync, "error determining status: %v", err)return err } err = p.syncPodFn(syncPodOptions{ mirrorPod: update.MirrorPod, pod: update.Pod, podStatus: status, killPodOptions: update.KillPodOptions, updateType: update.UpdateType, }) lastSyncTime = time.Now()return err }()// notify the call-back function if the operation succeeded or notif update.OnCompleteFunc !=nil { update.OnCompleteFunc(err) }if err !=nil {// IMPORTANT: we do not log errors here, the syncPodFn is responsible for logging errors glog.Errorf("Error syncing pod %s (%q), skipping: %v", update.Pod.UID, format.Pod(update.Pod), err) } p.wrapUp(update.Pod.UID, err) }}