// HostPriority represents the priority of scheduling to a particular host, higher priority is better.typeHostPrioritystruct {// Name of the host Host string// Score associated with the host Score int}
PrioritizeNodes完整代码如下:
此部分代码位于pkg/scheduler/core/generic_scheduler.go
// PrioritizeNodes prioritizes the nodes by running the individual priority functions in parallel.// Each priority function is expected to set a score of 0-10// 0 is the lowest priority score (least preferred node) and 10 is the highest// Each priority function can also have its own weight// The node scores returned by the priority function are multiplied by the weights to get weighted scores// All scores are finally combined (added) to get the total weighted scores of all nodesfuncPrioritizeNodes( pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, meta interface{}, priorityConfigs []algorithm.PriorityConfig, nodes []*v1.Node, extenders []algorithm.SchedulerExtender,) (schedulerapi.HostPriorityList, error) {// If no priority configs are provided, then the EqualPriority function is applied// This is required to generate the priority list in the required formatiflen(priorityConfigs) ==0&&len(extenders) ==0 { result :=make(schedulerapi.HostPriorityList, 0, len(nodes))for i :=range nodes { hostPriority, err :=EqualPriorityMap(pod, meta, nodeNameToInfo[nodes[i].Name])if err !=nil {returnnil, err } result =append(result, hostPriority) }return result, nil }var ( mu =sync.Mutex{} wg =sync.WaitGroup{} errs []error ) appendError :=func(err error) { mu.Lock()defer mu.Unlock() errs =append(errs, err) } results :=make([]schedulerapi.HostPriorityList, len(priorityConfigs), len(priorityConfigs))for i, priorityConfig :=range priorityConfigs {if priorityConfig.Function !=nil {// DEPRECATED wg.Add(1)gofunc(index int, config algorithm.PriorityConfig) {defer wg.Done()var err error results[index], err = config.Function(pod, nodeNameToInfo, nodes)if err !=nil {appendError(err) } }(i, priorityConfig) } else { results[i] =make(schedulerapi.HostPriorityList, len(nodes)) } } processNode :=func(index int) { nodeInfo := nodeNameToInfo[nodes[index].Name]var err errorfor i :=range priorityConfigs {if priorityConfigs[i].Function !=nil {continue } results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)if err !=nil {appendError(err)return } } } workqueue.Parallelize(16, len(nodes), processNode)for i, priorityConfig :=range priorityConfigs {if priorityConfig.Reduce ==nil {continue } wg.Add(1)gofunc(index int, config algorithm.PriorityConfig) {defer wg.Done()if err := config.Reduce(pod, meta, nodeNameToInfo, results[index]); err !=nil {appendError(err) }if glog.V(10) {for _, hostPriority :=range results[index] { glog.Infof("%v -> %v: %v, Score: (%d)", pod.Name, hostPriority.Host, config.Name, hostPriority.Score) } } }(i, priorityConfig) }// Wait for all computations to be finished. wg.Wait()iflen(errs) !=0 {returnschedulerapi.HostPriorityList{}, errors.NewAggregate(errs) }// Summarize all scores. result :=make(schedulerapi.HostPriorityList, 0, len(nodes))for i :=range nodes { result =append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0})for j :=range priorityConfigs { result[i].Score += results[j][i].Score * priorityConfigs[j].Weight } }iflen(extenders) !=0&& nodes !=nil { combinedScores :=make(map[string]int, len(nodeNameToInfo))for _, extender :=range extenders {if!extender.IsInterested(pod) {continue } wg.Add(1)gofunc(ext algorithm.SchedulerExtender) {defer wg.Done() prioritizedList, weight, err := ext.Prioritize(pod, nodes)if err !=nil {// Prioritization errors from extender can be ignored, let k8s/other extenders determine the prioritiesreturn } mu.Lock()for i :=range*prioritizedList { host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score combinedScores[host] += score * weight } mu.Unlock() }(extender) }// wait for all go routines to finish wg.Wait()for i :=range result { result[i].Score += combinedScores[result[i].Host] } }if glog.V(10) {for i :=range result { glog.V(10).Infof("Host %s => Score %d", result[i].Host, result[i].Score) } }return result, nil}
// If no priority configs are provided, then the EqualPriority function is applied// This is required to generate the priority list in the required formatiflen(priorityConfigs) ==0&&len(extenders) ==0 { result :=make(schedulerapi.HostPriorityList, 0, len(nodes))for i :=range nodes { hostPriority, err :=EqualPriorityMap(pod, meta, nodeNameToInfo[nodes[i].Name])if err !=nil {returnnil, err } result =append(result, hostPriority) }return result, nil}
EqualPriorityMap具体实现如下:
// EqualPriorityMap is a prioritizer function that gives an equal weight of one to all nodesfuncEqualPriorityMap(_ *v1.Pod, _ interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) { node := nodeInfo.Node()if node ==nil {returnschedulerapi.HostPriority{}, fmt.Errorf("node not found") }returnschedulerapi.HostPriority{ Host: node.Name, Score: 1, }, nil}
// PriorityConfig is a config used for a priority function.typePriorityConfigstruct { Name string Map PriorityMapFunction Reduce PriorityReduceFunction// TODO: Remove it after migrating all functions to// Map-Reduce pattern. Function PriorityFunction Weight int}
// PriorityMapFunction is a function that computes per-node results for a given node.// TODO: Figure out the exact API of this method.// TODO: Change interface{} to a specific type.type PriorityMapFunction func(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error)
// PriorityReduceFunction is a function that aggregated per-node results and computes// final scores for all nodes.// TODO: Figure out the exact API of this method.// TODO: Change interface{} to a specific type.type PriorityReduceFunction func(pod *v1.Pod, meta interface{}, nodeNameToInfo map[string]*schedulercache.NodeInfo, result schedulerapi.HostPriorityList) error
// ServiceSpreadingPriority is a priority config factory that spreads pods by minimizing// the number of pods (belonging to the same service) on the same node.// Register the factory so that it's available, but do not include it as part of the default priorities// Largely replaced by "SelectorSpreadPriority", but registered for backward compatibility with 1.0factory.RegisterPriorityConfigFactory("ServiceSpreadingPriority",factory.PriorityConfigFactory{ MapReduceFunction: func(args factory.PluginFactoryArgs) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction) {
return priorities.NewSelectorSpreadPriority(args.ServiceLister, algorithm.EmptyControllerLister{}, algorithm.EmptyReplicaSetLister{}, algorithm.EmptyStatefulSetLister{})
}, Weight: 1, },)
// CalculateSpreadPriorityMap spreads pods across hosts, considering pods// belonging to the same service,RC,RS or StatefulSet.// When a pod is scheduled, it looks for services, RCs,RSs and StatefulSets that match the pod,// then finds existing pods that match those selectors.// It favors nodes that have fewer existing matching pods.// i.e. it pushes the scheduler towards a node where there's the smallest number of// pods which match the same service, RC,RSs or StatefulSets selectors as the pod being scheduled.func (s *SelectorSpread) CalculateSpreadPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) {
var selectors []labels.Selector node := nodeInfo.Node()if node ==nil {returnschedulerapi.HostPriority{}, fmt.Errorf("node not found") } priorityMeta, ok := meta.(*priorityMetadata)if ok { selectors = priorityMeta.podSelectors } else { selectors =getSelectors(pod, s.serviceLister, s.controllerLister, s.replicaSetLister, s.statefulSetLister) }iflen(selectors) ==0 {returnschedulerapi.HostPriority{ Host: node.Name, Score: int(0), }, nil } count :=int(0)for _, nodePod :=range nodeInfo.Pods() {if pod.Namespace != nodePod.Namespace {continue }// When we are replacing a failed pod, we often see the previous// deleted version while scheduling the replacement.// Ignore the previous deleted version for spreading purposes// (it can still be considered for resource restrictions etc.)if nodePod.DeletionTimestamp !=nil { glog.V(4).Infof("skipping pending-deleted pod: %s/%s", nodePod.Namespace, nodePod.Name)continue }for _, selector :=range selectors {if selector.Matches(labels.Set(nodePod.ObjectMeta.Labels)) { count++break } } }returnschedulerapi.HostPriority{ Host: node.Name, Score: int(count), }, nil}
// CalculateSpreadPriorityReduce calculates the source of each node// based on the number of existing matching pods on the node// where zone information is included on the nodes, it favors nodes// in zones with fewer existing matching pods.func (s *SelectorSpread) CalculateSpreadPriorityReduce(pod *v1.Pod, meta interface{}, nodeNameToInfo map[string]*schedulercache.NodeInfo, result schedulerapi.HostPriorityList) error {
countsByZone :=make(map[string]int, 10) maxCountByZone :=int(0) maxCountByNodeName :=int(0)for i :=range result {if result[i].Score > maxCountByNodeName { maxCountByNodeName = result[i].Score } zoneID := utilnode.GetZoneKey(nodeNameToInfo[result[i].Host].Node())if zoneID =="" {continue } countsByZone[zoneID] += result[i].Score }for zoneID :=range countsByZone {if countsByZone[zoneID] > maxCountByZone { maxCountByZone = countsByZone[zoneID] } } haveZones :=len(countsByZone) !=0 maxCountByNodeNameFloat64 :=float64(maxCountByNodeName) maxCountByZoneFloat64 :=float64(maxCountByZone) MaxPriorityFloat64 :=float64(schedulerapi.MaxPriority)for i :=range result {// initializing to the default/max node score of maxPriority fScore := MaxPriorityFloat64if maxCountByNodeName >0 { fScore = MaxPriorityFloat64 * (float64(maxCountByNodeName-result[i].Score) / maxCountByNodeNameFloat64) }// If there is zone information present, incorporate itif haveZones { zoneID := utilnode.GetZoneKey(nodeNameToInfo[result[i].Host].Node())if zoneID !="" { zoneScore := MaxPriorityFloat64if maxCountByZone >0 { zoneScore = MaxPriorityFloat64 * (float64(maxCountByZone-countsByZone[zoneID]) / maxCountByZoneFloat64) } fScore = (fScore * (1.0- zoneWeighting)) + (zoneWeighting * zoneScore) } } result[i].Score =int(fScore)if glog.V(10) { glog.Infof("%v -> %v: SelectorSpreadPriority, Score: (%d)", pod.Name, result[i].Host, int(fScore), ) } }returnnil}
以下分段分析:
先获取所有节点中匹配到的pod最多的个数。
for i :=range result {if result[i].Score > maxCountByNodeName { maxCountByNodeName = result[i].Score } zoneID := utilnode.GetZoneKey(nodeNameToInfo[result[i].Host].Node())if zoneID =="" {continue } countsByZone[zoneID] += result[i].Score}
遍历所有的节点,按比例取十分制的得分。
for i :=range result {// initializing to the default/max node score of maxPriority fScore := MaxPriorityFloat64if maxCountByNodeName >0 { fScore = MaxPriorityFloat64 * (float64(maxCountByNodeName-result[i].Score) / maxCountByNodeNameFloat64) }...}