Kubewatch, an example of Kubernetes custom controller

In part I of this blogpost, you were given an overview of the elements involved in a Kubernetes controller. In this part, we show you an example of how to write a custom controller in Kubernetes. We have selected Kubewatch to show you how to use controllers in practice. Kubewatch monitors any changes that occur in the Kubernetes pod and sends notifications to Slack. Kubewatch is written in Golang. It uses a Kubernetes client library to interact with Kubernetes API server and a Slack client library to interact with Slack.

In the following paragraghs, I walk you through the process of building components of Kubewatch controller (SharedInformer and Workqueue), processing notifications and sending them to Slack. Finally, I will test Kubewatch on a local environment with Minikube to see how it works.

At the end of this post you should understand what main elements are needed to write a Kubernetes controller as Kubewatch, and you will have the knowledge to write your own controller.

In general, we build a controller struct which contains the necessary elements described below:

// Controller object
type Controller struct {
      logger       *logrus.Entry
      clientset    kubernetes.Interface
      queue        workqueue.RateLimitingInterface
      informer     cache.SharedIndexInformer
      eventHandler handlers.Handler
}
  • Logger manages the controller logs.
  • Clientset holds Kubernetes client interface which helps the controller interact with Kubernetes API server.
  • Queue is the controller Workqueue.
  • Informer is the controller SharedInformer.
  • eventHandler holds communication to Slack which can extend to other channels.

The following are the typical steps you must follow when writing a controller (Kubewatch example):

  1. Construct a WorkQueue and a SharedInformer
  2. Start the controller
  3. Connect the controller to Slack

Let's see how Kubewatch is built and how it works in the sample scenario.

Constructing a WorkQueue and a SharedInformer

First, we construct a WorkQueue and a SharedInformer for the controller. You'll find a brief explanation of each element after the piece of code:

queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

informer := cache.NewSharedIndexInformer(
      &cache.ListWatch{
             ListFunc: func(options meta_v1.ListOptions) (runtime.Object, error) {
                    return client.CoreV1().Pods(meta_v1.NamespaceAll).List(options)
             },
             WatchFunc: func(options meta_v1.ListOptions) (watch.Interface, error) {
                    return client.CoreV1().Pods(meta_v1.NamespaceAll).Watch(options)
             },
      },
      &api_v1.Pod{},
      0, //Skip resync
      cache.Indexers{},
)
  • ListWatcher says the controller wants to list and watch all pods in all namespaces.
  • We skip not to do resynchronization for the controller cache.
  • We use SharedIndexInformer instead of SharedInformer because it allows the controller to maintain indexes across all objects in the cache.

Now the controller must manage the events that happen to the pods it's watching. To do that, SharedInformer provides us with the AddEventHandler function. From the first blog post, you know that the controller passes these events to the Workqueue.

informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
      AddFunc: func(obj interface{}) {
             key, err := cache.MetaNamespaceKeyFunc(obj)
             if err == nil {
                    queue.Add(key)
             }
      },
      DeleteFunc: func(obj interface{}) {
             key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
             if err == nil {
                    queue.Add(key)
             }
      },
})

Events in the Workqueue are represented by their keys which are constructed in the format of pod_namespace/pod_name. In the case of pod deletion, we must check for the DeletedFinalStateUnknown state of that pod in the cache before enqueuing its key. The DeletedFinalStateUnknown state means that the pod has been deleted but that the watch deletion event was missed and the controller didn't react accordingly.

Starting the controller

We construct the controller using the Workqueue and SharedInformer above. Now, we will start the controller by calling the Run() function. (The comments give you an overall idea of how the function works.)

// Run will start the controller.
// StopCh channel is used to send interrupt signal to stop it.
func (c *Controller) Run(stopCh <-chan struct{}) {
      // don't let panics crash the process
      defer utilruntime.HandleCrash()
      // make sure the work queue is shutdown which will trigger workers to end
      defer c.queue.ShutDown()

      c.logger.Info("Starting kubewatch controller")

      go c.informer.Run(stopCh)

      // wait for the caches to synchronize before starting the worker
      if !cache.WaitForCacheSync(stopCh, c.HasSynced) {
             utilruntime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
             return
      }

      c.logger.Info("Kubewatch controller synced and ready")

     // runWorker will loop until "something bad" happens.  The .Until will
     // then rekick the worker after one second
      wait.Until(c.runWorker, time.Second, stopCh)
}

At this step, the SharedInformer starts watching for pods in the cluster and sends their keys to the Workqueue. The next step we must define how the worker(s) pops up and processes keys. The main idea here is implementing the key's life-cycle management which is explained in the first blog post.

func (c *Controller) runWorker() {
// processNextWorkItem will automatically wait until there's work available
      for c.processNextItem() {
             // continue looping
      }
}

// processNextWorkItem deals with one key off the queue.  It returns false
// when it's time to quit.
func (c *Controller) processNextItem() bool {
       // pull the next work item from queue.  It should be a key we use to lookup
    // something in a cache
      key, quit := c.queue.Get()
      if quit {
             return false
      }

       // you always have to indicate to the queue that you've completed a piece of
    // work
      defer c.queue.Done(key)

      // do your work on the key.
      err := c.processItem(key.(string))

      if err == nil {
             // No error, tell the queue to stop tracking history
             c.queue.Forget(key)
      } else if c.queue.NumRequeues(key) < maxRetries {
             c.logger.Errorf("Error processing %s (will retry): %v", key, err)
             // requeue the item to work on later
c.queue.AddRateLimited(key)
      } else {
             // err != nil and too many retries
             c.logger.Errorf("Error processing %s (giving up): %v", key, err)
             c.queue.Forget(key)
             utilruntime.HandleError(err)
      }

      return true
}

Connecting the controller to Slack

As you can see in the prior step, the processItem(key) function is where you define how to react to a pod event. Because our controller wants to send notifications to Slack, we connect Slack at this step and prepare the message to be sent.

func (c *Controller) processItem(key string) error {
      c.logger.Infof("Processing change to Pod %s", key)

      obj, exists, err := c.informer.GetIndexer().GetByKey(key)
      if err != nil {
             return fmt.Errorf("Error fetching object with key %s from store: %v", key, err)
      }

      if !exists {
             c.eventHandler.ObjectDeleted(obj)
             return nil
      }

      c.eventHandler.ObjectCreated(obj)
      return nil
}

The controller looks up the event in the cache using its key. If the event doesn't exist in the cache, this is a deletion event [(s, obj, "deleted")]. Otherwise, this is a creation event [(s, obj, "created")]. The controller then forwards the event to the eventHandler where we prepare a proper message and send it to Slack.

func (s *Slack) ObjectCreated(obj interface{}) {
      notifySlack(s, obj, "created")
}

func (s *Slack) ObjectDeleted(obj interface{}) {
      notifySlack(s, obj, "deleted")
}

func notifySlack(s *Slack, obj interface{}, action string) {
      e := kbEvent.New(obj, action)
      api := slack.New(s.Token)
      params := slack.PostMessageParameters{}
      attachment := prepareSlackAttachment(e)

      params.Attachments = []slack.Attachment{attachment}
      params.AsUser = true
      channelID, timestamp, err := api.PostMessage(s.Channel, "", params)
      if err != nil {
             log.Printf("%s\n", err)
             return
      }

      log.Printf("Message successfully sent to channel %s at %s", channelID, timestamp)
}

Slack connection is authenticated via a token which is pre-configured in the kubewatch.yaml file or by executing the kubewatch config command. Kubewatch also supports loading the token from Kubernetes secret object.

Okay, so now we have built all pieces of the controller. Let's try to run it on Minikube. You can download the latest version of Kubewatch on the Kubewatch release page.

$ wget https://github.com/skippbox/kubewatch/releases/download/v0.0.3/kubewatch.yaml

//Note: update slack channel and slack token values at the configmap object.
//Then deploy kubewatch

$ kubectl create -f kubewatch.yaml

$ kubectl get pods

NAME        READY     STATUS    RESTARTS   AGE
kubewatch   1/1       Running   0          2m

Checking the configured slack channel, you will see an output similar to the following image:

Connecting Kubewatch to Slack

So far I have given you a quick tour of Kubewatch and how we can develop a custom controller to build a simple notification system for Kubernetes. We can add more controllers for other Kubernetes resources such as deployment, service, configmap, and so on, into Kubewatch.

Give it a go on Kubewatch and look at its source code for more details. I hope this post helps you to start writing your own controller. Happy hacking!

Want to reach the next level in Kubernetes?

This tutorial is part of the series

Understand Kubernetes Controller

Learn about the elements involved in a Kubernetes controller and how to write a custom controller in Kubernetes.