preempt
以下代码分析基于kubernetes v1.12.0
版本。
本文主要分析调度中的抢占逻辑,当pod不适合任何节点的时候,可能pod会调度失败,这时候可能会发生抢占。抢占逻辑的具体实现函数为
Scheduler.preempt
。当pod不适合任何节点的时候,可能pod会调度失败。这时候可能会发生抢占。
scheduleOne
函数中关于抢占调用的逻辑如下:此部分的代码位于/pkg/scheduler/scheduler.go
// scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne() {
...
suggestedHost, err := sched.schedule(pod)
if err != nil {
// schedule() may have failed because the pod would not fit on any host, so we try to
// preempt, with the expectation that the next time the pod is tried for scheduling it
// will fit due to the preemption. It is also possible that a different pod will schedule
// into the resources that were preempted, but this is harmless.
if fitError, ok := err.(*core.FitError); ok {
preemptionStartTime := time.Now()
// 执行抢占逻辑
sched.preempt(pod, fitError)
metrics.PreemptionAttempts.Inc()
metrics.SchedulingAlgorithmPremptionEvaluationDuration.Observe(metrics.SinceInMicroseconds(preemptionStartTime))
metrics.SchedulingLatency.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime))
}
return
}
...
}
其中核心代码为:
// 基于sched.schedule(pod)返回的err和当前待调度的pod执行抢占策略
sched.preempt(pod, fitError)
当pod调度失败的时候,会抢占低优先级pod的空间来给高优先级的pod。其中入参为调度失败的pod对象和调度失败的err。
抢占的基本流程如下:
- 1.判断是否有关闭抢占机制,如果关闭抢占机制则直接返回。
- 2.获取调度失败pod的最新对象数据。
- 3.执行抢占算法
Algorithm.Preempt
,返回预调度节点和需要被剔除的pod列表。 - 4.将抢占算法返回的node添加到pod的
Status.NominatedNodeName
中,并删除需要被剔除的pod。 - 5.当抢占算法返回的node是nil的时候,清除pod的
Status.NominatedNodeName
信息。
整个抢占流程的最终结果实际上是更新
Pod.Status.NominatedNodeName
属性的信息。如果抢占算法返回的节点不为空,则将该node更新到Pod.Status.NominatedNodeName
中,否则就将Pod.Status.NominatedNodeName
设置为空。preempt的具体实现函数:
此部分的代码位于/pkg/scheduler/scheduler.go
// preempt tries to create room for a pod that has failed to schedule, by preempting lower priority pods if possible.
// If it succeeds, it adds the name of the node where preemption has happened to the pod annotations.
// It returns the node name and an error if any.
func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, error) {
if !util.PodPriorityEnabled() || sched.config.DisablePreemption {
glog.V(3).Infof("Pod priority feature is not enabled or preemption is disabled by scheduler configuration." +
" No preemption is performed.")
return "", nil
}
preemptor, err := sched.config.PodPreemptor.GetUpdatedPod(preemptor)
if err != nil {
glog.Errorf("Error getting the updated preemptor pod object: %v", err)
return "", err
}
node, victims, nominatedPodsToClear, err := sched.config.Algorithm.Preempt(preemptor, sched.config.NodeLister, scheduleErr)
metrics.PreemptionVictims.Set(float64(len(victims)))
if err != nil {
glog.Errorf("Error preempting victims to make room for %v/%v.", preemptor.Namespace, preemptor.Name)
return "", err
}
var nodeName = ""
if node != nil {
nodeName = node.Name
err = sched.config.PodPreemptor.SetNominatedNodeName(preemptor, nodeName)
if err != nil {
glog.Errorf("Error in preemption process. Cannot update pod %v/%v annotations: %v", preemptor.Namespace, preemptor.Name, err)
return "", err
}
for _, victim := range victims {
if err := sched.config.PodPreemptor.DeletePod(victim); err != nil {
glog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err)
return "", err
}
sched.config.Recorder.Eventf(victim, v1.EventTypeNormal, "Preempted", "by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName)
}
}
// Clearing nominated pods should happen outside of "if node != nil". Node could
// be nil when a pod with nominated node name is eligible to preempt again,
// but preemption logic does not find any node for it. In that case Preempt()
// function of generic_scheduler.go returns the pod itself for removal of the annotation.
for _, p := range nominatedPodsToClear {
rErr := sched.config.PodPreemptor.RemoveNominatedNodeName(p)
if rErr != nil {
glog.Errorf("Cannot remove nominated node annotation of pod: %v", rErr)
// We do not return as this error is not critical.
}
}
return nodeName, err
}
以下对
preempt
的实现分段分析。如果设置关闭抢占机制,则直接返回。
if !util.PodPriorityEnabled() || sched.config.DisablePreemption {
glog.V(3).Infof("Pod priority feature is not enabled or preemption is disabled by scheduler configuration." +
" No preemption is performed.")
return "", nil
}
获取当前pod的最新状态。
preemptor, err := sched.config.PodPreemptor.GetUpdatedPod(preemptor)
if err != nil {
glog.Errorf("Error getting the updated preemptor pod object: %v", err)
return "", err
}
GetUpdatedPod
的实现就是去拿pod的对象。func (p *podPreemptor) GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error) {
return p.Client.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{})
}
接着执行抢占的算法。抢占的算法返回预调度节点的信息和因抢占被剔除的pod的信息。具体的抢占算法逻辑下文分析。
node, victims, nominatedPodsToClear, err := sched.config.Algorithm.Preempt(preemptor, sched.config.NodeLister, scheduleErr)
将预调度节点的信息更新到pod的
Status.NominatedNodeName
属性中。err = sched.config.PodPreemptor.SetNominatedNodeName(preemptor, nodeName)
SetNominatedNodeName
的具体实现为:func (p *podPreemptor) SetNominatedNodeName(pod *v1.Pod, nominatedNodeName string) error {
podCopy := pod.DeepCopy()
podCopy.Status.NominatedNodeName = nominatedNodeName
_, err := p.Client.CoreV1().Pods(pod.Namespace).UpdateStatus(podCopy)
return err
}
接着删除因抢占而需要被剔除的pod。
err := sched.config.PodPreemptor.DeletePod(victim)
PodPreemptor.DeletePod
的具体实现就是删除具体的pod。func (p *podPreemptor) DeletePod(pod *v1.Pod) error {
return p.Client.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{})
}
如果抢占算法得出的node对象为nil,则将pod的
Status.NominatedNodeName
属性设置为空。// Clearing nominated pods should happen outside of "if node != nil". Node could
// be nil when a pod with nominated node name is eligible to preempt again,
// but preemption logic does not find any node for it. In that case Preempt()
// function of generic_scheduler.go returns the pod itself for removal of the annotation.
for _, p := range nominatedPodsToClear {
rErr := sched.config.PodPreemptor.RemoveNominatedNodeName(p)
if rErr != nil {
glog.Errorf("Cannot remove nominated node annotation of pod: %v", rErr)
// We do not return as this error is not critical.
}
}
RemoveNominatedNodeName
的具体实现如下:func (p *podPreemptor) RemoveNominatedNodeName(pod *v1.Pod) error {
if len(pod.Status.NominatedNodeName) == 0 {
return nil
}
return p.SetNominatedNodeName(pod, "")
}
Pod.Status.NominatedNodeName
的说明:nominatedNodeName
是调度失败的pod抢占别的pod的 时候,被抢占pod的运行节点。但在剔除被抢占pod之前该调度失败的pod不会被调度。同时也不保证最终该pod一定会调度到nominatedNodeName
的机器上,也可能因为之后资源充足等原因调度到其他节点上。最终该pod会被加到调度的队列中。其中加入到调度队列的具体过程如下:
func NewConfigFactory(args *ConfigFactoryArgs) scheduler.Configurator {
...
// unscheduled pod queue
args.PodInformer.Informer().AddEventHandler(
...
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: c.addPodToSchedulingQueue,
UpdateFunc: c.updatePodInSchedulingQueue,
DeleteFunc: c.deletePodFromSchedulingQueue,
},
},
)
...
}
addPodToSchedulingQueue:
func (c *configFactory) addPodToSchedulingQueue(obj interface{}) {
if err := c.podQueue.Add(obj.(*v1.Pod)); err != nil {
runtime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err))
}
}
PriorityQueue.Add:
// Add adds a pod to the active queue. It should be called only when a new pod
// is added so there is no chance the pod is already in either queue.
func (p *PriorityQueue) Add(pod *v1.Pod) error {
p.lock.Lock()
defer p.lock.Unlock()
err := p.activeQ.Add(pod)
if err != nil {
glog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
} else {
if p.unschedulableQ.get(pod) != nil {
glog.Errorf("Error: pod %v/%v is already in the unschedulable queue.", pod.Namespace, pod.Name)
p.deleteNominatedPodIfExists(pod)
p.unschedulableQ.delete(pod)
}
p.addNominatedPodIfNeeded(pod)
p.cond.Broadcast()
}
return err
}
addNominatedPodIfNeeded:
// addNominatedPodIfNeeded adds a pod to nominatedPods if it has a NominatedNodeName and it does not
// already exist in the map. Adding an existing pod is not going to update the pod.
func (p *PriorityQueue) addNominatedPodIfNeeded(pod *v1.Pod) {
nnn := NominatedNodeName(pod)
if len(nnn) > 0 {
for _, np := range p.nominatedPods[nnn] {
if np.UID == pod.UID {
glog.Errorf("Pod %v/%v already exists in the nominated map!", pod.Namespace, pod.Name)
return
}
}
p.nominatedPods[nnn] = append(p.nominatedPods[nnn], pod)
}
}
NominatedNodeName:
// NominatedNodeName returns nominated node name of a Pod.
func NominatedNodeName(pod *v1.Pod) string {
return pod.Status.NominatedNodeName
}
抢占算法依然是在
ScheduleAlgorithm
接口中定义。// ScheduleAlgorithm is an interface implemented by things that know how to schedule pods
// onto machines.
type ScheduleAlgorithm interface {
Schedule(*v1.Pod, NodeLister) (selectedMachine string, err error)
// Preempt receives scheduling errors for a pod and tries to create room for
// the pod by preempting lower priority pods if possible.
// It returns the node where preemption happened, a list of preempted pods, a
// list of pods whose nominated node name should be removed, and error if any.
Preempt(*v1.Pod, NodeLister, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
// Predicates() returns a pointer to a map of predicate functions. This is
// exposed for testing.
Predicates() map[string]FitPredicate
// Prioritizers returns a slice of priority config. This is exposed for
// testing.
Prioritizers() []PriorityConfig
}
Preempt
的具体实现为genericScheduler
结构体。Preempt
的主要实现是找到可以调度的节点和上面因抢占而需要被剔除的pod。基本流程如下:
- 1.根据调度失败的原因对所有节点先进行一批筛选,筛选出潜在的被调度节点列表。
- 2.通过
selectNodesForPreemption
筛选出需要牺牲的pod和其节点。 - 3.基于拓展抢占逻辑再次对上述筛选出来的牺牲者做过滤。
- 4.