Informer机制

kube-controller-manager源码分析(三)之 Informer机制

以下代码分析基于 kubernetes v1.12.0 版本。

本文主要分析k8s中各个核心组件经常使用到的Informer机制(即List-Watch)。该部分的代码主要位于client-go这个第三方包中。

此部分的逻辑主要位于/vendor/k8s.io/client-go/tools/cache包中,代码目录结构如下:

cache
├── controller.go  # 包含:Config、Run、processLoop、NewInformer、NewIndexerInformer
├── delta_fifo.go  # 包含:NewDeltaFIFO、DeltaFIFO、AddIfNotPresent
├── expiration_cache.go
├── expiration_cache_fakes.go
├── fake_custom_store.go
├── fifo.go   # 包含:Queue、FIFO、NewFIFO
├── heap.go
├── index.go    # 包含:Indexer、MetaNamespaceIndexFunc
├── listers.go
├── listwatch.go   # 包含:ListerWatcher、ListWatch、List、Watch
├── mutation_cache.go
├── mutation_detector.go
├── reflector.go   # 包含:Reflector、NewReflector、Run、ListAndWatch
├── reflector_metrics.go
├── shared_informer.go  # 包含:NewSharedInformer、WaitForCacheSync、Run、HasSynced
├── store.go  # 包含:Store、MetaNamespaceKeyFunc、SplitMetaNamespaceKey
├── testing
   ├── fake_controller_source.go
├── thread_safe_store.go  # 包含:ThreadSafeStore、threadSafeMap
├── undelta_store.go

0. 原理示意图

示意图1

示意图2

0.1. client-go组件

  • Reflector:reflector用来watch特定的k8s API资源。具体的实现是通过ListAndWatch的方法,watch可以是k8s内建的资源或者是自定义的资源。当reflector通过watch API接收到有关新资源实例存在的通知时,它使用相应的列表API获取新创建的对象,并将其放入watchHandler函数内的Delta Fifo队列中。

  • Informer:informer从Delta Fifo队列中弹出对象。执行此操作的功能是processLoop。base controller的作用是保存对象以供以后检索,并调用我们的控制器将对象传递给它。

  • Indexer:索引器提供对象的索引功能。典型的索引用例是基于对象标签创建索引。 Indexer可以根据多个索引函数维护索引。Indexer使用线程安全的数据存储来存储对象及其键。 在Store中定义了一个名为MetaNamespaceKeyFunc的默认函数,该函数生成对象的键作为该对象的<namespace> / <name>组合。

0.2. 自定义controller组件

  • Informer reference:指的是Informer实例的引用,定义如何使用自定义资源对象。 自定义控制器代码需要创建对应的Informer。

  • Indexer reference: 自定义控制器对Indexer实例的引用。自定义控制器需要创建对应的Indexser。

client-go中提供NewIndexerInformer函数可以创建Informer 和 Indexer。

  • Resource Event Handlers:资源事件回调函数,当它想要将对象传递给控制器时,它将被调用。 编写这些函数的典型模式是获取调度对象的key,并将该key排入工作队列以进行进一步处理。

  • Work queue:任务队列。 编写资源事件处理程序函数以提取传递的对象的key并将其添加到任务队列。

  • Process Item:处理任务队列中对象的函数, 这些函数通常使用Indexer引用或Listing包装器来重试与该key对应的对象。

1. sharedInformerFactory.Start

在controller-manager的Run函数部分调用了InformerFactory.Start的方法。

此部分代码位于/cmd/kube-controller-manager/app/controllermanager.go

InformerFactory是一个SharedInformerFactory的接口,接口定义如下:

此部分代码位于vendor/k8s.io/client-go/informers/internalinterfaces/factory_interfaces.go

Start方法初始化各种类型的informer,并且每个类型起了个informer.Run的goroutine。

此部分代码位于vendor/k8s.io/client-go/informers/factory.go

2. sharedIndexInformer.Run

此部分的代码位于/vendor/k8s.io/client-go/tools/cache/shared_informer.go

2.1. NewDeltaFIFO

DeltaFIFO是一个对象变化的存储队列,依据先进先出的原则,process的函数接收该队列的Pop方法的输出对象来处理相关功能。

2.2. Config

构造controller的配置文件,构造process,即HandleDeltas,该函数为后面使用到的process函数。

2.3. controller

调用New(cfg),构建sharedIndexInformer的controller。

2.4. cacheMutationDetector.Run

调用s.cacheMutationDetector.Run,检查缓存对象是否变化。

defaultCacheMutationDetector.Run

CompareObjects

2.5. processor.run

调用s.processor.run,将调用sharedProcessor.run,会调用Listener.run和Listener.pop,执行处理queue的函数。

sharedProcessor.Run

该部分逻辑待后面分析。

2.6. controller.Run

调用s.controller.Run,构建Reflector,进行对etcd的缓存

controller.Run

此部分代码位于/vendor/k8s.io/client-go/tools/cache/controller.go

核心代码:

3. Reflector

3.1. Reflector

Reflector的主要作用是watch指定的k8s资源,并将变化同步到本地是store中。Reflector只会放置指定的expectedType类型的资源到store中,除非expectedType为nil。如果resyncPeriod不为零,那么Reflector为以resyncPeriod为周期定期执行list的操作,这样就可以使用Reflector来定期处理所有的对象,也可以逐步处理变化的对象。

常用属性说明:

  • expectedType:期望放入缓存store的资源类型。

  • store:watch的资源对应的本地缓存。

  • listerWatcher:list和watch的接口。

  • period:watch的周期,默认为1秒。

  • resyncPeriod:resync的周期,当非零的时候,会按该周期执行list。

  • lastSyncResourceVersion:最新一次看到的资源的版本号,主要在watch时候使用。

3.2. NewReflector

NewReflector主要用来构建Reflector的结构体。

此部分的代码位于/vendor/k8s.io/client-go/tools/cache/reflector.go

3.3. Reflector.Run

Reflector.Run主要执行了ListAndWatch的方法。

3.4. ListAndWatch

ListAndWatch第一次会列出所有的对象,并获取资源对象的版本号,然后watch资源对象的版本号来查看是否有被变更。首先会将资源版本号设置为0,list()可能会导致本地的缓存相对于etcd里面的内容存在延迟,Reflector会通过watch的方法将延迟的部分补充上,使得本地的缓存数据与etcd的数据保持一致。

3.4.1. List

首先将资源的版本号设置为0,然后调用listerWatcher.List(options),列出所有list的内容。

获取资源版本号,并将list的内容提取成对象列表。

将list中对象列表的内容和版本号存储到本地的缓存store中,并全量替换已有的store的内容。

syncWith调用了store的Replace的方法来替换原来store中的数据。

Store.Replace方法定义如下:

最后设置最新的资源版本号。

setLastSyncResourceVersion:

3.4.2. store.Resync

核心代码:

store的具体对象为DeltaFIFO,即调用DeltaFIFO.Resync

3.4.3. Watch

设置watch的超时时间,默认为5分钟。

执行listerWatcher.Watch(options)。

执行watchHandler。

3.4.4. watchHandler

watchHandler主要是通过watch的方式保证当前的资源版本是最新的。

获取watch接口中的事件的channel,来获取事件的内容。

当获得添加、更新、删除的事件时,将对应的对象更新到本地缓存store中。

更新当前的最新版本号。

通过对Reflector模块的分析,可以看到多次使用到本地缓存store模块,而store的数据由DeltaFIFO赋值而来,以下针对DeltaFIFO和store做分析。

4. DeltaFIFO

DeltaFIFO由NewDeltaFIFO初始化,并赋值给config.Queue。

4.1. NewDeltaFIFO

controller.Run的部分调用了NewReflector。

NewReflector构造函数,将c.config.Queue赋值给Reflector.store的属性。

4.2. DeltaFIFO

DeltaFIFO是一个生产者与消费者的队列,其中Reflector是生产者,消费者调用Pop()的方法。

DeltaFIFO主要用在以下场景:

  • 希望对象变更最多处理一次

  • 处理对象时,希望查看自上次处理对象以来发生的所有事情

  • 要处理对象的删除

  • 希望定期重新处理对象

4.3. Queue & Store

DeltaFIFO的类型是Queue接口,Reflector.store是Store接口,Queue接口是一个存储队列,Process的方法执行Queue.Pop出来的数据对象,

5. store

Store是一个通用的存储接口,Reflector通过watch server的方式更新数据到store中,store给Reflector提供本地的缓存,让Reflector可以像消息队列一样的工作。

Store实现的是一种可以准确的写入对象和获取对象的机制。

其中Replace方法会删除原来store中的内容,并将新增的list的内容存入store中,即完全替换数据。

6.1. cache

cache实现了store的接口,而cache的具体实现又是调用ThreadSafeStore接口来实现功能的。

cache的功能主要有以下两点:

  • 通过keyFunc计算对象的key

  • 调用ThreadSafeStorage接口的方法

其中ListAndWatch主要用到以下的方法:

cache.Replace

cache.Add

cache.Update

cache.Delete

6.2. ThreadSafeStore

cache的具体是调用ThreadSafeStore来实现的。

threadSafeMap

6. processLoop

在controller.Run方法中会调用processLoop,以下分析processLoop的处理逻辑。

processLoop主要处理任务队列中的任务,其中处理逻辑是调用具体的ProcessFunc函数来实现,核心代码为:

5.1. DeltaFIFO.Pop

Pop会阻塞住直到队列里面添加了新的对象,如果有多个对象,按照先进先出的原则处理,如果某个对象没有处理成功会重新被加入该队列中。

Pop中会调用具体的process函数来处理对象。

核心代码:

5.2. HandleDeltas

其中process函数就是在sharedIndexInformer.Run方法中,给config.Process赋值的HandleDeltas函数。

核心代码:

根据不同的类型,调用processor.distribute方法,该方法将对象加入processorListener的channel中。

5.3. sharedProcessor.distribute

processorListener.add:

综合以上的分析,可以看出processLoop通过调用HandleDeltas,再调用distribute,processorListener.add最终将不同更新类型的对象加入processorListener的channel中,供processorListener.Run使用。以下分析processorListener.Run的部分。

7. processor

processor的主要功能就是记录了所有的回调函数实例(即 ResourceEventHandler 实例),并负责触发这些函数。在sharedIndexInformer.Run部分会调用processor.run。

流程:

  1. listenser的add函数负责将notify装进pendingNotifications。

  2. pop函数取出pendingNotifications的第一个nofify,输出到nextCh channel。

  3. run函数则负责取出notify,然后根据notify的类型(增加、删除、更新)触发相应的处理函数,这些函数是在不同的NewXxxcontroller实现中注册的。

7.1. sharedProcessor.Run

7.1.1. listener.pop

pop函数取出pendingNotifications的第一个nofify,输出到nextCh channel。

7.1.2. listener.run

listener.run部分根据不同的更新类型调用不同的处理函数。

其中具体的实现函数handler是在NewDeploymentController(其他不同类型的controller类似)中赋值的,而该handler是一个接口,具体如下:

7.2. ResourceEventHandler

以下以DeploymentController的处理逻辑为例。

NewDeploymentController部分会注册deployment的事件函数,以下注册了三种类型的事件函数,其中包括:dInformer、rsInformer和podInformer。

7.2.1. addDeployment

以下以addDeployment为例,addDeployment主要是将对象加入到enqueueDeployment的队列中。

enqueueDeployment的定义

将dc.enqueue赋值给dc.enqueueDeployment

dc.enqueue调用了dc.queue.Add(key)

dc.queue主要记录了需要被同步的deployment的对象,供syncDeployment使用。

NewNamedRateLimitingQueue

通过以上分析,可以看出processor记录了不同类似的事件函数,其中事件函数在NewXxxController构造函数部分注册,具体事件函数的处理,一般是将需要处理的对象加入对应的controller的任务队列中,然后由类似syncDeployment的同步函数来维持期望状态的同步逻辑。

8. 总结

本文分析的部分主要是k8s的informer机制,即List-Watch机制。

8.1. Reflector

Reflector的主要作用是watch指定的k8s资源,并将变化同步到本地是store中。Reflector只会放置指定的expectedType类型的资源到store中,除非expectedType为nil。如果resyncPeriod不为零,那么Reflector为以resyncPeriod为周期定期执行list的操作,这样就可以使用Reflector来定期处理所有的对象,也可以逐步处理变化的对象。

8.2. ListAndWatch

ListAndWatch第一次会列出所有的对象,并获取资源对象的版本号,然后watch资源对象的版本号来查看是否有被变更。首先会将资源版本号设置为0,list()可能会导致本地的缓存相对于etcd里面的内容存在延迟,Reflector会通过watch的方法将延迟的部分补充上,使得本地的缓存数据与etcd的数据保持一致。

8.3. DeltaFIFO

DeltaFIFO是一个生产者与消费者的队列,其中Reflector是生产者,消费者调用Pop()的方法。

DeltaFIFO主要用在以下场景:

  • 希望对象变更最多处理一次

  • 处理对象时,希望查看自上次处理对象以来发生的所有事情

  • 要处理对象的删除

  • 希望定期重新处理对象

8.4. store

Store是一个通用的存储接口,Reflector通过watch server的方式更新数据到store中,store给Reflector提供本地的缓存,让Reflector可以像消息队列一样的工作。

Store实现的是一种可以准确的写入对象和获取对象的机制。

8.5. processor

processor的主要功能就是记录了所有的回调函数实例(即 ResourceEventHandler 实例),并负责触发这些函数。在sharedIndexInformer.Run部分会调用processor.run。

流程:

  1. listenser的add函数负责将notify装进pendingNotifications。

  2. pop函数取出pendingNotifications的第一个nofify,输出到nextCh channel。

  3. run函数则负责取出notify,然后根据notify的类型(增加、删除、更新)触发相应的处理函数,这些函数是在不同的NewXxxcontroller实现中注册的。

processor记录了不同类似的事件函数,其中事件函数在NewXxxController构造函数部分注册,具体事件函数的处理,一般是将需要处理的对象加入对应的controller的任务队列中,然后由类似syncDeployment的同步函数来维持期望状态的同步逻辑。

8.6. 主要步骤

  1. 在controller-manager的Run函数部分调用了InformerFactory.Start的方法,Start方法初始化各种类型的informer,并且每个类型起了个informer.Run的goroutine。

  2. informer.Run的部分先生成一个DeltaFIFO的队列来存储对象变化的数据。然后调用processor.Run和controller.Run函数。

  3. controller.Run函数会生成一个Reflector,Reflector的主要作用是watch指定的k8s资源,并将变化同步到本地是store中。ReflectorresyncPeriod为周期定期执行list的操作,这样就可以使用Reflector来定期处理所有的对象,也可以逐步处理变化的对象。

  4. Reflector接着执行ListAndWatch函数,ListAndWatch第一次会列出所有的对象,并获取资源对象的版本号,然后watch资源对象的版本号来查看是否有被变更。首先会将资源版本号设置为0,list()可能会导致本地的缓存相对于etcd里面的内容存在延迟,Reflector会通过watch的方法将延迟的部分补充上,使得本地的缓存数据与etcd的数据保持一致。

  5. controller.Run函数还会调用processLoop函数,processLoop通过调用HandleDeltas,再调用distribute,processorListener.add最终将不同更新类型的对象加入processorListener的channel中,供processorListener.Run使用。

  6. processor的主要功能就是记录了所有的回调函数实例(即 ResourceEventHandler 实例),并负责触发这些函数。processor记录了不同类型的事件函数,其中事件函数在NewXxxController构造函数部分注册,具体事件函数的处理,一般是将需要处理的对象加入对应的controller的任务队列中,然后由类似syncDeployment的同步函数来维持期望状态的同步逻辑。

参考文章:

最后更新于

这有帮助吗?