// initializeModules will initialize internal modules that do not require the container runtime to be up.// Note that the modules here must not depend on modules that are not initialized here.func (kl *Kubelet) initializeModules() error {// Prometheus metrics. metrics.Register(kl.runtimeCache, collectors.NewVolumeStatsCollector(kl))// Setup filesystem directories.if err := kl.setupDataDirs(); err !=nil {return err }// If the container logs directory does not exist, create it.if _, err := os.Stat(ContainerLogsDir); err !=nil {if err := kl.os.MkdirAll(ContainerLogsDir, 0755); err !=nil { glog.Errorf("Failed to create directory %q: %v", ContainerLogsDir, err) } }// Start the image manager. kl.imageManager.Start()// Start the certificate manager if it was enabled.if kl.serverCertificateManager !=nil { kl.serverCertificateManager.Start() }// Start out of memory watcher.if err := kl.oomWatcher.Start(kl.nodeRef); err !=nil {return fmt.Errorf("Failed to start OOM watcher %v", err) }// Start resource analyzer kl.resourceAnalyzer.Start()returnnil}
3.1. setupDataDirs
initializeModules先创建相关目录。
具体目录如下:
ContainerLogsDir:目录为/var/log/containers。
rootDirectory:由参数传入,一般为/var/lib/kubelet。
PodsDir:目录为{rootDirectory}/pods。
PluginsDir:目录为{rootDirectory}/plugins。
initializeModules中setupDataDirs的相关代码如下:
// Setup filesystem directories.if err := kl.setupDataDirs(); err !=nil {return err}// If the container logs directory does not exist, create it.if _, err := os.Stat(ContainerLogsDir); err !=nil {if err := kl.os.MkdirAll(ContainerLogsDir, 0755); err !=nil { glog.Errorf("Failed to create directory %q: %v", ContainerLogsDir, err) }}
// Start the image manager.kl.imageManager.Start()// Start the certificate manager if it was enabled.if kl.serverCertificateManager !=nil { kl.serverCertificateManager.Start()}// Start out of memory watcher.if err := kl.oomWatcher.Start(kl.nodeRef); err !=nil {return fmt.Errorf("Failed to start OOM watcher %v", err)}// Start resource analyzerkl.resourceAnalyzer.Start()
if kl.kubeClient !=nil {// Start syncing node status immediately, this may set up things the runtime needs to run.go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)go kl.fastStatusUpdateOnce()// start syncing leaseif utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {go kl.nodeLeaseController.Run(wait.NeverStop) }}
// Start a goroutine responsible for killing pods (that are not properly// handled by pod workers).go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)
podKiller代码如下:
此部分代码位于pkg/kubelet/kubelet_pods.go
// podKiller launches a goroutine to kill a pod received from the channel if// another goroutine isn't already in action.func (kl *Kubelet) podKiller() { killing := sets.NewString()// guard for the killing set lock :=sync.Mutex{}for podPair :=range kl.podKillingCh { runningPod := podPair.RunningPod apiPod := podPair.APIPod lock.Lock() exists := killing.Has(string(runningPod.ID))if!exists { killing.Insert(string(runningPod.ID)) } lock.Unlock()if!exists {gofunc(apiPod *v1.Pod, runningPod *kubecontainer.Pod) { glog.V(2).Infof("Killing unwanted pod %q", runningPod.Name) err := kl.killPod(apiPod, runningPod, nil, nil)if err !=nil { glog.Errorf("Failed killing the pod %q: %v", runningPod.Name, err) } lock.Lock() killing.Delete(string(runningPod.ID)) lock.Unlock() }(apiPod, runningPod) } }}
func (m *manager) Start() {// Don't start the status manager if we don't have a client. This will happen// on the master, where the kubelet is responsible for bootstrapping the pods// of the master components.if m.kubeClient ==nil { glog.Infof("Kubernetes client is nil, not starting status manager.")return } glog.Info("Starting to sync pod status with apiserver") syncTicker := time.Tick(syncPeriod)// syncPod and syncBatch share the same go routine to avoid sync races.go wait.Forever(func() {select {case syncRequest :=<-m.podStatusChannel: glog.V(5).Infof("Status Manager: syncing pod: %q, with status: (%d, %v) from podStatusChannel", syncRequest.podUID, syncRequest.status.version, syncRequest.status.status) m.syncPod(syncRequest.podUID, syncRequest.status)case<-syncTicker: m.syncBatch() } }, 0)}
4.7. probeManager
处理容器探针
kl.probeManager.Start()
4.8. runtimeClassManager
// Start syncing RuntimeClasses if enabled.if kl.runtimeClassManager !=nil {go kl.runtimeClassManager.Run(wait.NeverStop)}
4.9. PodLifecycleEventGenerator
// Start the pod lifecycle event generator.kl.pleg.Start()
PodLifecycleEventGenerator是一个pod生命周期时间生成器接口,具体如下:
// PodLifecycleEventGenerator contains functions for generating pod life cycle events.typePodLifecycleEventGeneratorinterface {Start()Watch() chan*PodLifecycleEventHealthy() (bool, error)}
start方法具体实现如下:
// Start spawns a goroutine to relist periodically.func (g *GenericPLEG) Start() {go wait.Until(g.relist, g.relistPeriod, wait.NeverStop)}
// syncLoop is the main loop for processing changes. It watches for changes from// three channels (file, apiserver, and http) and creates a union of them. For// any new change seen, will run a sync against desired state and running state. If// no changes are seen to the configuration, will synchronize the last known desired// state every sync-frequency seconds. Never returns.func (kl *Kubelet) syncLoop(updates <-chankubetypes.PodUpdate, handler SyncHandler) { glog.Info("Starting kubelet main sync loop.")// The resyncTicker wakes up kubelet to checks if there are any pod workers// that need to be sync'd. A one-second period is sufficient because the// sync interval is defaulted to 10s. syncTicker := time.NewTicker(time.Second)defer syncTicker.Stop() housekeepingTicker := time.NewTicker(housekeepingPeriod)defer housekeepingTicker.Stop() plegCh := kl.pleg.Watch()const ( base =100* time.Millisecond max =5* time.Second factor =2 ) duration := basefor {if rs := kl.runtimeState.runtimeErrors(); len(rs) !=0 { glog.Infof("skipping pod synchronization - %v", rs)// exponential backoff time.Sleep(duration) duration = time.Duration(math.Min(float64(max), factor*float64(duration)))continue }// reset backoff if we have a success duration = base kl.syncLoopMonitor.Store(kl.clock.Now())if!kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {break } kl.syncLoopMonitor.Store(kl.clock.Now()) }}