Informer机制
kube-controller-manager源码分析(三)之 Informer机制
以下代码分析基于
kubernetes v1.12.0版本。
本文主要分析k8s中各个核心组件经常使用到的Informer机制(即List-Watch)。该部分的代码主要位于client-go这个第三方包中。
此部分的逻辑主要位于/vendor/k8s.io/client-go/tools/cache包中,代码目录结构如下:
cache
├── controller.go # 包含:Config、Run、processLoop、NewInformer、NewIndexerInformer
├── delta_fifo.go # 包含:NewDeltaFIFO、DeltaFIFO、AddIfNotPresent
├── expiration_cache.go
├── expiration_cache_fakes.go
├── fake_custom_store.go
├── fifo.go # 包含:Queue、FIFO、NewFIFO
├── heap.go
├── index.go # 包含:Indexer、MetaNamespaceIndexFunc
├── listers.go
├── listwatch.go # 包含:ListerWatcher、ListWatch、List、Watch
├── mutation_cache.go
├── mutation_detector.go
├── reflector.go # 包含:Reflector、NewReflector、Run、ListAndWatch
├── reflector_metrics.go
├── shared_informer.go # 包含:NewSharedInformer、WaitForCacheSync、Run、HasSynced
├── store.go # 包含:Store、MetaNamespaceKeyFunc、SplitMetaNamespaceKey
├── testing
│ ├── fake_controller_source.go
├── thread_safe_store.go # 包含:ThreadSafeStore、threadSafeMap
├── undelta_store.go0. 原理示意图
示意图1:

示意图2:

0.1. client-go组件
Reflector:reflector用来watch特定的k8s API资源。具体的实现是通过ListAndWatch的方法,watch可以是k8s内建的资源或者是自定义的资源。当reflector通过watch API接收到有关新资源实例存在的通知时,它使用相应的列表API获取新创建的对象,并将其放入watchHandler函数内的Delta Fifo队列中。Informer:informer从Delta Fifo队列中弹出对象。执行此操作的功能是processLoop。base controller的作用是保存对象以供以后检索,并调用我们的控制器将对象传递给它。Indexer:索引器提供对象的索引功能。典型的索引用例是基于对象标签创建索引。 Indexer可以根据多个索引函数维护索引。Indexer使用线程安全的数据存储来存储对象及其键。 在Store中定义了一个名为MetaNamespaceKeyFunc的默认函数,该函数生成对象的键作为该对象的<namespace> / <name>组合。
0.2. 自定义controller组件
Informer reference:指的是Informer实例的引用,定义如何使用自定义资源对象。 自定义控制器代码需要创建对应的Informer。Indexer reference: 自定义控制器对Indexer实例的引用。自定义控制器需要创建对应的Indexser。
client-go中提供
NewIndexerInformer函数可以创建Informer 和 Indexer。
Resource Event Handlers:资源事件回调函数,当它想要将对象传递给控制器时,它将被调用。 编写这些函数的典型模式是获取调度对象的key,并将该key排入工作队列以进行进一步处理。Work queue:任务队列。 编写资源事件处理程序函数以提取传递的对象的key并将其添加到任务队列。Process Item:处理任务队列中对象的函数, 这些函数通常使用Indexer引用或Listing包装器来重试与该key对应的对象。
1. sharedInformerFactory.Start
在controller-manager的Run函数部分调用了InformerFactory.Start的方法。
此部分代码位于/cmd/kube-controller-manager/app/controllermanager.go
InformerFactory是一个SharedInformerFactory的接口,接口定义如下:
此部分代码位于vendor/k8s.io/client-go/informers/internalinterfaces/factory_interfaces.go
Start方法初始化各种类型的informer,并且每个类型起了个informer.Run的goroutine。
此部分代码位于vendor/k8s.io/client-go/informers/factory.go
2. sharedIndexInformer.Run
此部分的代码位于/vendor/k8s.io/client-go/tools/cache/shared_informer.go
2.1. NewDeltaFIFO
DeltaFIFO是一个对象变化的存储队列,依据先进先出的原则,process的函数接收该队列的Pop方法的输出对象来处理相关功能。
2.2. Config
构造controller的配置文件,构造process,即HandleDeltas,该函数为后面使用到的process函数。
2.3. controller
调用New(cfg),构建sharedIndexInformer的controller。
2.4. cacheMutationDetector.Run
调用s.cacheMutationDetector.Run,检查缓存对象是否变化。
defaultCacheMutationDetector.Run
CompareObjects
2.5. processor.run
调用s.processor.run,将调用sharedProcessor.run,会调用Listener.run和Listener.pop,执行处理queue的函数。
sharedProcessor.Run
该部分逻辑待后面分析。
2.6. controller.Run
调用s.controller.Run,构建Reflector,进行对etcd的缓存
controller.Run
此部分代码位于/vendor/k8s.io/client-go/tools/cache/controller.go
核心代码:
3. Reflector
3.1. Reflector
Reflector的主要作用是watch指定的k8s资源,并将变化同步到本地是store中。Reflector只会放置指定的expectedType类型的资源到store中,除非expectedType为nil。如果resyncPeriod不为零,那么Reflector为以resyncPeriod为周期定期执行list的操作,这样就可以使用Reflector来定期处理所有的对象,也可以逐步处理变化的对象。
常用属性说明:
expectedType:期望放入缓存store的资源类型。
store:watch的资源对应的本地缓存。
listerWatcher:list和watch的接口。
period:watch的周期,默认为1秒。
resyncPeriod:resync的周期,当非零的时候,会按该周期执行list。
lastSyncResourceVersion:最新一次看到的资源的版本号,主要在watch时候使用。
3.2. NewReflector
NewReflector主要用来构建Reflector的结构体。
此部分的代码位于/vendor/k8s.io/client-go/tools/cache/reflector.go
3.3. Reflector.Run
Reflector.Run主要执行了ListAndWatch的方法。
3.4. ListAndWatch
ListAndWatch第一次会列出所有的对象,并获取资源对象的版本号,然后watch资源对象的版本号来查看是否有被变更。首先会将资源版本号设置为0,list()可能会导致本地的缓存相对于etcd里面的内容存在延迟,Reflector会通过watch的方法将延迟的部分补充上,使得本地的缓存数据与etcd的数据保持一致。
3.4.1. List
首先将资源的版本号设置为0,然后调用listerWatcher.List(options),列出所有list的内容。
获取资源版本号,并将list的内容提取成对象列表。
将list中对象列表的内容和版本号存储到本地的缓存store中,并全量替换已有的store的内容。
syncWith调用了store的Replace的方法来替换原来store中的数据。
Store.Replace方法定义如下:
最后设置最新的资源版本号。
setLastSyncResourceVersion:
3.4.2. store.Resync
核心代码:
store的具体对象为DeltaFIFO,即调用DeltaFIFO.Resync
3.4.3. Watch
设置watch的超时时间,默认为5分钟。
执行listerWatcher.Watch(options)。
执行watchHandler。
3.4.4. watchHandler
watchHandler主要是通过watch的方式保证当前的资源版本是最新的。
获取watch接口中的事件的channel,来获取事件的内容。
当获得添加、更新、删除的事件时,将对应的对象更新到本地缓存store中。
更新当前的最新版本号。
通过对Reflector模块的分析,可以看到多次使用到本地缓存store模块,而store的数据由DeltaFIFO赋值而来,以下针对DeltaFIFO和store做分析。
4. DeltaFIFO
DeltaFIFO由NewDeltaFIFO初始化,并赋值给config.Queue。
4.1. NewDeltaFIFO
controller.Run的部分调用了NewReflector。
NewReflector构造函数,将c.config.Queue赋值给Reflector.store的属性。
4.2. DeltaFIFO
DeltaFIFO是一个生产者与消费者的队列,其中Reflector是生产者,消费者调用Pop()的方法。
DeltaFIFO主要用在以下场景:
希望对象变更最多处理一次
处理对象时,希望查看自上次处理对象以来发生的所有事情
要处理对象的删除
希望定期重新处理对象
4.3. Queue & Store
DeltaFIFO的类型是Queue接口,Reflector.store是Store接口,Queue接口是一个存储队列,Process的方法执行Queue.Pop出来的数据对象,
5. store
Store是一个通用的存储接口,Reflector通过watch server的方式更新数据到store中,store给Reflector提供本地的缓存,让Reflector可以像消息队列一样的工作。
Store实现的是一种可以准确的写入对象和获取对象的机制。
其中Replace方法会删除原来store中的内容,并将新增的list的内容存入store中,即完全替换数据。
6.1. cache
cache实现了store的接口,而cache的具体实现又是调用ThreadSafeStore接口来实现功能的。
cache的功能主要有以下两点:
通过keyFunc计算对象的key
调用ThreadSafeStorage接口的方法
其中ListAndWatch主要用到以下的方法:
cache.Replace
cache.Add
cache.Update
cache.Delete
6.2. ThreadSafeStore
cache的具体是调用ThreadSafeStore来实现的。
threadSafeMap
6. processLoop
在controller.Run方法中会调用processLoop,以下分析processLoop的处理逻辑。
processLoop主要处理任务队列中的任务,其中处理逻辑是调用具体的ProcessFunc函数来实现,核心代码为:
5.1. DeltaFIFO.Pop
Pop会阻塞住直到队列里面添加了新的对象,如果有多个对象,按照先进先出的原则处理,如果某个对象没有处理成功会重新被加入该队列中。
Pop中会调用具体的process函数来处理对象。
核心代码:
5.2. HandleDeltas
其中process函数就是在sharedIndexInformer.Run方法中,给config.Process赋值的HandleDeltas函数。
核心代码:
根据不同的类型,调用processor.distribute方法,该方法将对象加入processorListener的channel中。
5.3. sharedProcessor.distribute
processorListener.add:
综合以上的分析,可以看出processLoop通过调用HandleDeltas,再调用distribute,processorListener.add最终将不同更新类型的对象加入processorListener的channel中,供processorListener.Run使用。以下分析processorListener.Run的部分。
7. processor
processor的主要功能就是记录了所有的回调函数实例(即 ResourceEventHandler 实例),并负责触发这些函数。在sharedIndexInformer.Run部分会调用processor.run。
流程:
listenser的add函数负责将notify装进pendingNotifications。
pop函数取出pendingNotifications的第一个nofify,输出到nextCh channel。
run函数则负责取出notify,然后根据notify的类型(增加、删除、更新)触发相应的处理函数,这些函数是在不同的
NewXxxcontroller实现中注册的。
7.1. sharedProcessor.Run
7.1.1. listener.pop
pop函数取出pendingNotifications的第一个nofify,输出到nextCh channel。
7.1.2. listener.run
listener.run部分根据不同的更新类型调用不同的处理函数。
其中具体的实现函数handler是在NewDeploymentController(其他不同类型的controller类似)中赋值的,而该handler是一个接口,具体如下:
7.2. ResourceEventHandler
以下以DeploymentController的处理逻辑为例。
在NewDeploymentController部分会注册deployment的事件函数,以下注册了三种类型的事件函数,其中包括:dInformer、rsInformer和podInformer。
7.2.1. addDeployment
以下以addDeployment为例,addDeployment主要是将对象加入到enqueueDeployment的队列中。
enqueueDeployment的定义
将dc.enqueue赋值给dc.enqueueDeployment
dc.enqueue调用了dc.queue.Add(key)
dc.queue主要记录了需要被同步的deployment的对象,供syncDeployment使用。
NewNamedRateLimitingQueue
通过以上分析,可以看出processor记录了不同类似的事件函数,其中事件函数在NewXxxController构造函数部分注册,具体事件函数的处理,一般是将需要处理的对象加入对应的controller的任务队列中,然后由类似syncDeployment的同步函数来维持期望状态的同步逻辑。
8. 总结
本文分析的部分主要是k8s的informer机制,即List-Watch机制。
8.1. Reflector
Reflector的主要作用是watch指定的k8s资源,并将变化同步到本地是store中。Reflector只会放置指定的expectedType类型的资源到store中,除非expectedType为nil。如果resyncPeriod不为零,那么Reflector为以resyncPeriod为周期定期执行list的操作,这样就可以使用Reflector来定期处理所有的对象,也可以逐步处理变化的对象。
8.2. ListAndWatch
ListAndWatch第一次会列出所有的对象,并获取资源对象的版本号,然后watch资源对象的版本号来查看是否有被变更。首先会将资源版本号设置为0,list()可能会导致本地的缓存相对于etcd里面的内容存在延迟,Reflector会通过watch的方法将延迟的部分补充上,使得本地的缓存数据与etcd的数据保持一致。
8.3. DeltaFIFO
DeltaFIFO是一个生产者与消费者的队列,其中Reflector是生产者,消费者调用Pop()的方法。
DeltaFIFO主要用在以下场景:
希望对象变更最多处理一次
处理对象时,希望查看自上次处理对象以来发生的所有事情
要处理对象的删除
希望定期重新处理对象
8.4. store
Store是一个通用的存储接口,Reflector通过watch server的方式更新数据到store中,store给Reflector提供本地的缓存,让Reflector可以像消息队列一样的工作。
Store实现的是一种可以准确的写入对象和获取对象的机制。
8.5. processor
processor的主要功能就是记录了所有的回调函数实例(即 ResourceEventHandler 实例),并负责触发这些函数。在sharedIndexInformer.Run部分会调用processor.run。
流程:
listenser的add函数负责将notify装进pendingNotifications。
pop函数取出pendingNotifications的第一个nofify,输出到nextCh channel。
run函数则负责取出notify,然后根据notify的类型(增加、删除、更新)触发相应的处理函数,这些函数是在不同的
NewXxxcontroller实现中注册的。
processor记录了不同类似的事件函数,其中事件函数在NewXxxController构造函数部分注册,具体事件函数的处理,一般是将需要处理的对象加入对应的controller的任务队列中,然后由类似syncDeployment的同步函数来维持期望状态的同步逻辑。
8.6. 主要步骤
在controller-manager的Run函数部分调用了InformerFactory.Start的方法,Start方法初始化各种类型的informer,并且每个类型起了个informer.Run的goroutine。
informer.Run的部分先生成一个DeltaFIFO的队列来存储对象变化的数据。然后调用processor.Run和controller.Run函数。
controller.Run函数会生成一个Reflector,
Reflector的主要作用是watch指定的k8s资源,并将变化同步到本地是store中。Reflector以resyncPeriod为周期定期执行list的操作,这样就可以使用Reflector来定期处理所有的对象,也可以逐步处理变化的对象。Reflector接着执行ListAndWatch函数,ListAndWatch第一次会列出所有的对象,并获取资源对象的版本号,然后watch资源对象的版本号来查看是否有被变更。首先会将资源版本号设置为0,
list()可能会导致本地的缓存相对于etcd里面的内容存在延迟,Reflector会通过watch的方法将延迟的部分补充上,使得本地的缓存数据与etcd的数据保持一致。controller.Run函数还会调用processLoop函数,processLoop通过调用HandleDeltas,再调用distribute,processorListener.add最终将不同更新类型的对象加入
processorListener的channel中,供processorListener.Run使用。processor的主要功能就是记录了所有的回调函数实例(即 ResourceEventHandler 实例),并负责触发这些函数。processor记录了不同类型的事件函数,其中事件函数在NewXxxController构造函数部分注册,具体事件函数的处理,一般是将需要处理的对象加入对应的controller的任务队列中,然后由类似syncDeployment的同步函数来维持期望状态的同步逻辑。
参考文章:
最后更新于
这有帮助吗?