Informer机制

kube-controller-manager源码分析(三)之 Informer机制

以下代码分析基于 kubernetes v1.12.0 版本。

本文主要分析k8s中各个核心组件经常使用到的Informer机制(即List-Watch)。该部分的代码主要位于client-go这个第三方包中。

此部分的逻辑主要位于/vendor/k8s.io/client-go/tools/cache包中,代码目录结构如下:

cache
├── controller.go  # 包含:Config、Run、processLoop、NewInformer、NewIndexerInformer
├── delta_fifo.go  # 包含:NewDeltaFIFO、DeltaFIFO、AddIfNotPresent
├── expiration_cache.go
├── expiration_cache_fakes.go
├── fake_custom_store.go
├── fifo.go   # 包含:Queue、FIFO、NewFIFO
├── heap.go
├── index.go    # 包含:Indexer、MetaNamespaceIndexFunc
├── listers.go
├── listwatch.go   # 包含:ListerWatcher、ListWatch、List、Watch
├── mutation_cache.go
├── mutation_detector.go
├── reflector.go   # 包含:Reflector、NewReflector、Run、ListAndWatch
├── reflector_metrics.go
├── shared_informer.go  # 包含:NewSharedInformer、WaitForCacheSync、Run、HasSynced
├── store.go  # 包含:Store、MetaNamespaceKeyFunc、SplitMetaNamespaceKey
├── testing
   ├── fake_controller_source.go
├── thread_safe_store.go  # 包含:ThreadSafeStore、threadSafeMap
├── undelta_store.go

0. 原理示意图

示意图1

示意图2

0.1. client-go组件

  • Reflector:reflector用来watch特定的k8s API资源。具体的实现是通过ListAndWatch的方法,watch可以是k8s内建的资源或者是自定义的资源。当reflector通过watch API接收到有关新资源实例存在的通知时,它使用相应的列表API获取新创建的对象,并将其放入watchHandler函数内的Delta Fifo队列中。

  • Informer:informer从Delta Fifo队列中弹出对象。执行此操作的功能是processLoop。base controller的作用是保存对象以供以后检索,并调用我们的控制器将对象传递给它。

  • Indexer:索引器提供对象的索引功能。典型的索引用例是基于对象标签创建索引。 Indexer可以根据多个索引函数维护索引。Indexer使用线程安全的数据存储来存储对象及其键。 在Store中定义了一个名为MetaNamespaceKeyFunc的默认函数,该函数生成对象的键作为该对象的<namespace> / <name>组合。

0.2. 自定义controller组件

  • Informer reference:指的是Informer实例的引用,定义如何使用自定义资源对象。 自定义控制器代码需要创建对应的Informer。

  • Indexer reference: 自定义控制器对Indexer实例的引用。自定义控制器需要创建对应的Indexser。

client-go中提供NewIndexerInformer函数可以创建Informer 和 Indexer。

  • Resource Event Handlers:资源事件回调函数,当它想要将对象传递给控制器时,它将被调用。 编写这些函数的典型模式是获取调度对象的key,并将该key排入工作队列以进行进一步处理。

  • Work queue:任务队列。 编写资源事件处理程序函数以提取传递的对象的key并将其添加到任务队列。

  • Process Item:处理任务队列中对象的函数, 这些函数通常使用Indexer引用或Listing包装器来重试与该key对应的对象。

1. sharedInformerFactory.Start

在controller-manager的Run函数部分调用了InformerFactory.Start的方法。

此部分代码位于/cmd/kube-controller-manager/app/controllermanager.go

// Run runs the KubeControllerManagerOptions.  This should never exit.
func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
    ...
		controllerContext.InformerFactory.Start(controllerContext.Stop)
		close(controllerContext.InformersStarted)
    ...
}

InformerFactory是一个SharedInformerFactory的接口,接口定义如下:

此部分代码位于vendor/k8s.io/client-go/informers/internalinterfaces/factory_interfaces.go

// SharedInformerFactory a small interface to allow for adding an informer without an import cycle
type SharedInformerFactory interface {
	Start(stopCh <-chan struct{})
	InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer
}

Start方法初始化各种类型的informer,并且每个类型起了个informer.Run的goroutine。

此部分代码位于vendor/k8s.io/client-go/informers/factory.go

// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
	f.lock.Lock()
	defer f.lock.Unlock()

	for informerType, informer := range f.informers {
		if !f.startedInformers[informerType] {
			go informer.Run(stopCh)
			f.startedInformers[informerType] = true
		}
	}
}

2. sharedIndexInformer.Run

此部分的代码位于/vendor/k8s.io/client-go/tools/cache/shared_informer.go

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()

	fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, s.indexer)

	cfg := &Config{
		Queue:            fifo,
		ListerWatcher:    s.listerWatcher,
		ObjectType:       s.objectType,
		FullResyncPeriod: s.resyncCheckPeriod,
		RetryOnError:     false,
		ShouldResync:     s.processor.shouldResync,

		Process: s.HandleDeltas,
	}

	func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()

		s.controller = New(cfg)
		s.controller.(*controller).clock = s.clock
		s.started = true
	}()

	// Separate stop channel because Processor should be stopped strictly after controller
	processorStopCh := make(chan struct{})
	var wg wait.Group
	defer wg.Wait()              // Wait for Processor to stop
	defer close(processorStopCh) // Tell Processor to stop
	wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
	wg.StartWithChannel(processorStopCh, s.processor.run)

	defer func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()
		s.stopped = true // Don't want any new listeners
	}()
	s.controller.Run(stopCh)
}

2.1. NewDeltaFIFO

DeltaFIFO是一个对象变化的存储队列,依据先进先出的原则,process的函数接收该队列的Pop方法的输出对象来处理相关功能。

fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, s.indexer)

2.2. Config

构造controller的配置文件,构造process,即HandleDeltas,该函数为后面使用到的process函数。

cfg := &Config{
	Queue:            fifo,
	ListerWatcher:    s.listerWatcher,
	ObjectType:       s.objectType,
	FullResyncPeriod: s.resyncCheckPeriod,
	RetryOnError:     false,
	ShouldResync:     s.processor.shouldResync,

	Process: s.HandleDeltas,
}

2.3. controller

调用New(cfg),构建sharedIndexInformer的controller。

func() {
	s.startedLock.Lock()
	defer s.startedLock.Unlock()

	s.controller = New(cfg)
	s.controller.(*controller).clock = s.clock
	s.started = true
}()

2.4. cacheMutationDetector.Run

调用s.cacheMutationDetector.Run,检查缓存对象是否变化。

wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)

defaultCacheMutationDetector.Run

func (d *defaultCacheMutationDetector) Run(stopCh <-chan struct{}) {
	// we DON'T want protection from panics.  If we're running this code, we want to die
	for {
		d.CompareObjects()

		select {
		case <-stopCh:
			return
		case <-time.After(d.period):
		}
	}
}

CompareObjects

func (d *defaultCacheMutationDetector) CompareObjects() {
	d.lock.Lock()
	defer d.lock.Unlock()

	altered := false
	for i, obj := range d.cachedObjs {
		if !reflect.DeepEqual(obj.cached, obj.copied) {
			fmt.Printf("CACHE %s[%d] ALTERED!\n%v\n", d.name, i, diff.ObjectDiff(obj.cached, obj.copied))
			altered = true
		}
	}

	if altered {
		msg := fmt.Sprintf("cache %s modified", d.name)
		if d.failureFunc != nil {
			d.failureFunc(msg)
			return
		}
		panic(msg)
	}
}

2.5. processor.run

调用s.processor.run,将调用sharedProcessor.run,会调用Listener.run和Listener.pop,执行处理queue的函数。

wg.StartWithChannel(processorStopCh, s.processor.run)

sharedProcessor.Run

func (p *sharedProcessor) run(stopCh <-chan struct{}) {
	func() {
		p.listenersLock.RLock()
		defer p.listenersLock.RUnlock()
		for _, listener := range p.listeners {
			p.wg.Start(listener.run)
			p.wg.Start(listener.pop)
		}
	}()
	<-stopCh
	p.listenersLock.RLock()
	defer p.listenersLock.RUnlock()
	for _, listener := range p.listeners {
		close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
	}
	p.wg.Wait() // Wait for all .pop() and .run() to stop
}

该部分逻辑待后面分析。

2.6. controller.Run

调用s.controller.Run,构建Reflector,进行对etcd的缓存

defer func() {
	s.startedLock.Lock()
	defer s.startedLock.Unlock()
	s.stopped = true // Don't want any new listeners
}()
s.controller.Run(stopCh)

controller.Run

此部分代码位于/vendor/k8s.io/client-go/tools/cache/controller.go

// Run begins processing items, and will continue until a value is sent down stopCh.
// It's an error to call Run more than once.
// Run blocks; call via go.
func (c *controller) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	go func() {
		<-stopCh
		c.config.Queue.Close()
	}()
	r := NewReflector(
		c.config.ListerWatcher,
		c.config.ObjectType,
		c.config.Queue,
		c.config.FullResyncPeriod,
	)
	r.ShouldResync = c.config.ShouldResync
	r.clock = c.clock

	c.reflectorMutex.Lock()
	c.reflector = r
	c.reflectorMutex.Unlock()

	var wg wait.Group
	defer wg.Wait()

	wg.StartWithChannel(stopCh, r.Run)

	wait.Until(c.processLoop, time.Second, stopCh)
}

核心代码:

// 构建Reflector
r := NewReflector(
	c.config.ListerWatcher,
	c.config.ObjectType,
	c.config.Queue,
	c.config.FullResyncPeriod,
)
// 运行Reflector
wg.StartWithChannel(stopCh, r.Run)
// 执行processLoop
wait.Until(c.processLoop, time.Second, stopCh)

3. Reflector

3.1. Reflector

Reflector的主要作用是watch指定的k8s资源,并将变化同步到本地是store中。Reflector只会放置指定的expectedType类型的资源到store中,除非expectedType为nil。如果resyncPeriod不为零,那么Reflector为以resyncPeriod为周期定期执行list的操作,这样就可以使用Reflector来定期处理所有的对象,也可以逐步处理变化的对象。

常用属性说明:

  • expectedType:期望放入缓存store的资源类型。

  • store:watch的资源对应的本地缓存。

  • listerWatcher:list和watch的接口。

  • period:watch的周期,默认为1秒。

  • resyncPeriod:resync的周期,当非零的时候,会按该周期执行list。

  • lastSyncResourceVersion:最新一次看到的资源的版本号,主要在watch时候使用。

// Reflector watches a specified resource and causes all changes to be reflected in the given store.
type Reflector struct {
	// name identifies this reflector. By default it will be a file:line if possible.
	name string
	// metrics tracks basic metric information about the reflector
	metrics *reflectorMetrics

	// The type of object we expect to place in the store.
	expectedType reflect.Type
	// The destination to sync up with the watch source
	store Store
	// listerWatcher is used to perform lists and watches.
	listerWatcher ListerWatcher
	// period controls timing between one watch ending and
	// the beginning of the next one.
	period       time.Duration
	resyncPeriod time.Duration
	ShouldResync func() bool
	// clock allows tests to manipulate time
	clock clock.Clock
	// lastSyncResourceVersion is the resource version token last
	// observed when doing a sync with the underlying store
	// it is thread safe, but not synchronized with the underlying store
	lastSyncResourceVersion string
	// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
	lastSyncResourceVersionMutex sync.RWMutex
}

3.2. NewReflector

NewReflector主要用来构建Reflector的结构体。

此部分的代码位于/vendor/k8s.io/client-go/tools/cache/reflector.go

// NewReflector creates a new Reflector object which will keep the given store up to
// date with the server's contents for the given resource. Reflector promises to
// only put things in the store that have the type of expectedType, unless expectedType
// is nil. If resyncPeriod is non-zero, then lists will be executed after every
// resyncPeriod, so that you can use reflectors to periodically process everything as
// well as incrementally processing the things that change.
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
	return NewNamedReflector(getDefaultReflectorName(internalPackages...), lw, expectedType, store, resyncPeriod)
}

// reflectorDisambiguator is used to disambiguate started reflectors.
// initialized to an unstable value to ensure meaning isn't attributed to the suffix.
var reflectorDisambiguator = int64(time.Now().UnixNano() % 12345)

// NewNamedReflector same as NewReflector, but with a specified name for logging
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
	reflectorSuffix := atomic.AddInt64(&reflectorDisambiguator, 1)
	r := &Reflector{
		name: name,
		// we need this to be unique per process (some names are still the same)but obvious who it belongs to
		metrics:       newReflectorMetrics(makeValidPromethusMetricLabel(fmt.Sprintf("reflector_"+name+"_%d", reflectorSuffix))),
		listerWatcher: lw,
		store:         store,
		expectedType:  reflect.TypeOf(expectedType),
		period:        time.Second,
		resyncPeriod:  resyncPeriod,
		clock:         &clock.RealClock{},
	}
	return r
}

3.3. Reflector.Run

Reflector.Run主要执行了ListAndWatch的方法。

// Run starts a watch and handles watch events. Will restart the watch if it is closed.
// Run will exit when stopCh is closed.
func (r *Reflector) Run(stopCh <-chan struct{}) {
	glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
	wait.Until(func() {
		if err := r.ListAndWatch(stopCh); err != nil {
			utilruntime.HandleError(err)
		}
	}, r.period, stopCh)
}

3.4. ListAndWatch

ListAndWatch第一次会列出所有的对象,并获取资源对象的版本号,然后watch资源对象的版本号来查看是否有被变更。首先会将资源版本号设置为0,list()可能会导致本地的缓存相对于etcd里面的内容存在延迟,Reflector会通过watch的方法将延迟的部分补充上,使得本地的缓存数据与etcd的数据保持一致。

3.4.1. List

// ListAndWatch first lists all items and get the resource version at the moment of call,
// and then use the resource version to watch.
// It returns error if ListAndWatch didn't even try to initialize watch.
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
	glog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name)
	var resourceVersion string

	// Explicitly set "0" as resource version - it's fine for the List()
	// to be served from cache and potentially be delayed relative to
	// etcd contents. Reflector framework will catch up via Watch() eventually.
	options := metav1.ListOptions{ResourceVersion: "0"}
	r.metrics.numberOfLists.Inc()
	start := r.clock.Now()
	list, err := r.listerWatcher.List(options)
	if err != nil {
		return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)
	}
	r.metrics.listDuration.Observe(time.Since(start).Seconds())
	listMetaInterface, err := meta.ListAccessor(list)
	if err != nil {
		return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
	}
	resourceVersion = listMetaInterface.GetResourceVersion()
	items, err := meta.ExtractList(list)
	if err != nil {
		return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
	}
	r.metrics.numberOfItemsInList.Observe(float64(len(items)))
	if err := r.syncWith(items, resourceVersion); err != nil {
		return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
	}
	r.setLastSyncResourceVersion(resourceVersion)
    ...
}    

首先将资源的版本号设置为0,然后调用listerWatcher.List(options),列出所有list的内容。

// 版本号设置为0
options := metav1.ListOptions{ResourceVersion: "0"}
// list接口
list, err := r.listerWatcher.List(options)

获取资源版本号,并将list的内容提取成对象列表。

// 获取版本号
resourceVersion = listMetaInterface.GetResourceVersion()
// 将list的内容提取成对象列表
items, err := meta.ExtractList(list)

将list中对象列表的内容和版本号存储到本地的缓存store中,并全量替换已有的store的内容。

err := r.syncWith(items, resourceVersion)

syncWith调用了store的Replace的方法来替换原来store中的数据。

// syncWith replaces the store's items with the given list.
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
	found := make([]interface{}, 0, len(items))
	for _, item := range items {
		found = append(found, item)
	}
	return r.store.Replace(found, resourceVersion)
}

Store.Replace方法定义如下:

type Store interface {
	...
	// Replace will delete the contents of the store, using instead the
	// given list. Store takes ownership of the list, you should not reference
	// it after calling this function.
	Replace([]interface{}, string) error
    ...
}

最后设置最新的资源版本号。

r.setLastSyncResourceVersion(resourceVersion)

setLastSyncResourceVersion:

func (r *Reflector) setLastSyncResourceVersion(v string) {
	r.lastSyncResourceVersionMutex.Lock()
	defer r.lastSyncResourceVersionMutex.Unlock()
	r.lastSyncResourceVersion = v

	rv, err := strconv.Atoi(v)
	if err == nil {
		r.metrics.lastResourceVersion.Set(float64(rv))
	}
}

3.4.2. store.Resync

resyncerrc := make(chan error, 1)
cancelCh := make(chan struct{})
defer close(cancelCh)
go func() {
	resyncCh, cleanup := r.resyncChan()
	defer func() {
		cleanup() // Call the last one written into cleanup
	}()
	for {
		select {
		case <-resyncCh:
		case <-stopCh:
			return
		case <-cancelCh:
			return
		}
		if r.ShouldResync == nil || r.ShouldResync() {
			glog.V(4).Infof("%s: forcing resync", r.name)
			if err := r.store.Resync(); err != nil {
				resyncerrc <- err
				return
			}
		}
		cleanup()
		resyncCh, cleanup = r.resyncChan()
	}
}()

核心代码:

err := r.store.Resync()

store的具体对象为DeltaFIFO,即调用DeltaFIFO.Resync

// Resync will send a sync event for each item
func (f *DeltaFIFO) Resync() error {
	f.lock.Lock()
	defer f.lock.Unlock()

	if f.knownObjects == nil {
		return nil
	}

	keys := f.knownObjects.ListKeys()
	for _, k := range keys {
		if err := f.syncKeyLocked(k); err != nil {
			return err
		}
	}
	return nil
}

3.4.3. Watch

for {
	// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
	select {
	case <-stopCh:
		return nil
	default:
	}

	timemoutseconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
	options = metav1.ListOptions{
		ResourceVersion: resourceVersion,
		// We want to avoid situations of hanging watchers. Stop any wachers that do not
		// receive any events within the timeout window.
		TimeoutSeconds: &timemoutseconds,
	}

	r.metrics.numberOfWatches.Inc()
	w, err := r.listerWatcher.Watch(options)
	if err != nil {
		switch err {
		case io.EOF:
			// watch closed normally
		case io.ErrUnexpectedEOF:
			glog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err)
		default:
			utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err))
		}
		// If this is "connection refused" error, it means that most likely apiserver is not responsive.
		// It doesn't make sense to re-list all objects because most likely we will be able to restart
		// watch where we ended.
		// If that's the case wait and resend watch request.
		if urlError, ok := err.(*url.Error); ok {
			if opError, ok := urlError.Err.(*net.OpError); ok {
				if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED {
					time.Sleep(time.Second)
					continue
				}
			}
		}
		return nil
	}

	if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
		if err != errorStopRequested {
			glog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
		}
		return nil
	}
}

设置watch的超时时间,默认为5分钟。

timemoutseconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options = metav1.ListOptions{
	ResourceVersion: resourceVersion,
	// We want to avoid situations of hanging watchers. Stop any wachers that do not
	// receive any events within the timeout window.
	TimeoutSeconds: &timemoutseconds,
}

执行listerWatcher.Watch(options)。

w, err := r.listerWatcher.Watch(options)

执行watchHandler。

err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh)

3.4.4. watchHandler

watchHandler主要是通过watch的方式保证当前的资源版本是最新的。

// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
	start := r.clock.Now()
	eventCount := 0

	// Stopping the watcher should be idempotent and if we return from this function there's no way
	// we're coming back in with the same watch interface.
	defer w.Stop()
	// update metrics
	defer func() {
		r.metrics.numberOfItemsInWatch.Observe(float64(eventCount))
		r.metrics.watchDuration.Observe(time.Since(start).Seconds())
	}()

loop:
	for {
		select {
		case <-stopCh:
			return errorStopRequested
		case err := <-errc:
			return err
		case event, ok := <-w.ResultChan():
			if !ok {
				break loop
			}
			if event.Type == watch.Error {
				return apierrs.FromObject(event.Object)
			}
			if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a {
				utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
				continue
			}
			meta, err := meta.Accessor(event.Object)
			if err != nil {
				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
				continue
			}
			newResourceVersion := meta.GetResourceVersion()
			switch event.Type {
			case watch.Added:
				err := r.store.Add(event.Object)
				if err != nil {
					utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
				}
			case watch.Modified:
				err := r.store.Update(event.Object)
				if err != nil {
					utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
				}
			case watch.Deleted:
				// TODO: Will any consumers need access to the "last known
				// state", which is passed in event.Object? If so, may need
				// to change this.
				err := r.store.Delete(event.Object)
				if err != nil {
					utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
				}
			default:
				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
			}
			*resourceVersion = newResourceVersion
			r.setLastSyncResourceVersion(newResourceVersion)
			eventCount++
		}
	}

	watchDuration := r.clock.Now().Sub(start)
	if watchDuration < 1*time.Second && eventCount == 0 {
		r.metrics.numberOfShortWatches.Inc()
		return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
	}
	glog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount)
	return nil
}

获取watch接口中的事件的channel,来获取事件的内容。

for {
	select {
	...
	case event, ok := <-w.ResultChan():
    ...
}        

当获得添加、更新、删除的事件时,将对应的对象更新到本地缓存store中。

switch event.Type {
case watch.Added:
	err := r.store.Add(event.Object)
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
	}
case watch.Modified:
	err := r.store.Update(event.Object)
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
	}
case watch.Deleted:
	// TODO: Will any consumers need access to the "last known
	// state", which is passed in event.Object? If so, may need
	// to change this.
	err := r.store.Delete(event.Object)
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
	}
default:
	utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
}

更新当前的最新版本号。

newResourceVersion := meta.GetResourceVersion()
*resourceVersion = newResourceVersion
r.setLastSyncResourceVersion(newResourceVersion)

通过对Reflector模块的分析,可以看到多次使用到本地缓存store模块,而store的数据由DeltaFIFO赋值而来,以下针对DeltaFIFO和store做分析。

4. DeltaFIFO

DeltaFIFO由NewDeltaFIFO初始化,并赋值给config.Queue。

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
	fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, s.indexer)

	cfg := &Config{
		Queue:            fifo,
		...
	}
    ...
}    

4.1. NewDeltaFIFO

// NewDeltaFIFO returns a Store which can be used process changes to items.
//
// keyFunc is used to figure out what key an object should have. (It's
// exposed in the returned DeltaFIFO's KeyOf() method, with bonus features.)
//
// 'compressor' may compress as many or as few items as it wants
// (including returning an empty slice), but it should do what it
// does quickly since it is called while the queue is locked.
// 'compressor' may be nil if you don't want any delta compression.
//
// 'keyLister' is expected to return a list of keys that the consumer of
// this queue "knows about". It is used to decide which items are missing
// when Replace() is called; 'Deleted' deltas are produced for these items.
// It may be nil if you don't need to detect all deletions.
// TODO: consider merging keyLister with this object, tracking a list of
//       "known" keys when Pop() is called. Have to think about how that
//       affects error retrying.
// TODO(lavalamp): I believe there is a possible race only when using an
//                 external known object source that the above TODO would
//                 fix.
//
// Also see the comment on DeltaFIFO.
func NewDeltaFIFO(keyFunc KeyFunc, compressor DeltaCompressor, knownObjects KeyListerGetter) *DeltaFIFO {
	f := &DeltaFIFO{
		items:           map[string]Deltas{},
		queue:           []string{},
		keyFunc:         keyFunc,
		deltaCompressor: compressor,
		knownObjects:    knownObjects,
	}
	f.cond.L = &f.lock
	return f
}

controller.Run的部分调用了NewReflector。

func (c *controller) Run(stopCh <-chan struct{}) {
	...
	r := NewReflector(
		c.config.ListerWatcher,
		c.config.ObjectType,
		c.config.Queue,
		c.config.FullResyncPeriod,
	)
    ...
}    

NewReflector构造函数,将c.config.Queue赋值给Reflector.store的属性。

func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
	return NewNamedReflector(getDefaultReflectorName(internalPackages...), lw, expectedType, store, resyncPeriod)
}

// NewNamedReflector same as NewReflector, but with a specified name for logging
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
	reflectorSuffix := atomic.AddInt64(&reflectorDisambiguator, 1)
	r := &Reflector{
		name: name,
		// we need this to be unique per process (some names are still the same)but obvious who it belongs to
		metrics:       newReflectorMetrics(makeValidPromethusMetricLabel(fmt.Sprintf("reflector_"+name+"_%d", reflectorSuffix))),
		listerWatcher: lw,
		store:         store,
		expectedType:  reflect.TypeOf(expectedType),
		period:        time.Second,
		resyncPeriod:  resyncPeriod,
		clock:         &clock.RealClock{},
	}
	return r
}

4.2. DeltaFIFO

DeltaFIFO是一个生产者与消费者的队列,其中Reflector是生产者,消费者调用Pop()的方法。

DeltaFIFO主要用在以下场景:

  • 希望对象变更最多处理一次

  • 处理对象时,希望查看自上次处理对象以来发生的所有事情

  • 要处理对象的删除

  • 希望定期重新处理对象

// DeltaFIFO is like FIFO, but allows you to process deletes.
//
// DeltaFIFO is a producer-consumer queue, where a Reflector is
// intended to be the producer, and the consumer is whatever calls
// the Pop() method.
//
// DeltaFIFO solves this use case:
//  * You want to process every object change (delta) at most once.
//  * When you process an object, you want to see everything
//    that's happened to it since you last processed it.
//  * You want to process the deletion of objects.
//  * You might want to periodically reprocess objects.
//
// DeltaFIFO's Pop(), Get(), and GetByKey() methods return
// interface{} to satisfy the Store/Queue interfaces, but it
// will always return an object of type Deltas.
//
// A note on threading: If you call Pop() in parallel from multiple
// threads, you could end up with multiple threads processing slightly
// different versions of the same object.
//
// A note on the KeyLister used by the DeltaFIFO: It's main purpose is
// to list keys that are "known", for the purpose of figuring out which
// items have been deleted when Replace() or Delete() are called. The deleted
// object will be included in the DeleteFinalStateUnknown markers. These objects
// could be stale.
//
// You may provide a function to compress deltas (e.g., represent a
// series of Updates as a single Update).
type DeltaFIFO struct {
	// lock/cond protects access to 'items' and 'queue'.
	lock sync.RWMutex
	cond sync.Cond

	// We depend on the property that items in the set are in
	// the queue and vice versa, and that all Deltas in this
	// map have at least one Delta.
	items map[string]Deltas
	queue []string

	// populated is true if the first batch of items inserted by Replace() has been populated
	// or Delete/Add/Update was called first.
	populated bool
	// initialPopulationCount is the number of items inserted by the first call of Replace()
	initialPopulationCount int

	// keyFunc is used to make the key used for queued item
	// insertion and retrieval, and should be deterministic.
	keyFunc KeyFunc

	// deltaCompressor tells us how to combine two or more
	// deltas. It may be nil.
	deltaCompressor DeltaCompressor

	// knownObjects list keys that are "known", for the
	// purpose of figuring out which items have been deleted
	// when Replace() or Delete() is called.
	knownObjects KeyListerGetter

	// Indication the queue is closed.
	// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
	// Currently, not used to gate any of CRED operations.
	closed     bool
	closedLock sync.Mutex
}

4.3. Queue & Store

DeltaFIFO的类型是Queue接口,Reflector.store是Store接口,Queue接口是一个存储队列,Process的方法执行Queue.Pop出来的数据对象,

// Queue is exactly like a Store, but has a Pop() method too.
type Queue interface {
	Store

	// Pop blocks until it has something to process.
	// It returns the object that was process and the result of processing.
	// The PopProcessFunc may return an ErrRequeue{...} to indicate the item
	// should be requeued before releasing the lock on the queue.
	Pop(PopProcessFunc) (interface{}, error)

	// AddIfNotPresent adds a value previously
	// returned by Pop back into the queue as long
	// as nothing else (presumably more recent)
	// has since been added.
	AddIfNotPresent(interface{}) error

	// Return true if the first batch of items has been popped
	HasSynced() bool

	// Close queue
	Close()
}

5. store

Store是一个通用的存储接口,Reflector通过watch server的方式更新数据到store中,store给Reflector提供本地的缓存,让Reflector可以像消息队列一样的工作。

Store实现的是一种可以准确的写入对象和获取对象的机制。

// Store is a generic object storage interface. Reflector knows how to watch a server
// and update a store. A generic store is provided, which allows Reflector to be used
// as a local caching system, and an LRU store, which allows Reflector to work like a
// queue of items yet to be processed.
//
// Store makes no assumptions about stored object identity; it is the responsibility
// of a Store implementation to provide a mechanism to correctly key objects and to
// define the contract for obtaining objects by some arbitrary key type.
type Store interface {
	Add(obj interface{}) error
	Update(obj interface{}) error
	Delete(obj interface{}) error
	List() []interface{}
	ListKeys() []string
	Get(obj interface{}) (item interface{}, exists bool, err error)
	GetByKey(key string) (item interface{}, exists bool, err error)

	// Replace will delete the contents of the store, using instead the
	// given list. Store takes ownership of the list, you should not reference
	// it after calling this function.
	Replace([]interface{}, string) error
	Resync() error
}

其中Replace方法会删除原来store中的内容,并将新增的list的内容存入store中,即完全替换数据。

6.1. cache

cache实现了store的接口,而cache的具体实现又是调用ThreadSafeStore接口来实现功能的。

cache的功能主要有以下两点:

  • 通过keyFunc计算对象的key

  • 调用ThreadSafeStorage接口的方法

// cache responsibilities are limited to:
//	1. Computing keys for objects via keyFunc
//  2. Invoking methods of a ThreadSafeStorage interface
type cache struct {
	// cacheStorage bears the burden of thread safety for the cache
	cacheStorage ThreadSafeStore
	// keyFunc is used to make the key for objects stored in and retrieved from items, and
	// should be deterministic.
	keyFunc KeyFunc
}

其中ListAndWatch主要用到以下的方法:

cache.Replace

// Replace will delete the contents of 'c', using instead the given list.
// 'c' takes ownership of the list, you should not reference the list again
// after calling this function.
func (c *cache) Replace(list []interface{}, resourceVersion string) error {
	items := map[string]interface{}{}
	for _, item := range list {
		key, err := c.keyFunc(item)
		if err != nil {
			return KeyError{item, err}
		}
		items[key] = item
	}
	c.cacheStorage.Replace(items, resourceVersion)
	return nil
}

cache.Add

// Add inserts an item into the cache.
func (c *cache) Add(obj interface{}) error {
	key, err := c.keyFunc(obj)
	if err != nil {
		return KeyError{obj, err}
	}
	c.cacheStorage.Add(key, obj)
	return nil
}

cache.Update

// Update sets an item in the cache to its updated state.
func (c *cache) Update(obj interface{}) error {
	key, err := c.keyFunc(obj)
	if err != nil {
		return KeyError{obj, err}
	}
	c.cacheStorage.Update(key, obj)
	return nil
}

cache.Delete

// Delete removes an item from the cache.
func (c *cache) Delete(obj interface{}) error {
	key, err := c.keyFunc(obj)
	if err != nil {
		return KeyError{obj, err}
	}
	c.cacheStorage.Delete(key)
	return nil
}

6.2. ThreadSafeStore

cache的具体是调用ThreadSafeStore来实现的。

// ThreadSafeStore is an interface that allows concurrent access to a storage backend.
// TL;DR caveats: you must not modify anything returned by Get or List as it will break
// the indexing feature in addition to not being thread safe.
//
// The guarantees of thread safety provided by List/Get are only valid if the caller
// treats returned items as read-only. For example, a pointer inserted in the store
// through `Add` will be returned as is by `Get`. Multiple clients might invoke `Get`
// on the same key and modify the pointer in a non-thread-safe way. Also note that
// modifying objects stored by the indexers (if any) will *not* automatically lead
// to a re-index. So it's not a good idea to directly modify the objects returned by
// Get/List, in general.
type ThreadSafeStore interface {
	Add(key string, obj interface{})
	Update(key string, obj interface{})
	Delete(key string)
	Get(key string) (item interface{}, exists bool)
	List() []interface{}
	ListKeys() []string
	Replace(map[string]interface{}, string)
	Index(indexName string, obj interface{}) ([]interface{}, error)
	IndexKeys(indexName, indexKey string) ([]string, error)
	ListIndexFuncValues(name string) []string
	ByIndex(indexName, indexKey string) ([]interface{}, error)
	GetIndexers() Indexers

	// AddIndexers adds more indexers to this store.  If you call this after you already have data
	// in the store, the results are undefined.
	AddIndexers(newIndexers Indexers) error
	Resync() error
}

threadSafeMap

// threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {
	lock  sync.RWMutex
	items map[string]interface{}

	// indexers maps a name to an IndexFunc
	indexers Indexers
	// indices maps a name to an Index
	indices Indices
}

6. processLoop

func (c *controller) Run(stopCh <-chan struct{}) {
	...
	wait.Until(c.processLoop, time.Second, stopCh)
}

在controller.Run方法中会调用processLoop,以下分析processLoop的处理逻辑。

// processLoop drains the work queue.
// TODO: Consider doing the processing in parallel. This will require a little thought
// to make sure that we don't end up processing the same object multiple times
// concurrently.
//
// TODO: Plumb through the stopCh here (and down to the queue) so that this can
// actually exit when the controller is stopped. Or just give up on this stuff
// ever being stoppable. Converting this whole package to use Context would
// also be helpful.
func (c *controller) processLoop() {
	for {
		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
		if err != nil {
			if err == FIFOClosedError {
				return
			}
			if c.config.RetryOnError {
				// This is the safe way to re-enqueue.
				c.config.Queue.AddIfNotPresent(obj)
			}
		}
	}
}

processLoop主要处理任务队列中的任务,其中处理逻辑是调用具体的ProcessFunc函数来实现,核心代码为:

obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))

5.1. DeltaFIFO.Pop

Pop会阻塞住直到队列里面添加了新的对象,如果有多个对象,按照先进先出的原则处理,如果某个对象没有处理成功会重新被加入该队列中。

Pop中会调用具体的process函数来处理对象。

// Pop blocks until an item is added to the queue, and then returns it.  If
// multiple items are ready, they are returned in the order in which they were
// added/updated. The item is removed from the queue (and the store) before it
// is returned, so if you don't successfully process it, you need to add it back
// with AddIfNotPresent().
// process function is called under lock, so it is safe update data structures
// in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc
// may return an instance of ErrRequeue with a nested error to indicate the current
// item should be requeued (equivalent to calling AddIfNotPresent under the lock).
//
// Pop returns a 'Deltas', which has a complete list of all the things
// that happened to the object (deltas) while it was sitting in the queue.
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
	f.lock.Lock()