本文主要分析csi-provisioner
的源码,关于开发一个Dynamic Provisioner
,具体可参考nfs-client-provisioner的源码分析
1. Dynamic Provisioner
1.1. Provisioner Interface
开发Dynamic Provisioner
需要实现Provisioner 接口,该接口有两个方法,分别是:
Provision:创建存储资源,并且返回一个PV对象。
Delete:移除对应的存储资源,但并没有删除PV对象。
1.2. 开发provisioner的步骤
写一个provisioner
实现Provisioner
接口(包含Provision
和Delete
的方法)。
通过该provisioner
构建ProvisionController
。
执行ProvisionController
的Run
方法。
2. CSI Provisioner
CSI Provisioner的源码可参考:https://github.com/kubernetes-csi/external-provisioner。
2.1.1. 读取环境变量
源码如下:
复制 var (
provisioner = flag.String("provisioner", "", "Name of the provisioner. The provisioner will only provision volumes for claims that request a StorageClass with a provisioner field set equal to this name.")
master = flag.String("master", "", "Master URL to build a client config from. Either this or kubeconfig needs to be set if the provisioner is being run out of cluster.")
kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Either this or master needs to be set if the provisioner is being run out of cluster.")
csiEndpoint = flag.String( "csi-address" , "/run/csi/socket" , "The gRPC endpoint for Target CSI Volume" )
connectionTimeout = flag.Duration("connection-timeout", 10*time.Second, "Timeout for waiting for CSI driver socket.")
volumeNamePrefix = flag.String( "volume-name-prefix" , "pvc" , "Prefix to apply to the name of a created volume" )
volumeNameUUIDLength = flag.Int("volume-name-uuid-length", -1, "Truncates generated UUID of a created volume to this length. Defaults behavior is to NOT truncate.")
showVersion = flag.Bool( "version" , false , "Show version." )
provisionController * controller . ProvisionController
version = "unknown"
)
func init () {
var config * rest . Config
var err error
flag.Parse()
flag.Set( "logtostderr" , "true" )
if * showVersion {
fmt.Println(os.Args[ 0 ], version)
os.Exit( 0 )
}
glog.Infof( "Version: %s " , version)
...
}
通过init函数
解析相关参数,其实provisioner
指明为PVC提供PV的provisioner的名字,需要和StorageClass
对象中的provisioner
字段一致。
2.1.2. 获取clientset对象
源码如下:
复制 // get the KUBECONFIG from env if specified (useful for local/debug cluster)
kubeconfigEnv := os.Getenv( "KUBECONFIG" )
if kubeconfigEnv != "" {
glog.Infof( "Found KUBECONFIG environment variable set, using that.." )
kubeconfig = & kubeconfigEnv
}
if * master != "" || * kubeconfig != "" {
glog.Infof( "Either master or kubeconfig specified. building kube config from that.." )
config, err = clientcmd.BuildConfigFromFlags( * master, * kubeconfig)
} else {
glog.Infof( "Building kube configs for running in cluster..." )
config, err = rest.InClusterConfig()
}
if err != nil {
glog.Fatalf( "Failed to create config: %v " , err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
glog.Fatalf( "Failed to create client: %v " , err)
}
// snapclientset.NewForConfig creates a new Clientset for VolumesnapshotV1alpha1Client
snapClient, err := snapclientset.NewForConfig(config)
if err != nil {
glog.Fatalf( "Failed to create snapshot client: %v " , err)
}
csiAPIClient, err := csiclientset.NewForConfig(config)
if err != nil {
glog.Fatalf( "Failed to create CSI API client: %v " , err)
}
通过读取对应的k8s的配置,创建clientset
对象,用来执行k8s对应的API,其中主要包括对PV和PVC等对象的创建删除等操作。
2.1.3. k8s版本校验
复制 // The controller needs to know what the server version is because out-of-tree
// provisioners aren't officially supported until 1.5
serverVersion, err := clientset.Discovery().ServerVersion()
if err != nil {
glog.Fatalf( "Error getting server version: %v " , err)
}
获取了k8s的版本信息,因为provisioners的功能在k8s 1.5及以上版本才支持。
2.1.4. 连接 csi socket
复制 // Generate a unique ID for this provisioner
timeStamp := time.Now().UnixNano() / int64 (time.Millisecond)
identity := strconv.FormatInt(timeStamp, 10 ) + "-" + strconv.Itoa(rand.Intn( 10000 )) + "-" + * provisioner
// Provisioner will stay in Init until driver opens csi socket, once it's done
// controller will exit this loop and proceed normally.
socketDown := true
grpcClient := & grpc . ClientConn {}
for socketDown {
grpcClient, err = ctrl.Connect( * csiEndpoint, * connectionTimeout)
if err == nil {
socketDown = false
continue
}
time.Sleep( 10 * time.Second)
}
在Provisioner
会停留在初始化状态,直到csi socket
连接成功才正常运行。如果连接失败,会暂停10秒
后重试,其中涉及以下2个参数:
csiEndpoint:CSI Volume的gRPC地址,默认通过为/run/csi/socket
。
connectionTimeout:连接CSI driver socket的超时时间,默认为10秒。
2.1.5. 构造csi-Provisioner对象
复制 // Create the provisioner: it implements the Provisioner interface expected by
// the controller
csiProvisioner := ctrl.NewCSIProvisioner(clientset, csiAPIClient, *csiEndpoint, *connectionTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient)
provisionController = controller.NewProvisionController(
clientset,
* provisioner,
csiProvisioner,
serverVersion.GitVersion,
)
通过参数clientset
, csiAPIClient
, csiEndpoint
, connectionTimeout
, identity
, volumeNamePrefix
, volumeNameUUIDLength
, grpcClient
, snapClient
构造csi-Provisioner对象。
通过csiProvisioner
构造ProvisionController
对象。
2.1.6. 运行ProvisionController
复制 func main () {
provisionController.Run(wait.NeverStop)
}
ProvisionController
实现了具体的PV和PVC的相关逻辑,Run
方法以常驻进程的方式运行。
2.2.1. Provision方法
csiProvisioner
的Provision
方法具体源码参考:https://github.com/kubernetes-csi/external-provisioner/blob/master/pkg/controller/controller.go#L336
Provision
方法用来创建存储资源,并且返回一个PV
对象。其中入参是VolumeOptions
,用来指定PV
对象的相关属性。
1、构造PV相关属性
复制 pvName, err := makeVolumeName(p.volumeNamePrefix, fmt.Sprintf("%s", options.PVC.ObjectMeta.UID), p.volumeNameUUIDLength)
if err != nil {
return nil , err
}
2、构造CSIPersistentVolumeSource相关属性
复制 driverState, err := checkDriverState(p.grpcClient, p.timeout, needSnapshotSupport)
if err != nil {
return nil , err
}
...
// Resolve controller publish, node stage, node publish secret references
controllerPublishSecretRef, err := getSecretReference(controllerPublishSecretNameKey, controllerPublishSecretNamespaceKey, options.Parameters, pvName, options.PVC)
if err != nil {
return nil , err
}
nodeStageSecretRef, err := getSecretReference(nodeStageSecretNameKey, nodeStageSecretNamespaceKey, options.Parameters, pvName, options.PVC)
if err != nil {
return nil , err
}
nodePublishSecretRef, err := getSecretReference(nodePublishSecretNameKey, nodePublishSecretNamespaceKey, options.Parameters, pvName, options.PVC)
if err != nil {
return nil , err
}
...
volumeAttributes := map [ string ] string {provisionerIDKey: p.identity}
for k, v := range rep.Volume.Attributes {
volumeAttributes[k] = v
}
...
fsType := ""
for k, v := range options.Parameters {
switch strings.ToLower(k) {
case "fstype" :
fsType = v
}
}
if len (fsType) == 0 {
fsType = defaultFSType
}
3、创建CSI CreateVolumeRequest
复制 // Create a CSI CreateVolumeRequest and Response
req := csi . CreateVolumeRequest {
Name: pvName,
Parameters: options.Parameters,
VolumeCapabilities: volumeCaps,
CapacityRange: & csi . CapacityRange {
RequiredBytes: int64 (volSizeBytes),
},
}
...
glog.V( 5 ).Infof( "CreateVolumeRequest %+v " , req)
rep := & csi . CreateVolumeResponse {}
...
opts := wait . Backoff {Duration: backoffDuration, Factor: backoffFactor, Steps: backoffSteps}
err = wait.ExponentialBackoff(opts, func () ( bool , error ) {
ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
defer cancel()
rep, err = p.csiClient.CreateVolume(ctx, & req)
if err == nil {
// CreateVolume has finished successfully
return true , nil
}
if status, ok := status.FromError(err); ok {
if status.Code() == codes.DeadlineExceeded {
// CreateVolume timed out, give it another chance to complete
glog.Warningf( "CreateVolume timeout: %s has expired, operation will be retried" , p.timeout.String())
return false , nil
}
}
// CreateVolume failed , no reason to retry, bailing from ExponentialBackoff
return false , err
})
if err != nil {
return nil , err
}
if rep.Volume != nil {
glog.V( 3 ).Infof( "create volume rep: %+v " , * rep.Volume)
}
respCap := rep.GetVolume().GetCapacityBytes()
if respCap < volSizeBytes {
capErr := fmt.Errorf( "created volume capacity %v less than requested capacity %v " , respCap, volSizeBytes)
delReq := & csi . DeleteVolumeRequest {
VolumeId: rep.GetVolume().GetId(),
}
delReq.ControllerDeleteSecrets = provisionerCredentials
ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
defer cancel()
_, err := p.csiClient.DeleteVolume(ctx, delReq)
if err != nil {
capErr = fmt.Errorf( " %v . Cleanup of volume %s failed, volume is orphaned: %v " , capErr, pvName, err)
}
return nil , capErr
}
Provison
方法核心功能是调用p.csiClient.CreateVolume(ctx, &req)
。
4、构造PV对象
复制 pv := & v1 . PersistentVolume {
ObjectMeta: metav1 . ObjectMeta {
Name: pvName,
},
Spec: v1 . PersistentVolumeSpec {
PersistentVolumeReclaimPolicy: options.PersistentVolumeReclaimPolicy,
AccessModes: options.PVC.Spec.AccessModes,
Capacity: v1 . ResourceList {
v1.ResourceName(v1.ResourceStorage): bytesToGiQuantity(respCap),
},
// TODO wait for CSI VolumeSource API
PersistentVolumeSource: v1 . PersistentVolumeSource {
CSI: & v1 . CSIPersistentVolumeSource {
Driver: driverState.driverName,
VolumeHandle: p.volumeIdToHandle(rep.Volume.Id),
FSType: fsType,
VolumeAttributes: volumeAttributes,
ControllerPublishSecretRef: controllerPublishSecretRef,
NodeStageSecretRef: nodeStageSecretRef,
NodePublishSecretRef: nodePublishSecretRef,
},
},
},
}
if driverState.capabilities.Has(PluginCapability_ACCESSIBILITY_CONSTRAINTS) {
pv.Spec.NodeAffinity = GenerateVolumeNodeAffinity(rep.Volume.AccessibleTopology)
}
glog.Infof( "successfully created PV %+v " , pv.Spec.PersistentVolumeSource)
return pv, nil
Provision
方法只是通过VolumeOptions
参数来构建PV
对象,并没有执行具体PV
的创建或删除的操作。
不同类型的Provisioner
的,一般是PersistentVolumeSource
类型和参数不同,例如csi-provisioner
对应的PersistentVolumeSource
为CSI
,并且需要传入CSI
相关的参数:
ControllerPublishSecretRef
2.2.2. Delete方法
csiProvisioner
的delete
方法具体源码参考:https://github.com/kubernetes-csi/external-provisioner/blob/master/pkg/controller/controller.go#L606
复制 func (p * csiProvisioner ) Delete (volume * v1 . PersistentVolume ) error {
if volume == nil || volume.Spec.CSI == nil {
return fmt.Errorf( "invalid CSI PV" )
}
volumeId := p.volumeHandleToId(volume.Spec.CSI.VolumeHandle)
_, err := checkDriverState(p.grpcClient, p.timeout, false )
if err != nil {
return err
}
req := csi . DeleteVolumeRequest {
VolumeId: volumeId,
}
// get secrets if StorageClass specifies it
storageClassName := volume.Spec.StorageClassName
if len (storageClassName) != 0 {
if storageClass, err := p.client.StorageV1().StorageClasses().Get(storageClassName, metav1.GetOptions{}); err == nil {
// Resolve provision secret credentials.
// No PVC is provided when resolving provision/delete secret names, since the PVC may or may not exist at delete time.
provisionerSecretRef, err := getSecretReference(provisionerSecretNameKey, provisionerSecretNamespaceKey, storageClass.Parameters, volume.Name, nil)
if err != nil {
return err
}
credentials, err := getCredentials(p.client, provisionerSecretRef)
if err != nil {
return err
}
req.ControllerDeleteSecrets = credentials
}
}
ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
defer cancel()
_, err = p.csiClient.DeleteVolume(ctx, & req)
return err
}
Delete
方法主要是调用了p.csiClient.DeleteVolume(ctx, &req)
方法。
2.3. 总结
csi provisioner
实现了Provisioner
接口,其中包含Provison
和Delete
两个方法:
Provision
:调用csiClient.CreateVolume
方法,同时构造并返回PV对象。
Delete
:调用csiClient.DeleteVolume
方法。
csi provisioner
的核心方法都调用了csi-client
相关方法。
3. csi-client
csi client
的相关代码参考:https://github.com/container-storage-interface/spec/blob/master/lib/go/csi/v0/csi.pb.go
3.1. 构造csi-client
3.1.1. 构造grpcClient
复制 // Provisioner will stay in Init until driver opens csi socket, once it's done
// controller will exit this loop and proceed normally.
socketDown := true
grpcClient := & grpc . ClientConn {}
for socketDown {
grpcClient, err = ctrl.Connect( * csiEndpoint, * connectionTimeout)
if err == nil {
socketDown = false
continue
}
time.Sleep( 10 * time.Second)
}
通过连接csi socket
,连接成功才构造可用的grpcClient
。
3.1.2. 构造csi-client
通过grpcClient
构造csi-client
。
复制 // Create the provisioner: it implements the Provisioner interface expected by
// the controller
csiProvisioner := ctrl.NewCSIProvisioner(clientset, csiAPIClient, *csiEndpoint, *connectionTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient)
NewCSIProvisioner
复制 // NewCSIProvisioner creates new CSI provisioner
func NewCSIProvisioner (client kubernetes . Interface ,
csiAPIClient csiclientset . Interface ,
csiEndpoint string ,
connectionTimeout time . Duration ,
identity string ,
volumeNamePrefix string ,
volumeNameUUIDLength int ,
grpcClient * grpc . ClientConn ,
snapshotClient snapclientset . Interface ) controller . Provisioner {
csiClient := csi.NewControllerClient(grpcClient)
provisioner := & csiProvisioner {
client: client,
grpcClient: grpcClient,
csiClient: csiClient,
csiAPIClient: csiAPIClient,
snapshotClient: snapshotClient,
timeout: connectionTimeout,
identity: identity,
volumeNamePrefix: volumeNamePrefix,
volumeNameUUIDLength: volumeNameUUIDLength,
}
return provisioner
}
NewControllerClient
复制 csiClient := csi.NewControllerClient(grpcClient)
...
type controllerClient struct {
cc * grpc . ClientConn
}
func NewControllerClient (cc * grpc . ClientConn ) ControllerClient {
return & controllerClient {cc}
}
3.2. csiClient.CreateVolume
csi provisoner
中调用csiClient.CreateVolume
代码如下:
复制 opts := wait . Backoff {Duration: backoffDuration, Factor: backoffFactor, Steps: backoffSteps}
err = wait.ExponentialBackoff(opts, func () ( bool , error ) {
ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
defer cancel()
rep, err = p.csiClient.CreateVolume(ctx, & req)
if err == nil {
// CreateVolume has finished successfully
return true , nil
}
if status, ok := status.FromError(err); ok {
if status.Code() == codes.DeadlineExceeded {
// CreateVolume timed out, give it another chance to complete
glog.Warningf( "CreateVolume timeout: %s has expired, operation will be retried" , p.timeout.String())
return false , nil
}
}
// CreateVolume failed , no reason to retry, bailing from ExponentialBackoff
return false , err
})
CreateVolumeRequest的构造:
复制 // Create a CSI CreateVolumeRequest and Response
req := csi . CreateVolumeRequest {
Name: pvName,
Parameters: options.Parameters,
VolumeCapabilities: volumeCaps,
CapacityRange: & csi . CapacityRange {
RequiredBytes: int64 (volSizeBytes),
},
}
...
req.VolumeContentSource = volumeContentSource
...
req.AccessibilityRequirements = requirements
...
req.ControllerCreateSecrets = provisionerCredentials
具体的 Create
实现方法如下:
其中csiClient
是个接口类型
具体代码参考controllerClient.CreateVolume
复制 func (c *controllerClient) CreateVolume(ctx context.Context, in *CreateVolumeRequest, opts ...grpc.CallOption) (*CreateVolumeResponse, error) {
out := new ( CreateVolumeResponse )
err := grpc.Invoke(ctx, "/csi.v0.Controller/CreateVolume" , in, out, c.cc, opts ... )
if err != nil {
return nil , err
}
return out, nil
}
3.3. csiClient.DeleteVolume
csi provisoner
中调用csiClient.DeleteVolume
代码如下:
复制 func (p * csiProvisioner ) Delete (volume * v1 . PersistentVolume ) error {
...
req := csi . DeleteVolumeRequest {
VolumeId: volumeId,
}
// get secrets if StorageClass specifies it
...
ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
defer cancel()
_, err = p.csiClient.DeleteVolume(ctx, & req)
return err
}
DeleteVolumeRequest的构造:
复制 req := csi . DeleteVolumeRequest {
VolumeId: volumeId,
}
...
req.ControllerDeleteSecrets = credentials
将构造的DeleteVolumeRequest
传给DeleteVolume
方法。
具体的 Delete
实现方法如下:
具体代码参考:controllerClient.DeleteVolume
复制 func (c *controllerClient) DeleteVolume(ctx context.Context, in *DeleteVolumeRequest, opts ...grpc.CallOption) (*DeleteVolumeResponse, error) {
out := new ( DeleteVolumeResponse )
err := grpc.Invoke(ctx, "/csi.v0.Controller/DeleteVolume" , in, out, c.cc, opts ... )
if err != nil {
return nil , err
}
return out, nil
}
自定义的provisioner
实现了Provisoner接口
的Provision
和Delete
方法,这两个方法主要对后端存储做创建和删除操作,并没有对PV对象进行创建和删除操作。
PV对象的相关操作具体由ProvisionController
中的provisionClaimOperation
和deleteVolumeOperation
具体执行,同时调用了具体provisioner
的Provision
和Delete
两个方法来对存储数据做处理。
复制 func main () {
provisionController.Run(wait.NeverStop)
}
这块代码逻辑可参考:nfs-client-provisioner 源码分析
参考文章:
https://github.com/kubernetes-csi/external-provisioner
https://github.com/container-storage-interface/spec
https://github.com/kubernetes/community/blob/master/contributors/design-proposals/storage/container-storage-interface.md
https://github.com/container-storage-interface/spec/blob/master/spec.md