工作經(jīng)驗(yàn)分享:Spark調(diào)優(yōu)【效率提升!建議收藏】
1.優(yōu)化背景
業(yè)務(wù)數(shù)據(jù)不斷增大, Spark運(yùn)行時(shí)間越來越長, 從最初的半小時(shí)到6個(gè)多小時(shí)
某日Spark程序運(yùn)行6.5個(gè)小時(shí)后, 報(bào)“Too large frame...”的異常:
org.apache.spark.shuffle.FetchFailedException: Too large frame: 2624680416
2.原因分析
2.1 拋出異常的原因
根本原因: 源數(shù)據(jù)的某一列(或某幾列)分布不均勻,當(dāng)某個(gè)shuffle操作是根據(jù)此列數(shù)據(jù)進(jìn)行shuffle時(shí),就會(huì)造成整個(gè)數(shù)據(jù)集發(fā)生傾斜,即某些partition包含了大量數(shù)據(jù),超出了2G的限制。
異常,就是發(fā)生在業(yè)務(wù)數(shù)據(jù)處理的最后一步left join操作。
?
2.2 臨時(shí)解決方案
增大partition數(shù), 讓partition中的數(shù)據(jù)量<2g
由于是left join觸發(fā)了shuffle操作, 而spark默認(rèn)join時(shí)的分區(qū)數(shù)為200(即spark.sql.shuffle.partitions=200), 所以增大這個(gè)分區(qū)數(shù), 即調(diào)整該參數(shù)為800, 即spark.sql.shuffle.partitions=800
2.3 臨時(shí)解決方案
Spark不再報(bào)錯(cuò),而且“艱難”的跑完了, 跑了近6個(gè)小時(shí)!
通過Spark UI頁面的監(jiān)控發(fā)現(xiàn), 由于數(shù)據(jù)傾斜導(dǎo)致, 整個(gè)Spark任務(wù)的運(yùn)行時(shí)間是被少數(shù)的幾個(gè)Task “拖累的”。

3.思考優(yōu)化
3.1?確認(rèn)數(shù)據(jù)傾斜
方法一: 通過sample算子對DataSet/DataFrame/RDD進(jìn)行采樣, 找出top n的key值及數(shù)量;
方法二: 源數(shù)據(jù)/中間數(shù)據(jù)落到存儲(chǔ)中(如HIVE), 直接查詢觀察。
3.2 可選優(yōu)化方法
1.HIVE ETL 數(shù)據(jù)預(yù)處理
把數(shù)據(jù)傾斜提前到 HIVE ETL中, 避免Spark發(fā)生數(shù)據(jù)傾斜
這個(gè)其實(shí)很有用,如果不是自己負(fù)責(zé),請友好提醒你的上游負(fù)責(zé)同事

2.過濾無效的數(shù)據(jù) (where / filter)
a.NULL值數(shù)據(jù)
b.“臟數(shù)據(jù)”(非法數(shù)據(jù))
c.業(yè)務(wù)無關(guān)的數(shù)據(jù)
無效數(shù)據(jù)需要結(jié)合自己負(fù)責(zé)的業(yè)務(wù)和場景去判斷哈,慎重處理業(yè)務(wù)的無關(guān)數(shù)據(jù)??!以免造成客戶Bug
3.分析join操作, 左右表的特征, 判斷是否可以進(jìn)行小表廣播 broadcast
a.這樣可避免shuffle操作,特別是當(dāng)大表特別大;
b.默認(rèn)情況下, join時(shí)候, 如果表的數(shù)據(jù)量低于;
spark.sql.autoBroadcastJoinThreshold參數(shù)值時(shí)(默認(rèn)值為10 MB), spark會(huì)自動(dòng)進(jìn)行broadcast, 但也可以通過強(qiáng)制手動(dòng)指定廣播;
業(yè)務(wù)數(shù)據(jù)量是100MB
c.Driver上有一個(gè)campaign_df全量的副本, 每個(gè)Executor上也會(huì)有一個(gè)campaign_df的副本;
d.JOIN操作, Spark默認(rèn)都會(huì)進(jìn)行 merge_sort (也需要避免傾斜)。
4.數(shù)據(jù)打散, 擴(kuò)容join
分散傾斜的數(shù)據(jù), 給key加上隨機(jī)數(shù)前綴

1)提高shuffle操作并行度
2)多階段
aggregate操作: 先局部聚合, 再全局聚合;
給key打隨機(jī)值, 如打上1-10, 先分別針對10個(gè)組做聚合;
最后再統(tǒng)一聚合;
join操作: 切成多個(gè)部分, 分開join, 最后union。
判斷出造成數(shù)據(jù)傾斜的一些key值 (可通過觀察或者sample取樣);
如主號,單獨(dú)拎出來上述key值的記錄做join, 剩余記錄再做join;
獨(dú)立做優(yōu)化, 如broadcast;
結(jié)果數(shù)據(jù)union即可。
3.3?實(shí)際優(yōu)化方法
示例:
4.優(yōu)化后效果
優(yōu)化處理思維導(dǎo)圖

文末給大家分享一份經(jīng)典的Spark性能優(yōu)化文檔,該文從開發(fā)、資源、數(shù)據(jù)傾斜和shuffle階段四個(gè)角度分析了如何優(yōu)化Spark程序,真實(shí)實(shí)用!

