syncLoopIteration

kubelet源码分析(四)之 syncLoopIteration

以下代码分析基于 kubernetes v1.12.0 版本。

本文主要分析kubelet中syncLoopIteration部分。syncLoopIteration通过几种channel来对不同类型的事件进行监听并做增删改查的处理。

1. syncLoop

syncLoop是处理变更的循环。 它监听来自三种channel(file,apiserver和http)的更改。 对于看到的任何新更改,将针对所需状态和运行状态运行同步。 如果没有看到配置的变化,将在每个同步频率秒同步最后已知的所需状态。

此部分代码位于pkg/kubelet/kubelet.go

// syncLoop is the main loop for processing changes. It watches for changes from
// three channels (file, apiserver, and http) and creates a union of them. For
// any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync-frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
	glog.Info("Starting kubelet main sync loop.")
	// The resyncTicker wakes up kubelet to checks if there are any pod workers
	// that need to be sync'd. A one-second period is sufficient because the
	// sync interval is defaulted to 10s.
	syncTicker := time.NewTicker(time.Second)
	defer syncTicker.Stop()
	housekeepingTicker := time.NewTicker(housekeepingPeriod)
	defer housekeepingTicker.Stop()
	plegCh := kl.pleg.Watch()
	const (
		base   = 100 * time.Millisecond
		max    = 5 * time.Second
		factor = 2
	)
	duration := base
	for {
		if rs := kl.runtimeState.runtimeErrors(); len(rs) != 0 {
			glog.Infof("skipping pod synchronization - %v", rs)
			// exponential backoff
			time.Sleep(duration)
			duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
			continue
		}
		// reset backoff if we have a success
		duration = base

		kl.syncLoopMonitor.Store(kl.clock.Now())
		if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
			break
		}
		kl.syncLoopMonitor.Store(kl.clock.Now())
	}
}

其中调用了syncLoopIteration的函数来执行更具体的监控pod变化的循环。

syncLoopIteration主要通过几种channel来对不同类型的事件进行监听并处理。其中包括:configChplegChsyncChhouseKeepingChlivenessManager.Updates()

syncLoopIteration实际执行了pod的操作,此部分设置了几种不同的channel:

  • configCh:将配置更改的pod分派给事件类型的相应处理程序回调。

  • plegCh:更新runtime缓存,同步pod。

  • syncCh:同步所有等待同步的pod。

  • houseKeepingCh:触发清理pod。

  • livenessManager.Updates():对失败的pod或者liveness检查失败的pod进行sync操作。

syncLoopIteration部分代码位于pkg/kubelet/kubelet.go

2.1. configCh

configCh将配置更改的pod分派给事件类型的相应处理程序回调,该部分主要通过SyncHandler对pod的不同事件进行增删改查等操作。

可以看出syncLoopIteration根据podUpdate的值来执行不同的pod操作,具体如下:

  • ADD:HandlePodAdditions

  • UPDATE:HandlePodUpdates

  • REMOVE:HandlePodRemoves

  • RECONCILE:HandlePodReconcile

  • DELETE:HandlePodUpdates

  • RESTORE:HandlePodAdditions

  • podsToSync:HandlePodSyncs

其中执行pod的handler操作的是SyncHandler,该类型是一个接口,实现体为kubelet本身,具体见后续分析。

2.2. plegCh

plegCh:更新runtime缓存,同步pod。此处调用了HandlePodSyncs的函数。

2.3. syncCh

syncCh:同步所有等待同步的pod。此处调用了HandlePodSyncs的函数。

2.4. livenessManager.Update

livenessManager.Updates():对失败的pod或者liveness检查失败的pod进行sync操作。此处调用了HandlePodSyncs的函数。

2.5. housekeepingCh

houseKeepingCh:触发清理pod。此处调用了HandlePodCleanups的函数。

SyncHandler是一个定义Pod的不同Handler的接口,具体是实现者是kubelet,该接口的方法主要在syncLoopIteration中调用,接口定义如下:

SyncHandler部分代码位于pkg/kubelet/kubelet.go

HandlePodAdditions先根据pod创建时间对pod进行排序,然后遍历pod列表,来执行pod的相关操作。

将pod添加到pod manager中。

如果是mirror pod,则对mirror pod进行处理。

如果当前pod的状态不是Terminated状态,则判断是否接受该pod,如果不接受则将pod状态改为Failed

执行dispatchWork函数,该函数是syncHandler中调用到的核心函数,该函数在pod worker中启动一个异步循环,来分派pod的相关操作。该函数的具体操作待后续分析。

最后加pod添加到probe manager中。

HandlePodUpdates同样遍历pod列表,执行相应的操作。

将pod更新到pod manager中。

如果是mirror pod,则对mirror pod进行处理。

执行dispatchWork函数。

HandlePodRemoves遍历pod列表。

从pod manager中删除pod。

如果是mirror pod,则对mirror pod进行处理。

调用kubelet的deletePod函数来删除pod。

deletePod 函数将需要删除的pod加入podKillingCh的channel中,有podKiller监听这个channel去执行删除任务,实现如下:

从probe manager中移除pod。

遍历pod列表。

将pod更新到pod manager中。

必要时调整pod的Ready状态,执行dispatchWork函数。

如果pod被设定为需要被驱逐的,则删除pod中的容器。

HandlePodSyncssyncHandler接口回调函数,调用dispatchWork,通过pod worker来执行任务。

HandlePodCleanups主要用来执行pod的清理任务,其中包括terminating的pod,orphaned的pod等。

首先查看pod使用到的cgroup。

列出所有pod包括mirror pod。

pod worker停止不再存在的pod的任务,并从probe manager中清除pod。

将需要杀死的pod加入到podKillingCh的channel中,podKiller的任务会监听该channel并获取需要杀死的pod列表来执行杀死pod的操作。

当pod不再被绑定到该节点,移除podStatus,其中removeOrphanedPodStatuses最后调用的函数是statusManagerRemoveOrphanedStatuses方法。

移除所有的orphaned volume。

移除mirror pod。

删除不再运行的pod的cgroup。

执行垃圾回收(GC)操作。

dispatchWork通过pod worker启动一个异步的循环。

完整代码如下:

以下分段进行分析:

如果pod的状态是处于Terminated状态,则执行statusManagerTerminatePod操作。

执行pod worker的UpdatePod函数,该函数是pod worker的核心函数,来执行pod相关操作。具体逻辑待下文分析。

当创建类型是SyncPodCreate(即创建pod的时候),统计新pod中容器的数目。

PodWorkers是一个接口类型:

其中UpdatePod是一个核心方法,通过podUpdates的channel来传递需要处理的pod信息,对于新创建的pod每个pod都会由一个goroutine来执行managePodLoop

此部分代码位于pkg/kubelet/pod_workers.go

managePodLoop通过读取podUpdateschannel的信息,执行syncPodFn函数,而syncPodFn函数在newPodWorkers的时候赋值了,即kubelet.syncPodkubelet.syncPod具体代码逻辑待后续文章单独分析。

newPodWorkers函数参考:

managePodLoop函数参考:

此部分代码位于pkg/kubelet/pod_workers.go

7. 总结

syncLoopIteration基本流程如下:

  1. 通过几种channel来对不同类型的事件进行监听并处理。其中channel包括:configChplegChsyncChhouseKeepingChlivenessManager.Updates()

  2. 不同的SyncHandler执行不同的增删改查操作。

  3. 其中HandlePodAdditionsHandlePodUpdatesHandlePodReconcileHandlePodSyncs都调用到了dispatchWork来执行pod的相关操作。HandlePodCleanups的pod清理任务,通过channel的方式加需要清理的pod给podKiller来清理。

  4. dispatchWork调用podWorkers.UpdatePod执行异步操作。

  5. podWorkers.UpdatePod中调用managePodLoop来执行pod相关操作循环。

channel类型及作用:

  • configCh:将配置更改的pod分派给事件类型的相应处理程序回调。

  • plegCh:更新runtime缓存,同步pod。

  • syncCh:同步所有等待同步的pod。

  • houseKeepingCh:触发清理pod。

  • livenessManager.Updates():对失败的pod或者liveness检查失败的pod进行sync操作。

参考:

最后更新于

这有帮助吗?