Spark RDD是什么?
Spark 的核心是建立在統(tǒng)一的抽象彈性分布式數(shù)據(jù)集(Resiliennt Distributed Datasets,RDD)之上的,這使得 Spark 的各個組件可以無縫地進行集成,能夠在同一個應用程序中完成大數(shù)據(jù)處理。本節(jié)將對 RDD 的基本概念及與 RDD 相關的概念做基本介紹。
RDD 的基本概念
RDD 是 Spark 提供的最重要的抽象概念,它是一種有容錯機制的特殊數(shù)據(jù)集合,可以分布在集群的結點上,以函數(shù)式操作集合的方式進行各種并行操作。
通俗點來講,可以將 RDD 理解為一個分布式對象集合,本質上是一個只讀的分區(qū)記錄集合。每個 RDD 可以分成多個分區(qū),每個分區(qū)就是一個數(shù)據(jù)集片段。一個 RDD 的不同分區(qū)可以保存到集群中的不同結點上,從而可以在集群中的不同結點上進行并行計算。
圖 1 展示了 RDD 的分區(qū)及分區(qū)與工作結點(Worker Node)的分布關系。

圖 1 RDD 分區(qū)及分區(qū)與工作節(jié)點的分布關系
RDD 具有容錯機制,并且只讀不能修改,可以執(zhí)行確定的轉換操作創(chuàng)建新的 RDD。具體來講,RDD 具有以下幾個屬性。
只讀:不能修改,只能通過轉換操作生成新的 RDD。
分布式:可以分布在多臺機器上進行并行處理。
彈性:計算過程中內(nèi)存不夠時它會和磁盤進行數(shù)據(jù)交換。
基于內(nèi)存:可以全部或部分緩存在內(nèi)存中,在多次計算間重用。
RDD 實質上是一種更為通用的迭代并行計算框架,用戶可以顯示控制計算的中間結果,然后將其自由運用于之后的計算。
在大數(shù)據(jù)實際應用開發(fā)中存在許多迭代算法,如機器學習、圖算法等,和交互式數(shù)據(jù)挖掘工具。這些應用場景的共同之處是在不同計算階段之間會重用中間結果,即一個階段的輸出結果會作為下一個階段的輸入。
RDD 正是為了滿足這種需求而設計的。雖然 MapReduce 具有自動容錯、負載平衡和可拓展性的優(yōu)點,但是其最大的缺點是采用非循環(huán)式的數(shù)據(jù)流模型,使得在迭代計算時要進行大量的磁盤 I/O 操作。
通過使用 RDD,用戶不必擔心底層數(shù)據(jù)的分布式特性,只需要將具體的應用邏輯表達為一系列轉換處理,就可以實現(xiàn)管道化,從而避免了中間結果的存儲,大大降低了數(shù)據(jù)復制、磁盤 I/O 和數(shù)據(jù)序列化的開銷。
RDD 基本操作
RDD 的操作分為轉化(Transformation)操作和行動(Action)操作。轉化操作就是從一個 RDD 產(chǎn)生一個新的 RDD,而行動操作就是進行實際的計算。
RDD 的操作是惰性的,當 RDD 執(zhí)行轉化操作的時候,實際計算并沒有被執(zhí)行,只有當 RDD 執(zhí)行行動操作時才會促發(fā)計算任務提交,從而執(zhí)行相應的計算操作。
1. 構建操作
Spark 里的計算都是通過操作 RDD 完成的,學習 RDD 的第一個問題就是如何構建 RDD,構建 RDD 的方式從數(shù)據(jù)來源角度分為以下兩類。
從內(nèi)存里直接讀取數(shù)據(jù)。
從文件系統(tǒng)里讀取數(shù)據(jù),文件系統(tǒng)的種類很多,常見的就是 HDFS 及本地文件系統(tǒng)。
第一類方式是從內(nèi)存里構造 RDD,需要使用 makeRDD 方法,代碼如下所示。
val rdd01 = sc.makeRDD(List(l,2,3,4,5,6))
這個語句創(chuàng)建了一個由“1,2,3,4,5,6”六個元素組成的 RDD。
第二類方式是通過文件系統(tǒng)構造 RDD,代碼如下所示。
val rdd:RDD[String] == sc.textFile("file:///D:/sparkdata.txt",1)
這里例子使用的是本地文件系統(tǒng),所以文件路徑協(xié)議前綴是 file://。
2. 轉換操作
RDD 的轉換操作是返回新的 RDD 的操作。轉換出來的 RDD 是惰性求值的,只有在行動操作中用到這些 RDD 時才會被計算。
許多轉換操作都是針對各個元素的,也就是說,這些轉換操作每次只會操作 RDD 中的一個元素,不過并不是所有的轉換操作都是這樣的。表 1 描述了常用的 RDD 轉換操作。
ype="table" data-size="normal" data-row-style="normal">3. 行動操作
行動操作用于執(zhí)行計算并按指定的方式輸出結果。行動操作接受 RDD,但是返回非 RDD,即輸出一個值或者結果。在 RDD 執(zhí)行過程中,真正的計算發(fā)生在行動操作。表 2 描述了常用的 RDD 行動操作。
aggregate() 函數(shù)的返回類型不需要和 RDD 中的元素類型一致,所以在使用時,需要提供所期待的返回類型的初始值,然后通過一個函數(shù)把 RDD 中的元素累加起來放入累加器。
考慮到每個結點都是在本地進行累加的,所以最終還需要提供第二個函數(shù)來將累加器兩兩合并。
aggregate(zero)(seqOp,combOp) 函數(shù)首先使用 seqOp 操作聚合各分區(qū)中的元素,然后再使用 combOp 操作把所有分區(qū)的聚合結果再次聚合,兩個操作的初始值都是 zero。
seqOp 的操作是遍歷分區(qū)中的所有元素 T,第一個 T 跟 zero 做操作,結果再作為與第二個 T 做操作的 zero,直到遍歷完整個分區(qū)。
combOp 操作是把各分區(qū)聚合的結果再聚合。aggregate() 函數(shù)會返回一個跟 RDD 不同類型的值。因此,需要 seqOp 操作來把分區(qū)中的元素 T 合并成一個 U,以及 combOp 操作把所有 U 聚合。
下面舉一個利用 aggreated() 函數(shù)求平均數(shù)的例子。
val rdd = List (1,2,3,4)
val input = sc.parallelize(rdd)
val result = input.aggregate((0,0))(
(acc,value) => (acc._1 + value,acc._2 + 1),
(acc1,acc2) => (acc1._1 + acc2._1,acc1._2 + acc2._2)
)
result:(Int,Int) = (10,4)
val avg = result._1 / result._2
avg:Int = 2.5
程序的詳細過程大概如下。
定義一個初始值 (0,0),即所期待的返回類型的初始值。代碼 (acc,value) => (acc._1 + value,acc._2 + 1) 中的 value 是函數(shù)定義里面的 T,這里是 List 里面的元素。acc._1 + value,acc._2 + 1 的過程如下。
(0+1,0+1)→(1+2,1+1)→(3+3,2+1)→(6+4,3+1),結果為(10,4)。
實際的 Spark 執(zhí)行過程是分布式計算,可能會把 List 分成多個分區(qū),假如是兩個:p1(1,2) 和 p2(3,4)。
經(jīng)過計算,各分區(qū)的結果分別為 (3,2) 和 (7,2)。這樣,執(zhí)行 (acc1,acc2) => (acc1._1 + acc2._2,acc1._2 + acc2._2) 的結果就是 (3+7,2+2),即 (10,4),然后可計算平均值。
RDD 血緣關系
RDD 的最重要的特性之一就是血緣關系(Lineage ),它描述了一個 RDD 是如何從父 RDD 計算得來的。如果某個 RDD 丟失了,則可以根據(jù)血緣關系,從父 RDD 計算得來。
圖 2 給出了一個 RDD 執(zhí)行過程的實例。系統(tǒng)從輸入中邏輯上生成了 A 和 C 兩個 RDD, 經(jīng)過一系列轉換操作,邏輯上生成了 F 這個 RDD。
Spark 記錄了 RDD 之間的生成和依賴關系。當 F 進行行動操作時,Spark 才會根據(jù) RDD 的依賴關系生成 DAG,并從起點開始真正的計算。

圖 2 RDD血緣關系
上述一系列處理稱為一個血緣關系(Lineage),即 DAG 拓撲排序的結果。在血緣關系中,下一代的 RDD 依賴于上一代的 RDD。例如,在圖 2 中,B 依賴于 A,D 依賴于 C,而 E 依賴于 B 和 D。
RDD依賴類型
根據(jù)不同的轉換操作,RDD 血緣關系的依賴分為窄依賴和寬依賴。窄依賴是指父 RDD 的每個分區(qū)都只被子 RDD 的一個分區(qū)所使用。寬依賴是指父 RDD 的每個分區(qū)都被多個子 RDD 的分區(qū)所依賴。
map、filter、union 等操作是窄依賴,而 groupByKey、reduceByKey 等操作是寬依賴,如圖 3 所示。
join 操作有兩種情況,如果 join 操作中使用的每個 Partition 僅僅和固定個 Partition 進行 join,則該 join 操作是窄依賴,其他情況下的 join 操作是寬依賴。
所以可得出一個結論,窄依賴不僅包含一對一的窄依賴,還包含一對固定個數(shù)的窄依賴,也就是說,對父 RDD 依賴的 Partition 不會隨著 RDD 數(shù)據(jù)規(guī)模的改變而改變。
1. 窄依賴
1)子 RDD 的每個分區(qū)依賴于常數(shù)個父分區(qū)(即與數(shù)據(jù)規(guī)模無關)。
2)輸入輸出一對一的算子,且結果 RDD 的分區(qū)結構不變,如 map、flatMap。
3)輸入輸出一對一的算子,但結果 RDD 的分區(qū)結構發(fā)生了變化,如 union。
4)從輸入中選擇部分元素的算子,如 filter、distinct、subtract、sample。
2. 寬依賴
1)子 RDD 的每個分區(qū)依賴于所有父 RDD 分區(qū)。

圖 3 RDD窄依賴和寬依賴
2)對單個 RDD 基于 Key 進行重組和 reduce,如 groupByKey、reduceByKey。
3)對兩個 RDD 基于 Key 進行 join 和重組,如 join。
Spark 的這種依賴關系設計,使其具有了天生的容錯性,大大加快了 Spark 的執(zhí)行速度。RDD 通過血緣關系記住了它是如何從其他 RDD 中演變過來的。當這個 RDD 的部分分區(qū)數(shù)據(jù)丟失時,它可以通過血緣關系獲取足夠的信息來重新運算和恢復丟失的數(shù)據(jù)分區(qū),從而帶來性能的提升。
相對而言,窄依賴的失敗恢復更為高效,它只需要根據(jù)父 RDD 分區(qū)重新計算丟失的分區(qū)即可,而不需要重新計算父 RDD 的所有分區(qū)。而對于寬依賴來講,單個結點失效,即使只是 RDD 的一個分區(qū)失效,也需要重新計算父 RDD 的所有分區(qū),開銷較大。
寬依賴操作就像是將父 RDD 中所有分區(qū)的記錄進行了“洗牌”,數(shù)據(jù)被打散,然后在子 RDD 中進行重組。
階段劃分
用戶提交的計算任務是一個由 RDD 構成的 DAG,如果 RDD 的轉換是寬依賴,那么這個寬依賴轉換就將這個 DAG 分為了不同的階段(Stage)。由于寬依賴會帶來“洗牌”,所以不同的 Stage 是不能并行計算的,后面 Stage 的 RDD 的計算需要等待前面 Stage 的 RDD 的所有分區(qū)全部計算完畢以后才能進行。
這點就類似于在 MapReduce 中,Reduce 階段的計算必須等待所有 Map 任務完成后才能開始一樣。
在對 Job 中的所有操作劃分 Stage 時,一般會按照倒序進行,即從 Action 開始,遇到窄依賴操作,則劃分到同一個執(zhí)行階段,遇到寬依賴操作,則劃分一個新的執(zhí)行階段。后面的 Stage 需要等待所有的前面的 Stage 執(zhí)行完之后才可以執(zhí)行,這樣 Stage 之間根據(jù)依賴關系就構成了一個大粒度的 DAG。
下面通過圖 4 詳細解釋一下階段劃分。
假設從 HDFS 中讀入數(shù)據(jù)生成 3 個不同的 RDD(A、C 和 E),通過一系列轉換操作后得到新的 RDD(G),并把結果保存到 HDFS 中。可以看到這幅 DAG 中只有 join 操作是一個寬依賴,Spark 會以此為邊界將其前后劃分成不同的階段。
同時可以注意到,在 Stage2 中,從 map 到 union 都是窄依賴,這兩步操作可以形成一個流水線操作,通過 map 操作生成的分區(qū)可以不用等待整個 RDD 計算結束,而是繼續(xù)進行 union 操作,這樣大大提高了計算的效率。

圖 4 DAG階級劃分
把一個 DAG 圖劃分成多個 Stage 以后,每個 Stage 都代表了一組由關聯(lián)的、相互之間沒有寬依賴關系的任務組成的任務集合。在運行的時候,Spark 會把每個任務集合提交給任務調度器進行處理。
RDD緩存
Spark RDD 是惰性求值的,而有時候希望能多次使用同一個 RDD。如果簡單地對 RDD 調用行動操作,Spark 每次都會重算 RDD 及它的依賴,這樣就會帶來太大的消耗。為了避免多次計算同一個 RDD,可以讓 Spark 對數(shù)據(jù)進行持久化。
Spark 可以使用 persist 和 cache 方法將任意 RDD 緩存到內(nèi)存、磁盤文件系統(tǒng)中。緩存是容錯的,如果一個 RDD 分片丟失,則可以通過構建它的轉換來自動重構。被緩存的 RDD 被使用時,存取速度會被大大加速。一般情況下,Executor 內(nèi)存的 60% 會分配給 cache,剩下的 40% 用來執(zhí)行任務。
cache 是 persist 的特例,將該 RDD 緩存到內(nèi)存中。persist 可以讓用戶根據(jù)需求指定一個持久化級別,如表 3 所示。
"block" data-draft-type="table" data-size="normal" data-row-style="normal">
對于 MEMORY_AND_DISK 和 MEMORY_AND_DISK_SER 級別,系統(tǒng)會首先把數(shù)據(jù)保存在內(nèi)存中,如果內(nèi)存不夠則把溢出部分寫入磁盤中。
另外,為了提高緩存的容錯性,可以在持久化級別名稱的后面加上“_2”來把持久化數(shù)據(jù)存為兩份,如 MEMORY_ONLY_2。
Spark 的不同 StorageLevel 的目的是為了滿足內(nèi)存使用和CPU效率權衡上的不同需求??梢酝ㄟ^以下步驟來選擇合適的持久化級別。
1)如果 RDD 可以很好地與默認的存儲級別(MEMORY_ONLY)契合,就不需要做任何修改了。這已經(jīng)是 CPU 使用效率最高的選項,它使得 RDD 的操作盡可能快。
2)如果 RDD 不能與默認的存儲級別很好契合,則嘗試使用 MEMORY_ONLY_SER,并且選擇一個快速序列化的庫使得對象在有比較高的空間使用率的情況下,依然可以較快被訪問。
3)盡可能不要將數(shù)據(jù)存儲到硬盤上,除非計算數(shù)據(jù)集函數(shù)的計算量特別大,或者它們過濾了大量的數(shù)據(jù)。否則,重新計算一個分區(qū)的速度與從硬盤中讀取的速度基本差不多。
4)如果想有快速故障恢復能力,則使用復制存儲級別。所有的存儲級別都有通過重新計算丟失數(shù)據(jù)恢復錯誤的容錯機制,但是復制存儲級別可以讓任務在 RDD 上持續(xù)運行,而不需要等待丟失的分區(qū)被重新計算。
5)在不使用 cached RDD 的時候,及時使用 unpersist 方法來釋放它。
在這里免費領取大數(shù)據(jù)資料:1127558097