最美情侣中文字幕电影,在线麻豆精品传媒,在线网站高清黄,久久黄色视频

歡迎光臨散文網(wǎng) 會員登陸 & 注冊

# IT明星不是夢 # 圖解kubernetes資源擴展機制

2021-08-11 13:22 作者:Vecloud_  | 我要投稿


  k8s目前主要支持CPU和內(nèi)存兩種資源,為了支持用戶需要按需分配的其他硬件類型的資源的調(diào)度分配,k8s實現(xiàn)了設(shè)備插件框架(device plugin framework)來用于其他硬件類型的資源集成,比如現(xiàn)在機器學(xué)習(xí)要使用GPU等資源,今天來看下其內(nèi)部的關(guān)鍵實現(xiàn)1. 基礎(chǔ)概念

?

  1.1 集成方式1.1.1 DaemonSet與服務(wù)當我們要集成本地硬件的資源的時候,我們可以在當前節(jié)點上通過DaemonSet來運行一個GRPC服務(wù),通過這個服務(wù)來進行本地硬件資源的上報與分配1.1.2 服務(wù)注冊設(shè)計當提供硬件服務(wù)需要與kubelet進行通信的時候,則首先需要進行注冊,注冊的方式,則是通過最原始的底層的socket文件,并且通過Linux文件系統(tǒng)的inotify機制,來實現(xiàn)服務(wù)的注冊1.2 插件服務(wù)感知

?

  1.2.1 WatcherWatcher主要是負責(zé)感知當前節(jié)點上注冊的服務(wù),當發(fā)現(xiàn)新的要注冊的插件服務(wù),則會產(chǎn)生對應(yīng)的事件,注冊到當前的kubelet中1.2.2 期望狀態(tài)與實際狀態(tài)這里的狀態(tài)主要是指的是否需要注冊,因為kubelet與對應(yīng)的插件服務(wù)是通過網(wǎng)絡(luò)進行通信的,當網(wǎng)絡(luò)出現(xiàn)問題、或者對應(yīng)的插件服務(wù)故障,則可能會導(dǎo)致服務(wù)注冊失敗,但此時對應(yīng)的服務(wù)的socket還依舊存在,即對應(yīng)的插件服務(wù)依舊存在此時就會有兩種狀態(tài):期望狀態(tài)與實際狀態(tài), 因為socket存在所以服務(wù)的期望狀態(tài)其實是需要注冊這個插件服務(wù),但是實際上因為某些原因,這個插件服務(wù)并沒有完成注冊,后續(xù)會不斷的通過期望狀態(tài),調(diào)整實際狀態(tài),從而達到一致1.2.3 協(xié)調(diào)器協(xié)調(diào)器則就是完成上述兩種狀態(tài)之間操作的核心,其通過調(diào)用對應(yīng)插件的回調(diào)函數(shù),其實就是調(diào)用對應(yīng)的grpc接口,來完成期望狀態(tài)與實際狀態(tài)的一致性1.2.4 插件控制器針對每種類型的插件,都會有對應(yīng)的控制器,其實也就是實現(xiàn)對應(yīng)設(shè)備注冊和反注冊并且完成底層資源的分配(Allocate)和收集(ListWatch)操作2. 插件服務(wù)發(fā)現(xiàn)

?

  2.1 核心數(shù)據(jù)結(jié)構(gòu)type Watcher struct { ???// 感知插件服務(wù)注冊的socket的路徑 ???path ???????????????string ???fs ?????????????????utilfs.Filesystem ???// inotify監(jiān)測插件服務(wù)socket變化 ???fsWatcher ??????????*fsnotify.Watcher ???stopped ????????????chan struct{} ???// 存儲期望狀態(tài) ???desiredStateOfWorld cache.DesiredStateOfWorld}2.2 初始化初始化其實就是創(chuàng)建對應(yīng)的目錄func (w *Watcher) init() error { ???klog.V(4).Infof("Ensuring Plugin directory at %s ", w.path) ???if err := w.fs.MkdirAll(w.path, 0755); err != nil { ???????return fmt.Errorf("error (re-)creating root %s: %v", w.path, err) ???} ???return nil}2.3 插件服務(wù)發(fā)現(xiàn)核心 ???go func(fsWatcher *fsnotify.Watcher) { ???????defer close(w.stopped) ???????for { ???????????select { ???????????case event := <-fsWatcher.Events: ???????????????//如果發(fā)現(xiàn)對應(yīng)目錄的文件的變化,則會觸發(fā)對應(yīng)的事件 ???????????????if event.Op&fsnotify.Create == fsnotify.Create { ???????????????????err := w.handleCreateEvent(event) ???????????????????if err != nil { ???????????????????????klog.Errorf("error %v when handling create event: %s", err, event) ???????????????????} ???????????????} else if event.Op&fsnotify.Remove == fsnotify.Remove { ???????????????????w.handleDeleteEvent(event) ???????????????} ???????????????continue ???????????case err := <-fsWatcher.Errors: ???????????????if err != nil { ???????????????????klog.Errorf("fsWatcher received error: %v", err) ???????????????} ???????????????continue ???????????case <-stopCh: ???????????????// In case of plugin watcher being stopped by plugin manager, stop ???????????????// probing the creation/deletion of plugin sockets. ???????????????// Also give all pending go routines a chance to complete ???????????????select { ???????????????case <-w.stopped: ???????????????case <-time.After(11 * time.Second): ???????????????????klog.Errorf("timeout on stopping watcher") ???????????????} ???????????????w.fsWatcher.Close() ???????????????return ???????????} ???????} ???}(fsWatcher)2.4 補償機制其實補償機制主要是在重新啟動kubelet的時候,需要將之前已經(jīng)存在的socket重新注冊到當前的kubelet中func (w *Watcher) traversePluginDir(dir string) error { ???return w.fs.Walk(dir, func(path string, info os.FileInfo, err error) error { ???????if err != nil { ???????????if path == dir { ???????????????return fmt.Errorf("error accessing path: %s error: %v", path, err) ???????????} ???????????klog.Errorf("error accessing path: %s error: %v", path, err) ???????????return nil ???????} ???????switch mode := info.Mode(); { ???????case mode.IsDir(): ???????????if err := w.fsWatcher.Add(path); err != nil { ???????????????return fmt.Errorf("failed to watch %s, err: %v", path, err) ???????????} ???????case mode&os.ModeSocket != 0: ???????????event := fsnotify.Event{ ???????????????Name: path, ???????????????Op: ??fsnotify.Create, ???????????} ???????????//TODO: Handle errors by taking corrective measures ???????????if err := w.handleCreateEvent(event); err != nil { ???????????????klog.Errorf("error %v when handling create event: %s", err, event) ???????????} ???????default: ???????????klog.V(5).Infof("Ignoring file %s with mode %v", path, mode) ???????} ???????return nil ???})}2.5 注冊事件回調(diào)注冊其實就只需要感知到的socket文件路徑傳遞給期望狀態(tài)進行管理func (w *Watcher) handlePluginRegistration(socketPath string) error { ???if runtime.GOOS == "windows" { ???????socketPath = util.NormalizePath(socketPath) ???} ???// 調(diào)用期望狀態(tài)進行更新 ???klog.V(2).Infof("Adding socket path or updating timestamp %s to desired state cache", socketPath) ???err := w.desiredStateOfWorld.AddOrUpdatePlugin(socketPath) ???if err != nil { ???????return fmt.Errorf("error adding socket path %s or updating timestamp to desired state cache: %v", socketPath, err) ???} ???return nil}2.6 刪除事件回調(diào)注冊其實就只需要感知到的socket文件路徑傳遞給期望狀態(tài)進行管理func (w *Watcher) handleDeleteEvent(event fsnotify.Event) { ???klog.V(6).Infof("Handling delete event: %v", event) ???socketPath := event.Name ???klog.V(2).Infof("Removing socket path %s from desired state cache", socketPath) ???w.desiredStateOfWorld.RemovePlugin(socketPath)}3.期望狀態(tài)與實際狀態(tài)3.1 插件信息插件信息其實只是存儲了對應(yīng)socket的路徑和最近更新的時間type PluginInfo struct { ???SocketPath string ???Timestamp ?time.Time}3.2 期望狀態(tài)期望狀態(tài)與實際狀態(tài)在數(shù)據(jù)結(jié)構(gòu)上都是一樣的,因為本質(zhì)上只是為了存儲插件的當前的狀態(tài)信息,即更新時間,這里不在贅述type desiredStateOfWorld struct { ???socketFileToInfo map[string]PluginInfo ???sync.RWMutex}type actualStateOfWorld struct { ???socketFileToInfo map[string]PluginInfo ???sync.RWMutex}4.OperationExecutor目前k8s中支持兩大類的插件的管理一類是DevicePlugin即我們本文說的這些都是這種概念,一類是CSIPlugin,其中針對每一類DRiver的處理其實內(nèi)部都是不一樣的,那其實在操作之前就要先感知到當前的Driver是那種類型的OperationExecutor主要就是做這件事的,其根據(jù)不同的plugin類型,生成不同的要執(zhí)行的操作,即對應(yīng)的Plugin類型獲取對應(yīng)的handler,就生成了一個要執(zhí)行的操作4.1 生成注冊插件回調(diào)函數(shù)

?

  4.1.1 通過socket連接對應(yīng)的插件服務(wù) ???registerPluginFunc := func() error { ???????client, conn, err := dial(socketPath, dialTimeoutDuration) ???????if err != nil { ???????????return fmt.Errorf("RegisterPlugin error -- dial failed at socket %s, err: %v", socketPath, err) ???????} ???????defer conn.Close() ???????ctx, cancel := context.WithTimeout(context.Background(), time.Second) ???????defer cancel() ???????infoResp, err := client.GetInfo(ctx, &registerapi.InfoRequest{}) ???????if err != nil { ???????????return fmt.Errorf("RegisterPlugin error -- failed to get plugin info using RPC GetInfo at socket %s, err: %v", socketPath, err) ???????}4.1.2 根據(jù)插件類型驗證服務(wù) ???????handler, ok := pluginHandlers[infoResp.Type] ???????if !ok { ???????????if err := og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- no handler registered for plugin type: %s at socket %s", infoResp.Type, socketPath)); err != nil { ???????????????return fmt.Errorf("RegisterPlugin error -- failed to send error at socket %s, err: %v", socketPath, err) ???????????} ???????????return fmt.Errorf("RegisterPlugin error -- no handler registered for plugin type: %s at socket %s", infoResp.Type, socketPath) ???????} ???????if infoResp.Endpoint == "" { ???????????infoResp.Endpoint = socketPath ???????} ???????if err := handler.ValidatePlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil { ???????????if err = og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin validation failed with err: %v", err)); err != nil { ???????????????return fmt.Errorf("RegisterPlugin error -- failed to send error at socket %s, err: %v", socketPath, err) ???????????} ???????????return fmt.Errorf("RegisterPlugin error -- pluginHandler.ValidatePluginFunc failed") ???????}4.1.3 注冊插件到實際狀態(tài) ???????err = actualStateOfWorldUpdater.AddPlugin(cache.PluginInfo{ ???????????SocketPath: socketPath, ???????????Timestamp: ?timestamp, ???????}) ???????if err != nil { ???????????klog.Errorf("RegisterPlugin error -- failed to add plugin at socket %s, err: %v", socketPath, err) ???????} ???????????// 調(diào)用插件的注冊回調(diào)函數(shù) ???????if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil { ???????????return og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin registration failed with err: %v", err)) ???????}4.1.4 通知對應(yīng)的服務(wù)注冊成功 ???????if err := og.notifyPlugin(client, true, ""); err != nil { ???????????return fmt.Errorf("RegisterPlugin error -- failed to send registration status at socket %s, err: %v", socketPath, err) ???????}4.2 通過socket構(gòu)建注冊clientfunc dial(unixSocketPath string, timeout time.Duration) (registerapi.RegistrationClient, *grpc.ClientConn, error) { ???ctx, cancel := context.WithTimeout(context.Background(), timeout) ???defer cancel() ???c, err := grpc.DialContext(ctx, unixSocketPath, grpc.WithInsecure(), grpc.WithBlock(), ???????grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) { ???????????return (&net.Dialer{}).DialContext(ctx, "unix", addr) ???????}), ???) ???if err != nil { ???????return nil, nil, fmt.Errorf("failed to dial socket %s, err: %v", unixSocketPath, err) ???} ???return registerapi.NewRegistrationClient(c), c, nil}

了解更多網(wǎng)絡(luò)知識關(guān)注:http://www.vecloud.com/

# IT明星不是夢 # 圖解kubernetes資源擴展機制的評論 (共 條)

分享到微博請遵守國家法律
宝坻区| 南岸区| 株洲市| 当涂县| 昭觉县| 夏河县| 松桃| 新宾| 张北县| 清涧县| 永昌县| 卢龙县| 丹凤县| 江孜县| 墨竹工卡县| 固阳县| 宜宾县| 云浮市| 栾川县| 都昌县| 北安市| 房产| 樟树市| 闵行区| 房产| 石台县| 武乡县| 汝南县| 榆树市| 河源市| 金门县| 岫岩| 延安市| 浏阳市| 会昌县| 临城县| 南通市| 麻栗坡县| 徐闻县| 石景山区| 勃利县|