從5分鐘到60秒,袋鼠云數(shù)棧在熱重啟技術(shù)上的提效探索之路
更好地提高效率一直以來是袋鼠云數(shù)棧產(chǎn)品的主要目標(biāo)之一。當(dāng)前數(shù)棧客戶的實(shí)時(shí)任務(wù)都是基于 Per-Job 模式運(yùn)行的,客戶在進(jìn)行一些任務(wù)參數(shù)的修改之后,只能先取消當(dāng)前任務(wù),再選擇 CheckPoint 恢復(fù)或者重新運(yùn)行,整個(gè)過程需要3-5分鐘,比較浪費(fèi)時(shí)間。為了達(dá)到提高效率的目的,我們針對(duì) Per-Job 任務(wù)的整體流程分析,進(jìn)行了相關(guān)探索。
下文和大家聊聊數(shù)棧在熱重啟技術(shù)方面的探索之路。
熱重啟是什么?
熱重啟技術(shù)旨在復(fù)用當(dāng)前 Per-Job 集群的相關(guān)資源,減少重新創(chuàng)建集群以及申請(qǐng)資源的耗時(shí),同時(shí)通過 CheckPoint 機(jī)制保障數(shù)據(jù)的正確性。
Flink 的 Per-Job 模式是指每個(gè)任務(wù)都會(huì)對(duì)應(yīng)一個(gè)獨(dú)立的?Flink 集群。在任務(wù)提交的時(shí)候,會(huì)創(chuàng)建一個(gè) Flink 集群進(jìn)行任務(wù)的運(yùn)行,整個(gè)集群只為這一個(gè)任務(wù)進(jìn)行服務(wù)。同時(shí) Flink 集群不允許繼續(xù)提交任務(wù),導(dǎo)致任務(wù)修改之后,只能 Cancel 當(dāng)前任務(wù)。重新提交修改后的任務(wù),創(chuàng)建一個(gè)新的 Flink 集群進(jìn)行運(yùn)行。
經(jīng)過分析,耗時(shí)主要是由于以下兩部分原因造成:
? Client 需要在 Yarn 上啟動(dòng)一個(gè) Flink 集群,這一部分是客戶端耗時(shí)最多的部分,因?yàn)檫@一部分包括上傳 jar,上傳文件到Hdfs 上,申請(qǐng)資源啟動(dòng) Flink 集群,都是比較耗時(shí)的步驟
? 集群運(yùn)行的時(shí)候需要申請(qǐng)資源等操作也十分耗時(shí)
我們思考如果僅僅是一些任務(wù)參數(shù)或者 Sql 邏輯的修改,而不涉及代碼上的修改,那么 PerJob 任務(wù)是否可以類似 Session 模式進(jìn)行改造,支持 JobGraph 的重新提交,解決 Client 需要啟動(dòng)一個(gè) Flink 集群的耗時(shí)問題,大大提高提交效率。
同時(shí)復(fù)用了整個(gè) Flink 集群的資源,如果并行度改變,只需要申請(qǐng)新增加的資源,已有的資源不需要再重復(fù)向 Yarn 的 Resourcemanager 申請(qǐng)。
熱重啟改造后的流程
Flink 中 Per-Job 任務(wù)運(yùn)行的整體流程大概如下所示:
客戶端流程
? Client 端創(chuàng)建 JobGraph
? 上傳 JobGraph 到 hdfs 里
? 通過 YarnClient 提交一個(gè) YarnApplication,運(yùn)行一個(gè) Flink 任務(wù)
? 獲取結(jié)果
Flink 集群流程
? 啟動(dòng) Flink 集群,啟動(dòng) WebMonitor,ResourceManager,Dispatcher 組件
? Client 端上傳到遠(yuǎn)程文件服務(wù)里的 JobGraph 會(huì)被反序列出來由 DIspatcher 持有;
? DIspatcher 會(huì)根據(jù)此 JobGraph 創(chuàng)建 JobManagerRunner 對(duì)象進(jìn)行運(yùn)行;
? JobManagerRunner 會(huì)交由內(nèi)部的 ScheduleNg 進(jìn)行調(diào)度運(yùn)行任務(wù):
a.構(gòu)建 ScheduleNg 時(shí),會(huì)將 JobGraph 轉(zhuǎn)為ExecutionGraph
b. ScheduleNg 根據(jù) ExecutionGraph 進(jìn)行調(diào)度,運(yùn)行任務(wù)
? 任務(wù)運(yùn)行,等待任務(wù)運(yùn)行結(jié)束,進(jìn)行相應(yīng)的回調(diào)處理

從上圖我們可以看出,一個(gè) Per-Job 任務(wù)的運(yùn)行主要包括兩部分:一部分是客戶端上傳文件 jar 等操作后,直接上傳任務(wù)到 Yarn 上進(jìn)行 Flink 任務(wù)的啟動(dòng),第二部分是Flink集群的啟動(dòng),然后對(duì)客戶端上傳到遠(yuǎn)程文件的 JobGraph 進(jìn)行處理。因此為了優(yōu)化 Per-Job 下的效率,我們對(duì)這兩部分進(jìn)行了改造。
想法邏輯是,集群首先改造支持 JobGraph 的重新提交,然后 DIspatcher 處理 JobGraph 的時(shí)候,不會(huì)創(chuàng)建新的 JobMaster ,而是將當(dāng)前現(xiàn)有的 JobGraph 里的一些信息填充到新的 JobGraph 里,比如當(dāng)前任務(wù)的 CheckPoint 信息等。任務(wù)最終的調(diào)度運(yùn)行是 JobMaster 里的 ScheduleNg 對(duì)象。因此我們認(rèn)為只需要將 ScheduleNg 重新構(gòu)建,其余的組件都可以復(fù)用。
下圖即為我們熱重啟技術(shù)改造后的一個(gè)大致流程:

熱重啟技術(shù)改造后流程
? WebMonitor 支持任務(wù)的提交
? DIspatcher 將新的 JobGraph 緩存
? 取消當(dāng)前任務(wù),等待異步回調(diào)
? 返回結(jié)果給客戶端
? 在任務(wù)取消的異步回調(diào)里主要是熱重啟的重點(diǎn)改造部分:
a.判斷當(dāng)前是否有新的 JobGraph 緩存,有的話進(jìn)入熱重啟邏輯,無則走當(dāng)前現(xiàn)有邏輯
b.獲取取消任務(wù)的 CheckPoint 信息,填充到新的 JobGraph 里
c.將 jobGrap 更新到 JobMaster 里,清理以前 JobGraph 的緩存信息
d.把 JobMaster 里 SlotPool 管理的資源釋放掉
e.JobMaster 重新創(chuàng)建 ScheduleNg 并調(diào)度運(yùn)行,至此新的 JobGraph 就被成功調(diào)度運(yùn)行了
熱重啟改造部分詳解
JobGraph 介紹
在上述流程中,JobGraph?是整體流轉(zhuǎn)的主要對(duì)象,后續(xù)的一切操作都是圍繞著 JobGraph 進(jìn)行處理,所以這里先對(duì) JobGraph 進(jìn)行介紹。
JobGraph 是 Flink 作業(yè)的內(nèi)部表示,是一個(gè)有向無環(huán)圖(DAG),主要是將一些可以優(yōu)化的算子節(jié)點(diǎn)合并為一個(gè)節(jié)點(diǎn)。從下圖可知,一個(gè)完整的 JobGraph 圖包含了 Source Sink Transform 節(jié)點(diǎn),以及節(jié)點(diǎn)的輸出?IntermrdiateDataset?和輸入邊 JobEdge 。在除了 Application 模式外,其余的提交模式下,JobGraph 是在 Client 創(chuàng)建的,然后通過 Rest 請(qǐng)求提交給 Flink 集群進(jìn)行處理。

看完 JobGraph 此類結(jié)構(gòu),可以得出以下這些信息:
· taskVertices:上圖中的每個(gè)頂點(diǎn)對(duì)應(yīng)一個(gè)?jobVertex,taskVertices 維護(hù)了 jobGraph 圖里的各個(gè) jobVertex
· snapshotSettings:checkponit 相關(guān)的配置信息,如 CheckPoint 的間隔時(shí)間等
· savepointRestoreSettings:任務(wù)恢復(fù)的 checkpoint 文件信息,熱重啟中,新的 jobGraph 會(huì)將上一個(gè)任務(wù)的 checkPoint 位點(diǎn)信息填充到這個(gè)參數(shù)里,新的任務(wù)會(huì)在?CheckPoint?位點(diǎn)處進(jìn)行恢復(fù)運(yùn)行
· jobConfiguration:整個(gè) job 的相關(guān)配置信息
· userJars & calsspath:任務(wù)運(yùn)行過程中需要的一些 jar 以及 classpath 相關(guān)信息
其中 JobVertex 是 jobGraph 里非常重要的對(duì)象,再看下此類結(jié)構(gòu):JobVertex 主要存儲(chǔ)了JobEdge以及 IntermediateDataSet 和并行度等相關(guān)信息。對(duì)于一個(gè) JobVertex 來說,IntermediateDataSet 是作為 JobVertex 的輸出,而 JobEdge 是其輸入。

WebMonitor 改造
WebMonitor 組件是 Flink 的?Web 端點(diǎn),可以通過 Rest Api 進(jìn)行 Flink 集群的狀態(tài)、任務(wù)、指標(biāo)等信息的查詢,同時(shí)支持任務(wù)的提交、取消、觸發(fā) SavePoint 等操作。
Per-Job 模式下 Flink 集群是不支持客戶端繼續(xù)提交任務(wù)運(yùn)行的,因此需要對(duì) WebMonitor 進(jìn)行改造,類似 Session 下支持同一個(gè) Flink 集群能繼續(xù)提交 JobGraph 并運(yùn)行。
從下圖可以看出 WebMonitor 組件啟動(dòng)時(shí),其本質(zhì)是?Netty?為核心的一個(gè) Web 端點(diǎn)。啟動(dòng)時(shí)的主要流程如下:
? 創(chuàng)建 Router,管理 http 請(qǐng)求和處理器 handler 的映射關(guān)系
? initializeHandlers 初始化所有的 handler,不同的集群對(duì)應(yīng)的 WebMonitor 提供的?API 功能不同,所以 handlers 也是不同的
? 將 handlers 注冊(cè)到 router,完成 URL 以及請(qǐng)求方式(GET,POST,DELETE,PUT)和 Handler 的映射關(guān)系
? 創(chuàng)建一個(gè) Netty 的 handler,包裝下 router,然后注冊(cè)到 Netty 的 pipeline 里

WebMonitor 支持的各種 Rest 請(qǐng)求其實(shí)最終是交給一個(gè)個(gè)的 handler 進(jìn)行處理,通過 Router 對(duì)這些 handler 進(jìn)行維護(hù),其內(nèi)部維護(hù)了一個(gè) url 以及 Rest 請(qǐng)求方式與 handler 的映射關(guān)系。接收 Client 端的 Rest 請(qǐng)求之后,Router 找到對(duì)應(yīng)的處理器 handler,交由 handler 進(jìn)行最終的處理并返回結(jié)果。
因?yàn)?Per-job 集群是不支持 Client 端繼續(xù)提交任務(wù)的,所以其 initializeHandlers 方法初始化出的 handlers 不包含處理任務(wù)提交的 handler,導(dǎo)致 router 找不到對(duì)應(yīng)的 handler 報(bào)錯(cuò),因此需要在 initializeHandlers 里將處理任務(wù)提交的 handler 注冊(cè)進(jìn)去 。

JobSubmitHandler 處理請(qǐng)求的主要邏輯如下圖所示。核心是從 Rest 請(qǐng)求的 Body 里反序列化得到 JobGraph,反序列化獲取的 Jobgraph 通過?DIspatcherGateway?發(fā)送給 Dispatcher 進(jìn)行后續(xù)提交處理。

這樣 Client 端只需要重新生成 JobGraph 然后提交即可,避免了重新上傳 jar 到 hdfs,以及避免浪費(fèi)重新向 yarn 集群申請(qǐng)資源啟動(dòng) AppMaster 的時(shí)間。
Dispatcher 改造
DisPatcher 顧名思義是一個(gè)分發(fā)器,其主要功能是 Flink 集群接收到關(guān)于 Job 的提交、取消、觸發(fā) SavePoint 等操作,分發(fā)到對(duì)應(yīng)的各個(gè) JobMaster 進(jìn)行處理,或者創(chuàng)建新的 JobMaster 進(jìn)行任務(wù)的運(yùn)行。
DisPatcher 處理任務(wù)提交的核心流程是根據(jù) JobGraph 創(chuàng)建一個(gè) JobManagerRunner 對(duì)象并啟動(dòng),然后將其包裝成一個(gè) DispatcherJob 緩存在內(nèi)部。任務(wù)的具體調(diào)度執(zhí)行交由創(chuàng)建的 JobManagerRunner 進(jìn)行異步處理。
JobManagerRunner 其內(nèi)部的具體操作其實(shí)是 JobMasterService,主要實(shí)現(xiàn)類就是 JobMaster。JobMaster 內(nèi)部有兩個(gè)主要對(duì)象分別是:
· ScheduleNg: 負(fù)責(zé) JobGraph 轉(zhuǎn)為 ExecutionGraph,然后對(duì) Job 進(jìn)行調(diào)度運(yùn)行
· SlotPool:負(fù)責(zé) Slot 資源的申請(qǐng)以及管理
以上便是 Dispatcher 處理的主要流程。當(dāng)前改造之后只是支持了任務(wù)的重新提交運(yùn)行,但是新的任務(wù)仍然是對(duì)應(yīng)一個(gè)新的 JobMaster,其實(shí)就是一個(gè)類似 Session 的處理,所以為了達(dá)到熱重啟的效果,需要進(jìn)行以下的改造。
主流程的改造邏輯如下:
? 增加了一個(gè) hotRestartJobGraph 字段,將新的 JobGraph 對(duì)象賦予此字段
? Dispatcher 將緩存的正在運(yùn)行的任務(wù) cancel,對(duì)異步返回結(jié)果進(jìn)行回調(diào)處理
? 直接返回 Client 結(jié)果
因?yàn)?Flink 整體是異步處理的,源碼里充滿了大量的 CompletableFuture 回調(diào)的處理,主流程僅僅對(duì)提交的 JobGraph 進(jìn)行了一個(gè)緩存處理,熱重啟的主要步驟在任務(wù)取消的回調(diào)里進(jìn)行處理:
? 判斷 hotRestartJobGraph 是否為空,如果不為空則進(jìn)行熱重啟處理,為空則用以前的邏輯,整個(gè) Per-job 集群關(guān)閉
? 獲取取消任務(wù)的最后一個(gè) CheckPoint 位點(diǎn)
? 將 CheckPoint 位點(diǎn)信息填充到新的 Jobgraph 里
? 反射將上一個(gè) Jobgraph 生成的 JobManagerRunner 和 jobMaster 兩個(gè)對(duì)象的JobGraph 字段用新的 JobGraph 替換掉
? jobMaster 對(duì)象根據(jù) jobGraph 重新生成 scheduleNg 進(jìn)行調(diào)度運(yùn)行
? jobMaster 的 slotPool 在心跳周期內(nèi),會(huì)緩存已經(jīng)釋放掉的 slot,需要把這部分緩存清空
? MiniDispatcher 的 close 方法修改下,如果 hotRestartJobGraph 不為空則不進(jìn)行集群的關(guān)閉
? hotRestartJobGraph 置空
注意上述只是主要的一些改造地方,其余一些邊緣的細(xì)節(jié)處理就不再進(jìn)行贅述。
所以在熱重啟中,DIspatcher 是不會(huì)對(duì)每一個(gè) JobGraph 創(chuàng)建新的 JobMaster 對(duì)象。通過將新的 JobGraph 更新到 JobMaster 里,內(nèi)部僅僅 ScheduleNg 進(jìn)行了重新構(gòu)建,其余的組件都進(jìn)行了復(fù)用,比如 SlotPool。
ScheduleNg 之所以需要重新構(gòu)建是因?yàn)?JobGraph 轉(zhuǎn)為 ExecutionGraph 是需要 ScheduleNg 在構(gòu)建的時(shí)候創(chuàng)建的,因此需要重新構(gòu)建一個(gè) ScheduleNg 進(jìn)行任務(wù)的調(diào)度執(zhí)行,這樣達(dá)到了整個(gè)資源的復(fù)用性,大大提升了效率。
Slot 資源的復(fù)用
Flink 中對(duì)于資源的抽象主要是 Slot,其各個(gè)組件對(duì) Slot 的管理是由不同的組件處理的:
· Flink 的 ResourceManager 里是 SlotManager 管理,主要是任務(wù)的資源申請(qǐng)以及管理
· JobMaster 里管理 Slot 是 SlotPool ,主要是對(duì)當(dāng)前任務(wù)申請(qǐng)的 slot 進(jìn)行管理
· TaskExecutor 里則是S lotTable 對(duì) Slot 進(jìn)行管理,維護(hù) JobId 和 Slot 的關(guān)系
在熱重啟中,上一個(gè)任務(wù)取消之后,JobMaster 里 SlotPool 管理的 Slot 狀態(tài)由已分配改為可用。這樣在 JobMaster 通過新的 ScheduleNg 進(jìn)行重新調(diào)度,會(huì)復(fù)用 SlotPool 里緩存的 Slot,這個(gè)時(shí)候其實(shí)是有問題的。在 TaskExecutor 接收到任務(wù)的時(shí)候會(huì)報(bào)錯(cuò),在其內(nèi)部的 JobTable?里找不到新任務(wù)的 JobId,因?yàn)榇藭r(shí) TaskExecutor 維護(hù)的 Jobid 還是上一個(gè)任務(wù)的。
所以 JobMaster 的 SlotPool 需要釋放掉其內(nèi)部緩存信息,注意只是清理內(nèi)部緩存,此時(shí) TaskManager 的 Slot 槽資源還沒被釋放,仍然被 Resourcemanager 的 SlotManager 管理著。這樣 SlotPool 發(fā)現(xiàn)內(nèi)部沒可用的 Slot 槽就會(huì)和 ResourceManager 的 SlotManager 申請(qǐng)資源,SlotManager 就仍然復(fù)用了以前的 Slot 槽并且將新的 JobGraph 的 jobId 通過 rpc 請(qǐng)求注冊(cè)進(jìn)了 TaskExecutor。從而達(dá)到了 slot 槽資源的復(fù)用,減少了 Flink 集群的 ResourceManager 重新向 Yarn 的 ResourceManager 申請(qǐng)資源。
總結(jié)
數(shù)棧在 Per-job 模式下,為了盡快看到任務(wù)修改后的效果,在業(yè)務(wù)允許情況下,通過熱重啟技術(shù)復(fù)用相關(guān)資源,減少了大量時(shí)間,極大地提高了效率。在開發(fā)驗(yàn)證中,以前一個(gè)任務(wù)等待任務(wù)結(jié)束以及重新提交運(yùn)行總流程超過4分鐘,但是在熱重啟情況下控制在1分鐘以內(nèi)就已經(jīng)可以進(jìn)行調(diào)度執(zhí)行。
未來我們將會(huì)把熱重啟的場(chǎng)景進(jìn)一步豐富,支持更多場(chǎng)景下的熱重啟技術(shù),如 jar 的代碼修改,如何更新環(huán)境里的 jar,支持 k8s 場(chǎng)景等。
袋鼠云一直以來高度重視產(chǎn)品升級(jí)和用戶體驗(yàn),用誠心傾聽用戶需求,新的一年我們將繼續(xù)保持產(chǎn)品升級(jí)節(jié)奏,以提效為目標(biāo)滿足不同行業(yè)用戶的更多需求。為了更好的產(chǎn)品,更佳的用戶體驗(yàn),數(shù)棧一直在路上。
《數(shù)據(jù)治理行業(yè)實(shí)踐白皮書》下載地址:https://fs80.cn/380a4b
想了解或咨詢更多有關(guān)袋鼠云大數(shù)據(jù)產(chǎn)品、行業(yè)解決方案、客戶案例的朋友,瀏覽袋鼠云官網(wǎng):https://www.dtstack.com/?src=szbzhan
同時(shí),歡迎對(duì)大數(shù)據(jù)開源項(xiàng)目有興趣的同學(xué)加入「袋鼠云開源框架釘釘技術(shù) qun」,交流最新開源技術(shù)信息,qun 號(hào)碼:30537511,項(xiàng)目地址:https://github.com/DTStack