最美情侣中文字幕电影,在线麻豆精品传媒,在线网站高清黄,久久黄色视频

歡迎光臨散文網(wǎng) 會(huì)員登陸 & 注冊(cè)

一文搞懂Spark CheckPoint

2023-06-27 10:20 作者:滌生大數(shù)據(jù)  | 我要投稿


CheckPoint主要應(yīng)用

checkpoint在spark中主要有兩方面應(yīng)用:一是在spark core中對(duì)RDD做checkpoint,可以切斷做checkpoint RDD的依賴關(guān)系,將RDD數(shù)據(jù)保存到可靠存儲(chǔ)(如HDFS)以便數(shù)據(jù)恢復(fù);另外是應(yīng)用在spark streaming中,使用checkpoint用來(lái)保存DStreamGraph以及相關(guān)配置信息,以便在Driver崩潰重啟的時(shí)候能夠接著之前進(jìn)度繼續(xù)進(jìn)行處理(如之前waiting batch的job會(huì)在重啟后繼續(xù)處理)。?RDD checkPoint實(shí)際上是利用 hdfs 的冗余來(lái)實(shí)現(xiàn)高可用;文件rdd保持的是該rdd的信息;

如果 rdd1.checkpoint(), 那么后面依賴 rdd1的 rdd2 在計(jì)算時(shí)加載 rdd1實(shí)際上是從 checkpoint產(chǎn)生的eliableCheckpointRDD而來(lái),(而不是從 rdd0->rdd1重新計(jì)算);

如果 rdd1.persist()并且checkpoint了,會(huì)優(yōu)先加載 cache 里面的,然后是 checkpoint 里面的。

Checkpoint 到底是什么和需要用 Checkpoint 解決什么問(wèn)題?

1.Spark 在生產(chǎn)環(huán)境下經(jīng)常會(huì)面臨 Transformation 的 RDD 非常多(例如一個(gè)Job 中包含1萬(wàn)個(gè)RDD) 或者是具體的 Transformation 產(chǎn)生的RDD 本身計(jì)算特別復(fù)雜和耗時(shí)(例如計(jì)算時(shí)常超過(guò)1個(gè)小時(shí)) , 可能業(yè)務(wù)比較復(fù)雜,此時(shí)我們必需考慮對(duì)計(jì)算結(jié)果的持久化。

2.Spark 是擅長(zhǎng)多步驟迭代,同時(shí)擅長(zhǎng)基于 Job 的復(fù)用。這個(gè)時(shí)候如果曾經(jīng)可以對(duì)計(jì)算的過(guò)程進(jìn)行復(fù)用,就可以極大的提升效率。因?yàn)橛袝r(shí)候有共同的步驟,就可以免卻重復(fù)計(jì)算的時(shí)間。

3.如果采用persists?把數(shù)據(jù)在內(nèi)存中的話,雖然最快速但是也是最不可靠的;如果放在磁盤上也不是完全可靠的,例如磁盤會(huì)損壞,系統(tǒng)管理員可能會(huì)清空磁盤。

4.Checkpoint 的產(chǎn)生就是為了相對(duì)而言更加可靠的持久化數(shù)據(jù),在 Checkpoint 可以指定把數(shù)據(jù)放在本地并且是多副本的方式,但是在正常生產(chǎn)環(huán)境下放在 HDFS 上,這就天然的借助HDFS 高可靠的特征來(lái)完成最大化的可靠的持久化數(shù)據(jù)的方式。

5.Checkpoint 是為了最大程度保證絕對(duì)可靠的復(fù)用 RDD?計(jì)算數(shù)據(jù)的 Spark 的高級(jí)功能,通過(guò) Checkpoint 可以把數(shù)據(jù)持久化到 HDFS 上來(lái)保證數(shù)據(jù)的最大程度的安任性。

6.Checkpoint 就是針對(duì)整個(gè)RDD 計(jì)算鏈條中特別需要數(shù)據(jù)持久化的環(huán)節(jié)(后面會(huì)反覆使用當(dāng)前環(huán)節(jié)的RDD) 開(kāi)始基于HDFS 等的數(shù)據(jù)持久化復(fù)用策略,通過(guò)對(duì) RDD 啟動(dòng) Checkpoint 機(jī)制來(lái)實(shí)現(xiàn)容錯(cuò)和高可用;

注:對(duì)于Shuffle Dependency加Checkpoint是一個(gè)值得提倡的做法

?Checkpoint運(yùn)行原理圖

Checkpoint源碼解析?

1、以rdd中的 iterator 方法為例,它會(huì)先在緩存中查看數(shù)據(jù) (內(nèi)部會(huì)查看 Checkpoint 有沒(méi)有相關(guān)數(shù)據(jù)),然后再?gòu)?CheckPoint 中查看數(shù)據(jù)。

Checkpoint 有兩種形式,一種是 reliably 和 一種是 locally

2、通過(guò)調(diào)用 SparkContext.setCheckpointDir 方法來(lái)指定進(jìn)行 Checkpoint 操作的 RDD 把數(shù)據(jù)放在那里,在生產(chǎn)集群中是放在 HDFS 上的,同時(shí)為了提高效率在進(jìn)行 Checkpoint 的時(shí)候可以指定很多目錄

3、在進(jìn)行 RDD 的 checkpoint 的時(shí)候,其所依賴的所有 RDD 都會(huì)清空掉;官方建議如果要進(jìn)行 checkpoint 時(shí),必需先緩存在內(nèi)存中。但實(shí)際可以考慮緩存在本地磁盤上或者是第三方組件如 Taychon 上。在進(jìn)行 checkpoint 之前需要通過(guò) SparkConetxt 設(shè)置 checkpoint 的文件夾

4、作為最住實(shí)際,一般在進(jìn)行 checkpoint 方法調(diào)用前通過(guò)都要進(jìn)行 persists 來(lái)把當(dāng)前 RDD 的數(shù)據(jù)持久化到內(nèi)存,這是因?yàn)?checkpoint 是 lazy 級(jí)別,必需有 Job 的執(zhí)行且在Job 執(zhí)行完成后才會(huì)從后往前回溯那個(gè) RDD 進(jìn)行了Checkpoint 標(biāo)指,然后對(duì)該標(biāo)記了要進(jìn)行 Checkpoint 的 RDD 新啟動(dòng)一個(gè)Job 執(zhí)行具體 Checkpoint 的過(guò)程

5、Checkpoint 改變了 RDD 的 Lineage6、當(dāng)我們調(diào)用了checkpoint 方法要對(duì)RDD 進(jìn)行Checkpoint 操作的話,此時(shí)框架會(huì)自動(dòng)生成 RDDCheckpointData

7、觸發(fā)時(shí)機(jī)當(dāng) RDD 上運(yùn)行一個(gè)Job 后就會(huì)立即觸發(fā) RDDCheckpointData 中的 checkpoint 方法,在其內(nèi)部會(huì)調(diào)用 doCheckpoint( )方法,實(shí)際上在生產(chǎn)環(huán)境上會(huì)調(diào)用 ReliableRDDCheckpointData 的 doCheckpoint( )方法

8、在生產(chǎn)環(huán)境下會(huì)導(dǎo)致 ReliableRDDCheckpointData 的 writeRDDToCheckpointDirectory 的調(diào)用,而在?writeRDDToCheckpointDirectory 方法內(nèi)部會(huì)觸發(fā)runJob 來(lái)執(zhí)行當(dāng)前的RDD 中的數(shù)據(jù)寫到Checkpoint 的目錄中,同時(shí)會(huì)產(chǎn)生ReliableCheckpointRDD 實(shí)例

在writeRDDToCheckpointDirectory方法中可以看到:將作為一個(gè)單獨(dú)的任務(wù)(RunJob)將RDD中每個(gè)parition的數(shù)據(jù)依次寫入到checkpoint目錄(writePartitionToCheckpointFile),此外如果該RDD中的partitioner如果不為空,則也會(huì)將該對(duì)象序列化后存儲(chǔ)到checkpoint目錄。所以,在做checkpoint的時(shí)候,寫入的hdfs中的數(shù)據(jù)主要包括:RDD中每個(gè)parition的實(shí)際數(shù)據(jù),以及可能的partitioner對(duì)象(writePartitionerToCheckpointDir)。

9、在寫完checkpoint數(shù)據(jù)到hdfs以后,將會(huì)調(diào)用rdd的markCheckpoined方法,主要斬?cái)嘣搑dd的對(duì)上游的依賴,以及將paritions置空等操作。

checkpoint 讀流程

在做完checkpoint后,獲取原來(lái)RDD的依賴以及partitions數(shù)據(jù)都將從CheckpointRDD中獲取。也就是說(shuō)獲取原來(lái)rdd中每個(gè)partition數(shù)據(jù)以及partitioner等對(duì)象,都將轉(zhuǎn)移到CheckPointRDD中。在CheckPointRDD的一個(gè)具體實(shí)現(xiàn)ReliableRDDCheckpointRDD中的compute方法中可以看到,將會(huì)從hdfs的checkpoint目錄中恢復(fù)之前寫入的partition數(shù)據(jù)。而partitioner對(duì)象(如果有)也會(huì)從之前寫入hdfs的paritioner對(duì)象恢復(fù)。

總的來(lái)說(shuō),checkpoint讀取過(guò)程是比較簡(jiǎn)單的。

持久化和checkpoint的區(qū)別:

1.cache/persist 可以說(shuō)一方面是為了提速,另一方面是為了當(dāng)某一重要步驟過(guò)長(zhǎng),后面的依賴出錯(cuò)(可能是邏輯錯(cuò)誤)情況下,可以無(wú)需從頭算起。e/p。.ersist 可以說(shuō)一方面是為

2.checkpoint:則更多的是為了高可用。其核心是 hdfs 的 replicaton。其情形是集群總某個(gè)點(diǎn)的硬件設(shè)備壞掉,例如 persist 中某個(gè)盤壞了,整個(gè)應(yīng)用仍然是可用的。Checkpoint的產(chǎn)生就是為了相對(duì)而言更加可靠的持久化數(shù)據(jù),在Checkpoint的時(shí)候可以指定把數(shù)據(jù)放在本地,并且是多副本的方式,但是在生產(chǎn)環(huán)境下是放在HDFS上,這就天然的借助了HDFS高容錯(cuò)、高可靠的特征來(lái)完成了最大化的可靠的持久化數(shù)據(jù)的方式。e/p。.ersist 可以說(shuō)一方面是為

3.checkpoint是為了最大程度保證絕對(duì)可靠的復(fù)用RDD計(jì)算數(shù)據(jù)的Spark高級(jí)功能,通過(guò)checkpoint通常把數(shù)據(jù)持久化到HDFS來(lái)保證數(shù)據(jù)最大程度的安全性。e/p。.ersist 可以說(shuō)一方面是為

4.checkpoint就是針對(duì)整個(gè)RDD計(jì)算鏈條中特別需要數(shù)據(jù)持久化的環(huán)節(jié)(后面會(huì)反復(fù)使用當(dāng)前環(huán)節(jié)的RDD)開(kāi)始基于HDFS等的數(shù)據(jù)持久化復(fù)用策略,通過(guò)對(duì)RDD啟動(dòng)checkpoint機(jī)制來(lái)實(shí)現(xiàn)容錯(cuò)和高可用。e/p。.ersist 可以說(shuō)一方面是為

5.持久化只是將數(shù)據(jù)保存在BlockManager中;但是RDD的lineage是不會(huì)變化的。Checkpoint完畢之后,RDD已經(jīng)沒(méi)有之前的lineage(血緣關(guān)系),而只有一個(gè)強(qiáng)行為其設(shè)置的CheckpointRDD, 也就是說(shuō)checkpoint之后,lineage發(fā)生了改變。e/p。.ersist 可以說(shuō)一方面是為

6.rdd.persist(StorageLevel.DISK_ONLY)雖然可以將 RDD 的 partition 持久化到磁盤,但該 partition 由 blockManager 管理。一旦 driver program 執(zhí)行結(jié)束,也就是 executor 所在進(jìn)程 CoarseGrainedExecutorBackend stop,blockManager 也會(huì) stop,被 cache 到磁盤上的 RDD 也會(huì)被清空(整個(gè) blockManager 使用的 local 文件夾被刪除)。而 checkpoint 將 RDD 持久化到 HDFS 或本地文件夾,如果不被手動(dòng) remove 掉,是一直存在的,也就是說(shuō)可以被下一個(gè) driver program 使用,而 cached RDD 不能被其他 dirver program 使用。e/p。.ersist 可以說(shuō)一方面是為

假如進(jìn)行一個(gè)1萬(wàn)個(gè)步驟,在9000個(gè)步驟的時(shí)候persist,數(shù)據(jù)還是有可能丟失的,但是如果checkpoint,數(shù)據(jù)丟失的概率幾乎為0。

注:Spark相比Hadoop的優(yōu)勢(shì)在于盡量不去持久化,所以使用 pipeline,cache 等機(jī)制。用戶如果感覺(jué) job 可能會(huì)出錯(cuò)可以手動(dòng)去 checkpoint 一些重要的RDD,job如果出錯(cuò),下次運(yùn)行時(shí)直接從 checkpoint 中讀取數(shù)據(jù)。唯一不足的是,checkpoint 需要兩次運(yùn)行 job。

Spark的容錯(cuò)的機(jī)制

Spark 采用Lineage(血統(tǒng))CheckPoint(檢查點(diǎn))兩種方式來(lái)解決分布式數(shù)據(jù)集中的容錯(cuò)問(wèn)題。Lineage本質(zhì)上類似于數(shù)據(jù)庫(kù)的重做日志(redo log),只不過(guò)這個(gè)日志粒度很大,是對(duì)整個(gè)RDD分區(qū)做重做進(jìn)而恢復(fù)數(shù)據(jù)的。

在容錯(cuò)機(jī)制中,如果集群中一個(gè)節(jié)點(diǎn)死機(jī)了,而且運(yùn)算窄依賴,則只需要把丟失的父RDD分區(qū)重算即可,不依賴于其他節(jié)點(diǎn)。但對(duì)寬依賴,則需要父RDD的所有分區(qū)都重算,這個(gè)代價(jià)就很昂貴了。因此,Spark 提供設(shè)置檢查點(diǎn)的方式來(lái)保存Shuffle前的祖先RDD數(shù)據(jù),將依賴關(guān)系刪除。當(dāng)數(shù)據(jù)丟失時(shí),直接從檢查點(diǎn)中恢復(fù)數(shù)據(jù)。為了確保檢查點(diǎn)不會(huì)因?yàn)楣?jié)點(diǎn)死機(jī)而丟失,檢查點(diǎn)數(shù)據(jù)保存在磁盤中,通常是hdfs文件。

Lineage

RDD Lineage被稱為RDD運(yùn)算圖或RDD依賴關(guān)系圖,是RDD所有父RDD的圖。它是在RDD上執(zhí)行transformations函數(shù)并創(chuàng)建邏輯執(zhí)行計(jì)劃(logical execution plan)的結(jié)果,是RDD的邏輯執(zhí)行計(jì)劃。相比其它系統(tǒng)的細(xì)顆粒度的內(nèi)存數(shù)據(jù)更新級(jí)別的備份或者LOG機(jī)制,RDD的Lineage記錄的是粗顆粒度的特定數(shù)據(jù)Transformation和Action操作。當(dāng)這個(gè)RDD的部分分區(qū)數(shù)據(jù)丟失時(shí),它可以通過(guò)Lineage找到丟失的父RDD的分區(qū)進(jìn)行局部計(jì)算來(lái)恢復(fù)丟失的數(shù)據(jù),這樣可以節(jié)省資源提高運(yùn)行效率。這種粗顆粒的數(shù)據(jù)模型,限制了Spark的運(yùn)用場(chǎng)合,但同時(shí)相比細(xì)顆粒度的數(shù)據(jù)模型,也帶來(lái)了性能的提升。

依賴關(guān)系決定Lineage的復(fù)雜程度,同時(shí)也是的RDD具有了容錯(cuò)性。因?yàn)楫?dāng)某一個(gè)分區(qū)里的數(shù)據(jù)丟失了,Spark程序會(huì)根據(jù)依賴關(guān)系進(jìn)行局部計(jì)算來(lái)恢復(fù)丟失的數(shù)據(jù)。

容錯(cuò)原理

在Spark的容錯(cuò)機(jī)制中,當(dāng)一個(gè)節(jié)點(diǎn)宕機(jī)了,進(jìn)行容錯(cuò)恢復(fù)時(shí),對(duì)于窄依賴來(lái)講,進(jìn)行重計(jì)算時(shí)只要把丟失的父RDD分區(qū)重算即可,不依賴于其他節(jié)點(diǎn)。而對(duì)于Shuffle Dependency來(lái)說(shuō),進(jìn)行重計(jì)算時(shí)需要父RDD的分區(qū)都存在,這樣計(jì)算量就太大了比較耗費(fèi)性能。

如下圖所示,如果 RDD_1 中的 Partition3 出錯(cuò)丟失,對(duì)于OneToOne的窄依賴,Spark 會(huì)回溯到 Partition3 的父分區(qū) RDD_0 的 Partition3

對(duì)于部分RangeToOne的窄依賴則會(huì)回溯到父分區(qū)RDD_0的Partition0和Partition3,對(duì) RDD_0 的 Partition0和Partition3 重算算子,得到 RDD_1 的 Partition3。其他分區(qū)丟失也是同理重算進(jìn)行容錯(cuò)恢復(fù)。

對(duì)于全RangeToOne的窄依賴,由于其父分區(qū)是 RDD_0 的所有分區(qū),所以需要回溯到 RDD_0,重算 RDD_0 的所有分區(qū),然后將 RDD_1 的 Partition3 需要的數(shù)據(jù)聚集合并為 RDD_1 的 Partition3。在這個(gè)過(guò)程中,由于 RDD_0 中不是 RDD_1 中 Partition3 需要的數(shù)據(jù)也全部進(jìn)行了重算,所以產(chǎn)生了大量冗余數(shù)據(jù)重算的開(kāi)銷。

一文搞懂Spark CheckPoint的評(píng)論 (共 條)

分享到微博請(qǐng)遵守國(guó)家法律
顺义区| 原平市| 乳山市| 浑源县| 米林县| 离岛区| 大冶市| 乌鲁木齐县| 泰兴市| 宽甸| 达州市| 巨鹿县| 阜宁县| 蕲春县| 新野县| 右玉县| 于都县| 临泽县| 化德县| 古浪县| 金沙县| 庆云县| 新疆| 齐齐哈尔市| 双牌县| 栾川县| 江北区| 宁明县| 瓮安县| 梓潼县| 柞水县| 克什克腾旗| 阿克苏市| 龙井市| 鄯善县| 吴川市| 方城县| 郁南县| 宁都县| 柳江县| 东明县|