Kubewatch, an example of Kubernetes custom controller
In part I of this blogpost, you were given an overview of the elements involved in a Kubernetes controller. In this part, we show you an example of how to write a custom controller in Kubernetes. We have selected Kubewatch to show you how to use controllers in practice. Kubewatch monitors any changes that occur in the Kubernetes pod and sends notifications to Slack. Kubewatch is written in Golang. It uses a Kubernetes client library to interact with Kubernetes API server and a Slack client library to interact with Slack.
In the following paragraghs, I walk you through the process of building components of Kubewatch controller (SharedInformer and Workqueue), processing notifications and sending them to Slack. Finally, I will test Kubewatch on a local environment with Minikube to see how it works.
At the end of this post you should understand what main elements are needed to write a Kubernetes controller as Kubewatch, and you will have the knowledge to write your own controller.
In general, we build a controller struct which contains the necessary elements described below:
// Controller object
type Controller struct {
logger *logrus.Entry
clientset kubernetes.Interface
queue workqueue.RateLimitingInterface
informer cache.SharedIndexInformer
eventHandler handlers.Handler
}
- Logger manages the controller logs.
- Clientset holds Kubernetes client interface which helps the controller interact with Kubernetes API server.
- Queue is the controller Workqueue.
- Informer is the controller SharedInformer.
- eventHandler holds communication to Slack which can extend to other channels.
The following are the typical steps you must follow when writing a controller (Kubewatch example):
- Construct a WorkQueue and a SharedInformer
- Start the controller
- Connect the controller to Slack
Let's see how Kubewatch is built and how it works in the sample scenario.
Constructing a WorkQueue and a SharedInformer
First, we construct a WorkQueue and a SharedInformer for the controller. You'll find a brief explanation of each element after the piece of code:
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
informer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options meta_v1.ListOptions) (runtime.Object, error) {
return client.CoreV1().Pods(meta_v1.NamespaceAll).List(options)
},
WatchFunc: func(options meta_v1.ListOptions) (watch.Interface, error) {
return client.CoreV1().Pods(meta_v1.NamespaceAll).Watch(options)
},
},
&api_v1.Pod{},
0, //Skip resync
cache.Indexers{},
)
- ListWatcher says the controller wants to list and watch all pods in all namespaces.
- We skip not to do resynchronization for the controller cache.
- We use SharedIndexInformer instead of SharedInformer because it allows the controller to maintain indexes across all objects in the cache.
Now the controller must manage the events that happen to the pods it's watching. To do that, SharedInformer provides us with the AddEventHandler
function. From the first blog post, you know that the controller passes these events to the Workqueue.
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
})
Events in the Workqueue are represented by their keys which are constructed in the format of pod_namespace/pod_name
. In the case of pod deletion, we must check for the DeletedFinalStateUnknown state of that pod in the cache before enqueuing its key. The DeletedFinalStateUnknown state means that the pod has been deleted but that the watch deletion event was missed and the controller didn't react accordingly.
Starting the controller
We construct the controller using the Workqueue and SharedInformer above. Now, we will start the controller by calling the Run() function. (The comments give you an overall idea of how the function works.)
// Run will start the controller.
// StopCh channel is used to send interrupt signal to stop it.
func (c *Controller) Run(stopCh <-chan struct{}) {
// don't let panics crash the process
defer utilruntime.HandleCrash()
// make sure the work queue is shutdown which will trigger workers to end
defer c.queue.ShutDown()
c.logger.Info("Starting kubewatch controller")
go c.informer.Run(stopCh)
// wait for the caches to synchronize before starting the worker
if !cache.WaitForCacheSync(stopCh, c.HasSynced) {
utilruntime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
return
}
c.logger.Info("Kubewatch controller synced and ready")
// runWorker will loop until "something bad" happens. The .Until will
// then rekick the worker after one second
wait.Until(c.runWorker, time.Second, stopCh)
}
At this step, the SharedInformer starts watching for pods in the cluster and sends their keys to the Workqueue. The next step we must define how the worker(s) pops up and processes keys. The main idea here is implementing the key's life-cycle management which is explained in the first blog post.
func (c *Controller) runWorker() {
// processNextWorkItem will automatically wait until there's work available
for c.processNextItem() {
// continue looping
}
}
// processNextWorkItem deals with one key off the queue. It returns false
// when it's time to quit.
func (c *Controller) processNextItem() bool {
// pull the next work item from queue. It should be a key we use to lookup
// something in a cache
key, quit := c.queue.Get()
if quit {
return false
}
// you always have to indicate to the queue that you've completed a piece of
// work
defer c.queue.Done(key)
// do your work on the key.
err := c.processItem(key.(string))
if err == nil {
// No error, tell the queue to stop tracking history
c.queue.Forget(key)
} else if c.queue.NumRequeues(key) < maxRetries {
c.logger.Errorf("Error processing %s (will retry): %v", key, err)
// requeue the item to work on later
c.queue.AddRateLimited(key)
} else {
// err != nil and too many retries
c.logger.Errorf("Error processing %s (giving up): %v", key, err)
c.queue.Forget(key)
utilruntime.HandleError(err)
}
return true
}
Connecting the controller to Slack
As you can see in the prior step, the processItem(key)
function is where you define how to react to a pod event. Because our controller wants to send notifications to Slack, we connect Slack at this step and prepare the message to be sent.
func (c *Controller) processItem(key string) error {
c.logger.Infof("Processing change to Pod %s", key)
obj, exists, err := c.informer.GetIndexer().GetByKey(key)
if err != nil {
return fmt.Errorf("Error fetching object with key %s from store: %v", key, err)
}
if !exists {
c.eventHandler.ObjectDeleted(obj)
return nil
}
c.eventHandler.ObjectCreated(obj)
return nil
}
The controller looks up the event in the cache using its key. If the event doesn't exist in the cache, this is a deletion event [(s, obj, "deleted")]. Otherwise, this is a creation event [(s, obj, "created")]. The controller then forwards the event to the eventHandler where we prepare a proper message and send it to Slack.
func (s *Slack) ObjectCreated(obj interface{}) {
notifySlack(s, obj, "created")
}
func (s *Slack) ObjectDeleted(obj interface{}) {
notifySlack(s, obj, "deleted")
}
func notifySlack(s *Slack, obj interface{}, action string) {
e := kbEvent.New(obj, action)
api := slack.New(s.Token)
params := slack.PostMessageParameters{}
attachment := prepareSlackAttachment(e)
params.Attachments = []slack.Attachment{attachment}
params.AsUser = true
channelID, timestamp, err := api.PostMessage(s.Channel, "", params)
if err != nil {
log.Printf("%s\n", err)
return
}
log.Printf("Message successfully sent to channel %s at %s", channelID, timestamp)
}
Slack connection is authenticated via a token which is pre-configured in the kubewatch.yaml
file or by executing the kubewatch config
command. Kubewatch also supports loading the token from Kubernetes secret object.
Okay, so now we have built all pieces of the controller. Let's try to run it on Minikube. You can download the latest version of Kubewatch on the Kubewatch release page.
$ wget https://github.com/skippbox/kubewatch/releases/download/v0.0.3/kubewatch.yaml
//Note: update slack channel and slack token values at the configmap object.
//Then deploy kubewatch
$ kubectl create -f kubewatch.yaml
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
kubewatch 1/1 Running 0 2m
Checking the configured slack channel, you will see an output similar to the following image:

So far I have given you a quick tour of Kubewatch and how we can develop a custom controller to build a simple notification system for Kubernetes. We can add more controllers for other Kubernetes resources such as deployment, service, configmap, and so on, into Kubewatch.
Give it a go on Kubewatch and look at its source code for more details. I hope this post helps you to start writing your own controller. Happy hacking!