syncLoopIteration
kubelet源码分析(四)之 syncLoopIteration
以下代码分析基于
kubernetes v1.12.0版本。
本文主要分析kubelet中syncLoopIteration部分。syncLoopIteration通过几种channel来对不同类型的事件进行监听并做增删改查的处理。
1. syncLoop
syncLoop是处理变更的循环。 它监听来自三种channel(file,apiserver和http)的更改。 对于看到的任何新更改,将针对所需状态和运行状态运行同步。 如果没有看到配置的变化,将在每个同步频率秒同步最后已知的所需状态。
此部分代码位于pkg/kubelet/kubelet.go
// syncLoop is the main loop for processing changes. It watches for changes from
// three channels (file, apiserver, and http) and creates a union of them. For
// any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync-frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
glog.Info("Starting kubelet main sync loop.")
// The resyncTicker wakes up kubelet to checks if there are any pod workers
// that need to be sync'd. A one-second period is sufficient because the
// sync interval is defaulted to 10s.
syncTicker := time.NewTicker(time.Second)
defer syncTicker.Stop()
housekeepingTicker := time.NewTicker(housekeepingPeriod)
defer housekeepingTicker.Stop()
plegCh := kl.pleg.Watch()
const (
base = 100 * time.Millisecond
max = 5 * time.Second
factor = 2
)
duration := base
for {
if rs := kl.runtimeState.runtimeErrors(); len(rs) != 0 {
glog.Infof("skipping pod synchronization - %v", rs)
// exponential backoff
time.Sleep(duration)
duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
continue
}
// reset backoff if we have a success
duration = base
kl.syncLoopMonitor.Store(kl.clock.Now())
if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
break
}
kl.syncLoopMonitor.Store(kl.clock.Now())
}
}其中调用了syncLoopIteration的函数来执行更具体的监控pod变化的循环。
syncLoopIteration主要通过几种channel来对不同类型的事件进行监听并处理。其中包括:configCh、plegCh、syncCh、houseKeepingCh、livenessManager.Updates()。
syncLoopIteration实际执行了pod的操作,此部分设置了几种不同的channel:
configCh:将配置更改的pod分派给事件类型的相应处理程序回调。plegCh:更新runtime缓存,同步pod。syncCh:同步所有等待同步的pod。houseKeepingCh:触发清理pod。livenessManager.Updates():对失败的pod或者liveness检查失败的pod进行sync操作。
syncLoopIteration部分代码位于pkg/kubelet/kubelet.go
2.1. configCh
configCh将配置更改的pod分派给事件类型的相应处理程序回调,该部分主要通过SyncHandler对pod的不同事件进行增删改查等操作。
可以看出syncLoopIteration根据podUpdate的值来执行不同的pod操作,具体如下:
ADD:HandlePodAdditionsUPDATE:HandlePodUpdatesREMOVE:HandlePodRemovesRECONCILE:HandlePodReconcileDELETE:HandlePodUpdatesRESTORE:HandlePodAdditionspodsToSync:HandlePodSyncs
其中执行pod的handler操作的是SyncHandler,该类型是一个接口,实现体为kubelet本身,具体见后续分析。
2.2. plegCh
plegCh:更新runtime缓存,同步pod。此处调用了HandlePodSyncs的函数。
2.3. syncCh
syncCh:同步所有等待同步的pod。此处调用了HandlePodSyncs的函数。
2.4. livenessManager.Update
livenessManager.Updates():对失败的pod或者liveness检查失败的pod进行sync操作。此处调用了HandlePodSyncs的函数。
2.5. housekeepingCh
houseKeepingCh:触发清理pod。此处调用了HandlePodCleanups的函数。
3. SyncHandler
SyncHandler是一个定义Pod的不同Handler的接口,具体是实现者是kubelet,该接口的方法主要在syncLoopIteration中调用,接口定义如下:
SyncHandler部分代码位于pkg/kubelet/kubelet.go
3.1. HandlePodAdditions
HandlePodAdditions先根据pod创建时间对pod进行排序,然后遍历pod列表,来执行pod的相关操作。
将pod添加到pod manager中。
如果是mirror pod,则对mirror pod进行处理。
如果当前pod的状态不是Terminated状态,则判断是否接受该pod,如果不接受则将pod状态改为Failed。
执行dispatchWork函数,该函数是syncHandler中调用到的核心函数,该函数在pod worker中启动一个异步循环,来分派pod的相关操作。该函数的具体操作待后续分析。
最后加pod添加到probe manager中。
3.2. HandlePodUpdates
HandlePodUpdates同样遍历pod列表,执行相应的操作。
将pod更新到pod manager中。
如果是mirror pod,则对mirror pod进行处理。
执行dispatchWork函数。
3.3. HandlePodRemoves
HandlePodRemoves遍历pod列表。
从pod manager中删除pod。
如果是mirror pod,则对mirror pod进行处理。
调用kubelet的deletePod函数来删除pod。
deletePod 函数将需要删除的pod加入podKillingCh的channel中,有podKiller监听这个channel去执行删除任务,实现如下:
从probe manager中移除pod。
3.4. HandlePodReconcile
遍历pod列表。
将pod更新到pod manager中。
必要时调整pod的Ready状态,执行dispatchWork函数。
如果pod被设定为需要被驱逐的,则删除pod中的容器。
3.5. HandlePodSyncs
HandlePodSyncs是syncHandler接口回调函数,调用dispatchWork,通过pod worker来执行任务。
3.6. HandlePodCleanups
HandlePodCleanups主要用来执行pod的清理任务,其中包括terminating的pod,orphaned的pod等。
首先查看pod使用到的cgroup。
列出所有pod包括mirror pod。
pod worker停止不再存在的pod的任务,并从probe manager中清除pod。
将需要杀死的pod加入到podKillingCh的channel中,podKiller的任务会监听该channel并获取需要杀死的pod列表来执行杀死pod的操作。
当pod不再被绑定到该节点,移除podStatus,其中removeOrphanedPodStatuses最后调用的函数是statusManager的RemoveOrphanedStatuses方法。
移除所有的orphaned volume。
移除mirror pod。
删除不再运行的pod的cgroup。
执行垃圾回收(GC)操作。
4. dispatchWork
dispatchWork通过pod worker启动一个异步的循环。
完整代码如下:
以下分段进行分析:
如果pod的状态是处于Terminated状态,则执行statusManager的TerminatePod操作。
执行pod worker的UpdatePod函数,该函数是pod worker的核心函数,来执行pod相关操作。具体逻辑待下文分析。
当创建类型是SyncPodCreate(即创建pod的时候),统计新pod中容器的数目。
PodWorkers是一个接口类型:
其中UpdatePod是一个核心方法,通过podUpdates的channel来传递需要处理的pod信息,对于新创建的pod每个pod都会由一个goroutine来执行managePodLoop。
此部分代码位于pkg/kubelet/pod_workers.go
managePodLoop通过读取podUpdateschannel的信息,执行syncPodFn函数,而syncPodFn函数在newPodWorkers的时候赋值了,即kubelet.syncPod。kubelet.syncPod具体代码逻辑待后续文章单独分析。
newPodWorkers函数参考:
managePodLoop函数参考:
此部分代码位于pkg/kubelet/pod_workers.go
7. 总结
syncLoopIteration基本流程如下:
通过几种
channel来对不同类型的事件进行监听并处理。其中channel包括:configCh、plegCh、syncCh、houseKeepingCh、livenessManager.Updates()。不同的SyncHandler执行不同的增删改查操作。
其中
HandlePodAdditions、HandlePodUpdates、HandlePodReconcile、HandlePodSyncs都调用到了dispatchWork来执行pod的相关操作。HandlePodCleanups的pod清理任务,通过channel的方式加需要清理的pod给podKiller来清理。dispatchWork调用podWorkers.UpdatePod执行异步操作。podWorkers.UpdatePod中调用managePodLoop来执行pod相关操作循环。
channel类型及作用:
configCh:将配置更改的pod分派给事件类型的相应处理程序回调。plegCh:更新runtime缓存,同步pod。syncCh:同步所有等待同步的pod。houseKeepingCh:触发清理pod。livenessManager.Updates():对失败的pod或者liveness检查失败的pod进行sync操作。
参考:
最后更新于
这有帮助吗?