尚硅谷大數(shù)據(jù)Spark教程從入門到精通

1、SPARK
基于內(nèi)存的快速通用可擴展的大數(shù)據(jù)分析計算引擎
包含流處理的批處理框架

一次性數(shù)據(jù)計算:
處理數(shù)據(jù)時會從存儲設(shè)備中讀取數(shù)據(jù),進行邏輯操作,然后將處理的結(jié)果重新存儲到介質(zhì)中

處理復(fù)雜邏輯性能低
SPARK對該流程進行了更改,即不是放入磁盤而是放入內(nèi)存中方便后續(xù)的操作
但這么做也可能導(dǎo)致內(nèi)存資源的使用緊張
2、核心模塊

spark core:核心功能,其他功能會基于該核心實現(xiàn)與擴展
spark sql:操作結(jié)構(gòu)化數(shù)據(jù)的組件,使用該組件可以用sql語言來查詢數(shù)據(jù)
spark streaming:針對實時數(shù)據(jù)進行流式計算的組件,提供了豐富的處理數(shù)據(jù)流的API
spqrk MLlib:機器學(xué)習(xí)算法庫,不僅提供了模型評估、數(shù)據(jù)導(dǎo)入等功能,還提供了一些底層機器學(xué)習(xí)的原語
spark graphX:面向圖計算提供的框架以及算法庫
3、wordcount總結(jié)
(1)版本要對不然可能報錯
(2)匿名函數(shù)的使用(下劃線)
(3)maven下載慢就換源
4、SPARK運行環(huán)境
(1)Local模式
不需要其他任何節(jié)點資源就可以在本地執(zhí)行SPARK代碼的環(huán)境,一般用于教學(xué)、調(diào)試、演示等
(2)獨立部署模式
一種只使用自身節(jié)點進行運行的集群模式
【1】打開conf目錄修改slaves和spark-env,去掉它們的后綴并修改內(nèi)部的內(nèi)容,添加上子客戶端的IP地址
【2】使用sbin中的start-all.sh
相關(guān)參數(shù):



配置歷史服務(wù)
【1】修改conf中的spark-default.conf,輸入主機地址、端口和文件名
【2】在hadoop上脫離安全模式并創(chuàng)造對應(yīng)文件名
sbin/start-dfs.sh hadoop fs -mkdir /directory
【3】修改conf中的spark-env文件
【4】分發(fā)
【5】啟動
sbin/start-all.sh
sbin/start-history-server.sh
(3)Yarn模式
獨立部署模式無需其他框架提供資源,但也降低了同第三方資源的耦合性,獨立性強。但Spark是一種計算框架,而不是資源調(diào)度框架,這不是它的槍響,因此Yarn模式是在Yarn環(huán)境下的Spark工作

附注:
在啟動和關(guān)閉spark時要在前面加上sbin/,否則會默認執(zhí)行HADOOP的開啟關(guān)閉
SPARK MASTER內(nèi)部端口號:7077(別和HADOOP 沖突)
SPARK歷史服務(wù)器端口:18080
Hadoop YARN:8088
SPARK 查看spark-shell運行任務(wù) 4040
5、運行架構(gòu)
主從架構(gòu)
Driver

Executor

ApplicationMaster
driver 如果直接和 master進行交互,會減少耦合性,因此需要使用ApplicationMaster作為中間件,增加靈活性
6、核心概念
(1)Executor 和 Core


(2)并行度
并發(fā):多個虛擬核去搶占單個真正核的操作
并行:有多個真正核,多個虛擬核可同時操作

(3)有向無環(huán)圖

各個部件的依賴不能形成回環(huán),否則會導(dǎo)致部件無法運行
(4)SPARK YARN部署 : client&cluster
主要區(qū)別在于Driver程序的運行節(jié)點位置
7、spark 核心編程


Driver: 調(diào)度作用,用于數(shù)據(jù)、邏輯的準備
Executor: 用于執(zhí)行
8、RDD:最小的計算單元

附加:
FileInputStream()
用于讀取文件數(shù)據(jù)
BufferedInputStream()的作用

相當于開辟了一個緩沖區(qū),讀取的數(shù)據(jù)會直接放入緩沖區(qū),只有超過緩沖區(qū)閾值的時候才會進行輸出
這么做的好處是能夠提升文件的讀取效率,不再需要讀一個輸出一個,讀一個輸出一個(字節(jié)流)。類似于批處理。
InputStreamReader()的作用

當使用字符流時,由于不清楚幾個字節(jié)為一個字符,因此需要一個轉(zhuǎn)換流來進行轉(zhuǎn)換組裝成字符
9、RDD與IO關(guān)系
HadoopRDD -textFile 讀取文件數(shù)據(jù)
MapPartitionsRDD -flatMap 扁平化操作,分詞
MapPartitionsRDD -Map 轉(zhuǎn)換功能
ShuffledRDD -reduceByKey 相同key做value聚合
交給collect()輸出

RDD的數(shù)據(jù)處理方式類似于IO流,也有裝飾者設(shè)計模式
RDD的數(shù)據(jù)只有再調(diào)用collect方法時才會真正執(zhí)行業(yè)務(wù)邏輯操作。之前的全部封裝都是功能的擴展
RDD數(shù)據(jù)中間不存儲,可以臨時保存一部分數(shù)據(jù)
10、RDD特點
在讀取數(shù)據(jù)還有,為了方便切分數(shù)據(jù)(分組),數(shù)據(jù)在讀取時便會根據(jù)邏輯分配到不同的分區(qū)中

RDD叫做彈性分布式數(shù)據(jù)集,是Spark中最基本的數(shù)據(jù)處理模型。是一個不可變,可分區(qū),內(nèi)部元素可并行計算的集合】
特點:
(1)彈性
【1】存儲彈性:內(nèi)存磁盤自動切換
【2】容錯彈性:數(shù)據(jù)丟失可自動恢復(fù)
【3】計算彈性:計算出錯重試機制
【4】分片彈性:根據(jù)需要重新分片
(2)分布式
數(shù)據(jù)存儲在大數(shù)據(jù)集群不同節(jié)點上
(3)數(shù)據(jù)集
RDD封裝了計算邏輯,并不保存數(shù)據(jù)
(4)數(shù)據(jù)抽象
RDD是一個抽象類,需要看具體封裝實現(xiàn)
(5)不可變
RDD封裝了計算邏輯,不可改變,若要改變只能產(chǎn)生新的RDD,在新的RDD里封裝計算邏輯
(6)可分區(qū)、并行計算
11、核心屬性
(1)分區(qū)列表
用于執(zhí)行并行計算,是實現(xiàn)分布式計算的重要屬性
(2)分區(qū)計算函數(shù)
(3)RDD依賴關(guān)系(一份依賴列表)
(4)分區(qū)器(將數(shù)據(jù)進行分區(qū)處理)
(5)計算分區(qū)的首選位置(判斷計算發(fā)生到哪個節(jié)點效率最優(yōu))
12、RDD的創(chuàng)建
(1)從內(nèi)存中
(2)從文件中
(3)從集合
13、RDD集合數(shù)據(jù)源是如何設(shè)置分區(qū)并劃分數(shù)據(jù)的
i 代表分區(qū)數(shù)遍歷,從0開始(例子:0,1,2)
start = (i * 數(shù)量長度)/分區(qū)數(shù)量
end = ((i+1)*數(shù)量長度)/分區(qū)數(shù)量
(start , end)包頭不包尾,代表的是用于劃分數(shù)據(jù)的序號,第一個是0,以此類推。假設(shè)(0,2)則包含序號0,1。意味著第一個和第二個數(shù)據(jù)被劃分在該分區(qū)。
14、RDD文件來源如何設(shè)置分區(qū)數(shù)量
默認情況下:
math.min(defaultParallelism,2)
defaultParallelism代表用戶設(shè)置的工作線程數(shù)
用戶設(shè)置的情況:
用戶設(shè)置的分區(qū)數(shù)量實際上是最小數(shù)量,具體分區(qū)公式如下:
所有文件的總字節(jié)數(shù)/(分區(qū)數(shù)量,如果分區(qū)為0就改成1)
15、RDD文件來源如何進行數(shù)據(jù)分配
(1)數(shù)據(jù)以行為單位進行分區(qū),與字節(jié)數(shù)無關(guān)
(2)數(shù)據(jù)讀取時以偏移量為單位,且偏移量不會被重新讀取
(3)數(shù)據(jù)分區(qū)的偏移量范圍計算,包頭包尾,任何行在被讀取后該行所有內(nèi)容將會被一次性全部讀取,哪怕不在偏移量范圍內(nèi)。注意讀取的內(nèi)容包括占兩個字節(jié)的回車
(4)如果數(shù)據(jù)源為多個文件,那么計算分區(qū)時以文件為單位進行分區(qū)
16、RDD方法-》RDD算子(操作)
(1)轉(zhuǎn)換:功能與補充,將舊的RDD包裝成新的RDD
flatmap,map:包裝添加新功能的函數(shù),本身不執(zhí)行什么
(2)行動:觸發(fā)任務(wù)的調(diào)度和作業(yè)的執(zhí)行
collect:用于具體行動的函數(shù)
17、RDD轉(zhuǎn)換算子
根據(jù)處理方式分類:
(1)單value類型
【1】map
def map[ClassTag](f:T->U):RDD[U]
將處理的數(shù)據(jù)按照給出的函數(shù)逐條進行映射轉(zhuǎn)換,這里轉(zhuǎn)換可以是類型的轉(zhuǎn)換也可以是值的轉(zhuǎn)換
小功能:從服務(wù)器日志數(shù)據(jù)apache.log中獲取用戶請求URL的資源路徑
附注:
rdd的計算一個分區(qū)內(nèi)的數(shù)據(jù)是一個一個執(zhí)行邏輯
不同分區(qū)數(shù)據(jù)計算是無序的
【2】mapPartitions
def map Partitions[U:ClassTag](
f:Iterator[T]=>Iterator[U],
preservesPartitioning:Boolean = false):RDD[U]
把一個分區(qū)的數(shù)據(jù)全拿到了在操作
優(yōu)點:可以以分區(qū)為單位進行數(shù)據(jù)轉(zhuǎn)換操作,數(shù)據(jù)少的時候效率高
缺點:由于需要將整個分區(qū)的數(shù)據(jù)加載到內(nèi)存進行引用,如果處理完的數(shù)據(jù)是不會被釋放掉的,存在對象的引用。內(nèi)存較小數(shù)據(jù)較大時可能導(dǎo)致內(nèi)存溢出
小功能:獲取每個分區(qū)內(nèi)的最大值
(2)雙value類型
(3)key-value類型
map和mapPartitions的區(qū)別



【3】mapPartitionsWithIndex
def mapPartitionsWithIndex[U:ClassTag](
f:(Int,Iterator[T])=>iTERATOR[U]
preservesParittioning:Boolean=false):RDD[U]
分區(qū)索引加上了分區(qū)號
將待處理的數(shù)據(jù)以分區(qū)為單位發(fā)送到計算節(jié)點進行處理,這里的處理是指可以進行任意的處理,哪怕只是過濾數(shù)據(jù),在處理時同時可以獲取當前分區(qū)的索引
小功能:獲取第二個數(shù)據(jù)分區(qū)的數(shù)據(jù)
【4】flatMap
def faltMap[U:ClassTag](f:T =>TraversableOnce[U]):RDD[U]
將處理的數(shù)據(jù)進行扁平化后再進行映射,所以也叫做扁平映射
小功能: 將LIST(LIST(1,2),3,LIST(3,4))進行扁平化操作
【5】glom
def glom():RDD[Array[T]]
將同一分區(qū)的數(shù)據(jù)直接轉(zhuǎn)換為相同類型的內(nèi)存數(shù)組進行處理,分區(qū)不變
小功能:計算所有分區(qū)最大值求和(分區(qū)內(nèi)取最大值,分區(qū)間最大值求和)
【6】groupBy
def groupBy[K](f:T=>)(implicit kt:ClassTag[K]):RDD[(K,Iterable[T])]
將數(shù)據(jù)根據(jù)指定的規(guī)則進行分組,分區(qū)默認不變,但是數(shù)據(jù)會被打亂重新組合,這樣的操作稱之為shuffle。極限情況下,數(shù)據(jù)可能被分在一個分區(qū)中
一個組的數(shù)據(jù)在一個分區(qū)中,但是并不是說一個分區(qū)中只有一個組
groupBy 會將數(shù)據(jù)打亂(打散),重新組合,這個操作我們稱之為shuffle
【7】filter
def filter(f:T => Boolean):RDD[T]
將數(shù)據(jù)根據(jù)指定的規(guī)則進行篩選過濾,復(fù)合規(guī)則的數(shù)據(jù)保留,不符合的數(shù)據(jù)丟棄,當數(shù)據(jù)進行篩選過濾后,分區(qū)不變,但是分區(qū)內(nèi)的數(shù)據(jù)可能不均衡,在生產(chǎn)環(huán)境下可能出現(xiàn)數(shù)據(jù)
【8】sample
def sample(
withReplacement:Boolean,
fraction:Double
seed:Long = Utils:random.nextLong):RDD[T]
根據(jù)指定的規(guī)則從數(shù)據(jù)集中抽取數(shù)據(jù)
【9】distinct
def distinct()(implicit ord:Ordering[T] = null):RDD[T]
def distinct(numPartitions:Int)(implicit ord:Ordering[T]=null):RDD[T]
將數(shù)據(jù)集中重復(fù)的數(shù)據(jù)去重
【10】coalesce
def coalesce(numPartitions:Int,shuffle:Boolean=false,
partitionCoalescer:Option[PartitionCoalescer] = Option.empty):RDD[T]
根據(jù)數(shù)據(jù)量縮減分區(qū),用于大數(shù)據(jù)集過濾后,提高小數(shù)據(jù)集的執(zhí)行效率
當spark程序中存在過多的小任務(wù)時,可以通過該方法收縮合并分區(qū),減少分區(qū)的個數(shù),減小任務(wù)調(diào)度的成本
coalesce方法默認情況下不會將分區(qū)的數(shù)據(jù)打亂重新組合,某些情況下縮減分區(qū)可能會導(dǎo)致數(shù)據(jù)不均衡,引發(fā)數(shù)據(jù)傾斜。如果想要讓數(shù)據(jù)均衡,可以進行shuffle處理
【11】repartitions
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
coalesce可用于擴大分區(qū),但是必須要進行shuffle操作,否則無意義
spark提供了簡化操作
縮減分區(qū):coalesce,如果想要數(shù)據(jù)均衡可以采用shuffle
擴大分區(qū):repartition,底層代碼調(diào)用的就是coalesce,并采用shuffle
【12】sortBy
def sortBy[K](
f: (T) =>
ascending: Boolean = true, numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]?
該操作用于排序數(shù)據(jù)。在排序之前,可以將數(shù)據(jù)通過 f 函數(shù)進行處理,之后按照 f 函數(shù)處理 的結(jié)果進行排序,默認為升序排列。排序后新產(chǎn)生的 RDD 的分區(qū)數(shù)與原 RDD 的分區(qū)數(shù)一 致。中間存在 shuffle 的過程
(2)雙value類型
【13】intersection
def intersection(other: RDD[T]): RDD[T]
對源 RDD 和參數(shù) RDD 求交集后返回一個新的 RDD
【14】union
def union(other: RDD[T]): RDD[T]
對源 RDD 和參數(shù) RDD 求并集后返回一個新的 RDD
【15】subtract
def subtract(other: RDD[T]): RDD[T]?
以一個 RDD 元素為主,去除兩個 RDD 中重復(fù)元素,將其他元素保留下來。求差集
【16】zip
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
將兩個 RDD 中的元素,以鍵值對的形式進行合并。其中,鍵值對中的 Key 為第 1 個 RDD 中的元素,Value 為第 2 個 RDD 中的相同位置的元素。
附注:
【1】
//Can't zip RDDs with unequal numbers of partitions: List(2, 4)
//zip要求數(shù)據(jù)源分區(qū)需要保持一致
//Can only zip RDDs with same number of elements in each partition
//兩個數(shù)據(jù)源分區(qū)中的數(shù)據(jù)數(shù)量要保持一致
【2】
交集、并集和差集要求兩個數(shù)據(jù)源的數(shù)據(jù)類型保持一致,但zip可以不一致
(3)key-value類型
【17】partitionBy
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
將數(shù)據(jù)按照指定 Partitioner 重新進行分區(qū)。Spark 默認的分區(qū)器是 HashPartitioner
附注:
//分區(qū)數(shù)量和類型相同時會直接返回
?//除了hash分區(qū)器還有RangePartitioner,一般用于排序
?//想按照自己的規(guī)則分區(qū)怎么搞?自己寫分區(qū)器
【18】reduceByKey
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
可以將數(shù)據(jù)按照相同的 Key 對 Value 進行聚合
reduceByKey中key的數(shù)據(jù)如果只有一個,是不會參與運算的
【19】groupByKey
def groupByKey(): RDD[(K, Iterable[V])] def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
將數(shù)據(jù)源的數(shù)據(jù)根據(jù)key對value進行分組
reduceByKey 和 groupByKey
從shuffle角度:
【1】groupByKey會導(dǎo)致數(shù)據(jù)打亂重組,存在shuffle操作
但是在shuffle操作中,考慮到后續(xù)例如map操作需要等待所有分區(qū)的數(shù)據(jù)重組完畢會占用大量內(nèi)存,因此這些數(shù)據(jù)必須落盤處理,也就是寫到文件中,否則會導(dǎo)致內(nèi)存溢出,盡管這會導(dǎo)致shuffle的性能降低(IO操作)
【2】在reduceByKey中,可以對相同的數(shù)據(jù)進行一個預(yù)聚合,即分區(qū)內(nèi)預(yù)先進行聚合操作,可以減少落盤的數(shù)據(jù)量,從而能夠提高shuffle性能,但是要求分區(qū)內(nèi)和分區(qū)間的計算規(guī)則是相同的
從功能角度:
rduceByKey只是進行分區(qū),不存在數(shù)據(jù)量減少的問題。GroupByKey只能分組,不能聚合,所以在分組聚合的場合下,推薦使用reduceByKey。如果僅僅是分組而不需要聚合,那么還是只能使用groupByKey
【20】aggregateByKey
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
將數(shù)據(jù)根據(jù)不同的規(guī)則進行分區(qū)內(nèi)計算和分區(qū)間計算
【21】foldByKey
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
當分區(qū)內(nèi)計算規(guī)則和分區(qū)間計算規(guī)則相同時,aggregateByKey 就可以簡化為 foldByKey
【22】combineByKey
def combineByKey[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
最通用的對 key-value 型 rdd 進行聚集操作的聚集函數(shù)(aggregation function)。類似于 aggregate(),combineByKey()允許用戶返回值的類型與輸入不一致
這種方式可以在初始化的時候便對數(shù)據(jù)進行轉(zhuǎn)換,從而提高了數(shù)據(jù)轉(zhuǎn)換的效率
reduceByKey、foldByKey、aggregateByKey、combineByKey 的區(qū)別?
reduceByKey: 相同 key 的第一個數(shù)據(jù)不進行任何計算,分區(qū)內(nèi)和分區(qū)間計算規(guī)則相同
FoldByKey: 相同 key 的第一個數(shù)據(jù)和初始值進行分區(qū)內(nèi)計算,分區(qū)內(nèi)和分區(qū)間計算規(guī)則相 同?
AggregateByKey:相同 key 的第一個數(shù)據(jù)和初始值進行分區(qū)內(nèi)計算,分區(qū)內(nèi)和分區(qū)間計算規(guī) 則可以不相同
CombineByKey:當計算時,發(fā)現(xiàn)數(shù)據(jù)結(jié)構(gòu)不滿足要求時,可以讓第一個數(shù)據(jù)轉(zhuǎn)換結(jié)構(gòu)。分區(qū) 內(nèi)和分區(qū)間計算規(guī)則不相同。
【23】join
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length) : RDD[(K, V)]
在一個(K,V)的 RDD 上調(diào)用,K 必須實現(xiàn) Ordered 接口(特質(zhì)),返回一個按照 key 進行排序?
【24】leftOuterJoin
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
類似于 SQL 語句的左外連接?
【25】cogroup
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
在類型為(K,V)和(K,W)的 RDD 上調(diào)用,返回一個(K,(Iterable,Iterable))類型的 RDD
18、RDD行動算子
所謂的行動算子,就是出發(fā)作業(yè)(job)執(zhí)行的方法
底層代碼調(diào)用的是環(huán)境對象的runJob方法
底層代碼中會創(chuàng)建ActiveJob,并提交執(zhí)行
【1】reduce
def reduce(f: (T, T) => T): T?
聚集 RDD 中的所有元素,先聚合分區(qū)內(nèi)數(shù)據(jù),再聚合分區(qū)間數(shù)據(jù)
【2】collect
def collect(): Array[T]
在驅(qū)動程序中,以數(shù)組 Array 的形式返回數(shù)據(jù)集的所有元素?
【3】count
def count(): Long
返回 RDD 中元素的個數(shù)
【4】first
def first(): T
返回 RDD 中的第一個元素?
【5】take
def take(num: Int): Array[T]
返回一個由 RDD 的前 n 個元素組成的數(shù)組?
【6】takeOrder
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
返回該 RDD 排序后的前 n 個元素組成的數(shù)組
【7】aggregate
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
分區(qū)的數(shù)據(jù)通過初始值和分區(qū)內(nèi)的數(shù)據(jù)進行聚合,然后再和初始值進行分區(qū)間的數(shù)據(jù)聚合
??aggregateByKey: 初始值只會參與分區(qū)內(nèi)的計算
??aggregate: 初始值會參與分區(qū)內(nèi)計算,并且會參與分區(qū)間計算
【8】fold
def fold(zeroValue: T)(op: (T, T) => T): T
aggregate 的簡化版操作?,分區(qū)內(nèi)外計算規(guī)則相同
【9】countByKey
def countByKey(): Map[K, Long]?
統(tǒng)計每種 key 的個數(shù)
【10】save算子
def saveAsTextFile(path: String): Unit
def saveAsObjectFile(path: String): Unit
def saveAsSequenceFile(
?path: String,
?codec: Option[Class[_ <: CompressionCodec]] = None): Unit
將數(shù)據(jù)保存到不同格式的文件中
【11】foreach
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
分布式遍歷 RDD 中的每一個元素,調(diào)用指定函數(shù)
為什么RDD的方法稱之為算子:
???RDD的方法和Scala集合對象的方法不一樣
???集合對象的方法都是在同一個節(jié)點的內(nèi)存中完成的
???RDD的方法可以將計算邏輯發(fā)送到Executor(分布式節(jié)點)執(zhí)行
????為了區(qū)分不同的處理效果,所以將RDD的方法稱之為算子
?? ?RDD的方法外部的操作在driver端執(zhí)行,方法內(nèi)的邏輯代碼在executor端執(zhí)行
19、RDD序列化
1) 閉包檢查
從計算的角度, 算子以外的代碼都是在 Driver 端執(zhí)行, 算子里面的代碼都是在 Executor 端執(zhí)行。那么在 scala 的函數(shù)式編程中,就會導(dǎo)致算子內(nèi)經(jīng)常會用到算子外的數(shù)據(jù),這樣就 形成了閉包的效果,如果使用的算子外的數(shù)據(jù)無法序列化,就意味著無法傳值給 Executor 端執(zhí)行,就會發(fā)生錯誤,所以需要在執(zhí)行任務(wù)計算前,檢測閉包內(nèi)的對象是否可以進行序列 化,這個操作我們稱之為閉包檢測。Scala2.12 版本后閉包編譯方式發(fā)生了改變?
20、kryo序列化框架
Java 的序列化能夠序列化任何的類。但是比較重(字節(jié)多),序列化后,對象的提交也 比較大。Spark 出于性能的考慮,Spark2.0 開始支持另外一種 Kryo 序列化機制。Kryo 速度 是 Serializable 的 10 倍。當 RDD 在 Shuffle 數(shù)據(jù)的時候,簡單數(shù)據(jù)類型、數(shù)組和字符串類型 已經(jīng)在 Spark 內(nèi)部使用 Kryo 來序列化。
21、RDD依賴關(guān)系
(1)RDD血緣關(guān)系
相鄰兩個RDD的關(guān)系稱之為依賴關(guān)系
多個連續(xù)的RDD依賴關(guān)系稱之為血緣關(guān)系
每個RDD會保存血緣關(guān)系

RDD不保存數(shù)據(jù)
因此一旦出現(xiàn)報錯需要從頭開始讀取
RDD為了提供容錯性,需要將RDD間的關(guān)系保存下來
一旦出現(xiàn)錯誤就可以根據(jù)血緣關(guān)系將數(shù)據(jù)源重新讀取進行計算

具體血緣關(guān)系保存路程

(2)依賴關(guān)系
新的RDD的一個分區(qū)的數(shù)據(jù)依賴于舊的RDD一個分區(qū)的數(shù)據(jù),這個依賴稱之為OneToOne依賴,又名窄依賴。指代每一個父(上游)RDD的Partition 最多被子(下游)RDD 的一個 Partition 使用, 窄依賴我們形象的比喻為獨生子女。

新的RDD的一個分區(qū)的數(shù)據(jù)依賴于舊的RDD多個分區(qū)的數(shù)據(jù),這個依賴稱之為Shuffle依賴,又名寬依賴,指代同一個父(上游)RDD 的 Partition 被多個子(下游)RDD 的 Partition 依賴,會 引起 Shuffle,總結(jié):寬依賴我們形象的比喻為多生。

當RDD中存在shuffle依賴時,階段會自動增加一個
階段的數(shù)量 = shuffle依賴的數(shù)量 + 1(resultstage)
ResultStage只有一個,最后需要執(zhí)行的階段

22、RDD任務(wù)劃分


任務(wù)的數(shù)量 = 當前階段中最后一個RDD的分區(qū)數(shù)量
23、持久化 cache & persist

如果一個RDD需要重復(fù)使用,那么需要從頭再執(zhí)行來獲取數(shù)據(jù)
RDD對象可以重用,但是數(shù)據(jù)無法重用,因為RDD是不留存數(shù)據(jù)的
解決方法——持久化:在交給下一位時先放入內(nèi)存或文件中

cache 默認持久化的操作只能把數(shù)據(jù)保存在內(nèi)存中,如果想要保存到磁盤文件要用persist
持久化操作必須在行動算子執(zhí)行時完成,不然沒有數(shù)據(jù)
除了重用外,持久化操作還可以再數(shù)據(jù)執(zhí)行較長或數(shù)據(jù)比較重要時也可以使用持久化操作
checkpoint 需要落盤,需要指定檢查點保存路徑
檢查點路徑保存的文件,當作業(yè)執(zhí)行完畢后,不會被刪除
一般保存路徑都是在分布式存儲系統(tǒng):HDFS中
區(qū)別
(1)cache : 將數(shù)據(jù)臨時存儲在內(nèi)存中進行數(shù)據(jù)重用,快但數(shù)據(jù)不夠安全
會在血緣關(guān)系中添加新的依賴,一旦出現(xiàn)問題可以重頭讀取數(shù)據(jù)
(2)persist : 將數(shù)據(jù)存儲在磁盤文件中進行數(shù)據(jù)重用,
涉及磁盤IO,性能較低但數(shù)據(jù)安全,
若作業(yè)執(zhí)行完畢,臨時保存的數(shù)據(jù)文件就會丟失
(3)checkpoint : 將數(shù)據(jù)長久地保存在磁盤文件中進行數(shù)據(jù)重用
涉及磁盤IO,性能較低但數(shù)據(jù)安全,
為了保證數(shù)據(jù)安全,所以一般情況下,會獨立執(zhí)行作業(yè)
為了能夠提高效率,一般情況下是需要和cache聯(lián)合使用的
執(zhí)行過程中,會切斷血緣關(guān)系,重新建立新的血緣關(guān)系
?????????因為checkpoint相當于把計算結(jié)果保存在分布式存儲中,比較安全,相當于數(shù)據(jù)源變化
24、RDD分區(qū)器

由于有時候默認分區(qū)器無法滿足我們的需求,因此我們需要自己寫分區(qū)器
(1)自己用class構(gòu)造分區(qū)器類,并繼承Partitioner類
(2)重寫方法
(3)使用partitionBy(new MyPartitioner)進行引用
25、RDD文件讀取與保存
??rdd.saveAsTextFile("ouput1")
??rdd.saveAsObjectFile("ouput2")
??rdd.saveAsSequenceFile("ouput3")
26、核心編程

累加器:分布式共享只寫變量

在進行累加計算的時候不能直接使用for循環(huán)進行操作。因為實際上所有的累加操作都在Executor端執(zhí)行,沒有返回到Driver端進行聚合,這個時候需要使用累加器ACC
累加器重寫


27、廣播變量
閉包數(shù)據(jù)都是以Task為單位發(fā)送的,每個任務(wù)中包含閉包數(shù)據(jù)
可能會導(dǎo)致一個Executor中含有大量重復(fù)的數(shù)據(jù),并且占用大量的內(nèi)存

Executor其實就是一個JVM,所以在啟動時會自動分配內(nèi)存
完全可以將任務(wù)中的閉包數(shù)據(jù)放置在Executor的內(nèi)存中,達到共享的目的

Spark中的廣播變量就可以將閉包的數(shù)據(jù)保存到Executor的內(nèi)存中
但是它不能更改,是分布式共享只讀變量
28、MVC //三層架構(gòu)
MVC:Model View Controller 架構(gòu)模式
三層架構(gòu): Controller (控制層),service(服務(wù)層),dao(持久層)

29、線程
在對代碼進行冗余抽離過程中,sc是在Application端定義,但在DAO中需要使用,如果直接從Controller到Service進行傳輸會增加耦合度,因此需要單獨開辟線性供sc的調(diào)度

ThreadLocal可以對線程的內(nèi)存進行控制,存儲數(shù)據(jù),共享數(shù)據(jù)
30、總結(jié)
(1)hadoop VS Spark
【1】hadoop在計算時中間結(jié)果會存儲在磁盤中,而Spark會將中間結(jié)果存儲在內(nèi)存中
【2】hadoop本質(zhì)是一個分布式的系統(tǒng)基礎(chǔ)架構(gòu),負責將巨大的數(shù)據(jù)集進行多節(jié)點存儲,降低了硬件成本;spark是一個用scala語言編寫的框架,是大數(shù)據(jù)分析引擎
【3】hadoop采用MapReduce進行數(shù)據(jù)的批處理,而RDD是基于內(nèi)存的彈性數(shù)據(jù)集。
【4】spark相比于hadoop更加適合迭代式、交互式和流式計算等類型的計算
(2)常見RDD算子
【1】map & flatmap
將處理的數(shù)據(jù)按照給出的函數(shù)逐條進行映射轉(zhuǎn)換,flatmap還附帶扁平化操作,例如把輸入的List以其中的元素進行輸出,這里轉(zhuǎn)換可以是類型的轉(zhuǎn)換也可以是值的轉(zhuǎn)換。
【2】groupBy & reduceBy
同樣是對數(shù)據(jù)處理,reduceBy效率會更高,首先groupBy存在數(shù)據(jù)打亂重組的線下(shuffle操作),因此數(shù)據(jù)比較多時會占用大量內(nèi)存,需要落盤處理),而reduceBy 對數(shù)據(jù)有預(yù)處理,可以減少落盤的數(shù)據(jù)量,從而能夠提高shuffle性 能,但是要求分區(qū)內(nèi)和分區(qū)間的計算規(guī)則是相同的
適用場景:
rduceByKey只是進行分區(qū),不存在數(shù)據(jù)量減少的問題。GroupByKey只能分組,不能聚合,所以在分組聚合的場合下,推薦使用 reduceByKey。如果僅僅是分組而不需要聚合,那么還是只能使用groupByKey
【3】filter
對數(shù)據(jù)按照指定規(guī)則進行篩選,篩選過濾后的數(shù)據(jù)分區(qū)不變,但分區(qū)內(nèi)數(shù)據(jù)可能不均衡
【4】sortBy
按照一定規(guī)則進行排序,在排序前可先通過函數(shù)f處理,默認升序列
【5】
reduceByKey、foldByKey、aggregateByKey、combineByKey 的區(qū)別?
reduceByKey:
相同 key 的第一個數(shù)據(jù)不進行任何計算,分區(qū)內(nèi)和分區(qū)間計算規(guī)則相同
FoldByKey:
相同 key 的第一個數(shù)據(jù)和初始值進行分區(qū)內(nèi)計算,分區(qū)內(nèi)和分區(qū)間計算規(guī)則相 同 AggregateByKey:
相同 key 的第一個數(shù)據(jù)和初始值進行分區(qū)內(nèi)計算,分區(qū)內(nèi)和分區(qū)間計算規(guī) 則可以不相同 CombineByKey:
當計算時,發(fā)現(xiàn)數(shù)據(jù)結(jié)構(gòu)不滿足要求時,可以讓第一個數(shù)據(jù)轉(zhuǎn)換結(jié)構(gòu)。分區(qū) 內(nèi)和分區(qū)間計算規(guī)則不相同。
【6】zip
拉鏈,將兩個 RDD 中的元素,以鍵值對的形式進行合并。其中,鍵值對中的 Key 為第 1 個 RDD 中的元素,Value 為第 2 個 RDD 中的相同位置的元素。
【7】collect & foreach
collect中在驅(qū)動程序中,以數(shù)組的形式返回數(shù)據(jù)集的所有元素。而foreach則是在各executor端進行數(shù)據(jù)的處理。之所以將二位放一塊,是因為需要注意一件事,那就RDD的方法(算子)和scala集合對象的方法是不一樣的。scala集合對象的方法都是在同一節(jié)點的內(nèi)存中完成,而RDD的方法是將計算邏輯分布式發(fā)送到executor上執(zhí)行,無法控制先后。為了區(qū)別因此叫算子,RDD的方法外部操作在driver執(zhí)行,內(nèi)邏輯代碼在executor執(zhí)行
(3)血緣關(guān)系 & 依賴關(guān)系
相鄰兩個RDD的關(guān)系稱之為依賴關(guān)系
多個連續(xù)的RDD依賴關(guān)系稱之為血緣關(guān)系
每個RDD會保存血緣關(guān)系
(4)寬依賴 & 窄依賴
寬依賴是一對多的依賴
窄依賴是一對一的依賴
(5)cache & persist & checkpoint
【1】cache : 將數(shù)據(jù)臨時存儲在內(nèi)存中進行數(shù)據(jù)重用,快但數(shù)據(jù)不夠安全 會在血緣關(guān)系中添加新的依賴,一旦出現(xiàn)問題可以重頭讀取數(shù)據(jù) 【2】persist : 將數(shù)據(jù)存儲在磁盤文件中進行數(shù)據(jù)重用, 涉及磁盤IO,性能較低但數(shù)據(jù)安全, 若作業(yè)執(zhí)行完畢,臨時保存的數(shù)據(jù)文件就會丟失
【3】checkpoint : 將數(shù)據(jù)長久地保存在磁盤文件中進行數(shù)據(jù)重用 涉及磁盤IO,性能較低但數(shù)據(jù)安全, 為了保證數(shù)據(jù)安全,所以一般情況下,會獨立執(zhí)行作業(yè) 為了能夠提高效率,一般情況下是需要和cache聯(lián)合使用的 執(zhí)行過程中,會切斷血緣關(guān)系,重新建立新的血緣關(guān)系 因為checkpoint相當于把計算結(jié)果保存在分布式存儲中,比較安全,相當于數(shù)據(jù)源變化?
(6)累加器 & 廣播變量
累加器:分布式共享只寫變量,在進行累加計算的時候不能直接使用for循環(huán)進行操作。因為實際上所有的累加操作都在Executor端執(zhí)行,沒有返回到Driver端進行聚合,這個時候需要使用累加器ACC?
廣播變量:閉包數(shù)據(jù)都是以Task為單位發(fā)送的,每個任務(wù)中包含閉包數(shù)據(jù),可能會導(dǎo)致一個Executor中含有大量重復(fù)的數(shù)據(jù),并且占用大量的內(nèi)存。Spark中的廣播變量就可以將閉包的數(shù)據(jù)保存到Executor的內(nèi)存中 但是它不能更改,是分布式共享只讀變量?
(7)MVC & 三層架構(gòu)
MVC:Model View Controller 架構(gòu)模式
三層架構(gòu): Controller (控制層),service(服務(wù)層),dao(持久層)