funcmain() { rand.Seed(time.Now().UTC().UnixNano()) command := app.NewSchedulerCommand()// TODO: once we switch everything over to Cobra commands, we can go back to calling// utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the// normalize func and add the go flag set by hand. pflag.CommandLine.SetNormalizeFunc(utilflag.WordSepNormalizeFunc) pflag.CommandLine.AddGoFlagSet(goflag.CommandLine)// utilflag.InitFlags() logs.InitLogs()defer logs.FlushLogs()if err := command.Execute(); err !=nil { fmt.Fprintf(os.Stderr, "%v\n", err) os.Exit(1) }}
// AddFlags adds flags for the scheduler options.func (o *Options) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&o.ConfigFile, "config", o.ConfigFile, "The path to the configuration file. Flags override values in this file.")
fs.StringVar(&o.WriteConfigTo, "write-config-to", o.WriteConfigTo, "If set, write the configuration values to this file and exit.")
fs.StringVar(&o.Master, "master", o.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
o.SecureServing.AddFlags(fs) o.CombinedInsecureServing.AddFlags(fs) o.Authentication.AddFlags(fs) o.Authorization.AddFlags(fs) o.Deprecated.AddFlags(fs, &o.ComponentConfig) leaderelectionconfig.BindFlags(&o.ComponentConfig.LeaderElection.LeaderElectionConfiguration, fs) utilfeature.DefaultFeatureGate.AddFlag(fs)}
// Start all informers.go c.PodInformer.Informer().Run(stopCh)c.InformerFactory.Start(stopCh)
3.3. WaitForCacheSync
在调度前等待cache同步。
// Wait for all caches to sync before scheduling.c.InformerFactory.WaitForCacheSync(stopCh)controller.WaitForCacheSync("scheduler", stopCh, c.PodInformer.Informer().HasSynced)
// WaitForCacheSync is a wrapper around cache.WaitForCacheSync that generates log messages// indicating that the controller identified by controllerName is waiting for syncs, followed by// either a successful or failed sync.funcWaitForCacheSync(controllerName string, stopCh <-chanstruct{}, cacheSyncs ...cache.InformerSynced) bool { glog.Infof("Waiting for caches to sync for %s controller", controllerName)if!cache.WaitForCacheSync(stopCh, cacheSyncs...) { utilruntime.HandleError(fmt.Errorf("Unable to sync caches for %s controller", controllerName))returnfalse } glog.Infof("Caches are synced for %s controller", controllerName)returntrue}
// If leader election is enabled, run via LeaderElector until done and exit.if c.LeaderElection !=nil { c.LeaderElection.Callbacks =leaderelection.LeaderCallbacks{ OnStartedLeading: run, OnStoppedLeading: func() { utilruntime.HandleError(fmt.Errorf("lost master")) }, } leaderElector, err := leaderelection.NewLeaderElector(*c.LeaderElection)if err !=nil {return fmt.Errorf("couldn't create leader elector: %v", err) } leaderElector.Run(ctx)return fmt.Errorf("lost lease")}
3.5. Scheduler.Run
// Prepare a reusable run function.run :=func(ctx context.Context) { sched.Run()<-ctx.Done()}ctx, cancel := context.WithCancel(context.TODO()) // TODO once Run() accepts a context, it should be used heredefer cancel()gofunc() {select {case<-stopCh: cancel()case<-ctx.Done(): }}()...run(ctx)
Scheduler.Run先等待cache同步,然后开启调度逻辑的goroutine。
Scheduler.Run的具体代码如下:
// Run begins watching and scheduling. It waits for cache to be synced, then starts a goroutine and returns immediately.
func (sched *Scheduler) Run() {if!sched.config.WaitForCacheSync() {return }go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)}