概述
文章目录
- 1. 介绍
- 2. 运行原理
- 3. 示例
1. 介绍
前面我们在使用clientset的时候了解到我们可以使用clientset来获取所有的原生资源对象,那么如果我们想要去一直获取集群的资源对象数据呢?岂不是需要用一个轮询去不断查询List()操作?这显然是不合理的,实际上除了常用的CRUD操作之外,我们还可以进行watch操作,可以监听资源对象的增、删、改、查操作,这样我们就可以根据自己的业务逻辑去处理这些数据了。 watch通过一个event接口监听对象的所有变化(增加、删除、更新):
// 任何知道如何监视和报告更改的东西都可以实现该接口。
type Interface interface {
// Stops watching. Will close the channel returned by ResultChan(). Releases
// any resources used by the watch.
// 停止监视。将关闭ResultChan()返回的通道。释放watch 使用的任何资源。
Stop()
// Returns a chan which will receive all the events. If an error occurs
// or Stop() is called, this channel will be closed, in which case the
// watch should be completely cleaned up.
// 返回将接收所有事件的chan。如果发生错误或调用Stop(),此通道将被关闭,在这种情况下,应彻底清理watch。
ResultChan() <-chan Event
}
watch接口的ResultChan方法会返回如下几种事件:
// EventType defines the possible types of events.
type EventType string
const (
Added EventType = "ADDED"
Modified EventType = "MODIFIED"
Deleted EventType = "DELETED"
Bookmark EventType = "BOOKMARK"
Error EventType = "ERROR"
DefaultChanSize int32 = 100
)
// Event represents a single event to a watched resource.
// +k8s:deepcopy-gen=true
type Event struct {
Type EventType
// Object is:
// * If Type is Added or Modified: the new state of the object.
// * If Type is Deleted: the state of the object immediately before deletion.
// * If Type is Bookmark: the object (instance of a type being watched) where
// only ResourceVersion field is set. On successful restart of watch from a
// bookmark resourceVersion, client is guaranteed to not get REPEAT event
// nor miss any events.
// * If Type is Error: *api.Status is recommended; other types may make sense
// depending on context.
Object runtime.Object
}
这个接口虽然我们可以直接去使用,但是实际上并不建议这样不使用,因为往往由于集群中的资源较多,我们需要自己在客户端去维护一套缓存,而这个维护成本也是非常大的,为此client-go也提供了自己的实现机制,那就是Informer。Informers是这个事件接口和带索引查询功能的内存缓存的组合,这样也是目前最常用的用法。Informers第一次被调用的时候会首先在客户端调用List来获取全量的对象集合,然后通过watch来获取增量的对象更新缓存。
2. 运行原理
一个控制器每次需要获取对象的时候都要访问APIServer,这会给系统带来很高的负载,Informers的内存缓存就是来解决这个问题的,此外Informers还可以几乎实时监控对象的变化,而不需要轮询请求,这样就可以保证客户端的缓存数据和服务端的数据一致,就可以大大降低APIServer的压力.
如上图展示了Informer的基本处理流程: - 以events事件的方式从APIServer获取数据 - 提供一个类似客户端的List接口,从内存缓存中get和List对象 - 为添加、删除、更新注册事件处理程序 此外Informers也有错误处理方式,当长期运行的Watch连接时,它们会尝试使用另一个watch请求来恢复连接,在不丢失任何事件的情况下恢复事件流。如果中断的时间较长,而且APIServer丢失了事件(etcd在新的watch请求成功前从数据库中清除了这些事件),那么informers就会重新List全量数据。 而且在重新List全量操作的时候还可以配置一个重新同步的周期参数,用于协调内存缓存数据和业务逻辑的数据一致性,每次过了该周期后,注册的事件处理程序就被所有的对象调用,通常这个周期参数以分为单位,比如10分钟或者30分钟。 informers的这些高级特性以及超级的鲁棒性,都足以让我们不去直接使用客户端的Watch()方法来处理自己的业务逻辑,而且在kubernetes中也有很多地方都有使用到informers。但是在使用informers的时候,通常每个GroupVersionResource(GVR)只实例化一个Informers,,但是有时候我们在一个应用中往往有使用多种资源对象的需求,这个时候为了方便共享Informers,我们可以通过使用共享Informer工厂来实例化一个Informer。 共享Informer工厂允许我们在应用中为同一个资源共享Informer,也就是不同的控制器循环可以使用相同的watch连接到后台的APIServer,例如,Kube-controller-mananger中的控制器数据量就非常多,但是队遭遇每个资源对象(比如Pod),在这个进程中只有一个Informer。
3. 示例
首先我们创建一个ClientSet对象,然后使用ClientSet来创建一个共享Informer工厂,Informer是通过informer-gen这个代码生产器工具自动生成的,位于k8s.io/client.go/informers中。 这里我们来创建一个用于获取Deployment的共享Informer,代码如下所示:
package main
import (
"flag"
"fmt"
v1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"path/filepath"
"time"
)
func main() {
var err error
var config *rest.Config
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "absolute path to kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "","absolute path to kubeconfig file")
}
flag.Parse()
// 使用sa创建集群配置(InCluster模式),需要去配置对应的RBAC权限,默认的是default->无权限获取deployments的List权限
if config, err = rest.InClusterConfig(); err != nil {
//使用kubeconfig文件创建集群配置
if config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig); err != nil {
panic(err.Error())
}
}
//创建clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
// 初始化informer factory(为了测试方便这里设置每30s重新List一次)
informerFactory := informers.NewSharedInformerFactory(clientset, time.Second * 30)
// 对Deployment监听
deployInformer := informerFactory.Apps().V1().Deployments()
// 创建Informer(相当于注册到工厂中去,这样下面启动的时候就会失去List && Watch对应的资源)
informer := deployInformer.Informer()
// 创建Lister
deployLister := deployInformer.Lister()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: onAdd,
UpdateFunc: onUpdate,
DeleteFunc: onDelete,
})
stopper := make(chan struct{})
defer close(stopper)
//启动informer, Lister && Watch
informerFactory.Start(stopper)
// 等待所有启动的Informer的缓存被同步
informerFactory.WaitForCacheSync(stopper)
//从本地缓存中获取default中所有的deployment列表
deployments, err := deployLister.Deployments("default").List(labels.Everything())
if err != nil {
panic(err)
}
for idx, deployment := range deployments {
fmt.Printf("%d->%sn", idx+1, deployment.Name)
}
<-stopper
}
func onDelete(obj interface{}) {
deploy := obj.(*v1.Deployment)
fmt.Println("delete a deployment:", deploy.Name)
}
func onUpdate(old, new interface{}) {
oldDeploy := old.(*v1.Deployment)
newDeploy := new.(*v1.Deployment)
fmt.Println("update a deployment:", oldDeploy.Name,newDeploy.Name)
}
func onAdd(obj interface{}) {
deploy := obj.(*v1.Deployment)
fmt.Println("Add a deployment:", deploy.Name)
}
上面的代码运行可以获得default命名空间之下的所有Deployment信息以及整个集群的Deployment的数据
这是因为我们首先通过Informer注册了事件处理程序,这样我们启动Informer的时候首先会将集群的全量Deployment数据同步到本地的缓存中,会触发AddFunc这个回调函数,然后我们又在下面的Lister()来获取default命名下面的所有Deployment数据,这个时候的数据是从本地的缓存中获取的,所以就看到了上面的结果,由于我们还配置了每30s重新全量List一次,所以正常每30s我们也可以看到所有Deployment数据出现在UpdateFunc回调函数下面,我们也可以尝试去删除一个Deployment,同样也会出现对于的DeleteFunc下面的事件
最后
以上就是勤恳刺猬为你收集整理的client-go——informer 原理(五)1. 介绍2. 运行原理3. 示例的全部内容,希望文章能够帮你解决client-go——informer 原理(五)1. 介绍2. 运行原理3. 示例所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复