当前位置 博文首页 > x_i_y_u_e的专栏:k8s informer使用示例

    x_i_y_u_e的专栏:k8s informer使用示例

    作者:[db:作者] 时间:2021-08-04 15:00

    package main
    
    import (
    	"k8s.io/client-go/kubernetes"
    	"k8s.io/client-go/tools/clientcmd"
    	"k8s.io/kubernetes/staging/src/k8s.io/client-go/informers"
    	"k8s.io/api/core/v1"
    	"fmt"
    	"time"
    //	"k8s.io/apimachinery/pkg/labels"
    	coreinformers "k8s.io/client-go/informers/core/v1"
    )
    
    type controller struct {
    	queue chan interface{}
    }
    
    func (c *controller) pop() interface{}{
    	e := <- c.queue
    	return e
    }
    
    func (c *controller) run() {
    	for {
    		ele := c.pop()
    		c.process(ele)
    	}
    }
    
    func (c *controller) process(obj interface{}) {
    	pod, _ := obj.(*v1.Pod)
    	fmt.Printf("process pod: %s_%s \n", pod.Namespace, pod.Name)
    }
    
    func initInformer() coreinformers.PodInformer {
    	// generate a k8s client
    	clientConfig, err := clientcmd.LoadFromFile("/var/run/kubernetes/admin.kubeconfig")
    	ExceptNilErr(err)
    
    	config, err := clientcmd.NewDefaultClientConfig(*clientConfig, &clientcmd.ConfigOverrides{}).ClientConfig()
    	ExceptNilErr(err)
    
    	k8sClient, err := kubernetes.NewForConfig(config)
    	ExceptNilErr(err)
    
    	// generate a shared informerFactory
    	sharedInformerFactory := informers.NewSharedInformerFactory(k8sClient, 0)
    
    	// create pod informer and start it
    	podInformer := sharedInformerFactory.Core().V1().Pods()
    	return podInformer
    }
    
    func main() {
    	stopCh := make(chan struct{})
    	podInformer := initInformer()
    
    	c := &controller{
    		queue: make(chan interface{}, 10),
    	}
    	podInformer.Informer().AddEventHandler(&QueueEventHandler{c:c})
    	go podInformer.Informer().Run(stopCh)
    	go c.run()
    
    	time.Sleep(time.Minute * 10)
    }
    
    type QueueEventHandler struct {
    	c *controller
    }
    
    func (h *QueueEventHandler) OnAdd(obj interface{}) {
    	h.c.queue <- obj
    }
    
    func (h *QueueEventHandler) OnDelete(obj interface{}) {
    //	h.c.queue <- obj
    }
    
    func (h *QueueEventHandler) OnUpdate(old, new interface{}) {
    //	h.c.queue <- new
    }
    
    func ExceptNilErr(err error)  {
    	if err != nil {
    		panic(err)
    	}
    }
    
    

    controller的一般模式

    cs