// Provisioner is an interface that creates templates for PersistentVolumes
// and can create the volume as a new resource in the infrastructure provider.
// It can also remove the volume it created from the underlying storage
// provider.
type Provisioner interface {
// Provision creates a volume i.e. the storage asset and returns a PV object
// for the volume
Provision(VolumeOptions) (*v1.PersistentVolume, error)
// Delete removes the storage asset that was created by Provision backing the
// given PV. Does not delete the PV object itself.
//
// May return IgnoredError to indicate that the call has been ignored and no
// action taken.
Delete(*v1.PersistentVolume) error
}
// VolumeOptions contains option information about a volume
// https://github.com/kubernetes/kubernetes/blob/release-1.4/pkg/volume/plugins.go
type VolumeOptions struct {
// Reclamation policy for a persistent volume
PersistentVolumeReclaimPolicy v1.PersistentVolumeReclaimPolicy
// PV.Name of the appropriate PersistentVolume. Used to generate cloud
// volume name.
PVName string
// PV mount options. Not validated - mount of the PVs will simply fail if one is invalid.
MountOptions []string
// PVC is reference to the claim that lead to provisioning of a new PV.
// Provisioners *must* create a PV that would be matched by this PVC,
// i.e. with required capacity, accessMode, labels matching PVC.Selector and
// so on.
PVC *v1.PersistentVolumeClaim
// Volume provisioning parameters from StorageClass
Parameters map[string]string
// Node selected by the scheduler for the volume.
SelectedNode *v1.Node
// Topology constraint parameter from StorageClass
AllowedTopologies []v1.TopologySelectorTerm
}
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: managed-nfs-storage
provisioner: fuseim.pri/ifs # or choose another name, must match deployment's env PROVISIONER_NAME'
parameters:
archiveOnDelete: "false" # When set to "false" your PVs will not be archived by the provisioner upon deletion of the PVC.
2.1.2. 获取clientset对象
源码如下:
// Create an InClusterConfig and use it to create a client for the controller
// to use to communicate with Kubernetes
config, err := rest.InClusterConfig()
if err != nil {
glog.Fatalf("Failed to create config: %v", err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
glog.Fatalf("Failed to create client: %v", err)
}
// The controller needs to know what the server version is because out-of-tree
// provisioners aren't officially supported until 1.5
serverVersion, err := clientset.Discovery().ServerVersion()
if err != nil {
glog.Fatalf("Error getting server version: %v", err)
}
clientNFSProvisioner := &nfsProvisioner{
client: clientset,
server: server,
path: path,
}
// Start the provision controller which will dynamically provision efs NFS
// PVs
pc := controller.NewProvisionController(clientset, provisionerName, clientNFSProvisioner, serverVersion.GitVersion)
pc.Run(wait.NeverStop)
// Get the storage class for this volume.
storageClass, err := p.getClassForVolume(volume)
if err != nil {
return err
}
// Determine if the "archiveOnDelete" parameter exists.
// If it exists and has a falsey value, delete the directory.
// Otherwise, archive it.
archiveOnDelete, exists := storageClass.Parameters["archiveOnDelete"]
if exists {
archiveBool, err := strconv.ParseBool(archiveOnDelete)
if err != nil {
return err
}
if !archiveBool {
return os.RemoveAll(oldPath)
}
}
// ProvisionController is a controller that provisions PersistentVolumes for
// PersistentVolumeClaims.
type ProvisionController struct {
client kubernetes.Interface
// The name of the provisioner for which this controller dynamically
// provisions volumes. The value of annDynamicallyProvisioned and
// annStorageProvisioner to set & watch for, respectively
provisionerName string
// The provisioner the controller will use to provision and delete volumes.
// Presumably this implementer of Provisioner carries its own
// volume-specific options and such that it needs in order to provision
// volumes.
provisioner Provisioner
// Kubernetes cluster server version:
// * 1.4: storage classes introduced as beta. Technically out-of-tree dynamic
// provisioning is not officially supported, though it works
// * 1.5: storage classes stay in beta. Out-of-tree dynamic provisioning is
// officially supported
// * 1.6: storage classes enter GA
kubeVersion *utilversion.Version
...
}
// Identity of this controller, generated at creation time and not persisted
// across restarts. Useful only for debugging, for seeing the source of
// events. controller.provisioner may have its own, different notion of
// identity which may/may not persist across restarts
id string
component string
eventRecorder record.EventRecorder
resyncPeriod time.Duration
exponentialBackOffOnError bool
threadiness int
createProvisionedPVRetryCount int
createProvisionedPVInterval time.Duration
failedProvisionThreshold, failedDeleteThreshold int
// The port for metrics server to serve on.
metricsPort int32
// The IP address for metrics server to serve on.
metricsAddress string
// The path of metrics endpoint path.
metricsPath string
// Parameters of leaderelection.LeaderElectionConfig.
leaseDuration, renewDeadline, retryPeriod time.Duration
hasRun bool
hasRunLock *sync.Mutex
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcher interface {
// List should return a list type object; the Items field will be extracted, and the
// ResourceVersion field will be used to start the watch in the right place.
List(options metav1.ListOptions) (runtime.Object, error)
// Watch should begin a watch at the specified version.
Watch(options metav1.ListOptions) (watch.Interface, error)
}
// ListFunc knows how to list resources
type ListFunc func(options metav1.ListOptions) (runtime.Object, error)
// WatchFunc knows how to watch resources
type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)
// ListWatch knows how to list and watch a set of apiserver resources. It satisfies the ListerWatcher interface.
// It is a convenience function for users of NewReflector, etc.
// ListFunc and WatchFunc must not be nil
type ListWatch struct {
ListFunc ListFunc
WatchFunc WatchFunc
// DisableChunking requests no chunking for this list watcher.
DisableChunking bool
}
3.2.4. ResourceEventHandlerFuncs
// PVC
claimHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) },
UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.claimQueue, newObj) },
DeleteFunc: func(obj interface{}) { controller.forgetWork(controller.claimQueue, obj) },
}
// PV
volumeHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },
UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.volumeQueue, newObj) },
DeleteFunc: func(obj interface{}) { controller.forgetWork(controller.volumeQueue, obj) },
}
// StorageClass
classHandler := cache.ResourceEventHandlerFuncs{
// We don't need an actual event handler for StorageClasses,
// but we must pass a non-nil one to cache.NewInformer()
AddFunc: nil,
UpdateFunc: nil,
DeleteFunc: nil,
}
// ResourceEventHandler can handle notifications for events that happen to a
// resource. The events are informational only, so you can't return an
// error.
// * OnAdd is called when an object is added.
// * OnUpdate is called when an object is modified. Note that oldObj is the
// last known state of the object-- it is possible that several changes
// were combined together, so you can't use this to see every single
// change. OnUpdate is also called when a re-list happens, and it will
// get called even if nothing changed. This is useful for periodically
// evaluating or syncing something.
// * OnDelete will get the final state of the item if it is known, otherwise
// it will get an object of type DeletedFinalStateUnknown. This can
// happen if the watch is closed and misses the delete event and we don't
// notice the deletion until the subsequent re-list.
type ResourceEventHandler interface {
OnAdd(obj interface{})
OnUpdate(oldObj, newObj interface{})
OnDelete(obj interface{})
}
// ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or
// as few of the notification functions as you want while still implementing
// ResourceEventHandler.
type ResourceEventHandlerFuncs struct {
AddFunc func(obj interface{})
UpdateFunc func(oldObj, newObj interface{})
DeleteFunc func(obj interface{})
}
// Run starts all of this controller's control loops
func (ctrl *ProvisionController) Run(stopCh <-chan struct{}) {
run := func(stopCh <-chan struct{}) {
...
if ctrl.metricsPort > 0 {
prometheus.MustRegister([]prometheus.Collector{
metrics.PersistentVolumeClaimProvisionTotal,
metrics.PersistentVolumeClaimProvisionFailedTotal,
metrics.PersistentVolumeClaimProvisionDurationSeconds,
metrics.PersistentVolumeDeleteTotal,
metrics.PersistentVolumeDeleteFailedTotal,
metrics.PersistentVolumeDeleteDurationSeconds,
}...)
http.Handle(ctrl.metricsPath, promhttp.Handler())
address := net.JoinHostPort(ctrl.metricsAddress, strconv.FormatInt(int64(ctrl.metricsPort), 10))
glog.Infof("Starting metrics server at %s\n", address)
go wait.Forever(func() {
err := http.ListenAndServe(address, nil)
if err != nil {
glog.Errorf("Failed to listen on %s: %v", address, err)
}
}, 5*time.Second)
}
...
}
3.3.2. Controller.Run
// If a SharedInformer has been passed in, this controller should not
// call Run again
if ctrl.claimInformer == nil {
go ctrl.claimController.Run(stopCh)
}
if ctrl.volumeInformer == nil {
go ctrl.volumeController.Run(stopCh)
}
if ctrl.classInformer == nil {
go ctrl.classController.Run(stopCh)
}
运行消息通知器Informer。
3.3.3. Worker
for i := 0; i < ctrl.threadiness; i++ {
go wait.Until(ctrl.runClaimWorker, time.Second, stopCh)
go wait.Until(ctrl.runVolumeWorker, time.Second, stopCh)
}
// provisionClaimOperation attempts to provision a volume for the given claim.
// Returns error, which indicates whether provisioning should be retried
// (requeue the claim) or not
func (ctrl *ProvisionController) provisionClaimOperation(claim *v1.PersistentVolumeClaim) error {
// Most code here is identical to that found in controller.go of kube's PV controller...
claimClass := helper.GetPersistentVolumeClaimClass(claim)
operation := fmt.Sprintf("provision %q class %q", claimToClaimKey(claim), claimClass)
glog.Infof(logOperation(operation, "started"))
// A previous doProvisionClaim may just have finished while we were waiting for
// the locks. Check that PV (with deterministic name) hasn't been provisioned
// yet.
pvName := ctrl.getProvisionedVolumeNameForClaim(claim)
volume, err := ctrl.client.CoreV1().PersistentVolumes().Get(pvName, metav1.GetOptions{})
if err == nil && volume != nil {
// Volume has been already provisioned, nothing to do.
glog.Infof(logOperation(operation, "persistentvolume %q already exists, skipping", pvName))
return nil
}
...
}
options := VolumeOptions{
PersistentVolumeReclaimPolicy: reclaimPolicy,
PVName: pvName,
PVC: claim,
MountOptions: mountOptions,
Parameters: parameters,
SelectedNode: selectedNode,
AllowedTopologies: allowedTopologies,
}
ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, "Provisioning", fmt.Sprintf("External provisioner is provisioning volume for claim %q", claimToClaimKey(claim)))
volume, err = ctrl.provisioner.Provision(options)
if err != nil {
if ierr, ok := err.(*IgnoredError); ok {
// Provision ignored, do nothing and hope another provisioner will provision it.
glog.Infof(logOperation(operation, "volume provision ignored: %v", ierr))
return nil
}
err = fmt.Errorf("failed to provision volume with StorageClass %q: %v", claimClass, err)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", err.Error())
return err
}
4、创建k8s的PV对象。
// Try to create the PV object several times
for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ {
glog.Infof(logOperation(operation, "trying to save persistentvvolume %q", volume.Name))
if _, err = ctrl.client.CoreV1().PersistentVolumes().Create(volume); err == nil || apierrs.IsAlreadyExists(err) {
// Save succeeded.
if err != nil {
glog.Infof(logOperation(operation, "persistentvolume %q already exists, reusing", volume.Name))
err = nil
} else {
glog.Infof(logOperation(operation, "persistentvolume %q saved", volume.Name))
}
break
}
// Save failed, try again after a while.
glog.Infof(logOperation(operation, "failed to save persistentvolume %q: %v", volume.Name, err))
time.Sleep(ctrl.createProvisionedPVInterval)
}
5、创建PV失败,清理存储资源。
if err != nil {
// Save failed. Now we have a storage asset outside of Kubernetes,
// but we don't have appropriate PV object for it.
// Emit some event here and try to delete the storage asset several
// times.
...
for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ {
if err = ctrl.provisioner.Delete(volume); err == nil {
// Delete succeeded
glog.Infof(logOperation(operation, "cleaning volume %q succeeded", volume.Name))
break
}
// Delete failed, try again after a while.
glog.Infof(logOperation(operation, "failed to clean volume %q: %v", volume.Name, err))
time.Sleep(ctrl.createProvisionedPVInterval)
}
if err != nil {
// Delete failed several times. There is an orphaned volume and there
// is nothing we can do about it.
strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), err)
glog.Error(logOperation(operation, strerr))
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningCleanupFailed", strerr)
}
}
如果创建成功,则打印成功的日志,并返回nil。
1、deleteVolumeOperation入参是PV,先获得PV对象,并判断是否需要删除。
// deleteVolumeOperation attempts to delete the volume backing the given
// volume. Returns error, which indicates whether deletion should be retried
// (requeue the volume) or not
func (ctrl *ProvisionController) deleteVolumeOperation(volume *v1.PersistentVolume) error {
...
// This method may have been waiting for a volume lock for some time.
// Our check does not have to be as sophisticated as PV controller's, we can
// trust that the PV controller has set the PV to Released/Failed and it's
// ours to delete
newVolume, err := ctrl.client.CoreV1().PersistentVolumes().Get(volume.Name, metav1.GetOptions{})
if err != nil {
return nil
}
if !ctrl.shouldDelete(newVolume) {
glog.Infof(logOperation(operation, "persistentvolume no longer needs deletion, skipping"))
return nil
}
...
}
err = ctrl.provisioner.Delete(volume)
if err != nil {
if ierr, ok := err.(*IgnoredError); ok {
// Delete ignored, do nothing and hope another provisioner will delete it.
glog.Infof(logOperation(operation, "volume deletion ignored: %v", ierr))
return nil
}
// Delete failed, emit an event.
glog.Errorf(logOperation(operation, "volume deletion failed: %v", err))
ctrl.eventRecorder.Event(volume, v1.EventTypeWarning, "VolumeFailedDelete", err.Error())
return err
}
3、删除k8s中的PV对象。
// Delete the volume
if err = ctrl.client.CoreV1().PersistentVolumes().Delete(volume.Name, nil); err != nil {
// Oops, could not delete the volume and therefore the controller will
// try to delete the volume again on next update.
glog.Infof(logOperation(operation, "failed to delete persistentvolume: %v", err))
return err
}