Spark優(yōu)化
一、spark的優(yōu)化的目的是什么?
減少內(nèi)存的使用,減少shuffle,減少節(jié)點(diǎn)之間數(shù)據(jù)傳輸?shù)臄?shù)據(jù)量。
二、spark優(yōu)化方式
2.1 資源優(yōu)化
在提交任務(wù)時(shí)可以通過給參數(shù)賦值對(duì)任務(wù)進(jìn)行優(yōu)化,比如
指定當(dāng)前application一共使用多個(gè)executor
--num-executor
指定啟動(dòng)一個(gè)executor需要用多少個(gè)core
--executor-cores?
指定啟動(dòng)一個(gè)executor需要用多少內(nèi)存,這里的core就對(duì)應(yīng)了服務(wù)器的線程數(shù),一般core的數(shù)量和任務(wù)的分區(qū)數(shù)為一個(gè)core對(duì)應(yīng)1-3個(gè)分區(qū)task,避免資源浪費(fèi)
--executor-memeory
./spark-submit --master spark://xxx? --executor-cores xxx --executor-memory xx .....
2.2 并行度優(yōu)化
除了提交任務(wù)時(shí)指定資源外,還要設(shè)置在代碼中設(shè)置相關(guān)的并行度,不然并行度很小,給再多的資源也用不了。
設(shè)置并行度的方式分以下幾種情況:
1)在算子后面直接帶上分區(qū)數(shù)個(gè)數(shù)
? ? 2)spark.default.parallelism
這個(gè)參數(shù)表示的意思是代碼中RDD默認(rèn)生成的分區(qū)數(shù),在本地環(huán)境中跟local后面帶上的數(shù)是一樣的,在集群(standalone和yarn)中是跟executor所在節(jié)點(diǎn)核的總數(shù)是一致的。
????3)sparkStreaming中direct模式
與讀取的topic的分區(qū)數(shù)保持一致。
2.3 代碼優(yōu)化
1)對(duì)代碼中頻繁用到的RDD進(jìn)行持久化,當(dāng)一個(gè)job跑完,其它job再用相同的RDD時(shí)就可以直接到內(nèi)存中讀取數(shù)據(jù)而不用再重新執(zhí)行。
持久化的方式有:
①cache:默認(rèn)調(diào)用的就是persist()=persist(StorgeLevel.memeory_level)
②persist:memory_only,memory_and_disk,memory_only_ser,memory_and_disk_ser
③checkpoint:一般在sparkStreaming中存儲(chǔ)狀態(tài)使用
2)盡量避免使用shuffle算子
大多數(shù)shuffle還是避免不了的主要就是join,有些情況可以用map類算子+廣播變量的方式代替join。
3)多使用高性能算子
比如在保存數(shù)據(jù)或者將數(shù)據(jù)插入數(shù)據(jù)庫時(shí),使用mapPartition代替map,foreachPartition代替foreach;在處理大量小文件或者在對(duì)數(shù)據(jù)進(jìn)行過濾后,可以使用coalesce算子連減小分區(qū),在數(shù)據(jù)量大,分區(qū)少的時(shí)候可以用repartition來增加分區(qū)等等
2.4 shuffle優(yōu)化
?主要可以優(yōu)化的參數(shù)有:每次shuffle過程拉取數(shù)據(jù)的緩存大小,默認(rèn)48M,數(shù)據(jù)量大的時(shí)候可以適當(dāng)增大,較少數(shù)據(jù)拉取的次數(shù)。同時(shí)還可以調(diào)節(jié)拉取任務(wù)失敗重試次數(shù)以及重試等待時(shí)間等。
2.5?內(nèi)存優(yōu)化
spark的統(tǒng)一內(nèi)存分布如下:
1)300M預(yù)留
2)(總-300M)*0.6(-- spark.memory.fraction)
?其中0.5是用于RDD緩存和廣播變量(--spark.memory.storageFraction)
? 0.5是用于shuffle。
3)(總-300M)*0.4
? ?這是task運(yùn)行內(nèi)存
所以要提高task運(yùn)行內(nèi)存的話,可以將spark.memeory.fraction的比例調(diào)低一點(diǎn)。
2.6?堆外內(nèi)存優(yōu)化
reduce端去map端拉取數(shù)據(jù)的時(shí)候,是從map端的堆外內(nèi)存中獲取的,堆外內(nèi)存的大小默認(rèn)是executor內(nèi)存大小的0.1,最小為384M。真正處理大數(shù)據(jù)的時(shí)候,這經(jīng)常出現(xiàn)問題,當(dāng)內(nèi)存不足時(shí)executor會(huì)掛掉,會(huì)報(bào)shuffle file cannot find的問題。也就是shuffleReadTask拉取數(shù)據(jù)的時(shí)候文件找不到,所以可以把堆外內(nèi)存調(diào)大點(diǎn)
--conf? spark.executor.memoryOverhead=2048M
2.7 數(shù)據(jù)傾斜處理
spark中早成數(shù)據(jù)傾斜的原因是某些分區(qū)的數(shù)據(jù)量明顯大于其它分區(qū),導(dǎo)致某些task處理時(shí)間過長。
主要分為以下兩大類
1)使用reduceByKey或者groupByKey這類算子造成數(shù)據(jù)傾斜
這類情況可以通過雙重聚合或調(diào)整并行度的方式解決,如果是并行度太低導(dǎo)致多個(gè)key落到同一個(gè)分區(qū)造成的數(shù)據(jù)傾斜,可以調(diào)大并行度解決。如果是由于少數(shù)key造成的數(shù)據(jù)傾斜也可以通過雙重聚合的方式解決,也就是通過先為每個(gè)key增加隨機(jī)數(shù)前綴先做一次聚合,減少數(shù)據(jù)量,然后再把隨機(jī)數(shù)前綴去掉聚合。
2)join過程產(chǎn)生數(shù)據(jù)傾斜
join造成數(shù)據(jù)傾斜也能分為以下幾種
①數(shù)據(jù)量大的RDDjoin數(shù)據(jù)量小的RDD
這種情況可以將數(shù)據(jù)量小的RDD進(jìn)行廣播,然后數(shù)據(jù)量較大的RDD通過map類算子結(jié)合廣播變量替換join計(jì)算,避免shuffle過程,也就避免了數(shù)據(jù)傾斜。
②數(shù)據(jù)量大的RDDjoin數(shù)據(jù)量大的RDD
如果造成數(shù)據(jù)傾斜是由其中一個(gè)RDD少部分key的數(shù)據(jù)量過大引起的,這個(gè)時(shí)候可以對(duì)數(shù)據(jù)進(jìn)行分拆,將造成數(shù)據(jù)傾斜的RDD的那些key單獨(dú)形成一個(gè)RDD,未造成數(shù)據(jù)傾斜的形成一個(gè)RDD,兩部分?jǐn)?shù)據(jù)單獨(dú)計(jì)算,其中造成數(shù)據(jù)傾斜的那一部分可以采取隨機(jī)數(shù)+數(shù)膨脹的方式解決數(shù)據(jù)傾斜問題,最后再將計(jì)算結(jié)果union即可。

如果造成數(shù)據(jù)傾斜是由很多key,那分拆就沒什么用了,這個(gè)時(shí)候可以通過隨機(jī)數(shù)+擴(kuò)容的方式解決,將其中一個(gè)RDD的每條數(shù)據(jù)的key加上一個(gè)n以內(nèi)的隨機(jī)前綴,將另外一個(gè)RDD的每條數(shù)據(jù)擴(kuò)容為n條數(shù)據(jù),并依次打上0-n的前綴。最后將兩個(gè)處理的RDDjoin即可。
