Understanding the Core Workflow of Kubernetes Informer in client-go
This article explains the internal workflow of the Kubernetes informer package in client-go, covering its architecture, key components such as Reflector, DeltaFIFO, and Indexer, and provides a step‑by‑step code example that demonstrates how informers are created, registered, started, and used to handle watch events efficiently.
The article analyzes the basic process of starting an informer in a Kubernetes controller, emphasizing that both built‑in components and custom controllers need to watch etcd events via the apiserver to implement their control loops.
The client-go informer package offers three main capabilities: a local cache (store), an indexing mechanism (indexer), and event‑handler registration.
Informer Architecture
The upper part of the architecture is provided by client-go, while the lower part is the user‑defined control‑loop logic. The upper part consists of several components:
Reflector
Reflector interacts with the apiserver using list and watch APIs, pushing resource objects into a queue.
DeltaFIFO
DeltaFIFO maintains a FIFO queue ( queue ) and a map of items ( items ) that store objects together with their change type. The queue holds the order of processing, while items keep the actual objects and their deltas.
type DeltaFIFO struct {
...
// items map[string]Deltas // objects indexed by key
// queue []string // FIFO order of keys
...
}Two parts are distinguished:
FIFO – a first‑in‑first‑out queue.
Delta – a map that stores the object together with the operation type (Added, Updated, Deleted, etc.).
Indexer
The indexer is a local store that keeps resource objects and provides indexing functions, keeping the cache in sync with the etcd data.
Basic Example
func main() {
stopCh := make(chan struct{})
defer close(stopCh)
// (1) Build clientset
masterUrl := "172.27.32.110:8080"
config, err := clientcmd.BuildConfigFromFlags(masterUrl, "")
if err != nil { klog.Errorf("BuildConfigFromFlags err: %v", err) }
clientset, err := k.NewForConfig(config)
if err != nil { klog.Errorf("Get clientset err: %v", err) }
// (2) Create shared informer factory
sharedInformers := informers.NewSharedInformerFactory(clientset, defaultResync)
// (3) Register a pod informer
podInformer := sharedInformers.Core().V1().Pods().Informer()
// (4) Register event handler
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
mObj := obj.(v1.Object)
klog.Infof("Get new obj: %v", mObj)
klog.Infof("Get new obj name: %s", mObj.GetName())
},
})
// (5) Start all informers
sharedInformers.Start(stopCh)
// (6) Wait for cache sync
if !cache.WaitForCacheSync(stopCh, podInformer.HasSynced) {
klog.Infof("Cache sync fail!")
}
// (7) Use lister to list pods
podLister := sharedInformers.Core().V1().Pods().Lister()
pods, err := podLister.List(labels.Everything())
if err != nil { klog.Infof("err: %v", err) }
klog.Infof("len(pods), %d", len(pods))
for _, v := range pods { klog.Infof("pod: %s", v.Name) }
<-stopCh
}The example highlights steps (2)–(5): creating a shared informer factory, obtaining a pod informer, adding an event handler, and starting the informer.
Process Analysis
3.1 New a sharedInformers factory
Creating a sharedInformerFactory avoids creating duplicate informers that would overload the apiserver. The factory holds a map of informers and starts them on demand.
sharedInformers := informers.NewSharedInformerFactory(clientset, defaultResync)
factory := &sharedInformerFactory{
client: client,
namespace: v1.NamespaceAll,
defaultResync: defaultResync,
informers: make(map[reflect.Type]cache.SharedIndexInformer),
startedInformers: make(map[reflect.Type]bool),
customResync: make(map[reflect.Type]time.Duration),
}3.2 Register a informer
The informer is generated and stored in the factory via InformerFor and defaultInformer which ultimately calls NewFilteredPodInformer .
podInformer := sharedInformers.Core().V1().Pods().Informer()
func (f *podInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
informer := newFunc(f.client, resyncPeriod)
f.informers[informerType] = informer
return informer
}3.3 Register event handler
The handler is added to the shared processor, which creates a listener that forwards events through addCh → nextCh → appropriate callback.
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { /* ... */ } })
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
s.processor.addListener(newProcessorListener(handler, ...))
}3.4 Start all informers
Calling sharedInformers.Start(stopCh) launches each informer in a separate goroutine.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
go informer.Run(stopCh)
f.startedInformers[informerType] = true
}
}
}Informer Run Logic
The run method creates a DeltaFIFO , builds a controller.Config , and starts the reflector and processing loop.
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KnownObjects: s.indexer, EmitDeltaTypeReplaced: true})
cfg := &Config{Queue: fifo, ListerWatcher: s.listerWatcher, ObjectType: s.objectType, FullResyncPeriod: s.resyncCheckPeriod, Process: s.HandleDeltas, ...}
s.controller = New(cfg)
s.controller.Run(stopCh)
}The controller runs the reflector ( ListAndWatch ) to fill the queue and a processing loop that pops items, calls the registered handlers, and updates the indexer.
Reflector ListAndWatch
The reflector first performs a list to obtain the current state, stores objects in the DeltaFIFO , then enters a watch loop that converts watch events into deltas and pushes them into the queue.
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
// List phase
list, err := r.listerWatcher.List(opts)
resourceVersion = listMetaInterface.GetResourceVersion()
items, err := meta.ExtractList(list)
r.syncWith(items, resourceVersion)
// Watch phase
for {
w, err := r.listerWatcher.Watch(options)
if err != nil { return err }
r.watchHandler(start, w, &resourceVersion, errc, stopCh)
}
}DeltaFIFO Production & Consumption
Production occurs in Reflector.watchHandler , where events are added to the FIFO via store.Add (which calls queueActionLocked ). Consumption happens in controller.processLoop , which pops items, runs HandleDeltas , and distributes notifications to listeners.
func (f *DeltaFIFO) Add(obj interface{}) error { return f.queueActionLocked(Added, obj) }
func (f *DeltaFIFO) queueActionLocked(action DeltaType, obj interface{}) error { /* update items and queue */ }
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
// handle err, requeue if needed
}
}Conclusion
The informer mechanism is essentially a producer‑consumer system built on DeltaFIFO with a local cache and indexer, providing a convenient API for registering callbacks and efficiently watching Kubernetes resources. Understanding the upper‑layer workflow—Reflector, DeltaFIFO, Indexer, and the shared informer factory—helps developers build reliable custom controllers.
政采云技术
ZCY Technology Team (Zero), based in Hangzhou, is a growth-oriented team passionate about technology and craftsmanship. With around 500 members, we are building comprehensive engineering, project management, and talent development systems. We are committed to innovation and creating a cloud service ecosystem for government and enterprise procurement. We look forward to your joining us.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.