1. 前言
转载请说明原文出处, 尊重他人劳动成果!
本文将分析
cmd/kube-scheduler和pkg/scheduler/scheduler.go和pkg/scheduler/factory/factory.go等目录或文件. 其中比较重要的两个类configFactory(factory.go)和Scheduler(scheduler.go).
源码位置: https://github.com/nicktming/kubernetes
分支: tming-v1.13 (基于v1.13版本)
2. 流程图
run_1.png
3. 代码流程
接下来就从代码的角度看看
kube-scheduler是如何启动的. 为了节约篇幅, 有些无关或者不影响理解的代码我将不放到代码中.
3.1 cmd/kube-scheduler
// cmd/kube-scheduler/scheduler.go
func main() {
...
command := app.NewSchedulerCommand()
...
if err := command.Execute(); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}
通过
NewSchedulerCommand()方法进入到了cmd/kube-scheduler/app/server.go.
// cmd/kube-scheduler/app/server.go
// NewSchedulerCommand creates a *cobra.Command object with default parameters
func NewSchedulerCommand() *cobra.Command {
opts, err := options.NewOptions()
if err != nil {
klog.Fatalf("unable to initialize command options: %v", err)
}
cmd := &cobra.Command{
Use: "kube-scheduler",
...
Run: func(cmd *cobra.Command, args []string) {
if err := runCommand(cmd, args, opts); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
},
}
...
return cmd
}
关于
cobra可以自己去官网看, 就是一个命令行的工具, 这里不多加介绍了.
主要需要关注一下opts, err := options.NewOptions(), 因为这里会生成一些默认的属性, 比较重要的两个地方就是:
DefaultProvider, 就是默认调度器的名字.
LeaderElection的属性会设置为true, 就是kube-scheduler要启动高可用, 这里会有一篇单独的博客来进行介绍.
另外如果
kube-scheduler命令设置了--config文件来设置自定义调度器, 会从cmd/kube-scheduler/app/options/options.go中的Flags进行解析.
// cmd/kube-scheduler/app/options/options.go
// Flags returns flags for a specific scheduler by section name
func (o *Options) Flags() (nfs apiserverflag.NamedFlagSets) {
fs := nfs.FlagSet("misc")
fs.StringVar(&o.ConfigFile, "config", o.ConfigFile, "The path to the configuration file. Flags override values in this file.")
fs.StringVar(&o.WriteConfigTo, "write-config-to", o.WriteConfigTo, "If set, write the configuration values to this file and exit.")
fs.StringVar(&o.Master, "master", o.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
o.SecureServing.AddFlags(nfs.FlagSet("secure serving"))
o.CombinedInsecureServing.AddFlags(nfs.FlagSet("insecure serving"))
o.Authentication.AddFlags(nfs.FlagSet("authentication"))
o.Authorization.AddFlags(nfs.FlagSet("authorization"))
o.Deprecated.AddFlags(nfs.FlagSet("deprecated"), &o.ComponentConfig)
leaderelectionconfig.BindFlags(&o.ComponentConfig.LeaderElection.LeaderElectionConfiguration, nfs.FlagSet("leader election"))
utilfeature.DefaultFeatureGate.AddFlag(nfs.FlagSet("feature gate"))
return nfs
}
现在回到上面的
NewSchedulerCommand方法中, 已经完成了opts, 所以就调用了runCommand方法.
// cmd/kube-scheduler/app/server.go
// runCommand runs the scheduler.
func runCommand(cmd *cobra.Command, args []string, opts *options.Options) error {
...
// 对opts的属性进行验证
if errs := opts.Validate(); len(errs) > 0 {
fmt.Fprintf(os.Stderr, "%v\n", utilerrors.NewAggregate(errs))
os.Exit(1)
}
// 如果需要 就把opts的ComponentConfig文件保存起来
if len(opts.WriteConfigTo) > 0 {
if err := options.WriteConfigFile(opts.WriteConfigTo, &opts.ComponentConfig); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
klog.Infof("Wrote configuration to: %s\n", opts.WriteConfigTo)
}
// 根据opts生成一个scheduler config 对象
c, err := opts.Config()
if err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
stopCh := make(chan struct{})
// Get the completed config
// 根据scheduler config 生成一个completed config
cc := c.Complete()
// 看看打开哪些feature
algorithmprovider.ApplyFeatureGates()
// 向componentconfig中注册配置文件
if cz, err := configz.New("componentconfig"); err == nil {
cz.Set(cc.ComponentConfig)
} else {
return fmt.Errorf("unable to register configz: %s", err)
}
// 上面的一系列操作 就是为了获得一个completed config
return Run(cc, stopCh)
}
这里的一系列操作 就是为了获得一个
completed config, 然后给Run调用. 这里需要关注一个地方就是opts.Config().
// cmd/kube-scheduler/app/options/options.go
func (o *Options) ApplyTo(c *schedulerappconfig.Config) error {
// 如果kube-scheduler 没有指定--config 就是从默认配置(o.ComponentConfig)拿
if len(o.ConfigFile) == 0 {
c.ComponentConfig = o.ComponentConfig
// only apply deprecated flags if no config file is loaded (this is the old behaviour).
if err := o.Deprecated.ApplyTo(&c.ComponentConfig); err != nil {
return err
}
if err := o.CombinedInsecureServing.ApplyTo(c, &c.ComponentConfig); err != nil {
return err
}
} else {
// 如果kube-scheduler 指定了--config 那就会从配置文件中取
cfg, err := loadConfigFromFile(o.ConfigFile)
if err != nil {
return err
}
// use the loaded config file only, with the exception of --address and --port. This means that
// none of the deprectated flags in o.Deprecated are taken into consideration. This is the old
// behaviour of the flags we have to keep.
c.ComponentConfig = *cfg
if err := o.CombinedInsecureServing.ApplyToFromLoadedConfig(c, &c.ComponentConfig); err != nil {
return err
}
}
...
return nil
}
func (o *Options) Config() (*schedulerappconfig.Config, error) {
if o.SecureServing != nil {
if err := o.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost", nil, []net.IP{net.ParseIP("127.0.0.1")}); err != nil {
return nil, fmt.Errorf("error creating self-signed certificates: %v", err)
}
}
c := &schedulerappconfig.Config{}
if err := o.ApplyTo(c); err != nil {
return nil, err
}
// Prepare kube clients.
// 生成client 可以调用api-server
client, leaderElectionClient, eventClient, err := createClients(c.ComponentConfig.ClientConnection, o.Master, c.ComponentConfig.LeaderElection.RenewDeadline.Duration)
if err != nil {
return nil, err
}
...
// Set up leader election if enabled.
var leaderElectionConfig *leaderelection.LeaderElectionConfig
// 默认值就是true 只要用户不设置为false 这一步就会执行
// 也就是说kube-scheduler 默认就是支持高可用
if c.ComponentConfig.LeaderElection.LeaderElect {
leaderElectionConfig, err = makeLeaderElectionConfig(c.ComponentConfig.LeaderElection, leaderElectionClient, recorder)
if err != nil {
return nil, err
}
}
c.Client = client
c.InformerFactory = informers.NewSharedInformerFactory(client, 0)
c.PodInformer = factory.NewPodInformer(client, 0)
c.EventClient = eventClient
c.Recorder = recorder
c.Broadcaster = eventBroadcaster
c.LeaderElection = leaderElectionConfig
return c, nil
}
ApplyTo: 主要是操作是否有配置文件, 如果有配置文件就会从配置文件中读取.
Config: 主要为了生成与api-server通信的client以及leaderElectionConfig用于支持kube-scheduler高可用.
接下来回到
cmd/kube-scheduler/app/server.go中的runCommand, 然后进行Run(cc, stopCh)方法. 因为该Run是真正的核心方法, 所以这里我们主要分块分析, 先看看是如何生成pkg/scheduler/scheduler.go中的Scheduler对象.
func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {
...
// Create the scheduler.
// 生成pkg/scheduler/scheduler.go 的Scheduler对象
sched, err := scheduler.New(cc.Client,
cc.InformerFactory.Core().V1().Nodes(),
cc.PodInformer,
cc.InformerFactory.Core().V1().PersistentVolumes(),
cc.InformerFactory.Core().V1().PersistentVolumeClaims(),
cc.InformerFactory.Core().V1().ReplicationControllers(),
cc.InformerFactory.Apps().V1().ReplicaSets(),
cc.InformerFactory.Apps().V1().StatefulSets(),
cc.InformerFactory.Core().V1().Services(),
cc.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
storageClassInformer,
cc.Recorder,
cc.ComponentConfig.AlgorithmSource,
stopCh,
scheduler.WithName(cc.ComponentConfig.SchedulerName),
scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight),
scheduler.WithEquivalenceClassCacheEnabled(cc.ComponentConfig.EnableContentionProfiling),
scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
scheduler.WithBindTimeoutSeconds(*cc.ComponentConfig.BindTimeoutSeconds))
...
}
可以看到之前的一系列操作都是为了生成
Scheduler所需要的配置. 包括了cc.Client,cc.ComponentConfig.AlgorithmSource等等.
3.2 pkg/scheduler/scheduler.go
可以看到整个
Scheduler结构体就一个属性, 就是pkg/scheduler/factory/factory.go中的Config结构体.
type Scheduler struct {
config *factory.Config
}
来看看
New方法
func New(client clientset.Interface,
nodeInformer coreinformers.NodeInformer,
podInformer coreinformers.PodInformer,
pvInformer coreinformers.PersistentVolumeInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
replicationControllerInformer coreinformers.ReplicationControllerInformer,
replicaSetInformer appsinformers.ReplicaSetInformer,
statefulSetInformer appsinformers.StatefulSetInformer,
serviceInformer coreinformers.ServiceInformer,
pdbInformer policyinformers.PodDisruptionBudgetInformer,
storageClassInformer storageinformers.StorageClassInformer,
recorder record.EventRecorder,
schedulerAlgorithmSource kubeschedulerconfig.SchedulerAlgorithmSource,
stopCh <-chan struct{},
opts ...func(o *schedulerOptions)) (*Scheduler, error) {
/**
scheduler.WithName(cc.ComponentConfig.SchedulerName),
scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight),
scheduler.WithEquivalenceClassCacheEnabled(cc.ComponentConfig.EnableContentionProfiling),
scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
scheduler.WithBindTimeoutSeconds(*cc.ComponentConfig.BindTimeoutSeconds))
*/
options := defaultSchedulerOptions
for _, opt := range opts {
opt(&options)
}
// Set up the configurator which can create schedulers from configs.
// 生成factory的config-factory
configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
SchedulerName: options.schedulerName,
Client: client,
NodeInformer: nodeInformer,
PodInformer: podInformer,
PvInformer: pvInformer,
PvcInformer: pvcInformer,
ReplicationControllerInformer: replicationControllerInformer,
ReplicaSetInformer: replicaSetInformer,
StatefulSetInformer: statefulSetInformer,
ServiceInformer: serviceInformer,
PdbInformer: pdbInformer,
StorageClassInformer: storageClassInformer,
HardPodAffinitySymmetricWeight: options.hardPodAffinitySymmetricWeight,
EnableEquivalenceClassCache: options.enableEquivalenceClassCache,
DisablePreemption: options.disablePreemption,
PercentageOfNodesToScore: options.percentageOfNodesToScore,
BindTimeoutSeconds: options.bindTimeoutSeconds,
})
var config *factory.Config
source := schedulerAlgorithmSource
switch {
case source.Provider != nil:
// 默认调度器会进入到这里 *source.Provider = DefaultProvider
// Create the config from a named algorithm provider.
sc, err := configurator.CreateFromProvider(*source.Provider)
if err != nil {
return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err)
}
config = sc
case source.Policy != nil:
// 自定义调度器会进入到这里
// Create the config from a user specified policy source.
policy := &schedulerapi.Policy{}
switch {
case source.Policy.File != nil:
if err := initPolicyFromFile(source.Policy.File.Path, policy); err != nil {
return nil, err
}
case source.Policy.ConfigMap != nil:
if err := initPolicyFromConfigMap(client, source.Policy.ConfigMap, policy); err != nil {
return nil, err
}
}
sc, err := configurator.CreateFromConfig(*policy)
if err != nil {
return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err)
}
config = sc
default:
return nil, fmt.Errorf("unsupported algorithm source: %v", source)
}
// Additional tweaks to the config produced by the configurator.
config.Recorder = recorder
config.DisablePreemption = options.disablePreemption
config.StopEverything = stopCh
// Create the scheduler.
sched := NewFromConfig(config)
return sched, nil
}
1. 根据传起来的
opts方法生成options, 因为默认的属性是下面的几个, 如果需要改变, 就是通过opts方法中来改变.
var defaultSchedulerOptions = schedulerOptions{
schedulerName: v1.DefaultSchedulerName,
hardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
enableEquivalenceClassCache: false,
disablePreemption: false,
percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
bindTimeoutSeconds: BindTimeoutSeconds,
}
2. 根据参数生成
factory的configFactory对象名字为configurator, 这个后面部分会具体研究.
3. 根据source的不同来选择如何生成Scheduler对象的Config, 这里就讨论默认调度器的, 自定义调度器会有一篇专门博客介绍. 所以就是会进入sc, err := configurator.CreateFromProvider(*source.Provider)中生成所需的Config.
// pkg/scheduler/factory/factory.go
// Creates a scheduler from the name of a registered algorithm provider.
func (c *configFactory) CreateFromProvider(providerName string) (*Config, error) {
klog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName)
provider, err := GetAlgorithmProvider(providerName)
if err != nil {
return nil, err
}
return c.CreateFromKeys(provider.FitPredicateKeys, provider.PriorityFunctionKeys, []algorithm.SchedulerExtender{})
}
这里在[k8s源码分析][kube-scheduler]scheduler/algorithmprovider之注册default-scheduler已经介绍了
GetAlgorithmProvider(providerName)其中providerName="DefaultProvider"获得了默认调度器的所有预选和优选方法的key.
然后根据
configFactory的CreateFromKeys根据所有的预选方法和优选方法以及扩展方法(这里是空的)生成了Scheduler所需要的Config. 这里CreateFromKeys放到后面的configFactory一起说明.
4. 根据Config生成Scheduler对象sched.
// pkg/scheduler/scheduler.go
// NewFromConfig returns a new scheduler using the provided Config.
func NewFromConfig(config *factory.Config) *Scheduler {
metrics.Register()
return &Scheduler{
config: config,
}
}
3.3 pkg/scheduler/factory/factory.go
这里将分析
3.2 pkg/scheduler/scheduler.go中提到的configurator := factory.NewConfigFactory部分.
3.3.1 configFactory
下面是关于
configFactory结构体的定义
type configFactory struct {
// 与api-server通信的客户端
client clientset.Interface
// queue for pods that need scheduling
// 存着那些需要调度的pod
podQueue internalqueue.SchedulingQueue
// a means to list all known scheduled pods.
// 可以获得所有已经调度的pod
scheduledPodLister corelisters.PodLister
// a means to list all known scheduled pods and pods assumed to have been scheduled.
// 可以获得所有已经调度的pod和那些assumed pod
podLister algorithm.PodLister
// a means to list all nodes
nodeLister corelisters.NodeLister
// a means to list all PersistentVolumes
pVLister corelisters.PersistentVolumeLister
// a means to list all PersistentVolumeClaims
pVCLister corelisters.PersistentVolumeClaimLister
// a means to list all services
serviceLister corelisters.ServiceLister
// a means to list all controllers
controllerLister corelisters.ReplicationControllerLister
// a means to list all replicasets
replicaSetLister appslisters.ReplicaSetLister
// a means to list all statefulsets
statefulSetLister appslisters.StatefulSetLister
// a means to list all PodDisruptionBudgets
pdbLister policylisters.PodDisruptionBudgetLister
// a means to list all StorageClasses
storageClassLister storagelisters.StorageClassLister
// Close this to stop all reflectors
StopEverything <-chan struct{}
scheduledPodsHasSynced cache.InformerSynced
schedulerCache schedulerinternalcache.Cache
// SchedulerName of a scheduler is used to select which pods will be
// processed by this scheduler, based on pods's "spec.schedulerName".
// 调度器的名字 默认为default-scheduler
schedulerName string
// RequiredDuringScheduling affinity is not symmetric, but there is an implicit PreferredDuringScheduling affinity rule
// corresponding to every RequiredDuringScheduling affinity rule.
// HardPodAffinitySymmetricWeight represents the weight of implicit PreferredDuringScheduling affinity rule, in the range 0-100.
hardPodAffinitySymmetricWeight int32
// Equivalence class cache
// 加速predicate阶段的equivalence class cache
equivalencePodCache *equivalence.Cache
// Enable equivalence class cache
enableEquivalenceClassCache bool
// Handles volume binding decisions
volumeBinder *volumebinder.VolumeBinder
// Always check all predicates even if the middle of one predicate fails.
alwaysCheckAllPredicates bool
// Disable pod preemption or not.
// 是否禁止抢占
disablePreemption bool
// percentageOfNodesToScore specifies percentage of all nodes to score in each scheduling cycle.
percentageOfNodesToScore int32
}
NewFactory方法
func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
stopEverything := args.StopCh
if stopEverything == nil {
stopEverything = wait.NeverStop
}
schedulerCache := schedulerinternalcache.New(30*time.Second, stopEverything)
// storageClassInformer is only enabled through VolumeScheduling feature gate
var storageClassLister storagelisters.StorageClassLister
if args.StorageClassInformer != nil {
storageClassLister = args.StorageClassInformer.Lister()
}
c := &configFactory{
client: args.Client,
podLister: schedulerCache,
podQueue: internalqueue.NewSchedulingQueue(stopEverything),
nodeLister: args.NodeInformer.Lister(),
pVLister: args.PvInformer.Lister(),
pVCLister: args.PvcInformer.Lister(),
serviceLister: args.ServiceInformer.Lister(),
controllerLister: args.ReplicationControllerInformer.Lister(),
replicaSetLister: args.ReplicaSetInformer.Lister(),
statefulSetLister: args.StatefulSetInformer.Lister(),
pdbLister: args.PdbInformer.Lister(),
storageClassLister: storageClassLister,
schedulerCache: schedulerCache,
StopEverything: stopEverything,
schedulerName: args.SchedulerName,
hardPodAffinitySymmetricWeight: args.HardPodAffinitySymmetricWeight,
enableEquivalenceClassCache: args.EnableEquivalenceClassCache,
disablePreemption: args.DisablePreemption,
percentageOfNodesToScore: args.PercentageOfNodesToScore,
}
c.scheduledPodsHasSynced = args.PodInformer.Informer().HasSynced
// scheduled pod cache
args.PodInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1.Pod:
return assignedPod(t)
case cache.DeletedFinalStateUnknown:
if pod, ok := t.Obj.(*v1.Pod); ok {
return assignedPod(pod)
}
runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c))
return false
default:
runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj))
return false
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: c.addPodToCache,
UpdateFunc: c.updatePodInCache,
DeleteFunc: c.deletePodFromCache,
},
},
)
...
c.scheduledPodLister = assignedPodLister{args.PodInformer.Lister()}
...
ch := make(chan os.Signal, 1)
signal.Notify(ch, compareSignal)
go func() {
for {
select {
case <-c.StopEverything:
c.podQueue.Close()
return
case <-ch:
debugger.Comparer.Compare()
debugger.Dumper.DumpAll()
}
}
}()
return c
}
这里主要需要注意几点就是:
1.schedulerCache := schedulerinternalcache.New(30*time.Second, stopEverything)实例化了一个schedulerCache, 它的具体实现和结构在[k8s源码分析][kube-scheduler]scheduler/internal/cache之node_tree和cache已经分析过了, 这里主要看一下它在哪里会用到.
2. 可以看到configFactory的podLister和schedulerCache用的是同一个schedulerCache对象. 因为podLister的定义就是可以获得所有已经调度的pod和那些assumed pod, 所以用schedulerCache很好理解.
3.configFactory的scheduledPodLister定义是可以获得所有已经调度的pod,args.PodInformer.Lister()可以得到所有的pod, 关于informer在client-go系列会有专门博客分析, 这里不细说, 很明显assignedPodLister就是在args.PodInformer.Lister()外面加了一层过滤那些已经被调度的pods.
c.scheduledPodLister = assignedPodLister{args.PodInformer.Lister()}
type assignedPodLister struct {
corelisters.PodLister
}
// List lists all Pods in the indexer for a given namespace.
func (l assignedPodLister) List(selector labels.Selector) ([]*v1.Pod, error) {
list, err := l.PodLister.List(selector)
if err != nil {
return nil, err
}
filtered := make([]*v1.Pod, 0, len(list))
for _, pod := range list {
// 选择那些已经被调度过的
if len(pod.Spec.NodeName) > 0 {
filtered = append(filtered, pod)
}
}
return filtered, nil
}
// List lists all Pods in the indexer for a given namespace.
func (l assignedPodLister) Pods(namespace string) corelisters.PodNamespaceLister {
return assignedPodNamespaceLister{l.PodLister.Pods(namespace)}
}
4.
podQueue的定义是存着那些需要调度的pod, 因此用的internalqueue.NewSchedulingQueue(stopEverything), 关于scheduling_queue在[k8s源码分析][kube-scheduler]scheduler/internal/queue之优先队列scheduling_queue(1) 和 [k8s源码分析][kube-scheduler]scheduler/internal/queue之优先队列scheduling_queue(2) 中有详细分析过.
5. 就是关于各种informers添加各种处理逻辑EventHandler, 包括podInformer,serviceInformer,NodeInformer,PvInformer,PvcInformer,StorageClassInformer等等, 该部分会在下一个主题分析.
3.2 Config
type Config struct {
// It is expected that changes made via SchedulerCache will be observed
// by NodeLister and Algorithm.
// 一个schedulerCache 就是configFactory的schedulerCache
SchedulerCache schedulerinternalcache.Cache
// Ecache is used for optimistically invalid affected cache items after
// successfully binding a pod
// 就是configFactory的equivalencePodCache
Ecache *equivalence.Cache
// 获得所有Node的Lister
NodeLister algorithm.NodeLister
// 用于调度的算法
Algorithm algorithm.ScheduleAlgorithm
// Bind方法
GetBinder func(pod *v1.Pod) Binder
// PodConditionUpdater is used only in case of scheduling errors. If we succeed
// with scheduling, PodScheduled condition will be updated in apiserver in /bind
// handler so that binding and setting PodCondition it is atomic.
PodConditionUpdater PodConditionUpdater
// PodPreemptor is used to evict pods and update pod annotations.
// 抢占器
PodPreemptor PodPreemptor
// NextPod should be a function that blocks until the next pod
// is available. We don't use a channel for this, because scheduling
// a pod may take some amount of time and we don't want pods to get
// stale while they sit in a channel.
// 取下一个需要调度的pod
// 如果没有了, 则block一直等到有
NextPod func() *v1.Pod
// WaitForCacheSync waits for scheduler cache to populate.
// It returns true if it was successful, false if the controller should shutdown.
WaitForCacheSync func() bool
// Error is called if there is an error. It is passed the pod in
// question, and the error
Error func(*v1.Pod, error)
// Recorder is the EventRecorder to use
Recorder record.EventRecorder
// Close this to shut down the scheduler.
StopEverything <-chan struct{}
VolumeBinder *volumebinder.VolumeBinder
DisablePreemption bool
// cache需要被调度的pod
SchedulingQueue internalqueue.SchedulingQueue
}
这里需要注意的是:
NextPod: 是一个方法, 所有的需要调度的pod都会存到这里, 然后一个一个出来进行调度.
接下来看看上面提到
configFactory的CreateFromKeys, 该方法根据当前的configFactory根据提供的预选方法, 优选方法和扩展方法从而生成一个factory.go中的Config对象.
func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error) {
klog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v'", predicateKeys, priorityKeys)
if c.GetHardPodAffinitySymmetricWeight() < 1 || c.GetHardPodAffinitySymmetricWeight() > 100 {
return nil, fmt.Errorf("invalid hardPodAffinitySymmetricWeight: %d, must be in the range 1-100", c.GetHardPodAffinitySymmetricWeight())
}
// 根据当前的预选key得到所有的预选方法
predicateFuncs, err := c.GetPredicates(predicateKeys)
if err != nil {
return nil, err
}
// 根据当前的优选key得到所有的优选方法
priorityConfigs, err := c.GetPriorityFunctionConfigs(priorityKeys)
if err != nil {
return nil, err
}
// priorityMetaProducer 在算分的时候会用到
priorityMetaProducer, err := c.GetPriorityMetadataProducer()
if err != nil {
return nil, err
}
// predicateMetaProducer 在真正预选的时候会用到
predicateMetaProducer, err := c.GetPredicateMetadataProducer()
if err != nil {
return nil, err
}
// 是否打开了加速predicate的equivalence class cache
// Init equivalence class cache
if c.enableEquivalenceClassCache {
c.equivalencePodCache = equivalence.NewCache(predicates.Ordering())
klog.Info("Created equivalence class cache")
}
// 生成真正进行调度计算的Algorithm algorithm.ScheduleAlgorithm
algo := core.NewGenericScheduler(
c.schedulerCache,
c.equivalencePodCache,
c.podQueue,
predicateFuncs,
predicateMetaProducer,
priorityConfigs,
priorityMetaProducer,
extenders,
c.volumeBinder,
c.pVCLister,
c.pdbLister,
c.alwaysCheckAllPredicates,
c.disablePreemption,
c.percentageOfNodesToScore,
)
podBackoff := util.CreateDefaultPodBackoff()
return &Config{
SchedulerCache: c.schedulerCache,
Ecache: c.equivalencePodCache,
// The scheduler only needs to consider schedulable nodes.
NodeLister: &nodeLister{c.nodeLister},
Algorithm: algo,
GetBinder: c.getBinderFunc(extenders),
PodConditionUpdater: &podConditionUpdater{c.client},
PodPreemptor: &podPreemptor{c.client},
WaitForCacheSync: func() bool {
return cache.WaitForCacheSync(c.StopEverything, c.scheduledPodsHasSynced)
},
NextPod: func() *v1.Pod {
return c.getNextPod()
},
Error: c.MakeDefaultErrorFunc(podBackoff, c.podQueue),
StopEverything: c.StopEverything,
VolumeBinder: c.volumeBinder,
SchedulingQueue: c.podQueue,
}, nil
}
这里需要注意的是:
1. 根据预选, 优选key得到其对应的预选和优选方法. 并得到注册的priorityMetaProducer和predicateMetaProducer.
2. 生成真正进行调度计算的algorithm.ScheduleAlgorithm接口类, 返回一个它的实现类genericScheduler(pkg/scheduler/core/generic_scheduler.go)对象.
// pkg/scheduler/algorithm/scheduler_interface.go
type ScheduleAlgorithm interface {
Schedule(*v1.Pod, NodeLister) (selectedMachine string, err error)
// Preempt receives scheduling errors for a pod and tries to create room for
// the pod by preempting lower priority pods if possible.
// It returns the node where preemption happened, a list of preempted pods, a
// list of pods whose nominated node name should be removed, and error if any.
Preempt(*v1.Pod, NodeLister, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
// Predicates() returns a pointer to a map of predicate functions. This is
// exposed for testing.
Predicates() map[string]FitPredicate
// Prioritizers returns a slice of priority config. This is exposed for
// testing.
Prioritizers() []PriorityConfig
}
// pkg/scheduler/core/generic_scheduler.go
func NewGenericScheduler(
cache schedulerinternalcache.Cache,
eCache *equivalence.Cache,
podQueue internalqueue.SchedulingQueue,
predicates map[string]algorithm.FitPredicate,
predicateMetaProducer algorithm.PredicateMetadataProducer,
prioritizers []algorithm.PriorityConfig,
priorityMetaProducer algorithm.PriorityMetadataProducer,
extenders []algorithm.SchedulerExtender,
volumeBinder *volumebinder.VolumeBinder,
pvcLister corelisters.PersistentVolumeClaimLister,
pdbLister algorithm.PDBLister,
alwaysCheckAllPredicates bool,
disablePreemption bool,
percentageOfNodesToScore int32,
) algorithm.ScheduleAlgorithm {
return &genericScheduler{
cache: cache,
equivalenceCache: eCache,
schedulingQueue: podQueue,
predicates: predicates,
predicateMetaProducer: predicateMetaProducer,
prioritizers: prioritizers,
priorityMetaProducer: priorityMetaProducer,
extenders: extenders,
cachedNodeInfoMap: make(map[string]*schedulercache.NodeInfo),
volumeBinder: volumeBinder,
pvcLister: pvcLister,
pdbLister: pdbLister,
alwaysCheckAllPredicates: alwaysCheckAllPredicates,
disablePreemption: disablePreemption,
percentageOfNodesToScore: percentageOfNodesToScore,
}
}
3. 生成
GetBinder,getBinderFunc返回一个对该pod支持的extender绑定器或者默认绑定器.
// pkg/scheduler/factory/factory.go
func (c *configFactory) getBinderFunc(extenders []algorithm.SchedulerExtender) func(pod *v1.Pod) Binder {
var extenderBinder algorithm.SchedulerExtender
for i := range extenders {
if extenders[i].IsBinder() {
extenderBinder = extenders[i]
break
}
}
defaultBinder := &binder{c.client}
return func(pod *v1.Pod) Binder {
if extenderBinder != nil && extenderBinder.IsInterested(pod) {
return extenderBinder
}
return defaultBinder
}
}
4. 生成
PodConditionUpdater和PodPreemptor, 都是与api-server通信的客户端(client).
5. NextPod这里最核心的一个函数, 因为所有需要调度的pod都是从这里出来的.
NextPod: func() *v1.Pod {
return c.getNextPod()
}
func (c *configFactory) getNextPod() *v1.Pod {
pod, err := c.podQueue.Pop()
if err == nil {
klog.V(4).Infof("About to try and schedule pod %v/%v", pod.Namespace, pod.Name)
return pod
}
klog.Errorf("Error while retrieving next pod from scheduling queue: %v", err)
return nil
}
可以看到所有的
pod都是从podQueue中出来的, 所以对于pod是在哪里进入podQueue就比较重要了, 这里就会涉及了上面说的各种informers, 所以放到下一篇博客说明.
3.4 返回到Run
3.3 中分析了3.1中
cmd/kube-scheduler/app/server.go中Run方法中是如何生成pkg/scheduler/scheduler.go中的Scheduler对象的. 那么现在该对象创建完了会怎么样呢?所以需要回到cmd/kube-scheduler/app/server.go中Run方法中.
// cmd/kube-scheduler/app/server.go
func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {
...
// Create the scheduler.
// 生成pkg/scheduler/scheduler.go 的Scheduler对象
sched, err := scheduler.New
...
// Start all informers.
go cc.PodInformer.Informer().Run(stopCh)
cc.InformerFactory.Start(stopCh)
// Wait for all caches to sync before scheduling.
cc.InformerFactory.WaitForCacheSync(stopCh)
controller.WaitForCacheSync("scheduler", stopCh, cc.PodInformer.Informer().HasSynced)
// Prepare a reusable runCommand function.
run := func(ctx context.Context) {
sched.Run()
<-ctx.Done()
}
ctx, cancel := context.WithCancel(context.TODO()) // TODO once Run() accepts a context, it should be used here
defer cancel()
go func() {
select {
case <-stopCh:
cancel()
case <-ctx.Done():
}
}()
// If leader election is enabled, runCommand via LeaderElector until done and exit.
// 启动高可用
if cc.LeaderElection != nil {
cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
// 调用run方法
OnStartedLeading: run,
OnStoppedLeading: func() {
utilruntime.HandleError(fmt.Errorf("lost master"))
},
}
leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
if err != nil {
return fmt.Errorf("couldn't create leader elector: %v", err)
}
leaderElector.Run(ctx)
return fmt.Errorf("lost lease")
}
// Leader election is disabled, so runCommand inline until done.
run(ctx)
return fmt.Errorf("finished without leader elect")
}
1. 启动了所有
informers.
2. 因为默认是支持高可用的, 所以会以高可用的方式启动sched.Run()方法.
接下来看看
sched.Run方法.
func (sched *Scheduler) Run() {
if !sched.config.WaitForCacheSync() {
return
}
go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}
func (sched *Scheduler) scheduleOne() {
pod := sched.config.NextPod()
...
}
可以看到每隔
0秒执行scheduleOne方法, 而schedulerOne方法中就是调用sched.config.NextPod()从它的podQueue中pop出一个pod进行调度.
说白了就是不断从
podQueue中出一个pod进行调度, 如果podQueue中没有, 就block在这里.
4. 总结
分析完整个过程, 可以看到
1. 解析文件或者根据默认配置生成一个completed config.
2. 启动跟pod有关的informers监控集群中的变化并按照相关规则进入到一个scheduling_queue, 也就是podQueue.
3. 启动无限制循环一直读podQueue来进行调度.

