eventBroadcaster := record.NewBroadcaster()eventBroadcaster.StartLogging(glog.Infof)// TODO: remove the wrapper when every clients have moved to use the clientset.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(client.CoreV1().RESTClient()).Events("")})
dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: dc.addDeployment, UpdateFunc: dc.updateDeployment,// This will enter the sync loop and no-op, because the deployment has been deleted from the store. DeleteFunc: dc.deleteDeployment,})rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: dc.addReplicaSet, UpdateFunc: dc.updateReplicaSet, DeleteFunc: dc.deleteReplicaSet,})podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ DeleteFunc: dc.deletePod,})
// WaitForCacheSync is a wrapper around cache.WaitForCacheSync that generates log messages// indicating that the controller identified by controllerName is waiting for syncs, followed by// either a successful or failed sync.funcWaitForCacheSync(controllerName string, stopCh <-chanstruct{}, cacheSyncs ...cache.InformerSynced) bool { glog.Infof("Waiting for caches to sync for %s controller", controllerName)if!cache.WaitForCacheSync(stopCh, cacheSyncs...) { utilruntime.HandleError(fmt.Errorf("Unable to sync caches for %s controller", controllerName))returnfalse } glog.Infof("Caches are synced for %s controller", controllerName)returntrue}
// worker runs a worker thread that just dequeues items, processes them, and marks them done.// It enforces that the syncHandler is never invoked concurrently with the same key.func (dc *DeploymentController) worker() {for dc.processNextWorkItem() { }}func (dc *DeploymentController) processNextWorkItem() bool { key, quit := dc.queue.Get()if quit {returnfalse }defer dc.queue.Done(key) err := dc.syncHandler(key.(string)) dc.handleErr(err, key)returntrue}
// syncDeployment will sync the deployment with the given key.// This function is not meant to be invoked concurrently with the same key.func (dc *DeploymentController) syncDeployment(key string) error { startTime := time.Now() glog.V(4).Infof("Started syncing deployment %q (%v)", key, startTime)deferfunc() { glog.V(4).Infof("Finished syncing deployment %q (%v)", key, time.Since(startTime)) }() namespace, name, err := cache.SplitMetaNamespaceKey(key)if err !=nil {return err } deployment, err := dc.dLister.Deployments(namespace).Get(name)if errors.IsNotFound(err) { glog.V(2).Infof("Deployment %v has been deleted", key)returnnil }if err !=nil {return err }// Deep-copy otherwise we are mutating our cache.// TODO: Deep-copy only when needed. d := deployment.DeepCopy() everything :=metav1.LabelSelector{}if reflect.DeepEqual(d.Spec.Selector, &everything) { dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
if d.Status.ObservedGeneration < d.Generation { d.Status.ObservedGeneration = d.Generation dc.client.ExtensionsV1beta1().Deployments(d.Namespace).UpdateStatus(d) }returnnil }// List ReplicaSets owned by this Deployment, while reconciling ControllerRef// through adoption/orphaning. rsList, err := dc.getReplicaSetsForDeployment(d)if err !=nil {return err }// List all Pods owned by this Deployment, grouped by their ReplicaSet.// Current uses of the podMap are://// * check if a Pod is labeled correctly with the pod-template-hash label.// * check that no old Pods are running in the middle of Recreate Deployments. podMap, err := dc.getPodMapForDeployment(d, rsList)if err !=nil {return err }if d.DeletionTimestamp !=nil {return dc.syncStatusOnly(d, rsList, podMap) }// Update deployment conditions with an Unknown condition when pausing/resuming// a deployment. In this way, we can be sure that we won't timeout when a user// resumes a Deployment with a set progressDeadlineSeconds.if err = dc.checkPausedConditions(d); err !=nil {return err }if d.Spec.Paused {return dc.sync(d, rsList, podMap) }// rollback is not re-entrant in case the underlying replica sets are updated with a new// revision so we should ensure that we won't proceed to update replica sets until we// make sure that the deployment has cleaned up its rollback spec in subsequent enqueues.if d.Spec.RollbackTo !=nil {return dc.rollback(d, rsList, podMap) } scalingEvent, err := dc.isScalingEvent(d, rsList, podMap)if err !=nil {return err }if scalingEvent {return dc.sync(d, rsList, podMap) }switch d.Spec.Strategy.Type {case extensions.RecreateDeploymentStrategyType:return dc.rolloutRecreate(d, rsList, podMap)case extensions.RollingUpdateDeploymentStrategyType:return dc.rolloutRolling(d, rsList, podMap) }return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)}
4.1. Get deployment
// get namespace and deployment namenamespace, name, err := cache.SplitMetaNamespaceKey(key)// get deployment by namedeployment, err := dc.dLister.Deployments(namespace).Get(name)
4.2. getReplicaSetsForDeployment
// List ReplicaSets owned by this Deployment, while reconciling ControllerRef// through adoption/orphaning.rsList, err := dc.getReplicaSetsForDeployment(d)
getReplicaSetsForDeployment具体代码:
// getReplicaSetsForDeployment uses ControllerRefManager to reconcile// ControllerRef by adopting and orphaning.// It returns the list of ReplicaSets that this Deployment should manage.func (dc *DeploymentController) getReplicaSetsForDeployment(d *apps.Deployment) ([]*apps.ReplicaSet, error) {// List all ReplicaSets to find those we own but that no longer match our// selector. They will be orphaned by ClaimReplicaSets(). rsList, err := dc.rsLister.ReplicaSets(d.Namespace).List(labels.Everything())if err !=nil {returnnil, err } deploymentSelector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector)if err !=nil {returnnil, fmt.Errorf("deployment %s/%s has invalid label selector: %v", d.Namespace, d.Name, err) }// If any adoptions are attempted, we should first recheck for deletion with// an uncached quorum read sometime after listing ReplicaSets (see #42639). canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) { fresh, err := dc.client.AppsV1().Deployments(d.Namespace).Get(d.Name, metav1.GetOptions{})if err !=nil {returnnil, err }if fresh.UID != d.UID { return nil, fmt.Errorf("original Deployment %v/%v is gone: got uid %v, wanted %v", d.Namespace, d.Name, fresh.UID, d.UID)
}return fresh, nil }) cm := controller.NewReplicaSetControllerRefManager(dc.rsControl, d, deploymentSelector, controllerKind, canAdoptFunc)return cm.ClaimReplicaSets(rsList)}
4.3. getPodMapForDeployment
// List all Pods owned by this Deployment, grouped by their ReplicaSet.// Current uses of the podMap are://// * check if a Pod is labeled correctly with the pod-template-hash label.// * check that no old Pods are running in the middle of Recreate Deployments.podMap, err := dc.getPodMapForDeployment(d, rsList)
getPodMapForDeployment具体代码:
// getPodMapForDeployment returns the Pods managed by a Deployment.//// It returns a map from ReplicaSet UID to a list of Pods controlled by that RS,// according to the Pod's ControllerRef.func (dc *DeploymentController) getPodMapForDeployment(d *apps.Deployment, rsList []*apps.ReplicaSet) (map[types.UID]*v1.PodList, error) {
// Get all Pods that potentially belong to this Deployment. selector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector)if err !=nil {returnnil, err } pods, err := dc.podLister.Pods(d.Namespace).List(selector)if err !=nil {returnnil, err }// Group Pods by their controller (if it's in rsList). podMap :=make(map[types.UID]*v1.PodList, len(rsList))for _, rs :=range rsList { podMap[rs.UID] =&v1.PodList{} }for _, pod :=range pods {// Do not ignore inactive Pods because Recreate Deployments need to verify that no// Pods from older versions are running before spinning up new Pods. controllerRef := metav1.GetControllerOf(pod)if controllerRef ==nil {continue }// Only append if we care about this UID.if podList, ok := podMap[controllerRef.UID]; ok { podList.Items =append(podList.Items, *pod) } }return podMap, nil}
4.4. checkPausedConditions
// Update deployment conditions with an Unknown condition when pausing/resuming// a deployment. In this way, we can be sure that we won't timeout when a user// resumes a Deployment with a set progressDeadlineSeconds.if err = dc.checkPausedConditions(d); err !=nil {return err}if d.Spec.Paused {return dc.sync(d, rsList)}
checkPausedConditions具体代码:
// checkPausedConditions checks if the given deployment is paused or not and adds an appropriate condition.// These conditions are needed so that we won't accidentally report lack of progress for resumed deployments// that were paused for longer than progressDeadlineSeconds.func (dc *DeploymentController) checkPausedConditions(d *apps.Deployment) error {if!deploymentutil.HasProgressDeadline(d) {returnnil } cond := deploymentutil.GetDeploymentCondition(d.Status, apps.DeploymentProgressing)if cond !=nil&& cond.Reason == deploymentutil.TimedOutReason {// If we have reported lack of progress, do not overwrite it with a paused condition.returnnil } pausedCondExists := cond !=nil&& cond.Reason == deploymentutil.PausedDeployReason needsUpdate :=falseif d.Spec.Paused &&!pausedCondExists { condition := deploymentutil.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionUnknown, deploymentutil.PausedDeployReason, "Deployment is paused")
deploymentutil.SetDeploymentCondition(&d.Status, *condition) needsUpdate =true } elseif!d.Spec.Paused && pausedCondExists { condition := deploymentutil.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionUnknown, deploymentutil.ResumedDeployReason, "Deployment is resumed")
deploymentutil.SetDeploymentCondition(&d.Status, *condition) needsUpdate =true }if!needsUpdate {returnnil }var err error d, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(d)return err}
// isScalingEvent checks whether the provided deployment has been updated with a scaling event// by looking at the desired-replicas annotation in the active replica sets of the deployment.//// rsList should come from getReplicaSetsForDeployment(d).// podMap should come from getPodMapForDeployment(d, rsList).func (dc *DeploymentController) isScalingEvent(d *apps.Deployment, rsList []*apps.ReplicaSet) (bool, error) { newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false)if err !=nil {returnfalse, err } allRSs :=append(oldRSs, newRS)for _, rs :=range controller.FilterActiveReplicaSets(allRSs) { desired, ok := deploymentutil.GetDesiredReplicasAnnotation(rs)if!ok {continue }if desired !=*(d.Spec.Replicas) {returntrue, nil } }returnfalse, nil}