~ cd ../

What I learnt about Kubernetes Controllers

If you are a Kubernetes Controller you know that your main duty is to react to changes to the world’s desired state and actual state to do whatever you can to update the latter so that it matches the former.

When I think about my early steps with Kubernetes two things comes to mind related to Controllers:

  • That everyone used the ReplicationController because we didn’t have Deployment
  • The hard times to have the controller manager working in my clusters.

Said that, I was sure I knew everything about controllers myself but just realized I never had the opportunity to learn what they actually do underneath until recently when I had a peak of interest after opening issue #67342 titled “Storage: devicePath is empty while WaitForAttach in StatefulSets”.

While trying to reproduce, I encountered a set of call to functions that were happening through some files named with very explanatory names:

actual_state_of_world.go:616 ->
  reconciler.go:238 ->
    operation_executor.go:712 ->
       operation_generator.go:437 -> error on line 496

This looked very similar to a definition I found in the “Standardized Glossary” here.

A control loop that watches the shared state of the cluster through the apiserver and makes changes attempting to move the current state towards the desired state. Examples of controllers that ship with Kubernetes today are the replication controller, endpoints controller, namespace controller, and serviceaccounts controller.

Nice so the VolumeManager is not really a controller but conceptually it behaves in a very similar way since it has a loop, a reconciler, a desired state and an actual state.

At this point I started looking at all the projects both private and public I touched and among the public ones I recognized a very interesting pattern they all had a cache.ListWatch and a cache.SharedInformer.

The interesting part was that most of them also had a workqueue.Interface like the etcd operator controller, the NGINX ingress controller and the OpenFaas Operator Controller and it turns out that they use it because it’s a key component in ensuring that the state is consistent and that all the controller’s instances agree on a shared set of elements to be processed with certain constraints (this looks very close to that Glossary’s definition above!).

While writing this post I was tempted to write a full length example but I found an already available exhaustive example in the kubernetes repo so i will just write and go through the simplest self-contained example I can write.

Scroll to the end of the controller example to read about it.

package main

import (
	"fmt"
	"time"

	"go.uber.org/zap"

	"k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/fields"
	"k8s.io/apimachinery/pkg/util/runtime"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/util/workqueue"
)

type Controller struct {
	informer cache.SharedInformer
	queue    workqueue.RateLimitingInterface
	logger   *zap.Logger
}

func NewController(queue workqueue.RateLimitingInterface, informer cache.SharedInformer) *Controller {
	return &Controller{
		informer: informer,
		queue:    queue,
		logger:   zap.NewNop(),
	}
}

func (c *Controller) WithLogger(logger *zap.Logger) {
	c.logger = logger
}

func (c *Controller) processNextItem() bool {
	key, quit := c.queue.Get()
	if quit {
		return false
	}
	defer c.queue.Done(key)

	err := c.syncToStdout(key.(string))
	c.handleErr(err, key)
	return true
}

func (c *Controller) syncToStdout(key string) error {
	obj, exists, err := c.informer.GetStore().GetByKey(key)
	if err != nil {
		c.logger.Error("error fetching object from index for the specified key", zap.String("key", key), zap.Error(err))
		return err
	}

	if !exists {
		c.logger.Info("pod has gone", zap.String("key", key))
		// do your heavy stuff for when the pod is gone here
	} else {
		c.logger.Info("update received for pod", zap.String("key", key), zap.String("pod", obj.(*v1.Pod).GetName()))
		// do your heavy stuff for when the pod is created/updated here
	}
	return nil
}

func (c *Controller) handleErr(err error, key interface{}) {
	if err == nil {
		c.queue.Forget(key)
		return
	}

	if c.queue.NumRequeues(key) < 5 {
		c.logger.Info("error during sync", zap.String("key", key.(string)), zap.Error(err))
		c.queue.AddRateLimited(key)
		return
	}

	c.queue.Forget(key)
	runtime.HandleError(err)
	c.logger.Info("drop pod out of queue after many retries", zap.String("key", key.(string)), zap.Error(err))
}

func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
	defer runtime.HandleCrash()
	defer c.queue.ShutDown()
	c.logger.Info("starting controller")
	go c.informer.Run(stopCh)
	if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
		runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
		return
	}
	for i := 0; i < threadiness; i++ {
		go wait.Until(c.runWorker, time.Second, stopCh)
	}
	<-stopCh
	c.logger.Info("stopping controller")
}

func (c *Controller) runWorker() {
	for c.processNextItem() {
	}
}

func main() {
	logger, err := zap.NewProduction()
	if err != nil {
		panic(err)
	}
	rules := clientcmd.NewDefaultClientConfigLoadingRules()
	overrides := &clientcmd.ConfigOverrides{}
	config, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, overrides).ClientConfig()
	if err != nil {
		logger.Fatal("error getting kubernetes client config", zap.Error(err))
	}
	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		logger.Fatal("error creating kubernetes client", zap.Error(err))
	}

	podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())
	queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

	si := cache.NewSharedInformer(podListWatcher, &v1.Pod{}, 0)

	si.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			key, err := cache.MetaNamespaceKeyFunc(obj)
			if err == nil {
				queue.Add(key)
			}
		},
		UpdateFunc: func(old interface{}, new interface{}) {
			key, err := cache.MetaNamespaceKeyFunc(new)
			if err == nil {
				queue.Add(key)
			}
		},
		DeleteFunc: func(obj interface{}) {
			key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
			if err == nil {
				queue.Add(key)
			}
		},
	})

	controller := NewController(queue, si)
	controller.WithLogger(logger)

	stop := make(chan struct{})
	defer close(stop)
	go controller.Run(1, stop)

	select {}
}

The main components of this Controller are:

  • The Workqueue (I would say, the reconciler or the thing that coordinates the synchronization from the actual state of the world to the desired state of the world)
  • syncToStdout: The logic used to make changes to the actual state of the world
  • The SharedInformer: From the glossary’s definition — A control loop that watches the shared state

In the SharedInformer I define some handlers to deal with Add, Delete and Update but instead of using them directly I synchronize what they receive into a workqueue with queue.Add

AddFunc: func(obj interface{}) {
   key, err := cache.MetaNamespaceKeyFunc(obj)
   if err == nil {
    queue.Add(key)
   }
 },

The Workqueue is a structure that allows to queue changes for a specific resource and process them later in multiple workers with the guarantee that there will be no more than one worker working on a specific item at the same moment. In fact the elements are processed in runWorker and multiple workers are started by increasing the threadiness parameter of the Controller’s Run method.

In this way I can end up in syncToStdout and be sure I will be the only one processing that item while knowing that if the current process gives an error my operation will be repeated up to an hardcoded limit of 5 times as defined in handleErr.

In this situation every item has an exponential backoff rate limit so that failures are not retried immediately but after a calculated amount of time that increases depending on the specified factor (I used DefaultControllerRateLimiter here but it’s very easy to create your own with chosen parameters).

This rate limit mechanism can be very helpful if we added a call to an external API every time we are informed about a pod. In such case the external API might impose a rate limit to our calls resulting in a failed behavior right now that will be perfectly fine after retrying in a while.

The Indexer and the Informer are also key components to use the process workqueue elements here because we want to be Informed about events occurring for the resources Kind we are interested in (in this case: Pod) and we want to have an Index where we can lookup for the final Pod object.

But hey, since we used the SharedInformer so we don’t need to provide an indexer ourselves because our beloved informer already contains one in GetStore().

Another aspect of using the SharedInformer here is that we are guaranteed that the element we get from its internal indexer is at least as fresh as the event we received.

Wow, I don’t think I know everything about controllers now but I’m still in the peak of interest so I will probably follow with more stuff on the topic.

⚡ Follow me on Twitter @fntlnz