// Run runs the KubeControllerManagerOptions. This should never exit.funcRun(c *config.CompletedConfig, stopCh <-chanstruct{}) error {... controllerContext.InformerFactory.Start(controllerContext.Stop)close(controllerContext.InformersStarted)...}
// SharedInformerFactory a small interface to allow for adding an informer without an import cycletypeSharedInformerFactoryinterface {Start(stopCh <-chanstruct{})InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer}
func (d *defaultCacheMutationDetector) Run(stopCh <-chanstruct{}) {// we DON'T want protection from panics. If we're running this code, we want to diefor { d.CompareObjects()select {case<-stopCh:returncase<-time.After(d.period): } }}
// Run begins processing items, and will continue until a value is sent down stopCh.// It's an error to call Run more than once.// Run blocks; call via go.func (c *controller) Run(stopCh <-chanstruct{}) {defer utilruntime.HandleCrash()gofunc() {<-stopCh c.config.Queue.Close() }() r :=NewReflector( c.config.ListerWatcher, c.config.ObjectType, c.config.Queue, c.config.FullResyncPeriod, ) r.ShouldResync = c.config.ShouldResync r.clock = c.clock c.reflectorMutex.Lock() c.reflector = r c.reflectorMutex.Unlock()var wg wait.Groupdefer wg.Wait() wg.StartWithChannel(stopCh, r.Run) wait.Until(c.processLoop, time.Second, stopCh)}
// Reflector watches a specified resource and causes all changes to be reflected in the given store.typeReflectorstruct {// name identifies this reflector. By default it will be a file:line if possible. name string// metrics tracks basic metric information about the reflector metrics *reflectorMetrics// The type of object we expect to place in the store. expectedType reflect.Type// The destination to sync up with the watch source store Store// listerWatcher is used to perform lists and watches. listerWatcher ListerWatcher// period controls timing between one watch ending and// the beginning of the next one. period time.Duration resyncPeriod time.Duration ShouldResync func() bool// clock allows tests to manipulate time clock clock.Clock// lastSyncResourceVersion is the resource version token last// observed when doing a sync with the underlying store// it is thread safe, but not synchronized with the underlying store lastSyncResourceVersion string// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion lastSyncResourceVersionMutex sync.RWMutex}
// NewReflector creates a new Reflector object which will keep the given store up to// date with the server's contents for the given resource. Reflector promises to// only put things in the store that have the type of expectedType, unless expectedType// is nil. If resyncPeriod is non-zero, then lists will be executed after every// resyncPeriod, so that you can use reflectors to periodically process everything as// well as incrementally processing the things that change.funcNewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {returnNewNamedReflector(getDefaultReflectorName(internalPackages...), lw, expectedType, store, resyncPeriod)}// reflectorDisambiguator is used to disambiguate started reflectors.// initialized to an unstable value to ensure meaning isn't attributed to the suffix.var reflectorDisambiguator =int64(time.Now().UnixNano() %12345)// NewNamedReflector same as NewReflector, but with a specified name for loggingfunc NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
reflectorSuffix := atomic.AddInt64(&reflectorDisambiguator, 1) r :=&Reflector{ name: name,// we need this to be unique per process (some names are still the same)but obvious who it belongs to metrics: newReflectorMetrics(makeValidPromethusMetricLabel(fmt.Sprintf("reflector_"+name+"_%d", reflectorSuffix))),
listerWatcher: lw, store: store, expectedType: reflect.TypeOf(expectedType), period: time.Second, resyncPeriod: resyncPeriod, clock: &clock.RealClock{}, }return r}
3.3. Reflector.Run
Reflector.Run主要执行了ListAndWatch的方法。
// Run starts a watch and handles watch events. Will restart the watch if it is closed.// Run will exit when stopCh is closed.func (r *Reflector) Run(stopCh <-chanstruct{}) { glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name) wait.Until(func() {if err := r.ListAndWatch(stopCh); err !=nil { utilruntime.HandleError(err) } }, r.period, stopCh)}
// ListAndWatch first lists all items and get the resource version at the moment of call,// and then use the resource version to watch.// It returns error if ListAndWatch didn't even try to initialize watch.func (r *Reflector) ListAndWatch(stopCh <-chanstruct{}) error { glog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name)var resourceVersion string// Explicitly set "0" as resource version - it's fine for the List()// to be served from cache and potentially be delayed relative to// etcd contents. Reflector framework will catch up via Watch() eventually. options :=metav1.ListOptions{ResourceVersion: "0"} r.metrics.numberOfLists.Inc() start := r.clock.Now() list, err := r.listerWatcher.List(options)if err !=nil {return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err) } r.metrics.listDuration.Observe(time.Since(start).Seconds()) listMetaInterface, err := meta.ListAccessor(list)if err !=nil {return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err) } resourceVersion = listMetaInterface.GetResourceVersion() items, err := meta.ExtractList(list)if err !=nil {return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err) } r.metrics.numberOfItemsInList.Observe(float64(len(items)))if err := r.syncWith(items, resourceVersion); err !=nil {return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err) } r.setLastSyncResourceVersion(resourceVersion)...}
// syncWith replaces the store's items with the given list.func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error { found :=make([]interface{}, 0, len(items))for _, item :=range items { found =append(found, item) }return r.store.Replace(found, resourceVersion)}
Store.Replace方法定义如下:
typeStoreinterface {...// Replace will delete the contents of the store, using instead the// given list. Store takes ownership of the list, you should not reference// it after calling this function.Replace([]interface{}, string) error...}
resyncerrc :=make(chanerror, 1)cancelCh :=make(chanstruct{})deferclose(cancelCh)gofunc() { resyncCh, cleanup := r.resyncChan()deferfunc() {cleanup() // Call the last one written into cleanup }()for {select {case<-resyncCh:case<-stopCh:returncase<-cancelCh:return }if r.ShouldResync ==nil|| r.ShouldResync() { glog.V(4).Infof("%s: forcing resync", r.name)if err := r.store.Resync(); err !=nil { resyncerrc <- errreturn } }cleanup() resyncCh, cleanup = r.resyncChan() }}()
核心代码:
err := r.store.Resync()
store的具体对象为DeltaFIFO,即调用DeltaFIFO.Resync
// Resync will send a sync event for each itemfunc (f *DeltaFIFO) Resync() error { f.lock.Lock()defer f.lock.Unlock()if f.knownObjects ==nil {returnnil } keys := f.knownObjects.ListKeys()for _, k :=range keys {if err := f.syncKeyLocked(k); err !=nil {return err } }returnnil}
3.4.3. Watch
for {// give the stopCh a chance to stop the loop, even in case of continue statements further down on errorsselect {case<-stopCh:returnnildefault: } timemoutseconds :=int64(minWatchTimeout.Seconds() * (rand.Float64() +1.0)) options =metav1.ListOptions{ ResourceVersion: resourceVersion,// We want to avoid situations of hanging watchers. Stop any wachers that do not// receive any events within the timeout window. TimeoutSeconds: &timemoutseconds, } r.metrics.numberOfWatches.Inc() w, err := r.listerWatcher.Watch(options)if err !=nil {switch err {case io.EOF:// watch closed normallycase io.ErrUnexpectedEOF: glog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err)default: utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err)) }// If this is "connection refused" error, it means that most likely apiserver is not responsive.// It doesn't make sense to re-list all objects because most likely we will be able to restart// watch where we ended.// If that's the case wait and resend watch request.if urlError, ok := err.(*url.Error); ok {if opError, ok := urlError.Err.(*net.OpError); ok {if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED { time.Sleep(time.Second)continue } } }returnnil }if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err !=nil {if err != errorStopRequested { glog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err) }returnnil }}
设置watch的超时时间,默认为5分钟。
timemoutseconds :=int64(minWatchTimeout.Seconds() * (rand.Float64() +1.0))options =metav1.ListOptions{ ResourceVersion: resourceVersion,// We want to avoid situations of hanging watchers. Stop any wachers that do not// receive any events within the timeout window. TimeoutSeconds: &timemoutseconds,}
// watchHandler watches w and keeps *resourceVersion up to date.func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
start := r.clock.Now() eventCount :=0// Stopping the watcher should be idempotent and if we return from this function there's no way// we're coming back in with the same watch interface.defer w.Stop()// update metricsdeferfunc() { r.metrics.numberOfItemsInWatch.Observe(float64(eventCount)) r.metrics.watchDuration.Observe(time.Since(start).Seconds()) }()loop:for {select {case<-stopCh:return errorStopRequestedcase err :=<-errc:return errcase event, ok :=<-w.ResultChan():if!ok {break loop }if event.Type == watch.Error {return apierrs.FromObject(event.Object) }if e, a := r.expectedType, reflect.TypeOf(event.Object); e !=nil&& e != a { utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))continue } meta, err := meta.Accessor(event.Object)if err !=nil { utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))continue } newResourceVersion := meta.GetResourceVersion()switch event.Type {case watch.Added: err := r.store.Add(event.Object)if err !=nil { utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
}case watch.Modified: err := r.store.Update(event.Object)if err !=nil { utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
}case watch.Deleted:// TODO: Will any consumers need access to the "last known// state", which is passed in event.Object? If so, may need// to change this. err := r.store.Delete(event.Object)if err !=nil { utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
}default: utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) }*resourceVersion = newResourceVersion r.setLastSyncResourceVersion(newResourceVersion) eventCount++ } } watchDuration := r.clock.Now().Sub(start)if watchDuration <1*time.Second && eventCount ==0 { r.metrics.numberOfShortWatches.Inc() return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
} glog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount)returnnil}
获取watch接口中的事件的channel,来获取事件的内容。
for {select {...case event, ok :=<-w.ResultChan():...}
当获得添加、更新、删除的事件时,将对应的对象更新到本地缓存store中。
switch event.Type {case watch.Added: err := r.store.Add(event.Object)if err !=nil { utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
}case watch.Modified: err := r.store.Update(event.Object)if err !=nil { utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
}case watch.Deleted:// TODO: Will any consumers need access to the "last known// state", which is passed in event.Object? If so, may need// to change this. err := r.store.Delete(event.Object)if err !=nil { utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
}default: utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))}
// NewDeltaFIFO returns a Store which can be used process changes to items.//// keyFunc is used to figure out what key an object should have. (It's// exposed in the returned DeltaFIFO's KeyOf() method, with bonus features.)//// 'compressor' may compress as many or as few items as it wants// (including returning an empty slice), but it should do what it// does quickly since it is called while the queue is locked.// 'compressor' may be nil if you don't want any delta compression.//// 'keyLister' is expected to return a list of keys that the consumer of// this queue "knows about". It is used to decide which items are missing// when Replace() is called; 'Deleted' deltas are produced for these items.// It may be nil if you don't need to detect all deletions.// TODO: consider merging keyLister with this object, tracking a list of// "known" keys when Pop() is called. Have to think about how that// affects error retrying.// TODO(lavalamp): I believe there is a possible race only when using an// external known object source that the above TODO would// fix.//// Also see the comment on DeltaFIFO.funcNewDeltaFIFO(keyFunc KeyFunc, compressor DeltaCompressor, knownObjects KeyListerGetter) *DeltaFIFO { f :=&DeltaFIFO{ items: map[string]Deltas{}, queue: []string{}, keyFunc: keyFunc, deltaCompressor: compressor, knownObjects: knownObjects, } f.cond.L =&f.lockreturn f}
funcNewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {returnNewNamedReflector(getDefaultReflectorName(internalPackages...), lw, expectedType, store, resyncPeriod)}// NewNamedReflector same as NewReflector, but with a specified name for loggingfunc NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
reflectorSuffix := atomic.AddInt64(&reflectorDisambiguator, 1) r :=&Reflector{ name: name,// we need this to be unique per process (some names are still the same)but obvious who it belongs to metrics: newReflectorMetrics(makeValidPromethusMetricLabel(fmt.Sprintf("reflector_"+name+"_%d", reflectorSuffix))),
listerWatcher: lw, store: store, expectedType: reflect.TypeOf(expectedType), period: time.Second, resyncPeriod: resyncPeriod, clock: &clock.RealClock{}, }return r}
// DeltaFIFO is like FIFO, but allows you to process deletes.//// DeltaFIFO is a producer-consumer queue, where a Reflector is// intended to be the producer, and the consumer is whatever calls// the Pop() method.//// DeltaFIFO solves this use case:// * You want to process every object change (delta) at most once.// * When you process an object, you want to see everything// that's happened to it since you last processed it.// * You want to process the deletion of objects.// * You might want to periodically reprocess objects.//// DeltaFIFO's Pop(), Get(), and GetByKey() methods return// interface{} to satisfy the Store/Queue interfaces, but it// will always return an object of type Deltas.//// A note on threading: If you call Pop() in parallel from multiple// threads, you could end up with multiple threads processing slightly// different versions of the same object.//// A note on the KeyLister used by the DeltaFIFO: It's main purpose is// to list keys that are "known", for the purpose of figuring out which// items have been deleted when Replace() or Delete() are called. The deleted// object will be included in the DeleteFinalStateUnknown markers. These objects// could be stale.//// You may provide a function to compress deltas (e.g., represent a// series of Updates as a single Update).typeDeltaFIFOstruct {// lock/cond protects access to 'items' and 'queue'. lock sync.RWMutex cond sync.Cond// We depend on the property that items in the set are in// the queue and vice versa, and that all Deltas in this// map have at least one Delta. items map[string]Deltas queue []string// populated is true if the first batch of items inserted by Replace() has been populated// or Delete/Add/Update was called first. populated bool// initialPopulationCount is the number of items inserted by the first call of Replace() initialPopulationCount int// keyFunc is used to make the key used for queued item// insertion and retrieval, and should be deterministic. keyFunc KeyFunc// deltaCompressor tells us how to combine two or more// deltas. It may be nil. deltaCompressor DeltaCompressor// knownObjects list keys that are "known", for the// purpose of figuring out which items have been deleted// when Replace() or Delete() is called. knownObjects KeyListerGetter// Indication the queue is closed.// Used to indicate a queue is closed so a control loop can exit when a queue is empty.// Currently, not used to gate any of CRED operations. closed bool closedLock sync.Mutex}
// Queue is exactly like a Store, but has a Pop() method too.typeQueueinterface {Store// Pop blocks until it has something to process.// It returns the object that was process and the result of processing.// The PopProcessFunc may return an ErrRequeue{...} to indicate the item// should be requeued before releasing the lock on the queue.Pop(PopProcessFunc) (interface{}, error)// AddIfNotPresent adds a value previously// returned by Pop back into the queue as long// as nothing else (presumably more recent)// has since been added.AddIfNotPresent(interface{}) error// Return true if the first batch of items has been poppedHasSynced() bool// Close queueClose()}
// Store is a generic object storage interface. Reflector knows how to watch a server// and update a store. A generic store is provided, which allows Reflector to be used// as a local caching system, and an LRU store, which allows Reflector to work like a// queue of items yet to be processed.//// Store makes no assumptions about stored object identity; it is the responsibility// of a Store implementation to provide a mechanism to correctly key objects and to// define the contract for obtaining objects by some arbitrary key type.typeStoreinterface {Add(obj interface{}) errorUpdate(obj interface{}) errorDelete(obj interface{}) errorList() []interface{}ListKeys() []stringGet(obj interface{}) (item interface{}, exists bool, err error)GetByKey(key string) (item interface{}, exists bool, err error)// Replace will delete the contents of the store, using instead the// given list. Store takes ownership of the list, you should not reference// it after calling this function.Replace([]interface{}, string) errorResync() error}
// cache responsibilities are limited to:// 1. Computing keys for objects via keyFunc// 2. Invoking methods of a ThreadSafeStorage interfacetypecachestruct {// cacheStorage bears the burden of thread safety for the cache cacheStorage ThreadSafeStore// keyFunc is used to make the key for objects stored in and retrieved from items, and// should be deterministic. keyFunc KeyFunc}
其中ListAndWatch主要用到以下的方法:
cache.Replace
// Replace will delete the contents of 'c', using instead the given list.// 'c' takes ownership of the list, you should not reference the list again// after calling this function.func (c *cache) Replace(list []interface{}, resourceVersion string) error { items :=map[string]interface{}{}for _, item :=range list { key, err := c.keyFunc(item)if err !=nil {returnKeyError{item, err} } items[key] = item } c.cacheStorage.Replace(items, resourceVersion)returnnil}
cache.Add
// Add inserts an item into the cache.func (c *cache) Add(obj interface{}) error { key, err := c.keyFunc(obj)if err !=nil {returnKeyError{obj, err} } c.cacheStorage.Add(key, obj)returnnil}
cache.Update
// Update sets an item in the cache to its updated state.func (c *cache) Update(obj interface{}) error { key, err := c.keyFunc(obj)if err !=nil {returnKeyError{obj, err} } c.cacheStorage.Update(key, obj)returnnil}
cache.Delete
// Delete removes an item from the cache.
func (c *cache) Delete(obj interface{}) error {
key, err := c.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
}
c.cacheStorage.Delete(key)
return nil
}
6.2. ThreadSafeStore
cache的具体是调用ThreadSafeStore来实现的。
// ThreadSafeStore is an interface that allows concurrent access to a storage backend.
// TL;DR caveats: you must not modify anything returned by Get or List as it will break
// the indexing feature in addition to not being thread safe.
//
// The guarantees of thread safety provided by List/Get are only valid if the caller
// treats returned items as read-only. For example, a pointer inserted in the store
// through `Add` will be returned as is by `Get`. Multiple clients might invoke `Get`
// on the same key and modify the pointer in a non-thread-safe way. Also note that
// modifying objects stored by the indexers (if any) will *not* automatically lead
// to a re-index. So it's not a good idea to directly modify the objects returned by
// Get/List, in general.
type ThreadSafeStore interface {
Add(key string, obj interface{})
Update(key string, obj interface{})
Delete(key string)
Get(key string) (item interface{}, exists bool)
List() []interface{}
ListKeys() []string
Replace(map[string]interface{}, string)
Index(indexName string, obj interface{}) ([]interface{}, error)
IndexKeys(indexName, indexKey string) ([]string, error)
ListIndexFuncValues(name string) []string
ByIndex(indexName, indexKey string) ([]interface{}, error)
GetIndexers() Indexers
// AddIndexers adds more indexers to this store. If you call this after you already have data
// in the store, the results are undefined.
AddIndexers(newIndexers Indexers) error
Resync() error
}
threadSafeMap
// threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{}
// indexers maps a name to an IndexFunc
indexers Indexers
// indices maps a name to an Index
indices Indices
}
// processLoop drains the work queue.
// TODO: Consider doing the processing in parallel. This will require a little thought
// to make sure that we don't end up processing the same object multiple times
// concurrently.
//
// TODO: Plumb through the stopCh here (and down to the queue) so that this can
// actually exit when the controller is stopped. Or just give up on this stuff
// ever being stoppable. Converting this whole package to use Context would
// also be helpful.
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == FIFOClosedError {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
// Pop blocks until an item is added to the queue, and then returns it. If
// multiple items are ready, they are returned in the order in which they were
// added/updated. The item is removed from the queue (and the store) before it
// is returned, so if you don't successfully process it, you need to add it back
// with AddIfNotPresent().
// process function is called under lock, so it is safe update data structures
// in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc
// may return an instance of ErrRequeue with a nested error to indicate the current
// item should be requeued (equivalent to calling AddIfNotPresent under the lock).
//
// Pop returns a 'Deltas', which has a complete list of all the things
// that happened to the object (deltas) while it was sitting in the queue.
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
if f.IsClosed() {
return nil, FIFOClosedError
}
f.cond.Wait()
}
id := f.queue[0]
f.queue = f.queue[1:]
item, ok := f.items[id]
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
if !ok {
// Item may have been deleted subsequently.
continue
}
delete(f.items, id)
err := process(item)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
// Don't need to copyDeltas here, because we're transferring
// ownership to the caller.
return item, err
}
}
核心代码:
for {
...
item, ok := f.items[id]
...
err := process(item)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
// Don't need to copyDeltas here, because we're transferring
// ownership to the caller.
return item, err
}
func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
defer close(p.nextCh) // Tell .run() to stop
var nextCh chan<- interface{}
var notification interface{}
for {
select {
case nextCh <- notification:
// Notification dispatched
var ok bool
notification, ok = p.pendingNotifications.ReadOne()
if !ok { // Nothing to pop
nextCh = nil // Disable this select case
}
case notificationToAdd, ok := <-p.addCh:
if !ok {
return
}
if notification == nil { // No notification to pop (and pendingNotifications is empty)
// Optimize the case - skip adding to pendingNotifications
notification = notificationToAdd
nextCh = p.nextCh
} else { // There is already a notification waiting to be dispatched
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}
7.1.2. listener.run
listener.run部分根据不同的更新类型调用不同的处理函数。
func (p *processorListener) run() {
defer utilruntime.HandleCrash()
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
}
}
}
// ResourceEventHandler can handle notifications for events that happen to a
// resource. The events are informational only, so you can't return an
// error.
// * OnAdd is called when an object is added.
// * OnUpdate is called when an object is modified. Note that oldObj is the
// last known state of the object-- it is possible that several changes
// were combined together, so you can't use this to see every single
// change. OnUpdate is also called when a re-list happens, and it will
// get called even if nothing changed. This is useful for periodically
// evaluating or syncing something.
// * OnDelete will get the final state of the item if it is known, otherwise
// it will get an object of type DeletedFinalStateUnknown. This can
// happen if the watch is closed and misses the delete event and we don't
// notice the deletion until the subsequent re-list.
type ResourceEventHandler interface {
OnAdd(obj interface{})
OnUpdate(oldObj, newObj interface{})
OnDelete(obj interface{})
}
// NewDeploymentController creates a new DeploymentController.
func NewDeploymentController(dInformer extensionsinformers.DeploymentInformer, rsInformer extensionsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
...
dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dc.addDeployment,
UpdateFunc: dc.updateDeployment,
// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
DeleteFunc: dc.deleteDeployment,
})
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dc.addReplicaSet,
UpdateFunc: dc.updateReplicaSet,
DeleteFunc: dc.deleteReplicaSet,
})
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: dc.deletePod,
})
...
}