// makePodSourceConfig creates a config.PodConfig from the given// KubeletConfiguration or returns an error.func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, bootstrapCheckpointPath string) (*config.PodConfig, error) {
...// source of all configuration cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)// define file config sourceif kubeCfg.StaticPodPath !="" { glog.Infof("Adding pod path: %v", kubeCfg.StaticPodPath) config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
}// define url config sourceif kubeCfg.StaticPodURL !="" { glog.Infof("Adding pod url %q with HTTP header %v", kubeCfg.StaticPodURL, manifestURLHeader) config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
}// Restore from the checkpoint path// NOTE: This MUST happen before creating the apiserver source// below, or the checkpoint would override the source of truth....if kubeDeps.KubeClient !=nil { glog.Infof("Watching apiserver")if updatechannel ==nil { updatechannel = cfg.Channel(kubetypes.ApiserverSource) } config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, updatechannel) }return cfg, nil}
1.1.2. NewPodConfig
// NewPodConfig creates an object that can merge many configuration sources into a stream// of normalized updates to a pod configuration.funcNewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder) *PodConfig { updates :=make(chankubetypes.PodUpdate, 50) storage := newPodStorage(updates, mode, recorder) podConfig :=&PodConfig{ pods: storage, mux: config.NewMux(storage), updates: updates, sources: sets.String{}, }return podConfig}
1.1.3. NewSourceApiserver
// NewSourceApiserver creates a config source that watches and pulls from the apiserver.funcNewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<-interface{}) { lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
newSourceApiserverFromLW(lw, updates)}
// podManager is also responsible for keeping secretManager and configMapManager contents up-to-date.klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient), secretManager, configMapManager, checkpointManager)
if containerRuntime =="rkt" { glog.Fatalln("rktnetes has been deprecated in favor of rktlet. Please see https://github.com/kubernetes-incubator/rktlet for more information.")
}
当runtime是docker的时候,会执行docker相关操作。
switch containerRuntime {case kubetypes.DockerContainerRuntime:// Create and start the CRI shim running as a grpc server....// The unix socket for kubelet <-> dockershim communication....// Create dockerLegacyService when the logging driver is not supported....case kubetypes.RemoteContainerRuntime:// No-op.breakdefault:returnnil, fmt.Errorf("unsupported CRI runtime: %q", containerRuntime) }
1.4.1. NewDockerService
// Create and start the CRI shim running as a grpc server.streamingConfig := getStreamingConfig(kubeCfg, kubeDeps, crOptions)ds, err := dockershim.NewDockerService(kubeDeps.DockerClientConfig, crOptions.PodSandboxImage, streamingConfig, &pluginSettings, runtimeCgroups, kubeCfg.CgroupDriver, crOptions.DockershimRootDirectory, !crOptions.RedirectContainerStreaming)
if err !=nil {returnnil, err}if crOptions.RedirectContainerStreaming { klet.criHandler = ds}
1.4.2. NewDockerServer
// The unix socket for kubelet <-> dockershim communication.glog.V(5).Infof("RemoteRuntimeEndpoint: %q, RemoteImageEndpoint: %q", remoteRuntimeEndpoint, remoteImageEndpoint)glog.V(2).Infof("Starting the GRPC server for the docker CRI shim.")server := dockerremote.NewDockerServer(remoteRuntimeEndpoint, ds)if err := server.Start(); err !=nil {returnnil, err}
1.4.3. DockerServer.Start
// Start starts the dockershim grpc server.func (s *DockerServer) Start() error {// Start the internal service.if err := s.service.Start(); err !=nil { glog.Errorf("Unable to start docker service")return err } glog.V(2).Infof("Start dockershim grpc server") l, err := util.CreateListener(s.endpoint)if err !=nil {return fmt.Errorf("failed to listen on %q: %v", s.endpoint, err) }// Create the grpc server and register runtime and image services. s.server = grpc.NewServer( grpc.MaxRecvMsgSize(maxMsgSize), grpc.MaxSendMsgSize(maxMsgSize), ) runtimeapi.RegisterRuntimeServiceServer(s.server, s.service) runtimeapi.RegisterImageServiceServer(s.server, s.service)gofunc() {if err := s.server.Serve(l); err !=nil { glog.Fatalf("Failed to serve connections: %v", err) } }()returnnil}
// PodWorkers is an abstract interface for testability.typePodWorkersinterface {UpdatePod(options *UpdatePodOptions)ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty)ForgetWorker(uid types.UID)}