我是靠谱客的博主 干净巨人,最近开发中收集的这篇文章主要介绍Kubernetes源码阅读笔记——Scheduler(之一),觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

Scheduler是集群中Master节点的重要组件,其功能是根据集群中各Pod的资源需求、亲和性等指标,将Pod合理调度到Kubernetes集群中的各个节点上。

一、入口函数

入口函数与Controller Manager的入口函数结构相同,同样是应用了cobra包,在命令行中注册了kube-scheduler命令。

cmd/kube-scheduler/scheduler.go

func main() { rand.Seed(time.Now().UnixNano()) command := app.NewSchedulerCommand() pflag.CommandLine.SetNormalizeFunc(utilflag.WordSepNormalizeFunc) logs.InitLogs() defer logs.FlushLogs() if err := command.Execute(); err != nil { fmt.Fprintf(os.Stderr, "%vn", err) os.Exit(1) } }

这里核心的方法仍然是NewSchedulerCommand。该方法位于app/server.go中,结构与Controller Manager几乎一样,因此不贴上来了。核心的部分仍然是在cobra.Command结构体的Run字段中调用runCommand方法。

runCommand方法为Scheduler配置Config,最终返回的是Run方法,将Scheduler运行起来。

runCommand方法中间有一行值得注意:

func runCommand(cmd *cobra.Command, args []string, opts *options.Options) error {
...
algorithmprovider.ApplyFeatureGates()
...
return Run(cc, stopCh)
}

这一行的作用是调用ApplyFeatureGates方法,并根据Feature Gate的配置,注册或者删除相应的预选策略。

进入ApplyFeatureGates方法,发现方法就一行,而整个包就这一个方法:

pkg/scheduler/algorithmprovider/plugin.go
package algorithmprovider
import
"k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults"
// ApplyFeatureGates applies algorithm by feature gates.
func ApplyFeatureGates() {
defaults.ApplyFeatureGates()
}

事实上,在pkg/scheduler/algorirhmprovider/defaults/defaults.go中,有一个init方法:

pkg/scheduler/algorirhmprovider/defaults/defaults.go

func init() { registerAlgorithmProvider(defaultPredicates(), defaultPriorities()) }

因此,在导入defaults包时,就已经执行了registerAlgorithmProvider方法,对一些预选与优选方法进行了注册。再配合ApplyFeatureGates方法,根据k8s中一些feature的开启情况,增加或删除一些预选和优选方法。这些feature的位置在pkg/features/kube_features.go中。

详细的预选和优选方法的定义位于pkg/scheduler/algorithm和pkg/scheduler/algorithmprovider中,这里不详细展开。

二、Run

看一下Run方法:

func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {
// Create the scheduler.
sched, err := scheduler.New(cc.Client,
cc.InformerFactory.Core().V1().Nodes(),
cc.PodInformer,
cc.InformerFactory.Core().V1().PersistentVolumes(),
cc.InformerFactory.Core().V1().PersistentVolumeClaims(),
cc.InformerFactory.Core().V1().ReplicationControllers(),
cc.InformerFactory.Apps().V1().ReplicaSets(),
cc.InformerFactory.Apps().V1().StatefulSets(),
cc.InformerFactory.Core().V1().Services(),
cc.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
cc.InformerFactory.Storage().V1().StorageClasses(),
cc.Recorder,
cc.ComponentConfig.AlgorithmSource,
stopCh,
scheduler.WithName(cc.ComponentConfig.SchedulerName),
scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight),
scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
scheduler.WithBindTimeoutSeconds(*cc.ComponentConfig.BindTimeoutSeconds))
if err != nil {
return err
}
// Prepare the event broadcaster.

...
// Setup healthz checks.

...// Start all informers.
go cc.PodInformer.Informer().Run(stopCh)
cc.InformerFactory.Start(stopCh)
// Wait for all caches to sync before scheduling.
cc.InformerFactory.WaitForCacheSync(stopCh)
controller.WaitForCacheSync("scheduler", stopCh, cc.PodInformer.Informer().HasSynced)
// Prepare a reusable runCommand function.
run := func(ctx context.Context) {
sched.Run()
<-ctx.Done()
}
ctx, cancel := context.WithCancel(context.TODO()) // TODO once Run() accepts a context, it should be used here

defer cancel()
go func() {
select {
case <-stopCh:
cancel()
case <-ctx.Done():
}
}()
// If leader election is enabled, runCommand via LeaderElector until done and exit.

...
// Leader election is disabled, so runCommand inline until done.

run(ctx)
return fmt.Errorf("finished without leader elect")
}

Run方法主要包含下面几件事:

(1)创建Scheduler。

Run方法的前几行代码调用了New方法,创建了一个Scheduler对象。这个New方法位于pkg/scheduler/scheduler.go中:

pkg/scheduler/scheduler.go
func New(client clientset.Interface,
nodeInformer coreinformers.NodeInformer,
podInformer coreinformers.PodInformer,
pvInformer coreinformers.PersistentVolumeInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
replicationControllerInformer coreinformers.ReplicationControllerInformer,
replicaSetInformer appsinformers.ReplicaSetInformer,
statefulSetInformer appsinformers.StatefulSetInformer,
serviceInformer coreinformers.ServiceInformer,
pdbInformer policyinformers.PodDisruptionBudgetInformer,
storageClassInformer storageinformers.StorageClassInformer,
recorder record.EventRecorder,
schedulerAlgorithmSource kubeschedulerconfig.SchedulerAlgorithmSource,
stopCh <-chan struct{},
opts ...func(o *schedulerOptions)) (*Scheduler, error) {
options := defaultSchedulerOptions
for _, opt := range opts {
opt(&options)
}
// Set up the configurator which can create schedulers from configs.
configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
SchedulerName:
options.schedulerName,
Client:
client,
NodeInformer:
nodeInformer,
PodInformer:
podInformer,
PvInformer:
pvInformer,
PvcInformer:
pvcInformer,
ReplicationControllerInformer:
replicationControllerInformer,
ReplicaSetInformer:
replicaSetInformer,
StatefulSetInformer:
statefulSetInformer,
ServiceInformer:
serviceInformer,
PdbInformer:
pdbInformer,
StorageClassInformer:
storageClassInformer,
HardPodAffinitySymmetricWeight: options.hardPodAffinitySymmetricWeight,
DisablePreemption:
options.disablePreemption,
PercentageOfNodesToScore:
options.percentageOfNodesToScore,
BindTimeoutSeconds:
options.bindTimeoutSeconds,
})
var config *factory.Config
source := schedulerAlgorithmSource
switch {
case source.Provider != nil:
// Create the config from a named algorithm provider.

...
case source.Policy != nil:
// Create the config from a user specified policy source.

...
default:
return nil, fmt.Errorf("unsupported algorithm source: %v", source)
}
// Additional tweaks to the config produced by the configurator.
config.Recorder = recorder
config.DisablePreemption = options.disablePreemption
config.StopEverything = stopCh
// Create the scheduler.
sched := NewFromConfig(config)
return sched, nil
}

New方法逻辑相对清晰,其本质就是根据传入的Informer、算法等参数,实例化一个Config,然后调用NewFromConfig方法,通过这个Config创建一个scheduler实例并返回。可以看到,scheduler中也用到了包括nodeInformer、podInformer等在内的大量Informer,因为scheduler也需要及时掌握资源的变化,从而调整调度的策略。

中间switch一段代码会判断config的调度算法源是用户自定义的还是给定的provider。如果使用默认的provider,则会将前面注册过的预选、优选方法加载进来。

创建config的NewConfigFactory方法位于pkg/scheduler/factory/factory.go中,进入方法:

pkg/scheduler/factory/factory.go
// NewConfigFactory initializes the default implementation of a Configurator. To encourage eventual privatization of the struct type, we only
// return the interface.
func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
stopEverything := args.StopCh
if stopEverything == nil {
stopEverything = wait.NeverStop
}
schedulerCache := schedulerinternalcache.New(30*time.Second, stopEverything)
// storageClassInformer is only enabled through VolumeScheduling feature gate

var storageClassLister storagelisters.StorageClassLister
if args.StorageClassInformer != nil {
storageClassLister = args.StorageClassInformer.Lister()
}
c := &configFactory{
client:
args.Client,
podLister:
schedulerCache,
podQueue:
internalqueue.NewSchedulingQueue(stopEverything),
nodeLister:
args.NodeInformer.Lister(),
pVLister:
args.PvInformer.Lister(),
pVCLister:
args.PvcInformer.Lister(),
serviceLister:
args.ServiceInformer.Lister(),
controllerLister:
args.ReplicationControllerInformer.Lister(),
replicaSetLister:
args.ReplicaSetInformer.Lister(),
statefulSetLister:
args.StatefulSetInformer.Lister(),
pdbLister:
args.PdbInformer.Lister(),
storageClassLister:
storageClassLister,
schedulerCache:
schedulerCache,
StopEverything:
stopEverything,
schedulerName:
args.SchedulerName,
hardPodAffinitySymmetricWeight: args.HardPodAffinitySymmetricWeight,
disablePreemption:
args.DisablePreemption,
percentageOfNodesToScore:
args.PercentageOfNodesToScore,
}
c.scheduledPodsHasSynced = args.PodInformer.Informer().HasSynced
// scheduled pod cache

args.PodInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1.Pod:
return assignedPod(t)
case cache.DeletedFinalStateUnknown:
if pod, ok := t.Obj.(*v1.Pod); ok {
return assignedPod(pod)
}
runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c))
return false
default:
runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj))
return false
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc:
c.addPodToCache,
UpdateFunc: c.updatePodInCache,
DeleteFunc: c.deletePodFromCache,
},
},
)
// unscheduled pod queue

args.PodInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1.Pod:
return !assignedPod(t) && responsibleForPod(t, args.SchedulerName)
case cache.DeletedFinalStateUnknown:
if pod, ok := t.Obj.(*v1.Pod); ok {
return !assignedPod(pod) && responsibleForPod(pod, args.SchedulerName)
}
runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c))
return false
default:
runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj))
return false
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc:
c.addPodToSchedulingQueue,
UpdateFunc: c.updatePodInSchedulingQueue,
DeleteFunc: c.deletePodFromSchedulingQueue,
},
},
)
// ScheduledPodLister is something we provide to plug-in functions that
// they may need to call.
c.scheduledPodLister = assignedPodLister{args.PodInformer.Lister()}
args.NodeInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc:
c.addNodeToCache,
UpdateFunc: c.updateNodeInCache,
DeleteFunc: c.deleteNodeFromCache,
},
)
args.PvInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
// MaxPDVolumeCountPredicate: since it relies on the counts of PV.

AddFunc:
c.onPvAdd,
UpdateFunc: c.onPvUpdate,
},
)
// This is for MaxPDVolumeCountPredicate: add/delete PVC will affect counts of PV when it is bound.

args.PvcInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc:
c.onPvcAdd,
UpdateFunc: c.onPvcUpdate,
},
)
// This is for ServiceAffinity: affected by the selector of the service is updated.

args.ServiceInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc:
c.onServiceAdd,
UpdateFunc: c.onServiceUpdate,
DeleteFunc: c.onServiceDelete,
},
)
// Setup volume binder
c.volumeBinder = volumebinder.NewVolumeBinder(args.Client, args.NodeInformer, args.PvcInformer, args.PvInformer, args.StorageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second)
args.StorageClassInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.onStorageClassAdd,
},
)
// Setup cache debugger

...
go func() {
<-c.StopEverything
c.podQueue.Close()
}()
return c
}

该方法为一系列Informer初始化了回调函数。其中最重要的是PodInformer的两个回调函数。

可以看到,方法调用了两次AddEventHandler方法,都经过了过滤。第一次只处理已调度的Pod,第二次只处理未调度的Pod,并定义了对两种Pod的增、改、删方法,分别在缓存和队列中对这两种Pod进行更新。这样,就将已调度和未调度的Pod区分开。

后面为其他informer添加的回调函数,除了NodeInformer的回调函数会在缓存中更新node信息,其他回调函数最终都会调用MoveAllToActiveQueue方法,将待调度的Pod添加进队列。

此外,可以看到,在ConfigFactory中,有一个podQueue字段,维护了一个队列,用于存放待调度的Pod。

(2)运行广播和健康检查。

中间有几行是为Scheduler配置广播和健康检查相关内容,与Controller Manager类似,不提。

(3)Informer启动。

值得注意的是,Scheduler将PodInformer从其他的Informer中独立出来,因为对Pod的调度才是Scheduler的核心。

(4)运行Scheduler。

这是整个方法的核心。通过调用Scheduler的Run方法,将Scheduler运行起来。

进入Run方法,我们发现方法非常简洁,就做了2件事:

pkg/scheduler/scheduler.go
func (sched *Scheduler) Run() {
if !sched.config.WaitForCacheSync() {
return
}
go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}

第一件事是等待各informer的缓存同步,第二件事是调用scheduleOne方法,执行Pod的调度操作。wait.Until的作用是每隔一段时间执行一次sched.scheduleOne方法,除非sched.config.StopEverything被关闭。这里时间段被设置为0,所以scheduleOne方法会一个接一个不停地被调用。

scheduleOne方法的具体逻辑我们下一篇文章再继续分析。https://www.cnblogs.com/00986014w/p/10320844.html

三、总结

总结Scheduler的逻辑,大体上是通过cobra注册一个kube-scheduler命令并运行。命令运行时,首先应用给定的调度算法,然后基于ConfigFactory,创建一个Scheduler的实例,启动相关的Informer,然后开始执行调度。

转载于:https://www.cnblogs.com/00986014w/p/10305425.html

最后

以上就是干净巨人为你收集整理的Kubernetes源码阅读笔记——Scheduler(之一)的全部内容,希望文章能够帮你解决Kubernetes源码阅读笔记——Scheduler(之一)所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(48)

评论列表共有 0 条评论

立即
投稿
返回
顶部