大數(shù)據(jù)離線階段 04:初識MapReduce,Apache Hadoop YARN
初識MapReduce
MapReduce計算模型介紹
理解MapReduce思想
MapReduce的思想核心是“分而治之”。
所謂“分而治之”就是把一個復(fù)雜的問題按一定的“分解”方法分為規(guī)模較小的若干部分,然后逐個解決,分別找出各部分的解,再把把各部分的解組成整個問題的解。

這種樸素的思想來源于人們生活與工作的經(jīng)驗,也完全適合于技術(shù)領(lǐng)域。諸如軟件的體系結(jié)構(gòu)設(shè)計、模塊化設(shè)計都是分而治之的具體表現(xiàn)。即使是發(fā)布過論文實現(xiàn)分布式計算的谷歌也只是實現(xiàn)了這種思想,而不是自己原創(chuàng)。
概況起來,MapReduce所包含的思想分為兩步:
Map負責(zé)“分”,即把復(fù)雜的任務(wù)分解為若干個“簡單的任務(wù)”來并行處理。可以進行拆分的前提是這些小任務(wù)可以并行計算,彼此間幾乎沒有依賴關(guān)系。
Reduce負責(zé)“合”,即對map階段的結(jié)果進行全局匯總。
這兩個階段合起來正是MapReduce思想的體現(xiàn)。

還有一個比較形象的語言解釋MapReduce:要數(shù)停車場中的所有停放車的總數(shù)量。
你數(shù)第一列,我數(shù)第二列…這就是Map階段,人越多,能夠同時數(shù)車的人就越多,速度就越快。
數(shù)完之后,聚到一起把所有人的統(tǒng)計數(shù)加在一起。這就是Reduce合并匯總階段。
Hadoop MapReduce設(shè)計構(gòu)思
MapReduce是一個分布式運算程序的編程框架,核心功能是將用戶編寫的業(yè)務(wù)邏輯代碼和自帶默認組件整合成一個完整的分布式運算程序,并發(fā)運行在Hadoop集群上。
既然是做計算的框架,那么表現(xiàn)形式就是有個輸入(input),MapReduce操作這個輸入(input),通過本身定義好的計算模型,得到一個輸出(output)。
對許多開發(fā)者來說,自己完完全全實現(xiàn)一個并行計算程序難度太大,而MapReduce就是一種簡化并行計算的編程模型,降低了開發(fā)并行應(yīng)用的入門門檻。
Hadoop MapReduce構(gòu)思體現(xiàn)在如下的三個方面:
如何對付大數(shù)據(jù)處理:分而治之
對相互間不具有計算依賴關(guān)系的大數(shù)據(jù),實現(xiàn)并行最自然的辦法就是采取分而治之的策略。并行計算的第一個重要問題是如何劃分計算任務(wù)或者計算數(shù)據(jù)以便對劃分的子任務(wù)或數(shù)據(jù)塊同時進行計算。不可分拆的計算任務(wù)或相互間有依賴關(guān)系的數(shù)據(jù)無法進行并行計算!
構(gòu)建抽象模型:Map和Reduce
MapReduce借鑒了函數(shù)式語言中的思想,用Map和Reduce兩個函數(shù)提供了高層的并行編程抽象模型。 Map: 對一組數(shù)據(jù)元素進行某種重復(fù)式的處理; Reduce: 對Map的中間結(jié)果進行某種進一步的結(jié)果整理。 MapReduce中定義了如下的Map和Reduce兩個抽象的編程接口,由用戶去編程實現(xiàn): map: (k1; v1) → [(k2; v2)] reduce: (k2; [v2]) → [(k3; v3)] Map和Reduce為程序員提供了一個清晰的操作接口抽象描述。通過以上兩個編程接口,大家可以看出MapReduce處理的數(shù)據(jù)類型是<key,value>鍵值對。
統(tǒng)一構(gòu)架,隱藏系統(tǒng)層細節(jié)
如何提供統(tǒng)一的計算框架,如果沒有統(tǒng)一封裝底層細節(jié),那么程序員則需要考慮諸如數(shù)據(jù)存儲、劃分、分發(fā)、結(jié)果收集、錯誤恢復(fù)等諸多細節(jié);為此,MapReduce設(shè)計并提供了統(tǒng)一的計算框架,為程序員隱藏了絕大多數(shù)系統(tǒng)層面的處理細節(jié)。
MapReduce最大的亮點在于通過抽象模型和計算框架把需要做什么(what need to do)與具體怎么做(how to do)分開了,為程序員提供一個抽象和高層的編程接口和框架。程序員僅需要關(guān)心其應(yīng)用層的具體計算問題,僅需編寫少量的處理應(yīng)用本身計算問題的程序代碼。
如何具體完成這個并行計算任務(wù)所相關(guān)的諸多系統(tǒng)層細節(jié)被隱藏起來,交給計算框架去處理:從分布代碼的執(zhí)行,到大到數(shù)千小到單個節(jié)點集群的自動調(diào)度使用。
官方MapReduce示例
在Hadoop的安裝包中,官方提供了MapReduce程序的示例examples,以便快速上手體驗MapReduce。
該示例是使用java語言編寫的,被打包成為了一個jar文件。
/export/server/hadoop-3.3.0/share/hadoop/mapreduce

運行該jar包程序,可以傳入不同的參數(shù)實現(xiàn)不同的處理功能。
hadoop jar hadoop-mapreduce-examples-3.3.0.jar args…
示例1:評估圓周率π(PI)
圓周率π大家都不陌生,如何去估算π的值呢?
Monte Carlo方法的基本思想:
當(dāng)所求解問題是某種隨機事件出現(xiàn)的概率,或者是某個隨機變量的期望值時,通過某種“實驗”的方法,以這種事件出現(xiàn)的頻率估計這一隨機事件的概率,或者得到這個隨機變量的某些數(shù)字特征,并將其作為問題的解。

假設(shè)正方形邊長為1,圓半徑也為1,那么1/4圓的面積為:

在正方形內(nèi)隨機撒點,分布于1/4圓內(nèi)的數(shù)量假設(shè)為a ,分布于圓外的數(shù)量為b,N則是所產(chǎn)生的總數(shù):N=a+b。
那么數(shù)量a與N的比值應(yīng)與1/4圓面積及正方形面積成正比,于是:

下面來運行MapReduce程序評估一下圓周率的值,執(zhí)行中可以去YARN頁面上觀察程序的執(zhí)行的情況。
第一個參數(shù)pi:表示MapReduce程序執(zhí)行圓周率計算;
第二個參數(shù):用于指定map階段運行的任務(wù)次數(shù),并發(fā)度,這是是10;
第三個參數(shù):用于指定每個map任務(wù)取樣的個數(shù),這里是50。



示例2:單詞詞頻統(tǒng)計WordCount
WordCount算是大數(shù)據(jù)統(tǒng)計分析領(lǐng)域的經(jīng)典需求了,相當(dāng)于編程語言的HelloWorld。其背后的應(yīng)用場景十分豐富,比如統(tǒng)計頁面點擊數(shù),搜索詞排行榜等跟count相關(guān)的需求。
其最基本的應(yīng)用雛形就是統(tǒng)計文本數(shù)據(jù)中,相同單詞出現(xiàn)的總次數(shù)。用SQL的角度來理解的話,相當(dāng)于根據(jù)單詞進行g(shù)roup by分組,相同的單詞分為一組,然后每個組內(nèi)進行count聚合統(tǒng)計。
對于MapReduce乃至于大數(shù)據(jù)計算引擎來說,業(yè)務(wù)需求本身是簡單的,重點是當(dāng)數(shù)據(jù)量大了之后,如何使用分而治之的思想來處理海量數(shù)據(jù)進行單詞統(tǒng)計。

上傳課程資料中的文本文件到HDFS文件系統(tǒng)的/input目錄下,如果沒有這個目錄,使用shell創(chuàng)建:
準備好之后,執(zhí)行官方MapReduce實例,對上述文件進行單詞次數(shù)統(tǒng)計:
第一個參數(shù):wordcount表示執(zhí)行單詞統(tǒng)計
第二個參數(shù):指定輸入文件的路徑
第三個參數(shù):指定輸出結(jié)果的路徑(該路徑不能已存在)


可以在課程資料中查看java代碼的具體實現(xiàn),后續(xù)課程中也會學(xué)習(xí)如何使用java編寫MapReduce程序。
MapReduce Python接口接入
前言
雖然Hadoop是用Java編寫的一個框架, 但是并不意味著他只能使用Java語言來操作, 在Hadoop-0.14.1版本后, Hadoop支持了Python和C++語言, 在Hadoop的文檔中也表示可以使用Python進行開發(fā)。
https://hadoop.apache.org/docs/r3.3.0/hadoop-streaming/HadoopStreaming.html
在Hadoop的文檔中提到了Hadoop Streaming, 我們可以使用流的方式來操作它.語法是:
在Python中的sys包中存在, stdin和stdout,輸入輸出流, 我們可以利用這個方式來進行MapReduce的編寫.
代碼實現(xiàn)
mapper.py
reducer.py
程序執(zhí)行
方式1:本地測試Python腳本邏輯是否正確。
方式2:使用hadoop streaming提交Python腳本集群運行。
注意:不管哪種方式執(zhí)行,都需要提前在Centos系統(tǒng)上安裝好Python3.詳細安裝步驟可以參考課程資料。
本地測試
hadoop streaming提交
執(zhí)行結(jié)果:


MapReduce基本原理
整體執(zhí)行流程圖


Map階段執(zhí)行流程

第一階段是把輸入目錄下文件按照一定的標準逐個進行邏輯切片,形成切片規(guī)劃。默認情況下,Split size = Block size。每一個切片由一個MapTask處理。(getSplits)
第二階段是對切片中的數(shù)據(jù)按照一定的規(guī)則解析成<key,value>對。默認規(guī)則是把每一行文本內(nèi)容解析成鍵值對。key是每一行的起始位置(單位是字節(jié)),value是本行的文本內(nèi)容。(TextInputFormat)
第三階段是調(diào)用Mapper類中的map方法。上階段中每解析出來的一個<k,v>,調(diào)用一次map方法。每次調(diào)用map方法會輸出零個或多個鍵值對。
第四階段是按照一定的規(guī)則對第三階段輸出的鍵值對進行分區(qū)。默認是只有一個區(qū)。分區(qū)的數(shù)量就是Reducer任務(wù)運行的數(shù)量。默認只有一個Reducer任務(wù)。
第五階段是對每個分區(qū)中的鍵值對進行排序。首先,按照鍵進行排序,對于鍵相同的鍵值對,按照值進行排序。比如三個鍵值對<2,2>、<1,3>、<2,1>,鍵和值分別是整數(shù)。那么排序后的結(jié)果是<1,3>、<2,1>、<2,2>。如果有第六階段,那么進入第六階段;如果沒有,直接輸出到文件中。
第六階段是對數(shù)據(jù)進行局部聚合處理,也就是combiner處理。鍵相等的鍵值對會調(diào)用一次reduce方法。經(jīng)過這一階段,數(shù)據(jù)量會減少。本階段默認是沒有的。
Reduce階段執(zhí)行流程

第一階段是Reducer任務(wù)會主動從Mapper任務(wù)復(fù)制其輸出的鍵值對。Mapper任務(wù)可能會有很多,因此Reducer會復(fù)制多個Mapper的輸出。
第二階段是把復(fù)制到Reducer本地數(shù)據(jù),全部進行合并,即把分散的數(shù)據(jù)合并成一個大的數(shù)據(jù)。再對合并后的數(shù)據(jù)排序。
第三階段是對排序后的鍵值對調(diào)用reduce方法。鍵相等的鍵值對調(diào)用一次reduce方法,每次調(diào)用會產(chǎn)生零個或者多個鍵值對。最后把這些輸出的鍵值對寫入到HDFS文件中。
Shuffle機制map階段處理的數(shù)據(jù)如何傳遞給reduce階段,是MapReduce框架中最關(guān)鍵的一個流程,這個流程就叫shuffle。 shuffle: 洗牌、發(fā)牌——(核心機制:數(shù)據(jù)分區(qū),排序,合并)。

shuffle是Mapreduce的核心,它分布在Mapreduce的map階段和reduce階段。一般把從Map產(chǎn)生輸出開始到Reduce取得數(shù)據(jù)作為輸入之前的過程稱作shuffle。
1.Collect階段:將MapTask的結(jié)果輸出到默認大小為100M的環(huán)形緩沖區(qū),保存的是key/value,Partition分區(qū)信息等。
2.Spill階段:當(dāng)內(nèi)存中的數(shù)據(jù)量達到一定的閥值的時候,就會將數(shù)據(jù)寫入本地磁盤,在將數(shù)據(jù)寫入磁盤之前需要對數(shù)據(jù)進行一次排序的操作,如果配置了combiner,還會將有相同分區(qū)號和key的數(shù)據(jù)進行排序。
3.Merge階段:把所有溢出的臨時文件進行一次合并操作,以確保一個MapTask最終只產(chǎn)生一個中間數(shù)據(jù)文件。
4.Copy階段: ReduceTask啟動Fetcher線程到已經(jīng)完成MapTask的節(jié)點上復(fù)制一份屬于自己的數(shù)據(jù),這些數(shù)據(jù)默認會保存在內(nèi)存的緩沖區(qū)中,當(dāng)內(nèi)存的緩沖區(qū)達到一定的閥值的時候,就會將數(shù)據(jù)寫到磁盤之上。
5.Merge階段:在ReduceTask遠程復(fù)制數(shù)據(jù)的同時,會在后臺開啟兩個線程對內(nèi)存到本地的數(shù)據(jù)文件進行合并操作。
6.Sort階段:在對數(shù)據(jù)進行合并的同時,會進行排序操作,由于MapTask階段已經(jīng)對數(shù)據(jù)進行了局部的排序,ReduceTask只需保證Copy的數(shù)據(jù)的最終整體有效性即可。
Shuffle中的緩沖區(qū)大小會影響到mapreduce程序的執(zhí)行效率,原則上說,緩沖區(qū)越大,磁盤io的次數(shù)越少,執(zhí)行速度就越快
四、Apache Hadoop YARN
1.Yarn通俗介紹

Apache Hadoop YARN (Yet Another Resource Negotiator,另一種資源協(xié)調(diào)者)是一種新的 Hadoop 資源管理器,它是一個通用資源管理系統(tǒng)和調(diào)度平臺,可為上層應(yīng)用提供統(tǒng)一的資源管理和調(diào)度,它的引入為集群在利用率、資源統(tǒng)一管理和數(shù)據(jù)共享等方面帶來了巨大好處。
可以把yarn理解為相當(dāng)于一個分布式的操作系統(tǒng)平臺,而mapreduce等運算程序則相當(dāng)于運行于操作系統(tǒng)之上的應(yīng)用程序,Yarn為這些程序提供運算所需的資源(內(nèi)存、cpu)。
yarn并不清楚用戶提交的程序的運行機制
yarn只提供運算資源的調(diào)度(用戶程序向yarn申請資源,yarn就負責(zé)分配資源)
yarn中的主管角色叫ResourceManager
yarn中具體提供運算資源的角色叫NodeManager
yarn與運行的用戶程序完全解耦,意味著yarn上可以運行各種類型的分布式運算程序,比如mapreduce、storm,spark,tez ……
spark、storm等運算框架都可以整合在yarn上運行,只要他們各自的框架中有符合yarn規(guī)范的資源請求機制即可
yarn成為一個通用的資源調(diào)度平臺.企業(yè)中以前存在的各種運算集群都可以整合在一個物理集群上,提高資源利用率,方便數(shù)據(jù)共享

2.Yarn基本架構(gòu)
YARN是一個資源管理、任務(wù)調(diào)度的框架,主要包含三大模塊:ResourceManager(RM)、NodeManager(NM)、ApplicationMaster(AM)。
ResourceManager負責(zé)所有資源的監(jiān)控、分配和管理;
ApplicationMaster負責(zé)每一個具體應(yīng)用程序的調(diào)度和協(xié)調(diào);
NodeManager負責(zé)每一個節(jié)點的維護。
對于所有的applications,RM擁有絕對的控制權(quán)和對資源的分配權(quán)。而每個AM則會和RM協(xié)商資源,同時和NodeManager通信來執(zhí)行和監(jiān)控task。
3.Yarn三大組件介紹
ResourceManager
ResourceManager負責(zé)整個集群的資源管理和分配,是一個全局的資源管理系統(tǒng)。
NodeManager以心跳的方式向ResourceManager匯報資源使用情況(目前主要是CPU和內(nèi)存的使用情況)。RM只接受NM的資源回報信息,對于具體的資源處理則交給NM自己處理。
YARN Scheduler根據(jù)application的請求為其分配資源,不負責(zé)application job的監(jiān)控、追蹤、運行狀態(tài)反饋、啟動等工作。
NodeManager
NodeManager是每個節(jié)點上的資源和任務(wù)管理器,它是管理這臺機器的代理,負責(zé)該節(jié)點程序的運行,以及該節(jié)點資源的管理和監(jiān)控。YARN集群每個節(jié)點都運行一個NodeManager。
NodeManager定時向ResourceManager匯報本節(jié)點資源(CPU、內(nèi)存)的使用情況和Container的運行狀態(tài)。當(dāng)ResourceManager宕機時NodeManager自動連接RM備用節(jié)點。
NodeManager接收并處理來自ApplicationMaster的Container啟動、停止等各種請求。
ApplicationMaster
用戶提交的每個應(yīng)用程序均包含一個ApplicationMaster,它可以運行在ResourceManager以外的機器上。
負責(zé)與RM調(diào)度器協(xié)商以獲取資源(用Container表示)。
將得到的任務(wù)進一步分配給內(nèi)部的任務(wù)(資源的二次分配)。
與NM通信以啟動/停止任務(wù)。
監(jiān)控所有任務(wù)運行狀態(tài),并在任務(wù)運行失敗時重新為任務(wù)申請資源以重啟任務(wù)。
當(dāng)前YARN自帶了兩個ApplicationMaster實現(xiàn),一個是用于演示AM編寫方法的實例程序DistributedShell,它可以申請一定數(shù)目的Container以并行運行一個Shell命令或者Shell腳本;另一個是運行MapReduce應(yīng)用程序的AM—MRAppMaster。
注:RM只負責(zé)監(jiān)控AM,并在AM運行失敗時候啟動它。RM不負責(zé)AM內(nèi)部任務(wù)的容錯,任務(wù)的容錯由AM完成。
4.Yarn運行流程
client向RM提交應(yīng)用程序,其中包括啟動該應(yīng)用的ApplicationMaster的必須信息,例如ApplicationMaster程序、啟動ApplicationMaster的命令、用戶程序等。
ResourceManager啟動一個container用于運行ApplicationMaster。
啟動中的ApplicationMaster向ResourceManager注冊自己,啟動成功后與RM保持心跳。
ApplicationMaster向ResourceManager發(fā)送請求,申請相應(yīng)數(shù)目的container。
ResourceManager返回ApplicationMaster的申請的containers信息。申請成功的container,由ApplicationMaster進行初始化。container的啟動信息初始化后,AM與對應(yīng)的NodeManager通信,要求NM啟動container。AM與NM保持心跳,從而對NM上運行的任務(wù)進行監(jiān)控和管理。
container運行期間,ApplicationMaster對container進行監(jiān)控。container通過RPC協(xié)議向?qū)?yīng)的AM匯報自己的進度和狀態(tài)等信息。
應(yīng)用運行期間,client直接與AM通信獲取應(yīng)用的狀態(tài)、進度更新等信息。
應(yīng)用運行結(jié)束后,ApplicationMaster向ResourceManager注銷自己,并允許屬于它的container被收回。

5.Yarn 調(diào)度器Scheduler
理想情況下,我們應(yīng)用對Yarn資源的請求應(yīng)該立刻得到滿足,但現(xiàn)實情況資源往往是有限的,特別是在一個很繁忙的集群,一個應(yīng)用資源的請求經(jīng)常需要等待一段時間才能的到相應(yīng)的資源。在Yarn中,負責(zé)給應(yīng)用分配資源的就是Scheduler。其實調(diào)度本身就是一個難題,很難找到一個完美的策略可以解決所有的應(yīng)用場景。為此,Yarn提供了多種調(diào)度器和可配置的策略供我們選擇。
在Yarn中有三種調(diào)度器可以選擇:FIFO Scheduler ,Capacity Scheduler,F(xiàn)air Scheduler。
5.1 FIFO Scheduler
FIFO Scheduler把應(yīng)用按提交的順序排成一個隊列,這是一個先進先出隊列,在進行資源分配的時候,先給隊列中最頭上的應(yīng)用進行分配資源,待最頭上的應(yīng)用需求滿足后再給下一個分配,以此類推。
FIFO Scheduler是最簡單也是最容易理解的調(diào)度器,也不需要任何配置,但它并不適用于共享集群。大的應(yīng)用可能會占用所有集群資源,這就導(dǎo)致其它應(yīng)用被阻塞。在共享集群中,更適合采用Capacity Scheduler或Fair Scheduler,這兩個調(diào)度器都允許大任務(wù)和小任務(wù)在提交的同時獲得一定的系統(tǒng)資源。
5.2 Capacity Scheduler
Capacity 調(diào)度器允許多個組織共享整個集群,每個組織可以獲得集群的一部分計算能力。通過為每個組織分配專門的隊列,然后再為每個隊列分配一定的集群資源,這樣整個集群就可以通過設(shè)置多個隊列的方式給多個組織提供服務(wù)了。除此之外,隊列內(nèi)部又可以垂直劃分,這樣一個組織內(nèi)部的多個成員就可以共享這個隊列資源了,在一個隊列內(nèi)部,資源的調(diào)度是采用的是先進先出(FIFO)策略。

容量調(diào)度器 Capacity Scheduler 最初是由 Yahoo 最初開發(fā)設(shè)計使得 Hadoop 應(yīng)用能夠被多用戶使用,且最大化整個集群資源的吞吐量,現(xiàn)被 IBM BigInsights 和 Hortonworks HDP 所采用。
Capacity Scheduler 被設(shè)計為允許應(yīng)用程序在一個可預(yù)見的和簡單的方式共享集群資源,即"作業(yè)隊列"。Capacity Scheduler 是根據(jù)租戶的需要和要求把現(xiàn)有的資源分配給運行的應(yīng)用程序。Capacity Scheduler 同時允許應(yīng)用程序訪問還沒有被使用的資源,以確保隊列之間共享其它隊列被允許的使用資源。管理員可以控制每個隊列的容量,Capacity Scheduler 負責(zé)把作業(yè)提交到隊列中。
5.3 Fair Scheduler
在Fair調(diào)度器中,我們不需要預(yù)先占用一定的系統(tǒng)資源,F(xiàn)air調(diào)度器會為所有運行的job動態(tài)的調(diào)整系統(tǒng)資源。如下圖所示,當(dāng)?shù)谝粋€大job提交時,只有這一個job在運行,此時它獲得了所有集群資源;當(dāng)?shù)诙€小任務(wù)提交后,F(xiàn)air調(diào)度器會分配一半資源給這個小任務(wù),讓這兩個任務(wù)公平的共享集群資源。
需要注意的是,在下圖Fair調(diào)度器中,從第二個任務(wù)提交到獲得資源會有一定的延遲,因為它需要等待第一個任務(wù)釋放占用的Container。小任務(wù)執(zhí)行完成之后也會釋放自己占用的資源,大任務(wù)又獲得了全部的系統(tǒng)資源。最終效果就是Fair調(diào)度器即得到了高的資源利用率又能保證小任務(wù)及時完成。

公平調(diào)度器 Fair Scheduler 最初是由 Facebook 開發(fā)設(shè)計使得 Hadoop 應(yīng)用能夠被多用戶公平地共享整個集群資源,現(xiàn)被 Cloudera CDH 所采用。
Fair Scheduler 不需要保留集群的資源,因為它會動態(tài)在所有正在運行的作業(yè)之間平衡資源。
5.4 示例:Capacity調(diào)度器配置使用
調(diào)度器的使用是通過yarn-site.xml配置文件中的
yarn.resourcemanager.scheduler.class參數(shù)進行配置的,默認采用Capacity Scheduler調(diào)度器。
假設(shè)我們有如下層次的隊列:
root
├── prod
└── dev
? ?├── mapreduce
? ?└── spark
下面是一個簡單的Capacity調(diào)度器的配置文件,文件名為capacity-scheduler.xml。在這個配置中,在root隊列下面定義了兩個子隊列prod和dev,分別占40%和60%的容量。需要注意,一個隊列的配置是通過屬性yarn.sheduler.capacity.<queue-path>.<sub-property>指定的,<queue-path>代表的是隊列的繼承樹,如root.prod隊列,<sub-property>一般指capacity和maximum-capacity。
我們可以看到,dev隊列又被分成了mapreduce和spark兩個相同容量的子隊列。dev的maximum-capacity屬性被設(shè)置成了75%,所以即使prod隊列完全空閑dev也不會占用全部集群資源,也就是說,prod隊列仍有25%的可用資源用來應(yīng)急。我們注意到,mapreduce和spark兩個隊列沒有設(shè)置maximum-capacity屬性,也就是說mapreduce或spark隊列中的job可能會用到整個dev隊列的所有資源(最多為集群的75%)。而類似的,prod由于沒有設(shè)置maximum-capacity屬性,它有可能會占用集群全部資源。
關(guān)于隊列的設(shè)置,這取決于我們具體的應(yīng)用。比如,在MapReduce中,我們可以通過mapreduce.job.queuename屬性指定要用的隊列。如果隊列不存在,我們在提交任務(wù)時就會收到錯誤。如果我們沒有定義任何隊列,所有的應(yīng)用將會放在一個default隊列中。
注意:對于Capacity調(diào)度器,我們的隊列名必須是隊列樹中的最后一部分,如果我們使用隊列樹則不會被識別。比如,在上面配置中,我們使用prod和mapreduce作為隊列名是可以的,但是如果我們用root.dev.mapreduce或者dev. mapreduce是無效的。
Hadoop High Availability
HA(High Available), 高可用,是保證業(yè)務(wù)連續(xù)性的有效解決方案,一般有兩個或兩個以上的節(jié)點,分為活動節(jié)點(Active)及備用節(jié)點(Standby)。通常把正在執(zhí)行業(yè)務(wù)的稱為活動節(jié)點,而作為活動節(jié)點的一個備份的則稱為備用節(jié)點。當(dāng)活動節(jié)點出現(xiàn)問題,導(dǎo)致正在運行的業(yè)務(wù)(任務(wù))不能正常運行時,備用節(jié)點此時就會偵測到,并立即接續(xù)活動節(jié)點來執(zhí)行業(yè)務(wù)。從而實現(xiàn)業(yè)務(wù)的不中斷或短暫中斷。
Hadoop1.X版本,NN是HDFS集群的單點故障點,每一個集群只有一個NN,如果這個機器或進程不可用,整個集群就無法使用。為了解決這個問題,出現(xiàn)了一堆針對HDFS HA的解決方案(如:Linux HA, VMware FT, shared NAS+NFS, BookKeeper, QJM/Quorum Journal Manager, BackupNode等)。
在HA具體實現(xiàn)方法不同情況下,HA框架的流程是一致的, 不一致的就是如何存儲、管理、同步edits編輯日志文件。
在Active NN和Standby NN之間要有個共享的存儲日志的地方,Active NN把edit Log寫到這個共享的存儲日志的地方,Standby NN去讀取日志然后執(zhí)行,這樣Active和Standby NN內(nèi)存中的HDFS元數(shù)據(jù)保持著同步。一旦發(fā)生主從切換Standby NN可以盡快接管Active NN的工作。
1.Namenode HA
1.1 Namenode HA詳解
hadoop2.x之后,Clouera提出了QJM/Qurom Journal Manager,這是一個基于Paxos算法(分布式一致性算法)實現(xiàn)的HDFS HA方案,它給出了一種較好的解決思路和方案,QJM主要優(yōu)勢如下:
不需要配置額外的高共享存儲,降低了復(fù)雜度和維護成本。
消除spof(單點故障)。
系統(tǒng)魯棒性(Robust)的程度可配置、可擴展。

基本原理就是用2N+1臺 JournalNode 存儲EditLog,每次寫數(shù)據(jù)操作有>=N+1返回成功時即認為該次寫成功,數(shù)據(jù)不會丟失了。當(dāng)然這個算法所能容忍的是最多有N臺機器掛掉,如果多于N臺掛掉,這個算法就失效了。這個原理是基于Paxos算法。
在HA架構(gòu)里面SecondaryNameNode已經(jīng)不存在了,為了保持standby NN時時的與Active NN的元數(shù)據(jù)保持一致,他們之間交互通過JournalNode進行操作同步。
任何修改操作在 Active NN上執(zhí)行時,JournalNode進程同時也會記錄修改log到至少半數(shù)以上的JN中,這時 Standby NN 監(jiān)測到JN 里面的同步log發(fā)生變化了會讀取 JN 里面的修改log,然后同步到自己的目錄鏡像樹里面,如下圖:

當(dāng)發(fā)生故障時,Active的 NN 掛掉后,Standby NN 會在它成為Active NN 前,讀取所有的JN里面的修改日志,這樣就能高可靠的保證與掛掉的NN的目錄鏡像樹一致,然后無縫的接替它的職責(zé),維護來自客戶端請求,從而達到一個高可用的目的。
在HA模式下,datanode需要確保同一時間有且只有一個NN能命令DN。為此:
每個NN改變狀態(tài)的時候,向DN發(fā)送自己的狀態(tài)和一個序列號。
DN在運行過程中維護此序列號,當(dāng)failover時,新的NN在返回DN心跳時會返回自己的active狀態(tài)和一個更大的序列號。DN接收到這個返回則認為該NN為新的active。
如果這時原來的active NN恢復(fù),返回給DN的心跳信息包含active狀態(tài)和原來的序列號,這時DN就會拒絕這個NN的命令。
1.2 Failover Controller
HA模式下,會將FailoverController部署在每個NameNode的節(jié)點上,作為一個單獨的進程用來監(jiān)視NN的健康狀態(tài)。FailoverController主要包括三個組件:
HealthMonitor: 監(jiān)控NameNode是否處于unavailable或unhealthy狀態(tài)。當(dāng)前通過RPC調(diào)用NN相應(yīng)的方法完成。
ActiveStandbyElector: 監(jiān)控NN在ZK中的狀態(tài)。
ZKFailoverController: 訂閱HealthMonitor 和ActiveStandbyElector 的事件,并管理NN的狀態(tài),另外zkfc還負責(zé)解決fencing(也就是腦裂問題)。
上述三個組件都在跑在一個JVM中,這個JVM與NN的JVM在同一個機器上。但是兩個獨立的進程。一個典型的HA集群,有兩個NN組成,每個NN都有自己的ZKFC進程。

ZKFailoverController主要職責(zé):
健康監(jiān)測:周期性的向它監(jiān)控的NN發(fā)送健康探測命令,從而來確定某個NameNode是否處于健康狀態(tài),如果機器宕機,心跳失敗,那么zkfc就會標記它處于一個不健康的狀態(tài)
會話管理:如果NN是健康的,zkfc就會在zookeeper中保持一個打開的會話,如果NameNode同時還是Active狀態(tài)的,那么zkfc還會在Zookeeper中占有一個類型為短暫類型的znode,當(dāng)這個NN掛掉時,這個znode將會被刪除,然后備用的NN將會得到這把鎖,升級為主NN,同時標記狀態(tài)為Active
當(dāng)宕機的NN新啟動時,它會再次注冊zookeper,發(fā)現(xiàn)已經(jīng)有znode鎖了,便會自動變?yōu)镾tandby狀態(tài),如此往復(fù)循環(huán),保證高可靠,需要注意,目前僅僅支持最多配置2個NN
master選舉:通過在zookeeper中維持一個短暫類型的znode,來實現(xiàn)搶占式的鎖機制,從而判斷那個NameNode為Active狀態(tài)
2. Yarn HA
Yarn作為資源管理系統(tǒng),是上層計算框架(如MapReduce,Spark)的基礎(chǔ)。在Hadoop 2.4.0版本之前,Yarn存在單點故障(即ResourceManager存在單點故障),一旦發(fā)生故障,恢復(fù)時間較長,且會導(dǎo)致正在運行的Application丟失,影響范圍較大。從Hadoop 2.4.0版本開始,Yarn實現(xiàn)了ResourceManager HA,在發(fā)生故障時自動failover,大大提高了服務(wù)的可靠性。
ResourceManager(簡寫為RM)作為Yarn系統(tǒng)中的主控節(jié)點,負責(zé)整個系統(tǒng)的資源管理和調(diào)度,內(nèi)部維護了各個應(yīng)用程序的ApplictionMaster信息、NodeManager(簡寫為NM)信息、資源使用等。由于資源使用情況和NodeManager信息都可以通過NodeManager的心跳機制重新構(gòu)建出來,因此只需要對ApplicationMaster相關(guān)的信息進行持久化存儲即可。
在一個典型的HA集群中,兩臺獨立的機器被配置成ResourceManger。在任意時間,有且只允許一個活動的ResourceManger,另外一個備用。切換分為兩種方式:
手動切換:在自動恢復(fù)不可用時,管理員可用手動切換狀態(tài),或是從Active到Standby,或是從Standby到Active。
自動切換:基于Zookeeper,但是區(qū)別于HDFS的HA,2個節(jié)點間無需配置額外的ZFKC守護進程來同步數(shù)據(jù)。

3. Hadoop HA集群的搭建
HA集群搭建的難度主要在于配置文件的編寫,心細,心細,心細!
詳細的搭建安裝步驟請參考附件資料。