kubernetes調(diào)度概念與工作流程
Overview?[1]
kubernetes集群中的調(diào)度程序?kube-scheduler
?會(huì)?watch
?未分配節(jié)點(diǎn)的新創(chuàng)建的Pod,并未該P(yáng)od找到可運(yùn)行的最佳(特定)節(jié)點(diǎn)。那么這些動(dòng)作或者說這些原理是怎么實(shí)現(xiàn)的呢,讓我們往下剖析下。
對(duì)于新創(chuàng)建的 pod 或其他未調(diào)度的 pod來講,kube-scheduler 選擇一個(gè)最佳節(jié)點(diǎn)供它們運(yùn)行。但是,Pod 中的每個(gè)容器對(duì)資源的要求都不同,每個(gè) Pod 也有不同的要求。因此,需要根據(jù)具體的調(diào)度要求對(duì)現(xiàn)有節(jié)點(diǎn)進(jìn)行過濾。
在Kubernetes集群中,滿足 Pod 調(diào)度要求的節(jié)點(diǎn)稱為可行節(jié)點(diǎn) (feasible nodes
?FN) 。如果沒有合適的節(jié)點(diǎn),則 pod 將保持未調(diào)度狀態(tài),直到調(diào)度程序能夠放置它。也就是說,當(dāng)我們創(chuàng)建Pod時(shí),如果長期處于?Pending
?狀態(tài),這個(gè)時(shí)候應(yīng)該看你的集群調(diào)度器是否因?yàn)槟承﹩栴}沒有合適的節(jié)點(diǎn)了
調(diào)度器為 Pod 找到 FN 后,然后運(yùn)行一組函數(shù)對(duì) FN 進(jìn)行評(píng)分,并在 FN 中找到得分最高的節(jié)點(diǎn)來運(yùn)行 Pod。
調(diào)度策略在決策時(shí)需要考慮的因素包括個(gè)人和集體資源需求、硬件/軟件/策略約束 (constraints
)、親和性 (affinity
) 和反親和性(?anti-affinity
?)規(guī)范、數(shù)據(jù)局部性、工作負(fù)載間干擾等。
如何為pod選擇節(jié)點(diǎn)?
kube-scheduler
?為pod選擇節(jié)點(diǎn)會(huì)分位兩部:
過濾 (
Filtering
)打分 (
Scoring
)
過濾也被稱為預(yù)選 (Predicates
),該步驟會(huì)找到可調(diào)度的節(jié)點(diǎn)集,然后通過是否滿足特定資源的請(qǐng)求,例如通過?PodFitsResources
?過濾器檢查候選節(jié)點(diǎn)是否有足夠的資源來滿足 Pod 資源的請(qǐng)求。這個(gè)步驟完成后會(huì)得到一個(gè)包含合適的節(jié)點(diǎn)的列表(通常為多個(gè)),如果列表為空,則Pod不可調(diào)度。
打分也被稱為優(yōu)選(Priorities
),在該步驟中,會(huì)對(duì)上一個(gè)步驟的輸出進(jìn)行打分,Scheduer 通過打分的規(guī)則為每個(gè)通過?Filtering
?步驟的節(jié)點(diǎn)計(jì)算出一個(gè)分?jǐn)?shù)。
完成上述兩個(gè)步驟之后,kube-scheduler
?會(huì)將Pod分配給分?jǐn)?shù)最高的 Node,如果存在多個(gè)相同分?jǐn)?shù)的節(jié)點(diǎn),會(huì)隨機(jī)選擇一個(gè)。
kubernetes的調(diào)度策略
Kubernetes 1.21之前版本可以在代碼?kubernetes\pkg\scheduler\algorithmprovider\registry.go?中看到對(duì)應(yīng)的注冊(cè)模式,在1.22 scheduler 更換了其路徑,對(duì)于registry文件更換到了kubernetes\pkg\scheduler\framework\plugins\registry.go?;對(duì)于kubernetes官方說法為,調(diào)度策略是用于“預(yù)選” (Predicates
)或 過濾(filtering
) 和 用于 優(yōu)選(Priorities
)或 評(píng)分 (scoring
)的
注:kubernetes官方?jīng)]有找到預(yù)選和優(yōu)選的概念,而Predicates和filtering 是處于預(yù)選階段的動(dòng)詞,而Priorities和scoring是優(yōu)選階段的動(dòng)詞。后面用PF和PS代替這個(gè)兩個(gè)詞。
為Pod預(yù)選節(jié)點(diǎn)?[2]
上面也提到了,filtering
?的目的是為了排除(過濾)掉不滿足 Pod 要求的節(jié)點(diǎn)。例如,某個(gè)節(jié)點(diǎn)上的閑置資源小于 Pod 所需資源,則該節(jié)點(diǎn)不會(huì)被考慮在內(nèi),即被過濾掉。在?“Predicates”?階段實(shí)現(xiàn)的?filtering?策略,包括:
NoDiskConflict
?:評(píng)估是否有合適Pod請(qǐng)求的卷NoVolumeZoneConflict
:在給定zone限制情況下,評(píng)估Pod請(qǐng)求所需的卷在Node上是否可用PodFitsResources
:檢查空閑資源(CPU、內(nèi)存)是否滿足Pod請(qǐng)求PodFitsHostPorts
:檢查Pod所需端口在Node上是否被占用HostName
: 過濾除去,PodSpec
?中?NodeName
?字段中指定的Node之外的所有Node。MatchNodeSelector
:檢查Node的?label?是否與?Pod?配置中?nodeSelector
字段中指定的?label?匹配,并且從 Kubernetes v1.2 開始, 如果存在?nodeAffinity
?也會(huì)匹配。CheckNodeMemoryPressure
:檢查是否可以在已出現(xiàn)內(nèi)存壓力情況節(jié)點(diǎn)上調(diào)度 Pod。CheckNodeDiskPressure
:檢查是否可以在報(bào)告磁盤壓力情況的節(jié)點(diǎn)上調(diào)度 Pod
具體對(duì)應(yīng)得策略可以在 kubernetes\pkg\scheduler\framework\plugins\registry.go 看到
對(duì)預(yù)選節(jié)點(diǎn)打分?[2]
通過上面步驟過濾過得列表則是適合托管的Pod,這個(gè)結(jié)果通常來說是一個(gè)列表,如何選擇最優(yōu)Node進(jìn)行調(diào)度,則是接下來打分的步驟步驟。
例如:Kubernetes對(duì)剩余節(jié)點(diǎn)進(jìn)行優(yōu)先級(jí)排序,優(yōu)先級(jí)由一組函數(shù)計(jì)算;優(yōu)先級(jí)函數(shù)將為剩余節(jié)點(diǎn)給出從0~10
?的分?jǐn)?shù),10 表示最優(yōu),0 表示最差。每個(gè)優(yōu)先級(jí)函數(shù)由一個(gè)正數(shù)加權(quán)組成,每個(gè)Node的得分是通過將所有加權(quán)得分相加來計(jì)算的。設(shè)有兩個(gè)優(yōu)先級(jí)函數(shù),priorityFunc1
?和?priorityFunc2
?加上權(quán)重因子?weight1
?和weight2
,那么這個(gè)Node的最終得分為:\(finalScore = (w1 \times priorityFunc1) + (w2 \times priorityFunc2)\)。計(jì)算完分?jǐn)?shù)后,選擇最高分?jǐn)?shù)的Node做為Pod的宿主機(jī),存在多個(gè)相同分?jǐn)?shù)Node情況下會(huì)隨機(jī)選擇一個(gè)Node。
目前kubernetes提供了一些在打分?Scoring?階段算法:
LeastRequestedPriority
:Node的優(yōu)先級(jí)基于Node的空閑部分\(\frac{capacity\ -\ Node上所有存在的Pod\ -\ 正在調(diào)度的Pod請(qǐng)求}{capacity}\),通過計(jì)算具有最高分?jǐn)?shù)的Node是FNBalancedResourceAllocation
?:該算法會(huì)將 Pod 放在一個(gè)Node上,使得在Pod 部署后 CPU 和內(nèi)存的使用率為平衡的SelectorSpreadPriority
:通過最小化資源方式,將屬于同一種服務(wù)、控制器或同一Node上的Replica的 Pod的數(shù)量來分布Pod。如果節(jié)點(diǎn)上存在Zone,則會(huì)調(diào)整優(yōu)先級(jí),以便 pod可以分布在Zone之上。CalculateAntiAffinityPriority
:根據(jù)label來分布,按照相同service上相同label值的pod進(jìn)行分配ImageLocalityPriority
?:根據(jù)Node上鏡像進(jìn)行打分,Node上存在Pod請(qǐng)求所需的鏡像優(yōu)先級(jí)較高。
在代碼中查看上述的代碼
以?PodFitsHostPorts
?算法為例,因?yàn)槭荖ode類算法,在kubernetes\pkg\scheduler\framework\plugins\nodeports
調(diào)度框架?[3]
調(diào)度框架 (scheduling framework
?SF?) 是kubernetes為 scheduler設(shè)計(jì)的一個(gè)pluggable的架構(gòu)。SF 將scheduler設(shè)計(jì)為?Plugin?式的 API,API將上一章中提到的一些列調(diào)度策略實(shí)現(xiàn)為?Plugin
。
在?SF?中,定義了一些擴(kuò)展點(diǎn) (extension points
?EP?),而被實(shí)現(xiàn)為Plugin的調(diào)度程序?qū)⒈蛔?cè)在一個(gè)或多個(gè)?EP?中,換句話來說,在這些?EP?的執(zhí)行過程中如果注冊(cè)在多個(gè)?EP?中,將會(huì)在多個(gè)?EP?被調(diào)用。
每次調(diào)度都分為兩個(gè)階段,調(diào)度周期(Scheduling Cycel
)與綁定周期(Binding Cycle
)。
SC?表示為,為Pod選擇一個(gè)節(jié)點(diǎn);SC?是串行運(yùn)行的。
BC?表示為,將?SC?決策結(jié)果應(yīng)用于集群中;BC?可以同時(shí)運(yùn)行。
調(diào)度周期與綁定周期結(jié)合一起,被稱為調(diào)度上下文?(Scheduling Context
),下圖則是調(diào)度上下文的工作流
注:如果決策結(jié)果為Pod的調(diào)度結(jié)果無可用節(jié)點(diǎn),或存在內(nèi)部錯(cuò)誤,則中止?SC?或?BC。Pod將重入隊(duì)列重試

圖1:Pod的調(diào)度上下文
Source:https://kubernetes.io/docs/concepts/scheduling-eviction/scheduling-framework
擴(kuò)展點(diǎn)?[4]
擴(kuò)展點(diǎn)(Extension points
)是指在調(diào)度上下文中的每個(gè)可擴(kuò)展API,通過圖提現(xiàn)為[圖1]。其中?Filter
?相當(dāng)于?Predicate
?而?Scoring
?相當(dāng)于?Priority
。
對(duì)于調(diào)度階段會(huì)通過以下擴(kuò)展點(diǎn):
Sort
:該插件提供了排序功能,用于對(duì)在調(diào)度隊(duì)列中待處理 Pod 進(jìn)行排序。一次只能啟用一個(gè)隊(duì)列排序。preFilter
:該插件用于在過濾之前預(yù)處理或檢查 Pod 或集群的相關(guān)信息。這里會(huì)終止調(diào)度filter
:該插件相當(dāng)于調(diào)度上下文中的?Predicates
,用于排除不能運(yùn)行 Pod 的節(jié)點(diǎn)。Filter 會(huì)按配置的順序進(jìn)行調(diào)用。如果有一個(gè)filter將節(jié)點(diǎn)標(biāo)記位不可用,則將 Pod 標(biāo)記為不可調(diào)度(即不會(huì)向下執(zhí)行)。postFilter
:當(dāng)沒有為 pod 找到FN時(shí),該插件會(huì)按照配置的順序進(jìn)行調(diào)用。如果任何postFilter
插件將 Pod 標(biāo)記為schedulable,則不會(huì)調(diào)用其余插件。即?filter
?成功后不會(huì)進(jìn)行這步驟preScore
:可用于進(jìn)行預(yù)Score工作(通知性的擴(kuò)展點(diǎn))。score
:該插件為每個(gè)通過?filter
?階段的Node提供打分服務(wù)。然后Scheduler將選擇具有最高加權(quán)分?jǐn)?shù)總和的Node。reserve
:因?yàn)榻壎ㄊ录r(shí)異步發(fā)生的,該插件是為了避免Pod在綁定到節(jié)點(diǎn)前時(shí),調(diào)度到新的Pod,使節(jié)點(diǎn)使用資源超過可用資源情況。如果后續(xù)階段發(fā)生錯(cuò)誤或失敗,將觸發(fā)?UnReserve
?回滾(通知性擴(kuò)展點(diǎn))。這也是作為調(diào)度周期中最后一個(gè)狀態(tài),要么成功到?postBind
?,要么失敗觸發(fā)?UnReserve
。permit
:該插件可以阻止或延遲 Pod 的綁定,一般情況下這步驟會(huì)做三件事:appove
?:調(diào)度器繼續(xù)綁定過程Deny
:如果任何一個(gè)Premit拒絕了Pod與節(jié)點(diǎn)的綁定,那么將觸發(fā)?UnReserve
?,并重入隊(duì)列Wait
: 如果 Permit 插件返回?Wait
,該 Pod 將保留在內(nèi)部?Wait
?Pod 列表中,直到被?Appove
。如果發(fā)生超時(shí),wait
?變?yōu)?deny
?,將Pod放回至調(diào)度隊(duì)列中,并觸發(fā)?Unreserve
?回滾 。preBind
:該插件用于在 bind Pod 之前執(zhí)行所需的前置工作。如,preBind
?可能會(huì)提供一個(gè)網(wǎng)絡(luò)卷并將其掛載到目標(biāo)節(jié)點(diǎn)上。如果在該步驟中的任意插件返回錯(cuò)誤,則Pod 將被?deny
?并放置到調(diào)度隊(duì)列中。bind
:在所有的?preBind
?完成后,該插件將用于將Pod綁定到Node,并按順序調(diào)用綁定該步驟的插件。如果有一個(gè)插件處理了這個(gè)事件,那么則忽略其余所有插件。postBind
:該插件在綁定 Pod 后調(diào)用,可用于清理相關(guān)資源(通知性的擴(kuò)展點(diǎn))。multiPoint
:這是一個(gè)僅配置字段,允許同時(shí)為所有適用的擴(kuò)展點(diǎn)啟用或禁用插件。
kube-scheduler工作流分析
對(duì)于?kube-scheduler
?組件的分析,包含?kube-scheduler
?啟動(dòng)流程,以及scheduler調(diào)度流程。這里會(huì)主要針對(duì)啟動(dòng)流程分析,后面算法及二次開發(fā)部分會(huì)切入調(diào)度分析。
對(duì)于我們部署時(shí)使用的?kube-scheduler
?位于?cmd/kube-scheduler?,在?Alpha (1.16)?版本提供了調(diào)度框架的模式,到?Stable (1.19)?,從代碼結(jié)構(gòu)上是相似的;直到1.22后改變了代碼風(fēng)格。
首先看到的是?kube-scheduler
?的入口?cmd/kube-scheduler?,這里主要作為兩部分,構(gòu)建參數(shù)與啟動(dòng)server
?,這里嚴(yán)格來講?kube-scheduer
?是作為一個(gè)server,而調(diào)度框架等部分是另外的。
func main() {
command := app.NewSchedulerCommand()
code := cli.Run(command)
os.Exit(code)
}
cli.Run
?提供了cobra構(gòu)成的命令行cli,日志將輸出為標(biāo)準(zhǔn)輸出
// 這里是main中執(zhí)行的Run
func Run(cmd *cobra.Command) int {
if logsInitialized, err := run(cmd); err != nil {
if !logsInitialized {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
} else {
klog.ErrorS(err, "command failed")
}
return 1
}
return 0
}
// 這個(gè)run作為
func run(cmd *cobra.Command) (logsInitialized bool, err error) {
rand.Seed(time.Now().UnixNano())
defer logs.FlushLogs()
cmd.SetGlobalNormalizationFunc(cliflag.WordSepNormalizeFunc)
if !cmd.SilenceUsage {
cmd.SilenceUsage = true
cmd.SetFlagErrorFunc(func(c *cobra.Command, err error) error {
// Re-enable usage printing.
c.SilenceUsage = false
return err
})
}
// In all cases error printing is done below.
cmd.SilenceErrors = true
// This is idempotent.
logs.AddFlags(cmd.PersistentFlags())
// Inject logs.InitLogs after command line parsing into one of the
// PersistentPre* functions.
switch {
case cmd.PersistentPreRun != nil:
pre := cmd.PersistentPreRun
cmd.PersistentPreRun = func(cmd *cobra.Command, args []string) {
logs.InitLogs()
logsInitialized = true
pre(cmd, args)
}
case cmd.PersistentPreRunE != nil:
pre := cmd.PersistentPreRunE
cmd.PersistentPreRunE = func(cmd *cobra.Command, args []string) error {
logs.InitLogs()
logsInitialized = true
return pre(cmd, args)
}
default:
cmd.PersistentPreRun = func(cmd *cobra.Command, args []string) {
logs.InitLogs()
logsInitialized = true
}
}
err = cmd.Execute()
return
}
可以看到最終是調(diào)用?command.Execute()
?執(zhí)行,這個(gè)是執(zhí)行本身構(gòu)建的命令,而真正被執(zhí)行的則是上面的?app.NewSchedulerCommand()
?,那么來看看這個(gè)是什么
app.NewSchedulerCommand()?構(gòu)建了一個(gè)cobra.Commond對(duì)象,?runCommand()?被封裝在內(nèi),這個(gè)是作為啟動(dòng)scheduler的函數(shù)
func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
opts := options.NewOptions()
cmd := &cobra.Command{
Use: "kube-scheduler",
Long: `The Kubernetes scheduler is a control plane process which assigns
Pods to Nodes. The scheduler determines which Nodes are valid placements for
each Pod in the scheduling queue according to constraints and available
resources. The scheduler then ranks each valid Node and binds the Pod to a
suitable Node. Multiple different schedulers may be used within a cluster;
kube-scheduler is the reference implementation.
See [scheduling](https://kubernetes.io/docs/concepts/scheduling-eviction/)
for more information about scheduling and the kube-scheduler component.`,
RunE: func(cmd *cobra.Command, args []string) error {
return runCommand(cmd, opts, registryOptions...)
},
Args: func(cmd *cobra.Command, args []string) error {
for _, arg := range args {
if len(arg) > 0 {
return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
}
}
return nil
},
}
nfs := opts.Flags
verflag.AddFlags(nfs.FlagSet("global"))
globalflag.AddGlobalFlags(nfs.FlagSet("global"), cmd.Name(), logs.SkipLoggingConfigurationFlags())
fs := cmd.Flags()
for _, f := range nfs.FlagSets {
fs.AddFlagSet(f)
}
cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
cliflag.SetUsageAndHelpFunc(cmd, *nfs, cols)
if err := cmd.MarkFlagFilename("config", "yaml", "yml", "json"); err != nil {
klog.ErrorS(err, "Failed to mark flag filename")
}
return cmd
}
下面來看下?runCommand()?在啟動(dòng)?scheduler?時(shí)提供了什么功能。
在新版中已經(jīng)沒有?algorithmprovider
?的概念,所以在?runCommand
?中做的也就是僅僅啟動(dòng)這個(gè)?scheduler
?,而 scheduler 作為kubernetes組件,也是會(huì)watch等操作,自然少不了informer。其次作為和?controller-manager
?相同的工作特性,kube-scheduler
?也是 基于Leader選舉的。
func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
// To help debugging, immediately log version
klog.InfoS("Starting Kubernetes Scheduler", "version", version.Get())
klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))
// Configz registration.
if cz, err := configz.New("componentconfig"); err == nil {
cz.Set(cc.ComponentConfig)
} else {
return fmt.Errorf("unable to register configz: %s", err)
}
// Start events processing pipeline.
cc.EventBroadcaster.StartRecordingToSink(ctx.Done())
defer cc.EventBroadcaster.Shutdown()
// Setup healthz checks.
var checks []healthz.HealthChecker
if cc.ComponentConfig.LeaderElection.LeaderElect {
checks = append(checks, cc.LeaderElection.WatchDog)
}
waitingForLeader := make(chan struct{})
isLeader := func() bool {
select {
case _, ok := <-waitingForLeader:
// if channel is closed, we are leading
return !ok
default:
// channel is open, we are waiting for a leader
return false
}
}
// Start up the healthz server.
if cc.SecureServing != nil {
handler := buildHandlerChain(newHealthzAndMetricsHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
// TODO: handle stoppedCh and listenerStoppedCh returned by c.SecureServing.Serve
if _, _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil {
// fail early for secure handlers, removing the old error loop from above
return fmt.Errorf("failed to start secure server: %v", err)
}
}
// Start all informers.
cc.InformerFactory.Start(ctx.Done())
// DynInformerFactory can be nil in tests.
if cc.DynInformerFactory != nil {
cc.DynInformerFactory.Start(ctx.Done())
}
// Wait for all caches to sync before scheduling.
cc.InformerFactory.WaitForCacheSync(ctx.Done())
// DynInformerFactory can be nil in tests.
if cc.DynInformerFactory != nil {
cc.DynInformerFactory.WaitForCacheSync(ctx.Done())
}
// If leader election is enabled, runCommand via LeaderElector until done and exit.
if cc.LeaderElection != nil {
cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
close(waitingForLeader)
sched.Run(ctx)
},
OnStoppedLeading: func() {
select {
case <-ctx.Done():
// We were asked to terminate. Exit 0.
klog.InfoS("Requested to terminate, exiting")
os.Exit(0)
default:
// We lost the lock.
klog.ErrorS(nil, "Leaderelection lost")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
},
}
leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
if err != nil {
return fmt.Errorf("couldn't create leader elector: %v", err)
}
leaderElector.Run(ctx)
return fmt.Errorf("lost lease")
}
// Leader election is disabled, so runCommand inline until done.
close(waitingForLeader)
sched.Run(ctx)
return fmt.Errorf("finished without leader elect")
}
上面看到了?runCommend
?是作為啟動(dòng)?scheduler?的工作,那么通過參數(shù)構(gòu)建一個(gè)?scheduler?則是在?Setup?中完成的。
// Setup creates a completed config and a scheduler based on the command args and options
func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {
if cfg, err := latest.Default(); err != nil {
return nil, nil, err
} else {
opts.ComponentConfig = cfg
}
// 驗(yàn)證參數(shù)
if errs := opts.Validate(); len(errs) > 0 {
return nil, nil, utilerrors.NewAggregate(errs)
}
// 構(gòu)建一個(gè)config對(duì)象
c, err := opts.Config()
if err != nil {
return nil, nil, err
}
// 返回一個(gè)config對(duì)象,包含了scheduler所需的配置,如informer,leader selection
cc := c.Complete()
outOfTreeRegistry := make(runtime.Registry)
for _, option := range outOfTreeRegistryOptions {
if err := option(outOfTreeRegistry); err != nil {
return nil, nil, err
}
}
recorderFactory := getRecorderFactory(&cc)
completedProfiles := make([]kubeschedulerconfig.KubeSchedulerProfile, 0)
// 創(chuàng)建出來的scheduler
sched, err := scheduler.New(cc.Client,
cc.InformerFactory,
cc.DynInformerFactory,
recorderFactory,
ctx.Done(),
scheduler.WithComponentConfigVersion(cc.ComponentConfig.TypeMeta.APIVersion),
scheduler.WithKubeConfig(cc.KubeConfig),
scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
scheduler.WithPodMaxInUnschedulablePodsDuration(cc.PodMaxInUnschedulablePodsDuration),
scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
scheduler.WithParallelism(cc.ComponentConfig.Parallelism),
scheduler.WithBuildFrameworkCapturer(func(profile kubeschedulerconfig.KubeSchedulerProfile) {
// Profiles are processed during Framework instantiation to set default plugins and configurations. Capturing them for logging
completedProfiles = append(completedProfiles, profile)
}),
)
if err != nil {
return nil, nil, err
}
if err := options.LogOrWriteConfig(opts.WriteConfigTo, &cc.ComponentConfig, completedProfiles); err != nil {
return nil, nil, err
}
return &cc, sched, nil
}
上面了解到了?scheduler?是如何被構(gòu)建出來的,下面就看看 構(gòu)建時(shí)參數(shù)是如何傳遞進(jìn)來的,而對(duì)象 option就是對(duì)應(yīng)需要的配置結(jié)構(gòu),而?ApplyTo?則是將啟動(dòng)時(shí)傳入的參數(shù)轉(zhuǎn)化為構(gòu)建?scheduler?所需的配置。
對(duì)于Deprecated flags可以參考官方對(duì)于kube-scheduler啟動(dòng)參數(shù)的說明?[5]
對(duì)于如何編寫一個(gè)scheduler config請(qǐng)參考?[6]?與?[7]
func (o *Options) ApplyTo(c *schedulerappconfig.Config) error {
if len(o.ConfigFile) == 0 {
// 在沒有指定 --config時(shí)會(huì)找到 Deprecated flags:參數(shù)
? ? ? ?// 通過kube-scheduler --help可以看到這些被棄用的參數(shù)
o.ApplyDeprecated()
o.ApplyLeaderElectionTo(o.ComponentConfig)
c.ComponentConfig = *o.ComponentConfig
} else {
? ? ? ?// 這里就是指定了--config
cfg, err := loadConfigFromFile(o.ConfigFile)
if err != nil {
return err
}
// 這里會(huì)將leader選舉的參數(shù)附加到配置中
o.ApplyLeaderElectionTo(cfg)
if err := validation.ValidateKubeSchedulerConfiguration(cfg); err != nil {
return err
}
c.ComponentConfig = *cfg
}
if err := o.SecureServing.ApplyTo(&c.SecureServing, &c.LoopbackClientConfig); err != nil {
return err
}
if o.SecureServing != nil && (o.SecureServing.BindPort != 0 || o.SecureServing.Listener != nil) {
if err := o.Authentication.ApplyTo(&c.Authentication, c.SecureServing, nil); err != nil {
return err
}
if err := o.Authorization.ApplyTo(&c.Authorization); err != nil {
return err
}
}
o.Metrics.Apply()
// Apply value independently instead of using ApplyDeprecated() because it can't be configured via ComponentConfig.
if o.Deprecated != nil {
c.PodMaxInUnschedulablePodsDuration = o.Deprecated.PodMaxInUnschedulablePodsDuration
}
return nil
}
Setup
?后會(huì)new一個(gè)?schedueler
?,?New?則是這個(gè)動(dòng)作,在里面可以看出,會(huì)初始化一些informer與 Pod的list等操作。
func New(client clientset.Interface,
informerFactory informers.SharedInformerFactory,
dynInformerFactory dynamicinformer.DynamicSharedInformerFactory,
recorderFactory profile.RecorderFactory,
stopCh <-chan struct{},
opts ...Option) (*Scheduler, error) {
stopEverything := stopCh
if stopEverything == nil {
stopEverything = wait.NeverStop
}
options := defaultSchedulerOptions // 默認(rèn)調(diào)度策略,如percentageOfNodesToScore
for _, opt := range opts {
opt(&options) // opt 是傳入的函數(shù),會(huì)返回一個(gè)schedulerOptions即相應(yīng)的一些配置
}
if options.applyDefaultProfile { // 這個(gè)是個(gè)bool類型,默認(rèn)scheduler會(huì)到這里
? ? ? ?// Profile包含了調(diào)度器的名稱與調(diào)度器在兩個(gè)過程中使用的插件
var versionedCfg v1beta3.KubeSchedulerConfiguration
scheme.Scheme.Default(&versionedCfg)
cfg := schedulerapi.KubeSchedulerConfiguration{} // 初始化一個(gè)配置,這個(gè)是--config傳入的類型。因?yàn)槟J(rèn)的調(diào)度策略會(huì)初始化
? ? ? ?// convert 會(huì)將in轉(zhuǎn)為out即versionedCfg轉(zhuǎn)換為cfg
if err := scheme.Scheme.Convert(&versionedCfg, &cfg, nil); err != nil {
return nil, err
}
options.profiles = cfg.Profiles
}
registry := frameworkplugins.NewInTreeRegistry() // 調(diào)度框架的注冊(cè)
if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {
return nil, err
}
metrics.Register() // 指標(biāo)類
extenders, err := buildExtenders(options.extenders, options.profiles)
if err != nil {
return nil, fmt.Errorf("couldn't build extenders: %w", err)
}
podLister := informerFactory.Core().V1().Pods().Lister()
nodeLister := informerFactory.Core().V1().Nodes().Lister()
// The nominator will be passed all the way to framework instantiation.
nominator := internalqueue.NewPodNominator(podLister)
snapshot := internalcache.NewEmptySnapshot()
clusterEventMap := make(map[framework.ClusterEvent]sets.String)
profiles, err := profile.NewMap(options.profiles, registry, recorderFactory, stopCh,
frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
frameworkruntime.WithClientSet(client),
frameworkruntime.WithKubeConfig(options.kubeConfig),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithPodNominator(nominator),
frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)),
frameworkruntime.WithClusterEventMap(clusterEventMap),
frameworkruntime.WithParallelism(int(options.parallelism)),
frameworkruntime.WithExtenders(extenders),
)
if err != nil {
return nil, fmt.Errorf("initializing profiles: %v", err)
}
if len(profiles) == 0 {
return nil, errors.New("at least one profile is required")
}
podQueue := internalqueue.NewSchedulingQueue(
profiles[options.profiles[0].SchedulerName].QueueSortFunc(),
informerFactory,
internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second),
internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second),
internalqueue.WithPodNominator(nominator),
internalqueue.WithClusterEventMap(clusterEventMap),
internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration),
)
schedulerCache := internalcache.New(durationToExpireAssumedPod, stopEverything)
// Setup cache debugger.
debugger := cachedebugger.New(nodeLister, podLister, schedulerCache, podQueue)
debugger.ListenForSignal(stopEverything)
sched := newScheduler(
schedulerCache,
extenders,
internalqueue.MakeNextPodFunc(podQueue),
MakeDefaultErrorFunc(client, podLister, podQueue, schedulerCache),
stopEverything,
podQueue,
profiles,
client,
snapshot,
options.percentageOfNodesToScore,
)
// 這個(gè)就是controller中onAdd等那三個(gè)必須的事件函數(shù)
addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(clusterEventMap))
return sched, nil
}
接下來會(huì)啟動(dòng)這個(gè)?scheduler, 在上面我們看到?NewSchedulerCommand?構(gòu)建了一個(gè)cobra.Commond對(duì)象,?runCommand()?最終會(huì)返回個(gè) Run,而這個(gè)Run就是啟動(dòng)這個(gè)?sche?的。
下面這個(gè)?run?是?sche?的運(yùn)行,他運(yùn)行并watch資源,直到上下文完成。
func (sched *Scheduler) Run(ctx context.Context) {
sched.SchedulingQueue.Run()
// We need to start scheduleOne loop in a dedicated goroutine,
// because scheduleOne function hangs on getting the next item
// from the SchedulingQueue.
// If there are no new pods to schedule, it will be hanging there
// and if done in this goroutine it will be blocking closing
// SchedulingQueue, in effect causing a deadlock on shutdown.
go wait.UntilWithContext(ctx, sched.scheduleOne, 0)
<-ctx.Done()
sched.SchedulingQueue.Close()
}
而調(diào)用這個(gè)?Run?的部分則是作為server的?kube-scheduler?中的?run
// Run executes the scheduler based on the given configuration. It only returns on error or when context is done.
func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
// To help debugging, immediately log version
klog.InfoS("Starting Kubernetes Scheduler", "version", version.Get())
klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))
// Configz registration.
if cz, err := configz.New("componentconfig"); err == nil {
cz.Set(cc.ComponentConfig)
} else {
return fmt.Errorf("unable to register configz: %s", err)
}
// Start events processing pipeline.
cc.EventBroadcaster.StartRecordingToSink(ctx.Done())
defer cc.EventBroadcaster.Shutdown()
// Setup healthz checks.
var checks []healthz.HealthChecker
if cc.ComponentConfig.LeaderElection.LeaderElect {
checks = append(checks, cc.LeaderElection.WatchDog)
}
waitingForLeader := make(chan struct{})
isLeader := func() bool {
select {
case _, ok := <-waitingForLeader:
// if channel is closed, we are leading
return !ok
default:
// channel is open, we are waiting for a leader
return false
}
}
// Start up the healthz server.
if cc.SecureServing != nil {
handler := buildHandlerChain(newHealthzAndMetricsHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
// TODO: handle stoppedCh and listenerStoppedCh returned by c.SecureServing.Serve
if _, _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil {
// fail early for secure handlers, removing the old error loop from above
return fmt.Errorf("failed to start secure server: %v", err)
}
}
// Start all informers.
cc.InformerFactory.Start(ctx.Done())
// DynInformerFactory can be nil in tests.
if cc.DynInformerFactory != nil {
cc.DynInformerFactory.Start(ctx.Done())
}
// Wait for all caches to sync before scheduling.
cc.InformerFactory.WaitForCacheSync(ctx.Done())
// DynInformerFactory can be nil in tests.
if cc.DynInformerFactory != nil {
cc.DynInformerFactory.WaitForCacheSync(ctx.Done())
}
// If leader election is enabled, runCommand via LeaderElector until done and exit.
if cc.LeaderElection != nil {
cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
close(waitingForLeader)
sched.Run(ctx)
},
OnStoppedLeading: func() {
select {
case <-ctx.Done():
// We were asked to terminate. Exit 0.
klog.InfoS("Requested to terminate, exiting")
os.Exit(0)
default:
// We lost the lock.
klog.ErrorS(nil, "Leaderelection lost")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
},
}
leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
if err != nil {
return fmt.Errorf("couldn't create leader elector: %v", err)
}
leaderElector.Run(ctx)
return fmt.Errorf("lost lease")
}
// Leader election is disabled, so runCommand inline until done.
close(waitingForLeader)
sched.Run(ctx)
return fmt.Errorf("finished without leader elect")
}
而上面的?server.Run?會(huì)被?runCommand
?也就是在?NewSchedulerCommand
?時(shí)被返回,在?kube-scheduler
?的入口文件中被執(zhí)行。
cc, sched, err := Setup(ctx, opts, registryOptions...)
if err != nil {
? ?return err
}
return Run(ctx, cc, sched)
至此,整個(gè)?kube-scheduler
?啟動(dòng)流就分析完了,這個(gè)的流程可以用下圖表示

圖2:scheduler server運(yùn)行流程
來源鏈接:https://www.dianjilingqu.com/436088.html