allNodes :=int32(g.cache.NodeTree().NumNodes)numNodesToFind := g.numFeasibleNodesToFind(allNodes)// Create filtered list with enough space to avoid growing it// and allow assigning.filtered =make([]*v1.Node, numNodesToFind)
// Stops searching for more nodes once the configured number of feasible nodes// are found.workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)
ParallelizeUntil具体代码如下:
// ParallelizeUntil is a framework that allows for parallelizing N// independent pieces of work until done or the context is canceled.funcParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc) {var stop <-chanstruct{}if ctx !=nil { stop = ctx.Done() } toProcess :=make(chanint, pieces)for i :=0; i < pieces; i++ { toProcess <- i }close(toProcess)if pieces < workers { workers = pieces } wg :=sync.WaitGroup{} wg.Add(workers)for i :=0; i < workers; i++ {gofunc() {defer utilruntime.HandleCrash()defer wg.Done()for piece :=range toProcess {select {case<-stop:returndefault:doWorkPiece(piece) } } }() } wg.Wait()}
// podFitsOnNode checks whether a node given by NodeInfo satisfies the given predicate functions.// For given pod, podFitsOnNode will check if any equivalent pod exists and try to reuse its cached// predicate results as possible.// This function is called from two different places: Schedule and Preempt.// When it is called from Schedule, we want to test whether the pod is schedulable// on the node with all the existing pods on the node plus higher and equal priority// pods nominated to run on the node.// When it is called from Preempt, we should remove the victims of preemption and// add the nominated pods. Removal of the victims is done by SelectVictimsOnNode().// It removes victims from meta and NodeInfo before calling this function.funcpodFitsOnNode( pod *v1.Pod, meta algorithm.PredicateMetadata, info *schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate, cache schedulercache.Cache, nodeCache *equivalence.NodeCache, queue SchedulingQueue, alwaysCheckAllPredicates bool, equivClass *equivalence.Class,) (bool, []algorithm.PredicateFailureReason, error) {var ( eCacheAvailable bool failedPredicates []algorithm.PredicateFailureReason ) podsAdded :=false// We run predicates twice in some cases. If the node has greater or equal priority// nominated pods, we run them when those pods are added to meta and nodeInfo.// If all predicates succeed in this pass, we run them again when these// nominated pods are not added. This second pass is necessary because some// predicates such as inter-pod affinity may not pass without the nominated pods.// If there are no nominated pods for the node or if the first run of the// predicates fail, we don't run the second pass.// We consider only equal or higher priority pods in the first pass, because// those are the current "pod" must yield to them and not take a space opened// for running them. It is ok if the current "pod" take resources freed for// lower priority pods.// Requiring that the new pod is schedulable in both circumstances ensures that// we are making a conservative decision: predicates like resources and inter-pod// anti-affinity are more likely to fail when the nominated pods are treated// as running, while predicates like pod affinity are more likely to fail when// the nominated pods are treated as not running. We can't just assume the// nominated pods are running because they are not running right now and in fact,// they may end up getting scheduled to a different node.for i :=0; i <2; i++ { metaToUse := meta nodeInfoToUse := infoif i ==0 { podsAdded, metaToUse, nodeInfoToUse =addNominatedPods(util.GetPodPriority(pod), meta, info, queue) } elseif!podsAdded ||len(failedPredicates) !=0 {break }// Bypass eCache if node has any nominated pods.// TODO(bsalamat): consider using eCache and adding proper eCache invalidations// when pods are nominated or their nominations change. eCacheAvailable = equivClass !=nil&& nodeCache !=nil&&!podsAddedfor _, predicateKey :=range predicates.Ordering() {var ( fit bool reasons []algorithm.PredicateFailureReason err error )//TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metricif predicate, exist := predicateFuncs[predicateKey]; exist {if eCacheAvailable { fit, reasons, err = nodeCache.RunPredicate(predicate, predicateKey, pod, metaToUse, nodeInfoToUse, equivClass, cache) } else { fit, reasons, err =predicate(pod, metaToUse, nodeInfoToUse) }if err !=nil {returnfalse, []algorithm.PredicateFailureReason{}, err }if!fit {// eCache is available and valid, and predicates result is unfit, record the fail reasons failedPredicates =append(failedPredicates, reasons...)// if alwaysCheckAllPredicates is false, short circuit all predicates when one predicate fails.if!alwaysCheckAllPredicates { glog.V(5).Infoln("since alwaysCheckAllPredicates has not been set, the predicate "+"evaluation is short circuited and there are chances "+"of other predicates failing as well.")break } } } } }returnlen(failedPredicates) ==0, failedPredicates, nil}
// PodFitsResources checks if a node has sufficient resources, such as cpu, memory, gpu, opaque int resources etc to run a pod.// First return value indicates whether a node has sufficient resources to run a pod while the second return value indicates the// predicate failure reasons if the node has insufficient resources to run the pod.funcPodFitsResources(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) { node := nodeInfo.Node()if node ==nil {returnfalse, nil, fmt.Errorf("node not found") }var predicateFails []algorithm.PredicateFailureReason allowedPodNumber := nodeInfo.AllowedPodNumber()iflen(nodeInfo.Pods())+1> allowedPodNumber { predicateFails =append(predicateFails, NewInsufficientResourceError(v1.ResourcePods, 1, int64(len(nodeInfo.Pods())), int64(allowedPodNumber))) }// No extended resources should be ignored by default. ignoredExtendedResources := sets.NewString()var podRequest *schedulercache.Resourceif predicateMeta, ok := meta.(*predicateMetadata); ok { podRequest = predicateMeta.podRequestif predicateMeta.ignoredExtendedResources !=nil { ignoredExtendedResources = predicateMeta.ignoredExtendedResources } } else {// We couldn't parse metadata - fallback to computing it. podRequest =GetResourceRequest(pod) }if podRequest.MilliCPU ==0&& podRequest.Memory ==0&& podRequest.EphemeralStorage ==0&&len(podRequest.ScalarResources) ==0 {returnlen(predicateFails) ==0, predicateFails, nil } allocatable := nodeInfo.AllocatableResource()if allocatable.MilliCPU < podRequest.MilliCPU+nodeInfo.RequestedResource().MilliCPU { predicateFails =append(predicateFails, NewInsufficientResourceError(v1.ResourceCPU, podRequest.MilliCPU, nodeInfo.RequestedResource().MilliCPU, allocatable.MilliCPU)) }if allocatable.Memory < podRequest.Memory+nodeInfo.RequestedResource().Memory { predicateFails =append(predicateFails, NewInsufficientResourceError(v1.ResourceMemory, podRequest.Memory, nodeInfo.RequestedResource().Memory, allocatable.Memory)) }if allocatable.EphemeralStorage < podRequest.EphemeralStorage+nodeInfo.RequestedResource().EphemeralStorage { predicateFails =append(predicateFails, NewInsufficientResourceError(v1.ResourceEphemeralStorage, podRequest.EphemeralStorage, nodeInfo.RequestedResource().EphemeralStorage, allocatable.EphemeralStorage)) }for rName, rQuant :=range podRequest.ScalarResources {if v1helper.IsExtendedResourceName(rName) {// If this resource is one of the extended resources that should be// ignored, we will skip checking it.if ignoredExtendedResources.Has(string(rName)) {continue } }if allocatable.ScalarResources[rName] < rQuant+nodeInfo.RequestedResource().ScalarResources[rName] { predicateFails =append(predicateFails, NewInsufficientResourceError(rName, podRequest.ScalarResources[rName], nodeInfo.RequestedResource().ScalarResources[rName], allocatable.ScalarResources[rName])) } }if glog.V(10) {iflen(predicateFails) ==0 {// We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is// not logged. There is visible performance gain from it. glog.Infof("Schedule Pod %+v on Node %+v is allowed, Node is running only %v out of %v Pods.",podName(pod), node.Name, len(nodeInfo.Pods()), allowedPodNumber) } }returnlen(predicateFails) ==0, predicateFails, nil}
// NodeInfo is node level aggregated information.typeNodeInfostruct {// Overall node information. node *v1.Node pods []*v1.Pod podsWithAffinity []*v1.Pod usedPorts util.HostPortInfo// Total requested resource of all pods on this node.// It includes assumed pods which scheduler sends binding to apiserver but// didn't get it as scheduled yet. requestedResource *Resource nonzeroRequest *Resource// We store allocatedResources (which is Node.Status.Allocatable.*) explicitly// as int64, to avoid conversions and accessing map. allocatableResource *Resource// Cached taints of the node for faster lookup. taints []v1.Taint taintsErr error// imageStates holds the entry of an image if and only if this image is on the node. The entry can be used for// checking an image's existence and advanced usage (e.g., image locality scheduling policy) based on the image// state information. imageStates map[string]*ImageStateSummary// TransientInfo holds the information pertaining to a scheduling cycle. This will be destructed at the end of// scheduling cycle.// TODO: @ravig. Remove this once we have a clear approach for message passing across predicates and priorities. TransientInfo *transientSchedulerInfo// Cached conditions of node for faster lookup. memoryPressureCondition v1.ConditionStatus diskPressureCondition v1.ConditionStatus pidPressureCondition v1.ConditionStatus// Whenever NodeInfo changes, generation is bumped.// This is used to avoid cloning it if the object didn't change. generation int64}
// Resource is a collection of compute resource.typeResourcestruct { MilliCPU int64 Memory int64 EphemeralStorage int64// We store allowedPodNumber (which is Node.Status.Allocatable.Pods().Value())// explicitly as int, to avoid conversions and improve performance. AllowedPodNumber int// ScalarResources ScalarResources map[v1.ResourceName]int64}
for rName, rQuant :=range podRequest.ScalarResources {if v1helper.IsExtendedResourceName(rName) {// If this resource is one of the extended resources that should be// ignored, we will skip checking it.if ignoredExtendedResources.Has(string(rName)) {continue } }if allocatable.ScalarResources[rName] < rQuant+nodeInfo.RequestedResource().ScalarResources[rName] { predicateFails =append(predicateFails, NewInsufficientResourceError(rName, podRequest.ScalarResources[rName], nodeInfo.RequestedResource().ScalarResources[rName], allocatable.ScalarResources[rName])) }}