NewKubeletCommand
以下代ç 分æžåŸºäºŽkubernetes v1.12.0
版本。本文主è¦åˆ†æž https://github.com/kubernetes/kubernetes/tree/v1.12.0/cmd/kubelet 部分的代ç 。
本文主è¦åˆ†æž
kubernetes/cmd/kubelet
éƒ¨åˆ†ï¼Œè¯¥éƒ¨åˆ†ä¸»è¦æ¶‰åŠkubelet
çš„å‚æ•°è§£æžï¼ŒåŠåˆå§‹åŒ–å’Œæž„é€ ç›¸å…³çš„ä¾èµ–组件(主è¦åœ¨kubeDeps
结构体ä¸ï¼‰ ,并没有kubelet
è¿è¡Œçš„详细逻辑,该部分ä½äºŽkubernetes/pkg/kubelet
模å—,待åŽç»æ–‡ç« 分æžã€‚kubelet
的cmd
代ç 目录结构如下:kubelet
├── app
│ ├── auth.go
│ ├── init_others.go
│ ├── init_windows.go
│ ├── options # 包括kubelet使用到的option
│ │ ├── container_runtime.go
│ │ ├── globalflags.go
│ │ ├── globalflags_linux.go
│ │ ├── globalflags_other.go
│ │ ├── options.go # 包括KubeletFlagsã€AddFlags ã€AddKubeletConfigFlagsç‰
│ │ ├── osflags_others.go
│ │ └── osflags_windows.go
│ ├── plugins.go
│ ├── server.go # 包括NewKubeletCommandã€Runã€RunKubeletã€CreateAndInitKubeletã€startKubeletç‰
│ ├── server_linux.go
│ └── server_unsupported.go
└── kubelet.go # kubeletçš„mainå…¥å£å‡½æ•°
kubelet
的入å£å‡½æ•° Main
函数,具体代ç å‚考:https://github.com/kubernetes/kubernetes/blob/v1.12.0/cmd/kubelet/kubelet.go。func main() {
rand.Seed(time.Now().UTC().UnixNano())
​
command := app.NewKubeletCommand(server.SetupSignalHandler())
logs.InitLogs()
defer logs.FlushLogs()
​
if err := command.Execute(); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}
// åˆå§‹åŒ–命令行
command := app.NewKubeletCommand(server.SetupSignalHandler())
// 执行Execute
err := command.Execute()
NewKubeletCommand
åŸºäºŽå‚æ•°åˆ›å»ºäº†ä¸€ä¸ª*cobra.Command
å¯¹è±¡ã€‚å…¶ä¸æ ¸å¿ƒéƒ¨åˆ†ä»£ç ä¸ºå‚æ•°è§£æžéƒ¨åˆ†å’ŒRun
函数。// NewKubeletCommand creates a *cobra.Command object with default parameters
func NewKubeletCommand(stopCh <-chan struct{}) *cobra.Command {
...
cmd := &cobra.Command{
Use: componentKubelet,
Long: `...`,
// The Kubelet has special flag parsing requirements to enforce flag precedence rules,
// so we do all our parsing manually in Run, below.
// DisableFlagParsing=true provides the full set of flags passed to the kubelet in the
// `args` arg to Run, without Cobra's interference.
DisableFlagParsing: true,
Run: func(cmd *cobra.Command, args []string) {
...
// run the kubelet
glog.V(5).Infof("KubeletConfiguration: %#v", kubeletServer.KubeletConfiguration)
if err := Run(kubeletServer, kubeletDeps, stopCh); err != nil {
glog.Fatal(err)
}
},
}
...
return cmd
}
kubeletå¼€å¯äº†
DisableFlagParsing
傿•°ï¼Œæ²¡æœ‰ä½¿ç”¨Cobra
框架ä¸çš„é»˜è®¤å‚æ•°è§£æžï¼Œè€Œæ˜¯è‡ªå®šä¹‰å‚æ•°è§£æžã€‚åˆå§‹åŒ–傿•°è§£æžï¼Œåˆå§‹åŒ–
cleanFlagSet
,kubeletFlags
,kubeletConfig
。cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError)
cleanFlagSet.SetNormalizeFunc(flag.WordSepNormalizeFunc)
kubeletFlags := options.NewKubeletFlags()
kubeletConfig, err := options.NewKubeletConfiguration()
å¦‚æžœè¾“å…¥éžæ³•傿•°åˆ™æ‰“å°ä½¿ç”¨å¸®åŠ©ä¿¡æ¯ã€‚
// initial flag parse, since we disable cobra's flag parsing
if err := cleanFlagSet.Parse(args); err != nil {
cmd.Usage()
glog.Fatal(err)
}
​
// check if there are non-flag arguments in the command line
cmds := cleanFlagSet.Args()
if len(cmds) > 0 {
cmd.Usage()
glog.Fatalf("unknown command: %s", cmds[0])
}
é‡åˆ°
help
å’Œversion
傿•°åˆ™æ‰“å°ç›¸å…³å†…容并退出。// short-circuit on help
help, err := cleanFlagSet.GetBool("help")
if err != nil {
glog.Fatal(`"help" flag is non-bool, programmer error, please correct`)
}
if help {
cmd.Help()
return
}
​
// short-circuit on verflag
verflag.PrintAndExitIfRequested()
utilflag.PrintFlags(cleanFlagSet)
åŠ è½½å¹¶æ ¡éªŒ
kubelet config
。其ä¸åŒ…æ‹¬æ ¡éªŒåˆå§‹åŒ–çš„kubeletFlags
,并从kubeletFlags
的KubeletConfigFile
傿•°èŽ·å–kubelet config
的内容。// set feature gates from initial flags-based config
if err := utilfeature.DefaultFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
glog.Fatal(err)
}
​
// validate the initial KubeletFlags
if err := options.ValidateKubeletFlags(kubeletFlags); err != nil {
glog.Fatal(err)
}
​
if kubeletFlags.ContainerRuntime == "remote" && cleanFlagSet.Changed("pod-infra-container-image") {
glog.Warning("Warning: For remote container runtime, --pod-infra-container-image is ignored in kubelet, which should be set in that remote runtime instead")
}
​
// load kubelet config file, if provided
if configFile := kubeletFlags.KubeletConfigFile; len(configFile) > 0 {
kubeletConfig, err = loadConfigFile(configFile)
if err != nil {
glog.Fatal(err)
}
// We must enforce flag precedence by re-parsing the command line into the new object.
// This is necessary to preserve backwards-compatibility across binary upgrades.
// See issue #56171 for more details.
if err := kubeletConfigFlagPrecedence(kubeletConfig, args); err != nil {
glog.Fatal(err)
}
// update feature gates based on new config
if err := utilfeature.DefaultFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
glog.Fatal(err)
}
}
​
// We always validate the local configuration (command line + config file).
// This is the default "last-known-good" config for dynamic config, and must always remain valid.
if err := kubeletconfigvalidation.ValidateKubeletConfiguration(kubeletConfig); err != nil {
glog.Fatal(err)
}
如果开å¯ä½¿ç”¨åЍæ€kubeletçš„é…置,则由动æ€é…置文件替æ¢kubeleté…置文件。
// use dynamic kubelet config, if enabled
var kubeletConfigController *dynamickubeletconfig.Controller
if dynamicConfigDir := kubeletFlags.DynamicConfigDir.Value(); len(dynamicConfigDir) > 0 {
var dynamicKubeletConfig *kubeletconfiginternal.KubeletConfiguration
dynamicKubeletConfig, kubeletConfigController, err = BootstrapKubeletConfigController(dynamicConfigDir,
func(kc *kubeletconfiginternal.KubeletConfiguration) error {
// Here, we enforce flag precedence inside the controller, prior to the controller's validation sequence,
// so that we get a complete validation at the same point where we can decide to reject dynamic config.
// This fixes the flag-precedence component of issue #63305.
// See issue #56171 for general details on flag precedence.
return kubeletConfigFlagPrecedence(kc, args)
})
if err != nil {
glog.Fatal(err)
}
// If we should just use our existing, local config, the controller will return a nil config
if dynamicKubeletConfig != nil {
kubeletConfig = dynamicKubeletConfig
// Note: flag precedence was already enforced in the controller, prior to validation,
// by our above transform function. Now we simply update feature gates from the new config.
if err := utilfeature.DefaultFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
glog.Fatal(err)
}
}
}
总结:以上通过对å„ç§ç‰¹å®šå‚数的解æžï¼Œæœ€ç»ˆç”Ÿæˆ
kubeletFlags
å’ŒkubeletConfig
两个é‡è¦çš„傿•°å¯¹è±¡ï¼Œç”¨æ¥æž„é€ kubeletServer
和其他需求。// construct a KubeletServer from kubeletFlags and kubeletConfig
kubeletServer := &options.KubeletServer{
KubeletFlags: *kubeletFlags,
KubeletConfiguration: *kubeletConfig,
}
// use kubeletServer to construct the default KubeletDeps
kubeletDeps, err := UnsecuredDependencies(kubeletServer)
if err != nil {
glog.Fatal(err)
}
​
// add the kubelet config controller to kubeletDeps
kubeletDeps.KubeletConfigController = kubeletConfigController
如果开å¯äº†docker shim傿•°ï¼Œåˆ™æ‰§è¡Œ
RunDockershim
。// start the experimental docker shim, if enabled
if kubeletServer.KubeletFlags.ExperimentalDockershim {
if err := RunDockershim(&kubeletServer.KubeletFlags, kubeletConfig, stopCh); err != nil {
glog.Fatal(err)
}
return
}
// keep cleanFlagSet separate, so Cobra doesn't pollute it with the global flags
kubeletFlags.AddFlags(cleanFlagSet)
options.AddKubeletConfigFlags(cleanFlagSet, kubeletConfig)
options.AddGlobalFlags(cleanFlagSet)
cleanFlagSet.BoolP("help", "h", false, fmt.Sprintf("help for %s", cmd.Name()))
​
// ugly, but necessary, because Cobra's default UsageFunc and HelpFunc pollute the flagset with global flags
const usageFmt = "Usage:\n %s\n\nFlags:\n%s"
cmd.SetUsageFunc(func(cmd *cobra.Command) error {
fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine(), cleanFlagSet.FlagUsagesWrapped(2))
return nil
})
cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {
fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine(), cleanFlagSet.FlagUsagesWrapped(2))
})
å…¶ä¸ï¼š
è¿è¡Œkubelet并且ä¸é€€å‡ºã€‚ç”±Run函数进入åŽç»çš„æ“ä½œã€‚
// run the kubelet
glog.V(5).Infof("KubeletConfiguration: %#v", kubeletServer.KubeletConfiguration)
if err := Run(kubeletServer, kubeletDeps, stopCh); err != nil {
glog.Fatal(err)
}
// Run runs the specified KubeletServer with the given Dependencies. This should never exit.
// The kubeDeps argument may be nil - if so, it is initialized from the settings on KubeletServer.
// Otherwise, the caller is assumed to have set up the Dependencies object and a default one will
// not be generated.
func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) error {
// To help debugging, immediately log version
glog.Infof("Version: %+v", version.Get())
if err := initForOS(s.KubeletFlags.WindowsService); err != nil {
return fmt.Errorf("failed OS init: %v", err)
}
if err := run(s, kubeDeps, stopCh); err != nil {
return fmt.Errorf("failed to run Kubelet: %v", err)
}
return nil
}
当è¿è¡ŒçŽ¯å¢ƒæ˜¯Windows的时候,åˆå§‹åŒ–æ“作,但是该æ“ä½œä¸ºç©ºï¼Œåªæ˜¯é¢„留。具体执行
run(s, kubeDeps, stopCh)
函数。创建
clientConfig
,该对象用æ¥åˆ›å»ºå„ç§çš„kubeDeps
属性ä¸åŒ…å«çš„client
。clientConfig, err := createAPIServerClientConfig(s)
if err != nil {
return fmt.Errorf("invalid kubeconfig: %v", err)
}
kubeClient, err = clientset.NewForConfig(clientConfig)
if err != nil {
glog.Warningf("New kubeClient from clientConfig error: %v", err)
} else if kubeClient.CertificatesV1beta1() != nil && clientCertificateManager != nil {
glog.V(2).Info("Starting client certificate rotation.")
clientCertificateManager.SetCertificateSigningRequestClient(kubeClient.CertificatesV1beta1().CertificateSigningRequests())
clientCertificateManager.Start()
}
dynamicKubeClient, err = dynamic.NewForConfig(clientConfig)
if err != nil {
glog.Warningf("Failed to initialize dynamic KubeClient: %v", err)
}
// make a separate client for events
eventClientConfig := *clientConfig
eventClientConfig.QPS = float32(s.EventRecordQPS)
eventClientConfig.Burst = int(s.EventBurst)
eventClient, err = v1core.NewForConfig(&eventClientConfig)
if err != nil {
glog.Warningf("Failed to create API Server client for Events: %v", err)
}
// make a separate client for heartbeat with throttling disabled and a timeout attached
heartbeatClientConfig := *clientConfig
heartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration
// if the NodeLease feature is enabled, the timeout is the minimum of the lease duration and status update frequency
if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
leaseTimeout := time.Duration(s.KubeletConfiguration.NodeLeaseDurationSeconds) * time.Second
if heartbeatClientConfig.Timeout > leaseTimeout {
heartbeatClientConfig.Timeout = leaseTimeout
}
}
heartbeatClientConfig.QPS = float32(-1)
heartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig)
if err != nil {
glog.Warningf("Failed to create API Server client for heartbeat: %v", err)
}
// csiClient works with CRDs that support json only
clientConfig.ContentType = "application/json"
csiClient, err := csiclientset.NewForConfig(clientConfig)
if err != nil {
glog.Warningf("Failed to create CSI API client: %v", err)
}
client赋值
kubeDeps.KubeClient = kubeClient
kubeDeps.DynamicKubeClient = dynamicKubeClient
if heartbeatClient != nil {
kubeDeps.HeartbeatClient = heartbeatClient
kubeDeps.OnHeartbeatFailure = closeAllConns
}
if eventClient != nil {
kubeDeps.EventClient = eventClient
}
kubeDeps.CSIClient = csiClient
if kubeDeps.CAdvisorInterface == nil {
imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.ContainerRuntime, s.RemoteRuntimeEndpoint)
kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cadvisor.UsingLegacyCadvisorStats(s.ContainerRuntime, s.RemoteRuntimeEndpoint))
if err != nil {
return err
}
}
if kubeDeps.ContainerManager == nil {
if s.CgroupsPerQOS && s.CgroupRoot == "" {
glog.Infof("--cgroups-per-qos enabled, but --cgroup-root was not specified. defaulting to /")
s.CgroupRoot = "/"
}
kubeReserved, err := parseResourceList(s.KubeReserved)
if err != nil {
return err
}
systemReserved, err := parseResourceList(s.SystemReserved)
if err != nil {
return err
}
var hardEvictionThresholds []evictionapi.Threshold
// If the user requested to ignore eviction thresholds, then do not set valid values for hardEvictionThresholds here.
if !s.ExperimentalNodeAllocatableIgnoreEvictionThreshold {
hardEvictionThresholds, err = eviction.ParseThresholdConfig([]string{}, s.EvictionHard, nil, nil, nil)
if err != nil {
return err
}
}
experimentalQOSReserved, err := cm.ParseQOSReserved(s.QOSReserved)
if err != nil {
return err
}
​
devicePluginEnabled := utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins)
​
kubeDeps.ContainerManager, err = cm.NewContainerManager(
kubeDeps.Mounter,
kubeDeps.CAdvisorInterface,
cm.NodeConfig{
RuntimeCgroupsName: s.RuntimeCgroups,
SystemCgroupsName: s.SystemCgroups,
KubeletCgroupsName: s.KubeletCgroups,
ContainerRuntime: s.ContainerRuntime,
CgroupsPerQOS: s.CgroupsPerQOS,
CgroupRoot: s.CgroupRoot,
CgroupDriver: s.CgroupDriver,
KubeletRootDir: s.RootDirectory,
ProtectKernelDefaults: s.ProtectKernelDefaults,
NodeAllocatableConfig: cm.NodeAllocatableConfig{
KubeReservedCgroupName: s.KubeReservedCgroup,
SystemReservedCgroupName: s.SystemReservedCgroup,
EnforceNodeAllocatable: sets.NewString(s.EnforceNodeAllocatable...),
KubeReserved: kubeReserved,
SystemReserved: systemReserved,
HardEvictionThresholds: hardEvictionThresholds,
},
QOSReserved: *experimentalQOSReserved,
ExperimentalCPUManagerPolicy: s.CPUManagerPolicy,
ExperimentalCPUManagerReconcilePeriod: s.CPUManagerReconcilePeriod.Duration,
ExperimentalPodPidsLimit: s.PodPidsLimit,
EnforceCPULimits: s.CPUCFSQuota,
CPUCFSQuotaPeriod: s.CPUCFSQuotaPeriod.Duration,
},
s.FailSwapOn,
devicePluginEnabled,
kubeDeps.Recorder)
​
if err != nil {
return err
}
}
// TODO(vmarmol): Do this through container config.
oomAdjuster := kubeDeps.OOMAdjuster
if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
glog.Warning(err)
}
if s.HealthzPort > 0 {
healthz.DefaultHealthz()
go wait.Until(func() {
err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), nil)
if err != nil {
glog.Errorf("Starting health server failed: %v", err)
}
}, 5*time.Second, wait.NeverStop)
}
通过å„ç§èµ‹å€¼æž„é€ äº†å®Œæ•´çš„
kubeDeps
结构体,最åŽå†æ‰§è¡ŒRunKubelet
转入åŽç»çš„kubelet执行æµç¨‹ã€‚if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
return err
}
// RunKubelet is responsible for setting up and running a kubelet. It is used in three different applications:
// 1 Integration tests
// 2 Kubelet binary
// 3 Standalone 'kubernetes' binary
// Eventually, #2 will be replaced with instances of #3
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
...
k, err := CreateAndInitKubelet(&kubeServer.KubeletConfiguration,
...
kubeServer.NodeStatusMaxImages)
if err != nil {
return fmt.Errorf("failed to create kubelet: %v", err)
}
​
// NewMainKubelet should have set up a pod source config if one didn't exist
// when the builder was run. This is just a precaution.
if kubeDeps.PodConfig == nil {
return fmt.Errorf("failed to create kubelet, pod source config was nil")
}
podCfg := kubeDeps.PodConfig
​
rlimit.RlimitNumFiles(uint64(kubeServer.MaxOpenFiles))
​
// process pods and exit.
if runOnce {
if _, err := k.RunOnce(podCfg.Updates()); err != nil {
return fmt.Errorf("runonce failed: %v", err)
}
glog.Infof("Started kubelet as runonce")
} else {
startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
glog.Infof("Started kubelet")
}
return nil
}
RunKubelet
å‡½æ•°æ ¸å¿ƒä»£ç 为执行了CreateAndInitKubelet
å’ŒstartKubelet
两个函数的æ“作,以下对这两个函数进行分æžã€‚é€šè¿‡ä¼ å…¥
kubeDeps
调用CreateAndInitKubelet
åˆå§‹åŒ–Kubelet。k, err := CreateAndInitKubelet(&kubeServer.KubeletConfiguration,
kubeDeps,
&kubeServer.ContainerRuntimeOptions,
kubeServer.ContainerRuntime,
kubeServer.RuntimeCgroups,
kubeServer.HostnameOverride,
kubeServer.NodeIP,
kubeServer.ProviderID,
kubeServer.CloudProvider,
kubeServer.CertDirectory,
kubeServer.RootDirectory,
kubeServer.RegisterNode,
kubeServer.RegisterWithTaints,
kubeServer.AllowedUnsafeSysctls,
kubeServer.RemoteRuntimeEndpoint,
kubeServer.RemoteImageEndpoint,
kubeServer.ExperimentalMounterPath,
kubeServer.ExperimentalKernelMemcgNotification,
kubeServer.ExperimentalCheckNodeCapabilitiesBeforeMount,
kubeServer.ExperimentalNodeAllocatableIgnoreEvictionThreshold,
kubeServer.MinimumGCAge,
kubeServer.MaxPerPodContainerCount,
kubeServer.MaxContainerCount,
kubeServer.MasterServiceNamespace,
kubeServer.RegisterSchedulable,
kubeServer.NonMasqueradeCIDR,
kubeServer.KeepTerminatedPodVolumes,
kubeServer.NodeLabels,
kubeServer.SeccompProfileRoot,
kubeServer.BootstrapCheckpointPath,
kubeServer.NodeStatusMaxImages)
if err != nil {
return fmt.Errorf("failed to create kubelet: %v", err)
}
CreateAndInitKubelet
æ–¹æ³•ä¸æ‰§è¡Œçš„æ ¸å¿ƒå‡½æ•°æ˜¯NewMainKubelet
,NewMainKubelet
实例化一个kubelet
对象,该部分的具体代ç 在kubernetes/pkg/kubelet
ä¸ï¼Œå…·ä½“å‚考:kubernetes/pkg/kubelet/kubelet.go#L325。func CreateAndInitKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
...
nodeStatusMaxImages int32) (k kubelet.Bootstrap, err error) {
// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
// up into "per source" synchronizations
​
k, err = kubelet.NewMainKubelet(kubeCfg,
kubeDeps,
crOptions,
containerRuntime,
runtimeCgroups,
hostnameOverride,
nodeIP,
providerID,
cloudProvider,
certDirectory,
rootDirectory,
registerNode,
registerWithTaints,
allowedUnsafeSysctls,
remoteRuntimeEndpoint,
remoteImageEndpoint,
experimentalMounterPath,
experimentalKernelMemcgNotification,
experimentalCheckNodeCapabilitiesBeforeMount,
experimentalNodeAllocatableIgnoreEvictionThreshold,
minimumGCAge,
maxPerPodContainerCount,
maxContainerCount,
masterServiceNamespace,
registerSchedulable,
nonMasqueradeCIDR,
keepTerminatedPodVolumes,
nodeLabels,
seccompProfileRoot,
bootstrapCheckpointPath,
nodeStatusMaxImages)
if err != nil {
return nil, err
}
​
k.BirthCry()
​
k.StartGarbageCollection()
​
return k, nil
}
if kubeDeps.PodConfig == nil {
var err error
kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, bootstrapCheckpointPath)
if err != nil {
return nil, err
}
}
NewMainKubelet-->PodConfig-->NewPodConfig-->kubetypes.PodUpdate
。会生æˆä¸€ä¸ªpodUpdate
çš„channelæ¥ç›‘å¬podçš„å˜åŒ–,该channel会在k.Run(podCfg.Updates())
ä¸ä½œä¸ºå…³é”®å…¥å‚。// process pods and exit.
if runOnce {
if _, err := k.RunOnce(podCfg.Updates()); err != nil {
return fmt.Errorf("runonce failed: %v", err)
}
glog.Infof("Started kubelet as runonce")
} else {
startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
glog.Infof("Started kubelet")
}
如果设置了åªè¿è¡Œä¸€æ¬¡çš„傿•°ï¼Œåˆ™æ‰§è¡Œ
k.RunOnce
,å¦åˆ™æ‰§è¡Œæ ¸å¿ƒå‡½æ•°startKubelet
。具体实现如下:func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
// start the kubelet
go wait.Until(func() {
k.Run(podCfg.Updates())
}, 0, wait.NeverStop)
​
// start the kubelet server
if enableServer {
go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)
​
}
if kubeCfg.ReadOnlyPort > 0 {
go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
}
}
// start the kubelet
go wait.Until(func() {
k.Run(podCfg.Updates())
}, 0, wait.NeverStop)
通过长驻进程的方å¼è¿è¡Œ
k.Run
,ä¸é€€å‡ºï¼Œå°†kubeletçš„è¿è¡Œé€»è¾‘引入kubernetes/pkg/kubelet/kubelet.go部分,kubernetes/pkg/kubelet
部分的è¿è¡Œé€»è¾‘å¾…åŽç»æ–‡ç« 分æžã€‚- 2.
kubernetes/cmd/kubelet
部分主è¦å¯¹è¿è¡Œå‚数进行定义和解æžï¼Œåˆå§‹åŒ–å’Œæž„é€ ç›¸å…³çš„ä¾èµ–组件(主è¦åœ¨kubeDeps
结构体ä¸ï¼‰ï¼Œå¹¶æ²¡æœ‰kubeletè¿è¡Œçš„详细逻辑,该部分ä½äºŽkubernetes/pkg/kubelet
模å—。 - 3.cmd部分调用æµç¨‹å¦‚下:
Main-->NewKubeletCommand-->Run(kubeletServer, kubeletDeps, stopCh)-->run(s *options.KubeletServer, kubeDeps ..., stopCh ...)--> RunKubelet(s, kubeDeps, s.RunOnce)-->startKubelet-->k.Run(podCfg.Updates())-->pkg/kubelet
ã€‚åŒæ—¶RunKubelet(s, kubeDeps, s.RunOnce)-->CreateAndInitKubelet-->kubelet.NewMainKubelet-->pkg/kubelet
。
å‚è€ƒæ–‡ç« ï¼š
- https://github.com/kubernetes/kubernetes/tree/v1.12.0
最近更新 6mo ago