// Provisioner is an interface that creates templates for PersistentVolumes// and can create the volume as a new resource in the infrastructure provider.// It can also remove the volume it created from the underlying storage// provider.typeProvisionerinterface {// Provision creates a volume i.e. the storage asset and returns a PV object// for the volumeProvision(VolumeOptions) (*v1.PersistentVolume, error)// Delete removes the storage asset that was created by Provision backing the// given PV. Does not delete the PV object itself.//// May return IgnoredError to indicate that the call has been ignored and no// action taken.Delete(*v1.PersistentVolume) error}
// VolumeOptions contains option information about a volume// https://github.com/kubernetes/kubernetes/blob/release-1.4/pkg/volume/plugins.gotypeVolumeOptionsstruct {// Reclamation policy for a persistent volume PersistentVolumeReclaimPolicy v1.PersistentVolumeReclaimPolicy// PV.Name of the appropriate PersistentVolume. Used to generate cloud// volume name. PVName string// PV mount options. Not validated - mount of the PVs will simply fail if one is invalid. MountOptions []string// PVC is reference to the claim that lead to provisioning of a new PV.// Provisioners *must* create a PV that would be matched by this PVC,// i.e. with required capacity, accessMode, labels matching PVC.Selector and// so on. PVC *v1.PersistentVolumeClaim// Volume provisioning parameters from StorageClass Parameters map[string]string// Node selected by the scheduler for the volume. SelectedNode *v1.Node// Topology constraint parameter from StorageClass AllowedTopologies []v1.TopologySelectorTerm}
apiVersion:storage.k8s.io/v1kind:StorageClassmetadata:name:managed-nfs-storageprovisioner:fuseim.pri/ifs# or choose another name, must match deployment's env PROVISIONER_NAME'parameters: archiveOnDelete: "false" # When set to "false" your PVs will not be archived by the provisioner upon deletion of the PVC.
2.1.2. 获取clientset对象
源码如下:
// Create an InClusterConfig and use it to create a client for the controller// to use to communicate with Kubernetesconfig, err := rest.InClusterConfig()if err !=nil { glog.Fatalf("Failed to create config: %v", err)}clientset, err := kubernetes.NewForConfig(config)if err !=nil { glog.Fatalf("Failed to create client: %v", err)}
// The controller needs to know what the server version is because out-of-tree// provisioners aren't officially supported until 1.5serverVersion, err := clientset.Discovery().ServerVersion()if err !=nil { glog.Fatalf("Error getting server version: %v", err)}clientNFSProvisioner :=&nfsProvisioner{ client: clientset, server: server, path: path,}
// Get the storage class for this volume.storageClass, err := p.getClassForVolume(volume)if err !=nil {return err}// Determine if the "archiveOnDelete" parameter exists.// If it exists and has a falsey value, delete the directory.// Otherwise, archive it.archiveOnDelete, exists := storageClass.Parameters["archiveOnDelete"]if exists { archiveBool, err := strconv.ParseBool(archiveOnDelete)if err !=nil {return err }if!archiveBool {return os.RemoveAll(oldPath) }}
// ProvisionController is a controller that provisions PersistentVolumes for// PersistentVolumeClaims.typeProvisionControllerstruct { client kubernetes.Interface// The name of the provisioner for which this controller dynamically// provisions volumes. The value of annDynamicallyProvisioned and// annStorageProvisioner to set & watch for, respectively provisionerName string// The provisioner the controller will use to provision and delete volumes.// Presumably this implementer of Provisioner carries its own// volume-specific options and such that it needs in order to provision// volumes. provisioner Provisioner// Kubernetes cluster server version:// * 1.4: storage classes introduced as beta. Technically out-of-tree dynamic// provisioning is not officially supported, though it works// * 1.5: storage classes stay in beta. Out-of-tree dynamic provisioning is// officially supported// * 1.6: storage classes enter GA kubeVersion *utilversion.Version...}
// Identity of this controller, generated at creation time and not persisted// across restarts. Useful only for debugging, for seeing the source of// events. controller.provisioner may have its own, different notion of// identity which may/may not persist across restartsid stringcomponent stringeventRecorder record.EventRecorderresyncPeriod time.DurationexponentialBackOffOnError boolthreadiness intcreateProvisionedPVRetryCount intcreateProvisionedPVInterval time.DurationfailedProvisionThreshold, failedDeleteThreshold int// The port for metrics server to serve on.metricsPort int32// The IP address for metrics server to serve on.metricsAddress string// The path of metrics endpoint path.metricsPath string// Parameters of leaderelection.LeaderElectionConfig.leaseDuration, renewDeadline, retryPeriod time.DurationhasRun boolhasRunLock *sync.Mutex
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.typeListerWatcherinterface {// List should return a list type object; the Items field will be extracted, and the// ResourceVersion field will be used to start the watch in the right place.List(options metav1.ListOptions) (runtime.Object, error)// Watch should begin a watch at the specified version.Watch(options metav1.ListOptions) (watch.Interface, error)}// ListFunc knows how to list resourcestypeListFuncfunc(options metav1.ListOptions) (runtime.Object, error)// WatchFunc knows how to watch resourcestypeWatchFuncfunc(options metav1.ListOptions) (watch.Interface, error)// ListWatch knows how to list and watch a set of apiserver resources. It satisfies the ListerWatcher interface.// It is a convenience function for users of NewReflector, etc.// ListFunc and WatchFunc must not be niltypeListWatchstruct { ListFunc ListFunc WatchFunc WatchFunc// DisableChunking requests no chunking for this list watcher. DisableChunking bool}
3.2.4. ResourceEventHandlerFuncs
// PVCclaimHandler :=cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) }, UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.claimQueue, newObj) }, DeleteFunc: func(obj interface{}) { controller.forgetWork(controller.claimQueue, obj) },}// PVvolumeHandler :=cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) }, UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.volumeQueue, newObj) }, DeleteFunc: func(obj interface{}) { controller.forgetWork(controller.volumeQueue, obj) },}// StorageClassclassHandler :=cache.ResourceEventHandlerFuncs{// We don't need an actual event handler for StorageClasses,// but we must pass a non-nil one to cache.NewInformer() AddFunc: nil, UpdateFunc: nil, DeleteFunc: nil,}
// ResourceEventHandler can handle notifications for events that happen to a// resource. The events are informational only, so you can't return an// error.// * OnAdd is called when an object is added.// * OnUpdate is called when an object is modified. Note that oldObj is the// last known state of the object-- it is possible that several changes// were combined together, so you can't use this to see every single// change. OnUpdate is also called when a re-list happens, and it will// get called even if nothing changed. This is useful for periodically// evaluating or syncing something.// * OnDelete will get the final state of the item if it is known, otherwise// it will get an object of type DeletedFinalStateUnknown. This can// happen if the watch is closed and misses the delete event and we don't// notice the deletion until the subsequent re-list.typeResourceEventHandlerinterface {OnAdd(obj interface{})OnUpdate(oldObj, newObj interface{})OnDelete(obj interface{})}// ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or// as few of the notification functions as you want while still implementing// ResourceEventHandler.typeResourceEventHandlerFuncsstruct { AddFunc func(obj interface{}) UpdateFunc func(oldObj, newObj interface{}) DeleteFunc func(obj interface{})}
// Run starts all of this controller's control loopsfunc (ctrl *ProvisionController) Run(stopCh <-chanstruct{}) { run :=func(stopCh <-chanstruct{}) {...if ctrl.metricsPort >0 { prometheus.MustRegister([]prometheus.Collector{ metrics.PersistentVolumeClaimProvisionTotal, metrics.PersistentVolumeClaimProvisionFailedTotal, metrics.PersistentVolumeClaimProvisionDurationSeconds, metrics.PersistentVolumeDeleteTotal, metrics.PersistentVolumeDeleteFailedTotal, metrics.PersistentVolumeDeleteDurationSeconds, }...) http.Handle(ctrl.metricsPath, promhttp.Handler()) address := net.JoinHostPort(ctrl.metricsAddress, strconv.FormatInt(int64(ctrl.metricsPort), 10)) glog.Infof("Starting metrics server at %s\n", address)go wait.Forever(func() { err := http.ListenAndServe(address, nil)if err !=nil { glog.Errorf("Failed to listen on %s: %v", address, err) } }, 5*time.Second) }...}
3.3.2. Controller.Run
// If a SharedInformer has been passed in, this controller should not// call Run againif ctrl.claimInformer ==nil {go ctrl.claimController.Run(stopCh)}if ctrl.volumeInformer ==nil {go ctrl.volumeController.Run(stopCh)}if ctrl.classInformer ==nil {go ctrl.classController.Run(stopCh)}
运行消息通知器Informer。
3.3.3. Worker
for i :=0; i < ctrl.threadiness; i++ {go wait.Until(ctrl.runClaimWorker, time.Second, stopCh)go wait.Until(ctrl.runVolumeWorker, time.Second, stopCh)}
// provisionClaimOperation attempts to provision a volume for the given claim.// Returns error, which indicates whether provisioning should be retried// (requeue the claim) or notfunc (ctrl *ProvisionController) provisionClaimOperation(claim *v1.PersistentVolumeClaim) error {// Most code here is identical to that found in controller.go of kube's PV controller... claimClass := helper.GetPersistentVolumeClaimClass(claim) operation := fmt.Sprintf("provision %q class %q", claimToClaimKey(claim), claimClass) glog.Infof(logOperation(operation, "started"))// A previous doProvisionClaim may just have finished while we were waiting for// the locks. Check that PV (with deterministic name) hasn't been provisioned// yet. pvName := ctrl.getProvisionedVolumeNameForClaim(claim) volume, err := ctrl.client.CoreV1().PersistentVolumes().Get(pvName, metav1.GetOptions{})if err ==nil&& volume !=nil {// Volume has been already provisioned, nothing to do. glog.Infof(logOperation(operation, "persistentvolume %q already exists, skipping", pvName))returnnil }...}
options :=VolumeOptions{ PersistentVolumeReclaimPolicy: reclaimPolicy, PVName: pvName, PVC: claim, MountOptions: mountOptions, Parameters: parameters, SelectedNode: selectedNode, AllowedTopologies: allowedTopologies,}ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, "Provisioning", fmt.Sprintf("External provisioner is provisioning volume for claim %q", claimToClaimKey(claim)))
volume, err = ctrl.provisioner.Provision(options)if err !=nil {if ierr, ok := err.(*IgnoredError); ok {// Provision ignored, do nothing and hope another provisioner will provision it. glog.Infof(logOperation(operation, "volume provision ignored: %v", ierr))returnnil } err = fmt.Errorf("failed to provision volume with StorageClass %q: %v", claimClass, err) ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", err.Error())return err}
4、创建k8s的PV对象。
// Try to create the PV object several timesfor i :=0; i < ctrl.createProvisionedPVRetryCount; i++ { glog.Infof(logOperation(operation, "trying to save persistentvvolume %q", volume.Name))if _, err = ctrl.client.CoreV1().PersistentVolumes().Create(volume); err ==nil|| apierrs.IsAlreadyExists(err) {// Save succeeded.if err !=nil { glog.Infof(logOperation(operation, "persistentvolume %q already exists, reusing", volume.Name)) err =nil } else { glog.Infof(logOperation(operation, "persistentvolume %q saved", volume.Name)) }break }// Save failed, try again after a while. glog.Infof(logOperation(operation, "failed to save persistentvolume %q: %v", volume.Name, err)) time.Sleep(ctrl.createProvisionedPVInterval)}
5、创建PV失败,清理存储资源。
if err !=nil {// Save failed. Now we have a storage asset outside of Kubernetes,// but we don't have appropriate PV object for it.// Emit some event here and try to delete the storage asset several// times....for i :=0; i < ctrl.createProvisionedPVRetryCount; i++ {if err = ctrl.provisioner.Delete(volume); err ==nil {// Delete succeeded glog.Infof(logOperation(operation, "cleaning volume %q succeeded", volume.Name))break }// Delete failed, try again after a while. glog.Infof(logOperation(operation, "failed to clean volume %q: %v", volume.Name, err)) time.Sleep(ctrl.createProvisionedPVInterval) }if err !=nil {// Delete failed several times. There is an orphaned volume and there// is nothing we can do about it. strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), err)
glog.Error(logOperation(operation, strerr)) ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningCleanupFailed", strerr) }}
// deleteVolumeOperation attempts to delete the volume backing the given// volume. Returns error, which indicates whether deletion should be retried// (requeue the volume) or notfunc (ctrl *ProvisionController) deleteVolumeOperation(volume *v1.PersistentVolume) error {...// This method may have been waiting for a volume lock for some time.// Our check does not have to be as sophisticated as PV controller's, we can// trust that the PV controller has set the PV to Released/Failed and it's// ours to delete newVolume, err := ctrl.client.CoreV1().PersistentVolumes().Get(volume.Name, metav1.GetOptions{})if err !=nil {returnnil }if!ctrl.shouldDelete(newVolume) { glog.Infof(logOperation(operation, "persistentvolume no longer needs deletion, skipping"))returnnil }...}
err = ctrl.provisioner.Delete(volume)if err !=nil {if ierr, ok := err.(*IgnoredError); ok {// Delete ignored, do nothing and hope another provisioner will delete it. glog.Infof(logOperation(operation, "volume deletion ignored: %v", ierr))returnnil }// Delete failed, emit an event. glog.Errorf(logOperation(operation, "volume deletion failed: %v", err)) ctrl.eventRecorder.Event(volume, v1.EventTypeWarning, "VolumeFailedDelete", err.Error())return err}
3、删除k8s中的PV对象。
// Delete the volumeif err = ctrl.client.CoreV1().PersistentVolumes().Delete(volume.Name, nil); err !=nil {// Oops, could not delete the volume and therefore the controller will// try to delete the volume again on next update. glog.Infof(logOperation(operation, "failed to delete persistentvolume: %v", err))return err}