概述
目标:
掌握kubernetes operator的开发,并弄清楚operator的原理
1 安装
1.1 安装go开发环境
cd /etc
cp -rf yum.repos.d yum.repos.d_bak
cd /etc/yum.repos.d/
rm -rf ./*
wget http://mirrors.aliyun.com/repo/Centos-7.repo
mv Centos-7.repo CentOs-Base.repo
yum install epel-release -y
yum install golang -y
1.2 导入go环境变量
mkdir -p $HOME/go
mkdir -p $HOME/go/bin
mkdir -p $HOME/go/src
mkdir -p $HOME/go/pkg
cat >>$HOME/.bash_profile<<EOF
export GOROOT=/usr/lib/golang
export GOPATH=$HOME/go
export GO111MODULE=on
export GOPROXY=https://goproxy.io
EOF
source $HOME/.bash_profile
然后修改
vim $HOME/.bash_profile
添加:
export PATH=$PATH:/usr/lib/golang/bin:$HOME/go/bin
source $HOME/.bash_profile
注意:
若无法安装epel-release,则
yum clean all
yum makecache
yum update
1.3 安装delve
mkdir -p $GOPATH/src/github.com/go-delve/delve
git clone https://github.com/go-delve/delve.git $GOPATH/src/github.com/go-delve/delve
cd $GOPATH/src/github.com/go-delve/delve
make install
1.4 安装operator-sdk
mkdir -p $GOPATH/src/github.com/operator-framework
cd $GOPATH/src/github.com/operator-framework
git clone https://github.com/operator-framework/operator-sdk
cd operator-sdk
git checkout v0.12.0
make dep
make install
1.5 验证go环境安装完成
[root@localhost ch1_dir]# go version
go version go1.13.3 linux/amd64
[root@localhost ch1_dir]# whereis go
go: /usr/bin/go
1.6 查看go环境变量
go env
1.7 验证go环境正常
cd $GOPATH/src
vim main.go
写入如下内容
package main
import "fmt"
func main() {
fmt.Println("Hello, World!")
}
执行:
go run ./main.go
1.8 关于环境变量
GOROOT: golang的安装目录
GOPATH:是工作空间,用来存放包的目录。
包含3个子目录:
src:源码, .go等,go run, go install等命令的当前工作路径,在此路径下执行尚书命令
pkg:编译生成的中间文件,比如.a, golang编译包时
bin:编译后生成可执行文件
GOPATH下的src目录就是接下来开发程序的主要目录,所有的源码都是放在这个目录下面,那么一般我们的做法就是一个目录一个项目,
例如: $GOPATH/src/mymath 表示mymath这个应用包或者可执行应用
参考:
https://www.cnblogs.com/enduo/p/9078313.html
2 使用
2.1 创建operator
operator-sdk new dozer-operator
2.2 生成api
operator-sdk add api --api-version=timers.estimer.com/v1alpha1 --kind=Timer
operator-sdk add api --api-version=timers.estimer.com/v1alpha1 --kind=AuditRecord
operator-sdk add api --api-version=timers.estimer.com/v1alpha1 --kind=Action
operator-sdk add api --api-version=timers.estimer.com/v1alpha1 --kind=Execution
2.2 生成controller
operator-sdk add controller --api-version=timers.estimer.com/v1alpha1 --kind=Timer
2.3 修改各个的types.go
例如修改:
pkg/apis/timers/v1alpha1/timer_types.go
修改如下内容:
......
type TimerSpec struct {
Name map[string]interface{} `json:"name"`
Labels []map[string]interface{} `json:"labels"`
}
// TimerStatus defines the observed state of Timer
// +k8s:openapi-gen=true
type TimerStatus struct {
Nodes []string `json:"nodes"`
}
......
可根据实际内容修改。
注意:
Trigger map[string]interface{} `json:"trigger"`
中的`json:"trigger"` 表示从json中的trigger中取出内容赋予Trigger字段。
//可以选择的控制字段有三种:
// -:不要解析这个字段
// omitempty:当字段为空(默认值)时,不要解析这个字段。比如 false、0、nil、长度为 0 的 array,map,slice,string
// FieldName:当解析 json 的时候,使用这个名字
type StudentWithOption struct {
StudentId string //默认使用原定义中的值
StudentName string `json:"sname"` // 解析(encode/decode) 的时候,使用 `sname`,而不是 `Field`
StudentClass string `json:"class,omitempty"` // 解析的时候使用 `class`,如果struct 中这个值为空,就忽略它
StudentTeacher string `json:"-"` // 解析的时候忽略该字段。默认情况下会解析这个字段,因为它是大写字母开头的
}
参考:
https://www.cnblogs.com/fengxm/p/9917686.html
2.4 更新代码
执行:
operator-sdk generate k8s
2.5 编写controller逻辑
1) 修改协调逻辑
具体在:
pkg/controller/timer/timer_controller.go中的
func (r *ReconcileTimer) Reconcile(request reconcile.Request) (reconcile.Result, error)
方法,实现自己的协调逻辑。
2)修改事件过滤
具体在:
pkg/controller/timer/timer_controller.go中的
func add(mgr manager.Manager, r reconcile.Reconciler)
2.6 调试operator
在一个安装了上述golang环境的pod中,开启一个窗口执行:
operator-sdk up local --namespace timed-task --enable-delve --verbose
在一个安装了上述golang环境的pod中,开启另一个窗口执行:
dlv connect 0.0.0.0:2345
加断点:
b main.main
b timer_controller.go:41
b timer_controller.go:86
注意:
请将41,86替换为自己真正想要打断点的代码行号。
注意:
执行operator-sdk up local --namespace timed-task --enable-delve --verbose
命令之前,确保2345和8333端口没有被占用。
2.7 制作镜像
operator-sdk build qingyuanluo/dozer-operator:v0.0.1
sed -i 's|REPLACE_IMAGE|qingyuanluo/dozer-operator:v0.0.1|g' deploy/operator.yaml
docker push qingyuanluo/dozer-operator:v0.0.1
3 原理
3.1 operator作用与组成
作用: 简化复杂有状态应用管理,其通过CRD扩展 Kubernetes API 来自动创建、管理和配置应用实例。
但群集仍需要控制器来监视其状态并协调资源以匹配声明。
通过扩展Kubernetes定义Custom Controller,观察应用并根据实际状态执行自定义任务
部署: Operator以deployment的形式部署到K8S中。
构建Operator:
为了创建自定义Operator,我们需要如下资源:
Custom Resource(CR)spec,定义我们要观测的应用对象,以及为CR定义的API
Custom Controller,用来观测CR
Custom code,决定Custom Controller如何协调CR
Operator,管理Custom Controller
Deployment,定义Operator和自定义资源
参考:
http://dockone.io/article/8769
3.2 operator框架
3.2.1) Operator Framework作用
用于快速开发 Operator 的工具包,该框架包含两个主要的部分:
Operator SDK: 根据自己需求来构建operator应用
Operator Lifecycle Manager OLM: 安装管理operator
3.2.2) Operator Framework开发工作流
1)使用 SDK 创建一个新的 Operator 项目
2)通过添加自定义资源(CRD)定义新的资源 API
3)指定使用 SDK API 来 watch 的资源
4)定义 Operator 的协调(reconcile)逻辑
5)使用 Operator SDK 构建并生成 Operator 部署清单文件
3.3 operator监控的资源
控制器检测的资源:
一级资源和二级资源。
对于replicaSet,一级资源是ReplicaSet本身,指定运行的
Docker镜像以及Pod的副本数。
二级资源是Pod,当ReplicaSet定义发生改变(例如镜像版本或Pod数量),
或者Pod发生改变(例如某个Pod被删除),控制器则会通过推出新版本来协调集群状态,
或者对Pod数量扩缩容。
对于DaemonSet,一级资源是DaemonSet本身,而二级资源是Pod和Node。不同之处在于,控制器也会监测集群Node变化,以便在群集增大或缩小时添加或删除Pod。
明确识别出Operator的主要和二级资源是至关重要的,因为他们将决定其控制器的行为。
3.4 operator的原理
Operator 实际上作为kubernetes自定义扩展资源注册到controller-manager,通过list and watch的方式监听对应资源的变化,然后在周期内的各个环节做相应的协调处理。所谓的处理就是operator实现由状态的应用的核心部分,当然不同应用其处理的方式也不同。
参考:
https://github.com/operator-framework/operator-sdk/blob/master/doc/user/install-operator-sdk.md
https://feisky.gitbooks.io/kubernetes/apps/operator.html
http://dockone.io/article/8769
https://github.com/operator-framework/operator-sdk
http://dockone.io/article/8733
https://www.jianshu.com/p/628aac3e6758
http://www.imooc.com/article/294186
https://www.cnblogs.com/liabio/p/11723809.html
4 sample-controller源码分析
1)代码主入口
sample-controller/main.go
代码如下:
func main() {
klog.InitFlags(nil)
flag.Parse()
// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()
cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
if err != nil {
klog.Fatalf("Error building kubeconfig: %s", err.Error())
}
kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
klog.Fatalf("Error building kubernetes clientset: %s", err.Error())
}
exampleClient, err := clientset.NewForConfig(cfg)
if err != nil {
klog.Fatalf("Error building example clientset: %s", err.Error())
}
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)
controller := NewController(kubeClient, exampleClient,
kubeInformerFactory.Apps().V1().Deployments(),
exampleInformerFactory.Samplecontroller().V1alpha1().Foos())
// notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh)
// Start method is non-blocking and runs all registered informers in a dedicated goroutine.
kubeInformerFactory.Start(stopCh)
exampleInformerFactory.Start(stopCh)
if err = controller.Run(2, stopCh); err != nil {
klog.Fatalf("Error running controller: %s", err.Error())
}
}
分析:
大体流程就是:
s1)读取 kubeconfig 配置,构造用于事件监听的 Kubernetes Client
s2)这里创建了两个,一个监听普通事件,一个监听 Foo 事件
s3)基于 Client 构造监听相关的 informer
s4)基于 Client、Informer 初始化自定义 Controller,监听 Deployment 以及 Foos 资源变化
s5)开启 Controller
接下来分析Controller是如何处理事件的
2) 查看Controller的定义
代码在:
sample-controller/controller.go
代码如下:
// Controller is the controller implementation for Foo resources
type Controller struct {
// kubeclientset is a standard kubernetes clientset
kubeclientset kubernetes.Interface
// sampleclientset is a clientset for our own API group
sampleclientset clientset.Interface
deploymentsLister appslisters.DeploymentLister
deploymentsSynced cache.InformerSynced
foosLister listers.FooLister
foosSynced cache.InformerSynced
// workqueue is a rate limited work queue. This is used to queue work to be
// processed instead of performing it as soon as a change happens. This
// means we can ensure we only process a fixed amount of resources at a
// time, and makes it easy to ensure we are never processing the same item
// simultaneously in two different workers.
workqueue workqueue.RateLimitingInterface
// recorder is an event recorder for recording Event resources to the
// Kubernetes API.
recorder record.EventRecorder
}
分析:
Controller 的关键成员即两个事件的 Listener(appslisters.DeploymentLister、listers.FooLister)
这两个成员将由 main 函数传入参数进行初始化
此外,为了缓冲事件处理,这里使用队列暂存事件,相关成员即为 workqueue.RateLimitingInterface
record.EventRecorder 用于记录事件
3) 分析Controller的构造过程
// NewController returns a new sample controller
func NewController(
kubeclientset kubernetes.Interface,
sampleclientset clientset.Interface,
deploymentInformer appsinformers.DeploymentInformer,
fooInformer informers.FooInformer) *Controller {
// Create event broadcaster
// Add sample-controller types to the default Kubernetes Scheme so Events can be
// logged for sample-controller types.
utilruntime.Must(samplescheme.AddToScheme(scheme.Scheme))
klog.V(4).Info("Creating event broadcaster")
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
controller := &Controller{
kubeclientset: kubeclientset,
sampleclientset: sampleclientset,
deploymentsLister: deploymentInformer.Lister(),
deploymentsSynced: deploymentInformer.Informer().HasSynced,
foosLister: fooInformer.Lister(),
foosSynced: fooInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Foos"),
recorder: recorder,
}
klog.Info("Setting up event handlers")
// Set up an event handler for when Foo resources change
fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueFoo,
UpdateFunc: func(old, new interface{}) {
controller.enqueueFoo(new)
},
})
// Set up an event handler for when Deployment resources change. This
// handler will lookup the owner of the given Deployment, and if it is
// owned by a Foo resource will enqueue that Foo resource for
// processing. This way, we don't need to implement custom logic for
// handling Deployment resources. More info on this pattern:
// https://github.com/kubernetes/community/blob/8cafef897a22026d42f5e5bb3f104febe7e29830/contributors/devel/controllers.md
deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.handleObject,
UpdateFunc: func(old, new interface{}) {
newDepl := new.(*appsv1.Deployment)
oldDepl := old.(*appsv1.Deployment)
if newDepl.ResourceVersion == oldDepl.ResourceVersion {
// Periodic resync will send update events for all known Deployments.
// Two different versions of the same Deployment will always have different RVs.
return
}
controller.handleObject(new)
},
DeleteFunc: controller.handleObject,
})
return controller
}
分析:
初始化 Controller 大体过程如下:
s1)将 sample-controller 的类型信息(Foo)添加到默认 Kubernetes Scheme,以便能够记录到其事件
s2)基于新 Scheme 创建一个事件记录 recorder ,用于记录来自 “sample-controller” 的事件
s3)基于函数入参及刚刚构造的 recorder,初始化 Controller
s4)设置对 Foo 资源变化的事件处理函数(Add、Update 均通过 enqueueFoo 处理)
s5)设置对 Deployment 资源变化的事件处理函数(Add、Update、Delete 均通过 handleObject 处理)
s6)返回初始化的 Controller
分析enqueueFoo 以及 handleObject 的实现
4) 分析enqueueFoo的实现
// enqueueFoo takes a Foo resource and converts it into a namespace/name
// string which is then put onto the work queue. This method should *not* be
// passed resources of any type other than Foo.
func (c *Controller) enqueueFoo(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
return
}
c.workqueue.Add(key)
}
分析:
enqueueFoo 就是解析 Foo 资源为 namespace/name 形式的字符串,然后入队
5) 分析handleObject 的实现
// handleObject will take any resource implementing metav1.Object and attempt
// to find the Foo resource that 'owns' it. It does this by looking at the
// objects metadata.ownerReferences field for an appropriate OwnerReference.
// It then enqueues that Foo resource to be processed. If the object does not
// have an appropriate OwnerReference, it will simply be skipped.
func (c *Controller) handleObject(obj interface{}) {
var object metav1.Object
var ok bool
if object, ok = obj.(metav1.Object); !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("error decoding object, invalid type"))
return
}
object, ok = tombstone.Obj.(metav1.Object)
if !ok {
utilruntime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type"))
return
}
klog.V(4).Infof("Recovered deleted object '%s' from tombstone", object.GetName())
}
klog.V(4).Infof("Processing object: %s", object.GetName())
if ownerRef := metav1.GetControllerOf(object); ownerRef != nil {
// If this object is not owned by a Foo, we should not do anything more
// with it.
if ownerRef.Kind != "Foo" {
return
}
foo, err := c.foosLister.Foos(object.GetNamespace()).Get(ownerRef.Name)
if err != nil {
klog.V(4).Infof("ignoring orphaned object '%s' of foo '%s'", object.GetSelfLink(), ownerRef.Name)
return
}
c.enqueueFoo(foo)
return
}
}
分析:
handleObject 监听了所有实现了 metav1 的资源,但只过滤出 owner 是 Foo 的,
将其解析为 namespace/name 入队
小结:在构造 Controller 时就已经初始化好事件收集这部分的工作了
那如何处理队列里的这些事件呢?
来到 Run 函数的定义处
6) 分析Run方法
// Run will set up the event handlers for types we are interested in, as well
// as syncing informer caches and starting workers. It will block until stopCh
// is closed, at which point it will shutdown the workqueue and wait for
// workers to finish processing their current work items.
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()
// Start the informer factories to begin populating the informer caches
klog.Info("Starting Foo controller")
// Wait for the caches to be synced before starting workers
klog.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, c.foosSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
klog.Info("Starting workers")
// Launch two workers to process Foo resources
for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
klog.Info("Started workers")
<-stopCh
klog.Info("Shutting down workers")
return nil
}
分析:
Run 函数的执行过程大体如下:
等待 Informer 同步完成
并发 runWorker,处理队列内事件
7) 分析runWorker
// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
func (c *Controller) runWorker() {
for c.processNextWorkItem() {
}
}
8) 分析processNextWorkItem
// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the syncHandler.
func (c *Controller) processNextWorkItem() bool {
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}
// We wrap this block in a func so we can defer c.workqueue.Done.
err := func(obj interface{}) error {
// We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is
// put back on the workqueue and attempted again after a back-off
// period.
defer c.workqueue.Done(obj)
var key string
var ok bool
// We expect strings to come off the workqueue. These are of the
// form namespace/name. We do this as the delayed nature of the
// workqueue means the items in the informer cache may actually be
// more up to date that when the item was initially put onto the
// workqueue.
if key, ok = obj.(string); !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
c.workqueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
// Run the syncHandler, passing it the namespace/name string of the
// Foo resource to be synced.
if err := c.syncHandler(key); err != nil {
// Put the item back on the workqueue to handle any transient errors.
c.workqueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
c.workqueue.Forget(obj)
klog.Infof("Successfully synced '%s'", key)
return nil
}(obj)
if err != nil {
utilruntime.HandleError(err)
return true
}
return true
}
分析:
processNextWorkItem 的处理流程大体如下:
从队列取出待处理对象
调用 syncHandler 处理
再来分析syncHandler方法
9) 分析syncHandler方法
// syncHandler compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the Foo resource
// with the current status of the resource.
func (c *Controller) syncHandler(key string) error {
// Convert the namespace/name string into a distinct namespace and name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}
// Get the Foo resource with this namespace/name
foo, err := c.foosLister.Foos(namespace).Get(name)
if err != nil {
// The Foo resource may no longer exist, in which case we stop
// processing.
if errors.IsNotFound(err) {
utilruntime.HandleError(fmt.Errorf("foo '%s' in work queue no longer exists", key))
return nil
}
return err
}
deploymentName := foo.Spec.DeploymentName
if deploymentName == "" {
// We choose to absorb the error here as the worker would requeue the
// resource otherwise. Instead, the next time the resource is updated
// the resource will be queued again.
utilruntime.HandleError(fmt.Errorf("%s: deployment name must be specified", key))
return nil
}
// Get the deployment with the name specified in Foo.spec
deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName)
// If the resource doesn't exist, we'll create it
if errors.IsNotFound(err) {
deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(newDeployment(foo))
}
// If an error occurs during Get/Create, we'll requeue the item so we can
// attempt processing again later. This could have been caused by a
// temporary network failure, or any other transient reason.
if err != nil {
return err
}
// If the Deployment is not controlled by this Foo resource, we should log
// a warning to the event recorder and return error msg.
if !metav1.IsControlledBy(deployment, foo) {
msg := fmt.Sprintf(MessageResourceExists, deployment.Name)
c.recorder.Event(foo, corev1.EventTypeWarning, ErrResourceExists, msg)
return fmt.Errorf(msg)
}
// If this number of the replicas on the Foo resource is specified, and the
// number does not equal the current desired replicas on the Deployment, we
// should update the Deployment resource.
if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas {
klog.V(4).Infof("Foo %s replicas: %d, deployment replicas: %d", name, *foo.Spec.Replicas, *deployment.Spec.Replicas)
deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(newDeployment(foo))
}
// If an error occurs during Update, we'll requeue the item so we can
// attempt processing again later. This could have been caused by a
// temporary network failure, or any other transient reason.
if err != nil {
return err
}
// Finally, we update the status block of the Foo resource to reflect the
// current state of the world
err = c.updateFooStatus(foo, deployment)
if err != nil {
return err
}
c.recorder.Event(foo, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
return nil
}
分析:
syncHandler 的处理逻辑大体如下:
s1)根据 namespace/name 获取 foo 资源
s2)根据 foo,获取其 Deployment 名称,进而获取 deployment 资源(没有就为其创建)
s3)根据 foo 的 Replicas 更新 deployment 的 Replicas(如果不匹配)
s4)更新 foo 资源的状态为最新 deployment 的状态(其实就是 AvailableReplicas)
由此,可知 foo 的实现实体其实就是 deployment
10) 分析deployment 的实现
// newDeployment creates a new Deployment for a Foo resource. It also sets
// the appropriate OwnerReferences on the resource so handleObject can discover
// the Foo resource that 'owns' it.
func newDeployment(foo *samplev1alpha1.Foo) *appsv1.Deployment {
labels := map[string]string{
"app": "nginx",
"controller": foo.Name,
}
return &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: foo.Spec.DeploymentName,
Namespace: foo.Namespace,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(foo, samplev1alpha1.SchemeGroupVersion.WithKind("Foo")),
},
},
Spec: appsv1.DeploymentSpec{
Replicas: foo.Spec.Replicas,
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "nginx",
Image: "nginx:latest",
},
},
},
},
},
}
}
分析:
简单逻辑就是根据 foo 资源的 namespace、name、deploymentname、replicas 信息
创建 nginx deployment 而已
需要注意的是 OwnerReferences 里需要与 Foo 类型绑定(Group、Version、Kind),
主要是要与采集处匹配,因为 handleObject 中的筛选 Foo 资源代码是根据 Kind 值做的
if ownerRef.Kind != "Foo" {
return
}
11) 自定义 Controller 是如何与 crd.yaml 定义关联的?
我们知道,一开始是通过 crd.yaml 来通告 kubernetes 我们自定义资源的 scheme 的,
那是如何与 Controller 关联的呢?其实就在于 pkg/apis 目录下
pkg/apis 下定义了自定义资源的相关属性信息,我们简单看下:
pkg/samplecontroller/v1alpha1/register.go(处理类型 Schema)
代码如下:
// SchemeGroupVersion is group version used to register these objects
var SchemeGroupVersion = schema.GroupVersion{Group: samplecontroller.GroupName, Version: "v1alpha1"}
// Kind takes an unqualified kind and returns back a Group qualified GroupKind
func Kind(kind string) schema.GroupKind {
return SchemeGroupVersion.WithKind(kind).GroupKind()
}
// Resource takes an unqualified resource and returns a Group qualified GroupResource
func Resource(resource string) schema.GroupResource {
return SchemeGroupVersion.WithResource(resource).GroupResource()
}
var (
// SchemeBuilder initializes a scheme builder
SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
// AddToScheme is a global function that registers this API group & version to a scheme
AddToScheme = SchemeBuilder.AddToScheme
)
// Adds the list of known types to Scheme.
func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(SchemeGroupVersion,
&Foo{},
&FooList{},
)
metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
return nil
}
分析:
与之前的 crd 定义对比下
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: foos.samplecontroller.k8s.io
spec:
group: samplecontroller.k8s.io
version: v1alpha1
names:
kind: Foo
plural: foos
scope: Namespaced
会发现 controller 与 crd 两者的 group、version 都是一致的,
而且 metadata 的 name 是符合 <plural>.<group> 规范的
在 k8s 系统中,一旦创建了 CRD,对该 CRD 的增删改查其实就已经被支持了,
我们的 Controller 只是监听自己感兴趣的资源事件,做出真实的部署、更新、移除等动作
参考:
https://blog.csdn.net/shida_csdn/article/details/87102801
13) 分析目录功能
hack目录:
调用了 https://github.com/kubernetes/code-generator 项目中的 generate-groups.sh 脚本,code-gengrator 项目 cmd 目录下的代码,需要提前go install生成对应二进制文件。
pkg目录:
重要,需要自己编写。
pkg/apis/samplecontroller是 CRD 所属的 apiGroup,v1alpha1是apiVersion。
pkg/apis/samplecontroller/register.go : 定义了后面所需的全局变量。
pkg/apis/samplecontroller/v1alpha1/types.go: 包含了CRD类型Foo的完整定义。
Foo是Kubernetes对象标准定义;重要: FooSpec是我们需要定义的Foo类型的结构;
FooList包含一组Foo对象,apiserver的List接口返回的是List对象类型;
FooStatus:描述Foo类型实例状态的结构体。
pkg/apis/samplecontroller/v1alpha1/register.go : 通过addKnownTypes()方法,将我们定义的CRD类型Foo添加到Scheme。
pkg/apis/samplecontroller/v1alpha1/doc.go : 包含了+<tag-name>[=value] 格式的注释,这就是 Kubernetes 进行源码生成用的 Annotation 风格的注释,doc.go 中的注释,起到的是全局范围的作用
14) 代码生成
pkg下的上述文件完成,即可执行./hack/update-codegen.sh,即可生成管理新定义的 CRD 类型所需的 Kubernetes 代码
自动生成了 clientset,informers,listers 三个文件夹下的文件和apis下的zz_generated.deepcopy.go文件。
其中zz_generated.deepcopy.go中包含 pkg/apis/samplecontroller/v1alpha1/types.go 中定义的结构体的 DeepCopy() 方法。
另外三个文件夹clientset,informers,listers下都是 Kubernetes 生成的客户端库,在 controller 中会用到。
15) controller代码编写
接下来就是编写具体 controller 的代码,通过上述步骤生成的客户端库访问 apiserver,监听 CRD 资源的变化,并触发对应的动作,如创建或删除 Deployment 等。
编写自定义controller(Operator)时,可以使用 Kubernetes 提供的 client-go 客户端库。下图是 Kubernetes 提供的在使用client-go开发 controller 的过程中,client-go 和 controller 的交互流程:
交互流程图如下:
Reflector-------1) List & Watch -------------> Kubernetes API
|
|
2)Add Object
|
V
Delta Fifo queue
|
V
3) Pop Object
|
V
Informer---------4) Add Object------>Indexer----5) Store Object & Key-------> Thread safe store
| |
----------------------- |
| | |
6)Dispatch Event Handler |
functions(Send Object to Informer reference Indexer reference
Custom Controller)
| ^
V |
Res Event Handlers reference |
| |
Resource Event Handlers |
| 9) Get Object for key
7)Enqueue Object Key ^
| |
----> Workqueue---8) Get Key--->Process Item-->Handle Object
client-go组件
Reflector: 在cache包的Reflector类中,监听特定资源类型(Kind)的Kubernetes API,在ListAndWatch方法中执行。
可以监听k8s自定义资源类型。当reflector通过watch API发现新的资源实例被创建,将通过对应的list API获取到
新创建的对象并在watchHandler方法中将其加入到Delta Fifo队列中。
Informer: 在cache包中的base controller中,从Delta Fifo队列中pop出对象,在processLoop方法中执行。
base controller作用是将对象保存一遍后获取,并调用controller将对象传递给controller。
Indexer: 提供对象的indexing方法,定义在cache包的Indexer中。一个indexing场景是基于对象的label创建索引。
Indexer基于几个indexing方法维护索引,使用线程安全的data store来存储对象和他们的key。
cache包的Store类定义了一个名为MetaNamespaceKeyFunc的默认方法,可以为对象生成一个
<namespace>/<name>形式的key。
16) 自定义controller组件
Informer reference:是对Informer实例的引用。知道如何使用自定义资源对象。
自定义controller需要创建正确的Informer。
Indexer reference:是对Indexer实例的引用,自定义controller中需要创建它,在获取对象供后续使用时会用到这个引用。
base controller提供了:
NewIndexerInformer来创建Informer和Indexer,可以使用此方法或者用工厂方法创建informer。
Resource Event Handlers: 回调方法,当Informer想要发送一个对象给controller时,会调用这些方法。
编写回调方法的模式: 获取资源对象的key并放入一个work queue队列,等待进一步的处理(Process item)
Work queue: 在controller代码中创建的方法,用来对work queue中的对象做对应处理。可以有一个或多个其他方法做实际处理。
这些方法一般会使用Indexer reference,或者list方法来获取key对应的对象。
参考:
https://www.jianshu.com/p/2aaa0fe53db9
17) 编写对应的 CRD 和 对应 CRD 实例的 yaml 文件及 operator 的 Dockerfile
sample-controller
├── artifacts
│ └── examples
│ ├── crd.yaml
│ └── example-foo.yaml
├── controller.go
├── Dockerfile
├── hack
│ ├── boilerplate.go.txt
│ ├── custom-boilerplate.go.txt
│ ├── update-codegen.sh
│ └── verify-codegen.sh
├── main.go
└── pkg
├── apis
│ └── samplecontroller
│ ├── register.go
│ └── v1alpha1
│ ├── doc.go
│ ├── register.go
│ ├── types.go
│ └── zz_generated.deepcopy.go
├── generated
│ ├── clientset
│ │ └── ...
│ ├── informers
│ │ └── ...
│ └── listers
│ └── ...
└── signals
└── signal.go
sample-controller分析
参见blog:
http://maoqide.live/post/cloud/sample-controller/
5 协调方法分析
待补充
6 事件过滤
Kubernetes Operators are processes connecting to the master API and watching for events, typically on a limited number of resource types.
When a relevant event occurs, the operator reacts and performs a specific action. This may be limited to interacting with the master API only, but will often involve performing some action on some other systems (this could be either in cluster or off cluster resources).
Operators are implemented as a collection of controllers where each controller watches a specific resource type. When a relevant event occurs on a watched resource a reconcile cycle is started.
During the reconcile cycle, the controller has the responsibility to check that current state matches the desired state described by the watched resource. Interestingly, by design, the event is not passed to the reconcile cycle, which is then forced to consider the whole state of the instance that was referenced by the event.
This approach is referred to as level-based, as opposed to edge-based. Deriving from electronic circuit design, level-based triggering is the idea of receiving an event (an interrupt for example) and reacting to a state, while edge-based triggering is the idea of receiving an event and reacting to a state variation.
Level-based triggering, while arguably less efficient because it forces to re-evaluate the entire state as opposed to just what changed, is considered more suitable in complex and unreliable environments where signals can be lost or retransmitted multiple times.
This design choice influences how we write controller’s code.
Also relevant to this discussion is an understanding of the lifecycle of an API request. The following diagram provides a high level summary:
When a request is made to the API server, especially for create and delete requests, the request goes through the above phases. Notice that it is possible to specify webhooks to perform mutations and validations. If the operator introduces a new custom resource definition (CRD), we may have to also define those webhooks. Normally, the operator process would also implement the webhook endpoint by listening on a port.
This document presents a set of best practices to keep in mind when designing and developing operators using the Operator SDK.
If your operator introduces a new CRD, the Operator SDK will assist in scaffolding it. To make sure your CRD conforms to the Kubernetes best practices for extending the API, follow these conventions.
All the best practices mentioned in this article are portrayed in an example available at the operator-utils repository. This repository is also a library which you can import in your operator, giving you some useful utilities for writing your own operators.
Finally this set of best practices for writing operators represents my personal view and should not be considered an official list of best practices from Red Hat.
Creating watches
As we said, controllers watch events on resources. This is done through the abstraction of watches.
A watch is a mechanism to receive events of a certain type (either a core type or a CRD). A watch is normally created by specifying the following:
The resource type to watch.
A handler. The handler maps the events on the watched type to one or more instances for which the reconcile cycle is called. Watched type and instance type do not have to be the same.
A predicate. The predicate is a set of functions that can be customized to filter only the events we are interested in.
The diagram below captures these contexts:
In general, opening multiple watches on the same kind is acceptable because the watches are multiplexed.
You should also try to filter events as much as possible. Here, for example, is a predicate that filters events on secrets. Here we are interested only in events on secrets of type kubernetes.io/tls which have a certain annotation:
isAnnotatedSecret := predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
oldSecret, ok := e.ObjectOld.(*corev1.Secret)
if !ok {
return false
}
newSecret, ok := e.ObjectNew.(*corev1.Secret)
if !ok {
return false
}
if newSecret.Type != util.TLSSecret {
return false
}
oldValue, _ := e.MetaOld.GetAnnotations()[certInfoAnnotation]
newValue, _ := e.MetaNew.GetAnnotations()[certInfoAnnotation]
old := oldValue == "true"
new := newValue == "true"
// if the content has changed we trigger if the annotation is there
if !reflect.DeepEqual(newSecret.Data[util.Cert], oldSecret.Data[util.Cert]) ||
!reflect.DeepEqual(newSecret.Data[util.CA], oldSecret.Data[util.CA]) {
return new
}
// otherwise we trigger if the annotation has changed
return old != new
},
CreateFunc: func(e event.CreateEvent) bool {
secret, ok := e.Object.(*corev1.Secret)
if !ok {
return false
}
if secret.Type != util.TLSSecret {
return false
}
value, _ := e.Meta.GetAnnotations()[certInfoAnnotation]
return value == "true"
},
}
A very common pattern is to observe events on the resources that we create (and we own) and to schedule a reconcile cycle on the CR that owns those resources, to do so you can use the EnqueueRequestForOwner handler. This can be done as follows:
err = c.Watch(&source.Kind{Type: &examplev1alpha1.MyControlledType{}}, &handler.EnqueueRequestForOwner{})
A less common situation is where an event is multicast to several destination resources. Consider the case of a controller that injects TLS secrets into routes based on an annotation. Multiple routes in the same namespace can point to the same secret. If the secret changes we need to update all the routes. So, we would need to create a watch on the secret type and the handler would look as follows:
type enqueueRequestForReferecingRoutes struct {
client.Client
}
// trigger a router reconcile event for those routes that reference this secret
func (e *enqueueRequestForReferecingRoutes) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
routes, _ := matchSecret(e.Client, types.NamespacedName{
Name: evt.Meta.GetName(),
Namespace: evt.Meta.GetNamespace(),
})
for _, route := range routes {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Namespace: route.GetNamespace(),
Name: route.GetName(),
}})
}
}
// Update implements EventHandler
// trigger a router reconcile event for those routes that reference this secret
func (e *enqueueRequestForReferecingRoutes) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
routes, _ := matchSecret(e.Client, types.NamespacedName{
Name: evt.MetaNew.GetName(),
Namespace: evt.MetaNew.GetNamespace(),
})
for _, route := range routes {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Namespace: route.GetNamespace(),
Name: route.GetName(),
}})
}
}
Resource Reconciliation Cycle
The reconciliation cycle is where the framework gives us back control after a watch has passed up an event. As explained before, at this point we don’t have the information about the type of event because we are working on level-based triggers.
Below is a model of what a common reconciliation cycle for a controller that manages a CRD could look like. As with every model, this is not going to completely reflect any particular use case, but I hope you find it useful to help think about the problem one needs to solve when writing an operator.
As we can see from the diagram the main steps are:
Retrieve the interested CR instance.
Manage the instance validity. We don’t want to try to do anything on an instance that does not carry valid values.
Manage instance initialization. If some values of the instance are not initialized, this section will take care of it.
Manage instance deletion. If the instance is being deleted and we need to do some specific clean up, this is where we manage it.
Manage controller business logic. If the above steps all pass we can finally manage and execute the reconciliation logic for this particular instance. This will be very controller specific.
In the rest of this section you can find some more in depth considerations on each of these steps.
Resource Validation
Two types of validation exist: Syntactic validation and semantic validation.
Syntactic validation happens by defining OpenAPI validation rules.
Semantic Validation can be done by creating a ValidatingAdmissionConfiguration.
Note: it is not possible to validate a CR within a controller. Once the CR is accepted by the API server it will be stored in etcd. Once it is in etcd, the controller that owns it cannot do anything to reject it and if the CR is not valid, trying to use/process it will result in an error.
Recommendation: because we cannot guarantee that a ValidatingAdmissionConfiguration will be created and or working, we should also validate the CR from within the controller and if they are not valid avoid creating an endless error-loop (see also: error management).
Syntactic Validation
OpenAPI validation rules can be added as described here.
Recommendation: model as much of the custom resource as possible of your validation as syntactic validation. Syntactic validation is relatively straightforward and prevents badly formed CRs from being stored in etcd, so it should be used as much as possible.
Semantic Validation
Semantic validation is about making sure that fields have sensible values and that the entire resource record is meaningful. Semantic validation business logic depends on the concept that the CR represents and must be coded by the operator developer.
If semantic validation is required by the given CR, then the operator should expose a webhook and ValidatingAdmissionConfiguration should be created as part of the operator deployment.
The following limitations currently exist:
In OpenShift 3.11, ValidatingAdmissionConfigurations are in tech preview (they are supported from 4.1 on).
The Operator SDK has no support for scaffolding webhooks. This can be worked around using kubebuilder, for example: kubebuilder webhook –group crew –version v1 –kind FirstMate –type=mutating –operations=create,update
Validating a resource in the controller
It is better to reject an invalid CR rather than to accept it in etcd and then manage the error condition. That said, there could be situations in which the ValidatingAdmissionConfiguration is not deployed or not available at all. I think it is still a good practice to do semantic validation in the controller code. Code should be structured in such a way that you can share the same validation routine between the ValidatingAdmissionConfiguration and the controller.
The code of the controller calling the validation method should look like this:
if ok, err := r.IsValid(instance); !ok {
return r.ManageError(instance, err)
}
Note that if the validation fails, we manage this error as described in the error management section.
The IsValid function will look something like:
func (r *ReconcileMyCRD) IsValid(obj metav1.Object) (bool, error) {
mycrd, ok := obj.(*examplev1alpha1.MyCRD)
// validation logic
}
Resource Initialization
One of the nice conventional features of Kubernetes resources is that only the needed fields of a resource are to be initialized by the user and the others can be omitted. This is the point of view of the user, but from the point of view of the coder and anyone debugging what is happening with a resource it is actually better to have all the fields initialized. This allows writing code without always checking if a field is defined, and allows for easy troubleshooting of error situations. In order to initialize resources there are two options:
Define an initialization method in the controller.
Define a MutatingAdmissionConfiguration (the procedure is similar to the ValidatingAdmissionConfiguration).
Recommendation: define an initialization method in the controller. The code should look like this sample:
if ok := r.IsInitialized(instance); !ok {
err := r.GetClient().Update(context.TODO(), instance)
if err != nil {
log.Error(err, "unable to update instance", "instance", instance)
return r.ManageError(instance, err)
}
return reconcile.Result{}, nil
}
Notice that if the result is true, we update the instance and then we return. This will trigger another immediate reconcile cycle. This second time the initialize method will return false, and the logic will continue to the next phase.
Resource Finalization
If resources are not owned by the CR controlled by your operator but action needs to be taken when that CR is deleted, you must use a finalizer.
Finalizers provide a mechanism to inform the Kubernetes control plane that an action needs to take place before the standard Kubernetes garbage collection logic can be performed.
One or more finalizers can be set on resources. Each controller should manage its own finalizer and ignore others if present.
This is the pseudo code algorithm to manage finalizers:
If needed, add finalizers during the initialization method.
If the resource is being deleted, check if the finalizer owned by this controller is present.
If successful, update the CR by removing the finalizer.
If failure decide whether to retry or give up and likely leave garbage (in some situations this can be acceptable).
If not, return
If yes, execute the cleanup logic
If your clean-up logic requires creating additional resources, do keep in mind that additional resources cannot be created in a namespace that is being deleted. A To-be-deleted namespace will trigger a delete of all in the included resources including your CR with the finalizer.
See an example of the code here:
if util.IsBeingDeleted(instance) {
if !util.HasFinalizer(instance, controllerName) {
return reconcile.Result{}, nil
}
err := r.manageCleanUpLogic(instance)
if err != nil {
log.Error(err, "unable to delete instance", "instance", instance)
return r.ManageError(instance, err)
}
util.RemoveFinalizer(instance, controllerName)
err = r.GetClient().Update(context.TODO(), instance)
if err != nil {
log.Error(err, "unable to update instance", "instance", instance)
return r.ManageError(instance, err)
}
return reconcile.Result{}, nil
}
Resource Ownership
Resource ownership is a native concept in Kubernetes that determines how resources are deleted. When a resource is deleted and it owns other resources the children resources will be, by default, also deleted (you can disable this behavior, by setting cascade=false).
This behavior is instrumental to guarantee correct garbage collection of resources especially when resources control other resources in a multilevel hierarchy (think deployment-> repilcaset->pod).
Recommendation: If your controller creates resources and these resource’s lifecycle is tied to a resource (either core or CR), then you should set this resource as the owner of those resources. This can be done as follows:
controllerutil.SetControllerReference(owner, obj, r.GetScheme())
Some other rules around ownership are the following:
The owner object must be in the same namespace as the owned object.
A namespaced resource can own a cluster level resource. We have to be careful here. An object can have a list of owners. If multiple namespaced objects own the same cluster-level object then each should claim ownership without overwriting the others’ ownership (the above API takes care of that).
A cluster level resource cannot own a namespaced resource.
A cluster level object can own another cluster level object.
Managing status
Status is a standard section of a resource. Status is obviously used to report on the status of the resource. In this document we are going to use status to report on the outcome of the last execution of the reconcile cycle. You can use status to add more information.
Under normal circumstances, If we were updating our resource every time we execute the reconcile cycle, this would trigger an update event which in turn would trigger a reconcile cycle in an endless loop.
For this reason, Status should be modeled as a subresource as explained here.
This way when we can update the status of our resources without increasing the ResourceGeneration metadata field. We can update the status with this command:
err = r.Status().Update(context.Background(), instance)
Now we need to write a predicate for our watch (see the section about watches for more details on these concepts) that will discard updates that did not increase the ResourceGeneration, this can be done using the GenerationChangePredicate
If you recall, if we are using a finalizer, the finalizer should be set up at initialization time. If the finalizer is the only item that is being initialized, since it is a portion of the metadata field, the ResourceGeneration will not be incremented. To account for that use case, the following is a modified version of the predicate:
type resourceGenerationOrFinalizerChangedPredicate struct {
predicate.Funcs
}
// Update implements default UpdateEvent filter for validating resource version change
func (resourceGenerationOrFinalizerChangedPredicate) Update(e event.UpdateEvent) bool {
if e.MetaNew.GetGeneration() == e.MetaOld.GetGeneration() && reflect.DeepEqual(e.MetaNew.GetFinalizers(), e.MetaOld.GetFinalizers()) {
return false
}
return true
}
Now assuming your status is as follows:
type MyCRStatus struct {
// +kubebuilder:validation:Enum=Success,Failure
Status string `json:"status,omitempty"`
LastUpdate metav1.Time `json:"lastUpdate,omitempty"`
Reason string `json:"reason,omitempty"`
}
You can write a function to manage the successful execution of a reconciliation cycle:
func (r *ReconcilerBase) ManageSuccess(obj metav1.Object) (reconcile.Result, error) {
runtimeObj, ok := (obj).(runtime.Object)
if !ok {
log.Error(errors.New("not a runtime.Object"), "passed object was not a runtime.Object", "object", obj)
return reconcile.Result{}, nil
}
if reconcileStatusAware, updateStatus := (obj).(apis.ReconcileStatusAware); updateStatus {
status := apis.ReconcileStatus{
LastUpdate: metav1.Now(),
Reason: "",
Status: "Success",
}
reconcileStatusAware.SetReconcileStatus(status)
err := r.GetClient().Status().Update(context.Background(), runtimeObj)
if err != nil {
log.Error(err, "unable to update status")
return reconcile.Result{
RequeueAfter: time.Second,
Requeue: true,
}, nil
}
} else {
log.Info("object is not RecocileStatusAware, not setting status")
}
return reconcile.Result{}, nil
}
Managing errors
If a controller enters an error condition and returns an error in the reconcile method, the error will be logged by the operator to standard output and a reconciliation event will be immediately rescheduled (the default scheduler should actually detect if the same error appears over and over again, and increase the scheduling time, but in my experience, this does not occur). If the error condition is permanent, this will generate an eternal error loop situation. Furthermore, this error condition will not be visible by the user.
There are two ways to notify the user of an error and they can be both used at the same time:
Return the error in the status of the object.
Generate an event describing the error.
Also, if you believe the error might resolve itself, you should reschedule a reconciliation cycle after a certain period of time. Often, the period of time is increased exponentially so that at every iteration the reconciliation event is scheduled farther in the future (for example twice the amount of time every time).
We are going to build on top of status management to handle error conditions:
func (r *ReconcilerBase) ManageError(obj metav1.Object, issue error) (reconcile.Result, error) {
runtimeObj, ok := (obj).(runtime.Object)
if !ok {
log.Error(errors.New("not a runtime.Object"), "passed object was not a runtime.Object", "object", obj)
return reconcile.Result{}, nil
}
var retryInterval time.Duration
r.GetRecorder().Event(runtimeObj, "Warning", "ProcessingError", issue.Error())
if reconcileStatusAware, updateStatus := (obj).(apis.ReconcileStatusAware); updateStatus {
lastUpdate := reconcileStatusAware.GetReconcileStatus().LastUpdate.Time
lastStatus := reconcileStatusAware.GetReconcileStatus().Status
status := apis.ReconcileStatus{
LastUpdate: metav1.Now(),
Reason: issue.Error(),
Status: "Failure",
}
reconcileStatusAware.SetReconcileStatus(status)
err := r.GetClient().Status().Update(context.Background(), runtimeObj)
if err != nil {
log.Error(err, "unable to update status")
return reconcile.Result{
RequeueAfter: time.Second,
Requeue: true,
}, nil
}
if lastUpdate.IsZero() || lastStatus == "Success" {
retryInterval = time.Second
} else {
retryInterval = status.LastUpdate.Sub(lastUpdate).Round(time.Second)
}
} else {
log.Info("object is not RecocileStatusAware, not setting status")
retryInterval = time.Second
}
return reconcile.Result{
RequeueAfter: time.Duration(math.Min(float64(retryInterval.Nanoseconds()*2), float64(time.Hour.Nanoseconds()*6))),
Requeue: true,
}, nil
}
Notice that this function immediately sends an event, then it updates the status with the error condition. Finally, a calculation is made as to when to reschedule the next attempt. The algorithm tries to double the time every loop, up to a maximum of six hours.
Six hours is a good cap, because events last about six hours, so this should make sure that there is always an active event describing the current error condition.
Conclusion
The practices presented in this blog deal with the most common concerns of designing a Kubernetes operator, and should allow you to write an operator that you feel confident putting in production. Very likely, this is just the beginning and it is easy to foresee that more frameworks and tools will come to life to help writing operators.
In this blog we saw many code fragments that should be immediately reusable, but for a more comprehensive example one can refer to the operator-utils repository.
上述参考:
https://cloud.tencent.com/developer/article/1462350
7 总结
1) operator原理:
通过自定义扩展资源注册到controller-manager,通过list and watch的方式监听对应资源的变化,即解析crd实例的spec部分,然后在周期内做相应的协调处理,来将自定义资源实例状态修正到期望值。
2) operator处理的整体流程如下:
监听了owner为自定义crd类型的资源,将该crd实例入队,从队列中取出待处理
对象,进行协调处理来达到平衡状态,如果在协调处理中经过处理达到期望状态,
则从队列中移除当前处理的对象;否则,该对象会在下一次协调处理中
会被继续处理,直至达到期望状态。
3) operator处理架构如下:
下图是Kubernetes 提供的在使用client-go开发 controller 的过程中,client-go 和 controller 的交互流程:
交互流程图如下:
Reflector-------1) List & Watch -------------> Kubernetes API
|
|
2)Add Object
|
V
Delta Fifo queue
|
V
3) Pop Object
|
V
Informer---------4) Add Object------>Indexer----5) Store Object & Key-------> Thread safe store
| |
----------------------- |
| | |
6)Dispatch Event Handler |
functions(Send Object to Informer reference Indexer reference
Custom Controller)
| ^
V |
Res Event Handlers reference |
| |
Resource Event Handlers |
| 9) Get Object for key
7)Enqueue Object Key ^
| |
----> Workqueue---8) Get Key--->Process Item-->Handle Object
client-go组件
Reflector: 在cache包的Reflector类中,监听特定资源类型(Kind)的Kubernetes API,在ListAndWatch方法中执行。
可以监听k8s自定义资源类型。当reflector通过watch API发现新的资源实例被创建,将通过对应的list API获取到
新创建的对象并在watchHandler方法中将其加入到Delta Fifo队列中。
Informer: 在cache包中的base controller中,从Delta Fifo队列中pop出对象,在processLoop方法中执行。
base controller作用是将对象保存一遍后获取,并调用controller将对象传递给controller。
Indexer: 提供对象的indexing方法,定义在cache包的Indexer中。一个indexing场景是基于对象的label创建索引。
Indexer基于几个indexing方法维护索引,使用线程安全的data store来存储对象和他们的key。
cache包的Store类定义了一个名为MetaNamespaceKeyFunc的默认方法,可以为对象生成一个
<namespace>/<name>形式的key。
自定义controller组件
Informer reference:是对Informer实例的引用。知道如何使用自定义资源对象。
自定义controller需要创建正确的Informer。
Indexer reference:是对Indexer实例的引用,自定义controller中需要创建它,在获取对象供后续使用时会用到这个引用。
base controller提供了:
NewIndexerInformer来创建Informer和Indexer,可以使用此方法或者用工厂方法创建informer。
Resource Event Handlers: 回调方法,当Informer想要发送一个对象给controller时,会调用这些方法。
编写回调方法的模式: 获取资源对象的key并放入一个work queue队列,等待进一步的处理(Process item)
Work queue: 在controller代码中创建的方法,用来对work queue中的对象做对应处理。可以有一个或多个其他方法做实际处理。
这些方法一般会使用Indexer reference,或者list方法来获取key对应的对象。
4) 查询crd实例是通过namespace和name实现。
5) operator监控的资源包含一级资源crd定义本身,以及二级资源crd实例。
参考:
https://www.jianshu.com/p/2aaa0fe53db9
https://www.cnblogs.com/enduo/p/9078313.html
https://www.cnblogs.com/fengxm/p/9917686.html
http://dockone.io/article/8769
https://github.com/operator-framework/operator-sdk/blob/master/doc/user/install-operator-sdk.md
https://feisky.gitbooks.io/kubernetes/apps/operator.html
http://dockone.io/article/8769
https://github.com/operator-framework/operator-sdk
http://dockone.io/article/8733
https://www.jianshu.com/p/628aac3e6758
http://www.imooc.com/article/294186
https://www.cnblogs.com/liabio/p/11723809.html
https://blog.csdn.net/shida_csdn/article/details/87102801
https://cloud.tencent.com/developer/article/1462350
最后
以上就是含糊小蝴蝶为你收集整理的kubernetes 21、operator源码分析的全部内容,希望文章能够帮你解决kubernetes 21、operator源码分析所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复