最美情侣中文字幕电影,在线麻豆精品传媒,在线网站高清黄,久久黄色视频

歡迎光臨散文網(wǎng) 會員登陸 & 注冊

尚硅谷大數(shù)據(jù)Spark教程從入門到精通

2023-07-20 12:29 作者:默黨  | 我要投稿

1、SPARK

基于內(nèi)存的快速通用可擴展的大數(shù)據(jù)分析計算引擎

包含流處理的批處理框架

一次性數(shù)據(jù)計算:

處理數(shù)據(jù)時會從存儲設(shè)備中讀取數(shù)據(jù),進行邏輯操作,然后將處理的結(jié)果重新存儲到介質(zhì)中

處理復(fù)雜邏輯性能低


SPARK對該流程進行了更改,即不是放入磁盤而是放入內(nèi)存中方便后續(xù)的操作


但這么做也可能導(dǎo)致內(nèi)存資源的使用緊張


2、核心模塊

spark core:核心功能,其他功能會基于該核心實現(xiàn)與擴展


spark sql:操作結(jié)構(gòu)化數(shù)據(jù)的組件,使用該組件可以用sql語言來查詢數(shù)據(jù)


spark streaming:針對實時數(shù)據(jù)進行流式計算的組件,提供了豐富的處理數(shù)據(jù)流的API


spqrk MLlib:機器學(xué)習(xí)算法庫,不僅提供了模型評估、數(shù)據(jù)導(dǎo)入等功能,還提供了一些底層機器學(xué)習(xí)的原語


spark graphX:面向圖計算提供的框架以及算法庫


3、wordcount總結(jié)

(1)版本要對不然可能報錯

(2)匿名函數(shù)的使用(下劃線)

(3)maven下載慢就換源


4、SPARK運行環(huán)境

(1)Local模式

不需要其他任何節(jié)點資源就可以在本地執(zhí)行SPARK代碼的環(huán)境,一般用于教學(xué)、調(diào)試、演示等

(2)獨立部署模式

一種只使用自身節(jié)點進行運行的集群模式

【1】打開conf目錄修改slaves和spark-env,去掉它們的后綴并修改內(nèi)部的內(nèi)容,添加上子客戶端的IP地址

【2】使用sbin中的start-all.sh

相關(guān)參數(shù):



配置歷史服務(wù)

【1】修改conf中的spark-default.conf,輸入主機地址、端口和文件名

【2】在hadoop上脫離安全模式并創(chuàng)造對應(yīng)文件名

sbin/start-dfs.sh hadoop fs -mkdir /directory

【3】修改conf中的spark-env文件

【4】分發(fā)

【5】啟動

sbin/start-all.sh

sbin/start-history-server.sh


(3)Yarn模式

獨立部署模式無需其他框架提供資源,但也降低了同第三方資源的耦合性,獨立性強。但Spark是一種計算框架,而不是資源調(diào)度框架,這不是它的槍響,因此Yarn模式是在Yarn環(huán)境下的Spark工作



附注:

在啟動和關(guān)閉spark時要在前面加上sbin/,否則會默認執(zhí)行HADOOP的開啟關(guān)閉

SPARK MASTER內(nèi)部端口號:7077(別和HADOOP 沖突)

SPARK歷史服務(wù)器端口:18080

Hadoop YARN:8088

SPARK 查看spark-shell運行任務(wù) 4040


5、運行架構(gòu)


主從架構(gòu)


Driver


Executor

ApplicationMaster

driver 如果直接和 master進行交互,會減少耦合性,因此需要使用ApplicationMaster作為中間件,增加靈活性


6、核心概念

(1)Executor 和 Core



(2)并行度

并發(fā):多個虛擬核去搶占單個真正核的操作

并行:有多個真正核,多個虛擬核可同時操作


(3)有向無環(huán)圖

各個部件的依賴不能形成回環(huán),否則會導(dǎo)致部件無法運行


(4)SPARK YARN部署 : client&cluster

主要區(qū)別在于Driver程序的運行節(jié)點位置


7、spark 核心編程



Driver: 調(diào)度作用,用于數(shù)據(jù)、邏輯的準備

Executor: 用于執(zhí)行


8、RDD:最小的計算單元


附加:

FileInputStream()

用于讀取文件數(shù)據(jù)


BufferedInputStream()的作用

相當于開辟了一個緩沖區(qū),讀取的數(shù)據(jù)會直接放入緩沖區(qū),只有超過緩沖區(qū)閾值的時候才會進行輸出

這么做的好處是能夠提升文件的讀取效率,不再需要讀一個輸出一個,讀一個輸出一個(字節(jié)流)。類似于批處理。


InputStreamReader()的作用

當使用字符流時,由于不清楚幾個字節(jié)為一個字符,因此需要一個轉(zhuǎn)換流來進行轉(zhuǎn)換組裝成字符


9、RDD與IO關(guān)系

HadoopRDD -textFile 讀取文件數(shù)據(jù)

MapPartitionsRDD -flatMap 扁平化操作,分詞

MapPartitionsRDD -Map 轉(zhuǎn)換功能

ShuffledRDD -reduceByKey 相同key做value聚合

交給collect()輸出

RDD的數(shù)據(jù)處理方式類似于IO流,也有裝飾者設(shè)計模式

RDD的數(shù)據(jù)只有再調(diào)用collect方法時才會真正執(zhí)行業(yè)務(wù)邏輯操作。之前的全部封裝都是功能的擴展

RDD數(shù)據(jù)中間不存儲,可以臨時保存一部分數(shù)據(jù)


10、RDD特點

在讀取數(shù)據(jù)還有,為了方便切分數(shù)據(jù)(分組),數(shù)據(jù)在讀取時便會根據(jù)邏輯分配到不同的分區(qū)中

RDD叫做彈性分布式數(shù)據(jù)集,是Spark中最基本的數(shù)據(jù)處理模型。是一個不可變,可分區(qū),內(nèi)部元素可并行計算的集合】

特點:

(1)彈性

【1】存儲彈性:內(nèi)存磁盤自動切換

【2】容錯彈性:數(shù)據(jù)丟失可自動恢復(fù)

【3】計算彈性:計算出錯重試機制

【4】分片彈性:根據(jù)需要重新分片

(2)分布式

數(shù)據(jù)存儲在大數(shù)據(jù)集群不同節(jié)點上

(3)數(shù)據(jù)集

RDD封裝了計算邏輯,并不保存數(shù)據(jù)

(4)數(shù)據(jù)抽象

RDD是一個抽象類,需要看具體封裝實現(xiàn)

(5)不可變

RDD封裝了計算邏輯,不可改變,若要改變只能產(chǎn)生新的RDD,在新的RDD里封裝計算邏輯

(6)可分區(qū)、并行計算


11、核心屬性

(1)分區(qū)列表

用于執(zhí)行并行計算,是實現(xiàn)分布式計算的重要屬性

(2)分區(qū)計算函數(shù)

(3)RDD依賴關(guān)系(一份依賴列表)

(4)分區(qū)器(將數(shù)據(jù)進行分區(qū)處理)

(5)計算分區(qū)的首選位置(判斷計算發(fā)生到哪個節(jié)點效率最優(yōu))


12、RDD的創(chuàng)建

(1)從內(nèi)存中

(2)從文件中

(3)從集合


13、RDD集合數(shù)據(jù)源是如何設(shè)置分區(qū)并劃分數(shù)據(jù)的

i 代表分區(qū)數(shù)遍歷,從0開始(例子:0,1,2)

start = (i * 數(shù)量長度)/分區(qū)數(shù)量

end = ((i+1)*數(shù)量長度)/分區(qū)數(shù)量

(start , end)包頭不包尾,代表的是用于劃分數(shù)據(jù)的序號,第一個是0,以此類推。假設(shè)(0,2)則包含序號0,1。意味著第一個和第二個數(shù)據(jù)被劃分在該分區(qū)。


14、RDD文件來源如何設(shè)置分區(qū)數(shù)量

默認情況下:

math.min(defaultParallelism,2)

defaultParallelism代表用戶設(shè)置的工作線程數(shù)


用戶設(shè)置的情況:

用戶設(shè)置的分區(qū)數(shù)量實際上是最小數(shù)量,具體分區(qū)公式如下:

所有文件的總字節(jié)數(shù)/(分區(qū)數(shù)量,如果分區(qū)為0就改成1)


15、RDD文件來源如何進行數(shù)據(jù)分配

(1)數(shù)據(jù)以行為單位進行分區(qū),與字節(jié)數(shù)無關(guān)

(2)數(shù)據(jù)讀取時以偏移量為單位,且偏移量不會被重新讀取

(3)數(shù)據(jù)分區(qū)的偏移量范圍計算,包頭包尾,任何行在被讀取后該行所有內(nèi)容將會被一次性全部讀取,哪怕不在偏移量范圍內(nèi)。注意讀取的內(nèi)容包括占兩個字節(jié)的回車

(4)如果數(shù)據(jù)源為多個文件,那么計算分區(qū)時以文件為單位進行分區(qū)


16、RDD方法-》RDD算子(操作)

(1)轉(zhuǎn)換:功能與補充,將舊的RDD包裝成新的RDD

flatmap,map:包裝添加新功能的函數(shù),本身不執(zhí)行什么

(2)行動:觸發(fā)任務(wù)的調(diào)度和作業(yè)的執(zhí)行

collect:用于具體行動的函數(shù)


17、RDD轉(zhuǎn)換算子

根據(jù)處理方式分類:

(1)單value類型

【1】map

def map[ClassTag](f:T->U):RDD[U]

將處理的數(shù)據(jù)按照給出的函數(shù)逐條進行映射轉(zhuǎn)換,這里轉(zhuǎn)換可以是類型的轉(zhuǎn)換也可以是值的轉(zhuǎn)換

小功能:從服務(wù)器日志數(shù)據(jù)apache.log中獲取用戶請求URL的資源路徑


附注:

rdd的計算一個分區(qū)內(nèi)的數(shù)據(jù)是一個一個執(zhí)行邏輯

不同分區(qū)數(shù)據(jù)計算是無序的


【2】mapPartitions

def map Partitions[U:ClassTag](

f:Iterator[T]=>Iterator[U],

preservesPartitioning:Boolean = false):RDD[U]

把一個分區(qū)的數(shù)據(jù)全拿到了在操作

優(yōu)點:可以以分區(qū)為單位進行數(shù)據(jù)轉(zhuǎn)換操作,數(shù)據(jù)少的時候效率高


缺點:由于需要將整個分區(qū)的數(shù)據(jù)加載到內(nèi)存進行引用,如果處理完的數(shù)據(jù)是不會被釋放掉的,存在對象的引用。內(nèi)存較小數(shù)據(jù)較大時可能導(dǎo)致內(nèi)存溢出

小功能:獲取每個分區(qū)內(nèi)的最大值

(2)雙value類型

(3)key-value類型


map和mapPartitions的區(qū)別




【3】mapPartitionsWithIndex

def mapPartitionsWithIndex[U:ClassTag](

f:(Int,Iterator[T])=>iTERATOR[U]

preservesParittioning:Boolean=false):RDD[U]

分區(qū)索引加上了分區(qū)號

將待處理的數(shù)據(jù)以分區(qū)為單位發(fā)送到計算節(jié)點進行處理,這里的處理是指可以進行任意的處理,哪怕只是過濾數(shù)據(jù),在處理時同時可以獲取當前分區(qū)的索引

小功能:獲取第二個數(shù)據(jù)分區(qū)的數(shù)據(jù)


【4】flatMap

def faltMap[U:ClassTag](f:T =>TraversableOnce[U]):RDD[U]

將處理的數(shù)據(jù)進行扁平化后再進行映射,所以也叫做扁平映射

小功能: 將LIST(LIST(1,2),3,LIST(3,4))進行扁平化操作


【5】glom

def glom():RDD[Array[T]]

將同一分區(qū)的數(shù)據(jù)直接轉(zhuǎn)換為相同類型的內(nèi)存數(shù)組進行處理,分區(qū)不變

小功能:計算所有分區(qū)最大值求和(分區(qū)內(nèi)取最大值,分區(qū)間最大值求和)


【6】groupBy

def groupBy[K](f:T=>)(implicit kt:ClassTag[K]):RDD[(K,Iterable[T])]

將數(shù)據(jù)根據(jù)指定的規(guī)則進行分組,分區(qū)默認不變,但是數(shù)據(jù)會被打亂重新組合,這樣的操作稱之為shuffle。極限情況下,數(shù)據(jù)可能被分在一個分區(qū)中

一個組的數(shù)據(jù)在一個分區(qū)中,但是并不是說一個分區(qū)中只有一個組

groupBy 會將數(shù)據(jù)打亂(打散),重新組合,這個操作我們稱之為shuffle


【7】filter

def filter(f:T => Boolean):RDD[T]

將數(shù)據(jù)根據(jù)指定的規(guī)則進行篩選過濾,復(fù)合規(guī)則的數(shù)據(jù)保留,不符合的數(shù)據(jù)丟棄,當數(shù)據(jù)進行篩選過濾后,分區(qū)不變,但是分區(qū)內(nèi)的數(shù)據(jù)可能不均衡,在生產(chǎn)環(huán)境下可能出現(xiàn)數(shù)據(jù)


【8】sample

def sample(

withReplacement:Boolean,

fraction:Double

seed:Long = Utils:random.nextLong):RDD[T]

根據(jù)指定的規(guī)則從數(shù)據(jù)集中抽取數(shù)據(jù)


【9】distinct

def distinct()(implicit ord:Ordering[T] = null):RDD[T]

def distinct(numPartitions:Int)(implicit ord:Ordering[T]=null):RDD[T]

將數(shù)據(jù)集中重復(fù)的數(shù)據(jù)去重


【10】coalesce

def coalesce(numPartitions:Int,shuffle:Boolean=false,

partitionCoalescer:Option[PartitionCoalescer] = Option.empty):RDD[T]

根據(jù)數(shù)據(jù)量縮減分區(qū),用于大數(shù)據(jù)集過濾后,提高小數(shù)據(jù)集的執(zhí)行效率

當spark程序中存在過多的小任務(wù)時,可以通過該方法收縮合并分區(qū),減少分區(qū)的個數(shù),減小任務(wù)調(diào)度的成本

coalesce方法默認情況下不會將分區(qū)的數(shù)據(jù)打亂重新組合,某些情況下縮減分區(qū)可能會導(dǎo)致數(shù)據(jù)不均衡,引發(fā)數(shù)據(jù)傾斜。如果想要讓數(shù)據(jù)均衡,可以進行shuffle處理


【11】repartitions

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

coalesce可用于擴大分區(qū),但是必須要進行shuffle操作,否則無意義

spark提供了簡化操作

縮減分區(qū):coalesce,如果想要數(shù)據(jù)均衡可以采用shuffle

擴大分區(qū):repartition,底層代碼調(diào)用的就是coalesce,并采用shuffle


【12】sortBy

def sortBy[K](

f: (T) =>

ascending: Boolean = true, numPartitions: Int = this.partitions.length)

(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]?

該操作用于排序數(shù)據(jù)。在排序之前,可以將數(shù)據(jù)通過 f 函數(shù)進行處理,之后按照 f 函數(shù)處理 的結(jié)果進行排序,默認為升序排列。排序后新產(chǎn)生的 RDD 的分區(qū)數(shù)與原 RDD 的分區(qū)數(shù)一 致。中間存在 shuffle 的過程


(2)雙value類型

【13】intersection

def intersection(other: RDD[T]): RDD[T]

對源 RDD 和參數(shù) RDD 求交集后返回一個新的 RDD


【14】union

def union(other: RDD[T]): RDD[T]

對源 RDD 和參數(shù) RDD 求并集后返回一個新的 RDD


【15】subtract

def subtract(other: RDD[T]): RDD[T]?

以一個 RDD 元素為主,去除兩個 RDD 中重復(fù)元素,將其他元素保留下來。求差集


【16】zip

def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]

將兩個 RDD 中的元素,以鍵值對的形式進行合并。其中,鍵值對中的 Key 為第 1 個 RDD 中的元素,Value 為第 2 個 RDD 中的相同位置的元素。


附注:

【1】

//Can't zip RDDs with unequal numbers of partitions: List(2, 4)

//zip要求數(shù)據(jù)源分區(qū)需要保持一致

//Can only zip RDDs with same number of elements in each partition

//兩個數(shù)據(jù)源分區(qū)中的數(shù)據(jù)數(shù)量要保持一致


【2】

交集、并集和差集要求兩個數(shù)據(jù)源的數(shù)據(jù)類型保持一致,但zip可以不一致


(3)key-value類型

【17】partitionBy

def partitionBy(partitioner: Partitioner): RDD[(K, V)]

將數(shù)據(jù)按照指定 Partitioner 重新進行分區(qū)。Spark 默認的分區(qū)器是 HashPartitioner


附注:

//分區(qū)數(shù)量和類型相同時會直接返回

?//除了hash分區(qū)器還有RangePartitioner,一般用于排序

?//想按照自己的規(guī)則分區(qū)怎么搞?自己寫分區(qū)器


【18】reduceByKey

def reduceByKey(func: (V, V) => V): RDD[(K, V)]

def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

可以將數(shù)據(jù)按照相同的 Key 對 Value 進行聚合

reduceByKey中key的數(shù)據(jù)如果只有一個,是不會參與運算的


【19】groupByKey

def groupByKey(): RDD[(K, Iterable[V])] def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

將數(shù)據(jù)源的數(shù)據(jù)根據(jù)key對value進行分組


reduceByKey 和 groupByKey

從shuffle角度:

【1】groupByKey會導(dǎo)致數(shù)據(jù)打亂重組,存在shuffle操作

但是在shuffle操作中,考慮到后續(xù)例如map操作需要等待所有分區(qū)的數(shù)據(jù)重組完畢會占用大量內(nèi)存,因此這些數(shù)據(jù)必須落盤處理,也就是寫到文件中,否則會導(dǎo)致內(nèi)存溢出,盡管這會導(dǎo)致shuffle的性能降低(IO操作)

【2】在reduceByKey中,可以對相同的數(shù)據(jù)進行一個預(yù)聚合,即分區(qū)內(nèi)預(yù)先進行聚合操作,可以減少落盤的數(shù)據(jù)量,從而能夠提高shuffle性能,但是要求分區(qū)內(nèi)和分區(qū)間的計算規(guī)則是相同的

從功能角度:

rduceByKey只是進行分區(qū),不存在數(shù)據(jù)量減少的問題。GroupByKey只能分組,不能聚合,所以在分組聚合的場合下,推薦使用reduceByKey。如果僅僅是分組而不需要聚合,那么還是只能使用groupByKey


【20】aggregateByKey

def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]

將數(shù)據(jù)根據(jù)不同的規(guī)則進行分區(qū)內(nèi)計算和分區(qū)間計算


【21】foldByKey

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

當分區(qū)內(nèi)計算規(guī)則和分區(qū)間計算規(guī)則相同時,aggregateByKey 就可以簡化為 foldByKey


【22】combineByKey

def combineByKey[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]

最通用的對 key-value 型 rdd 進行聚集操作的聚集函數(shù)(aggregation function)。類似于 aggregate(),combineByKey()允許用戶返回值的類型與輸入不一致

這種方式可以在初始化的時候便對數(shù)據(jù)進行轉(zhuǎn)換,從而提高了數(shù)據(jù)轉(zhuǎn)換的效率


reduceByKey、foldByKey、aggregateByKey、combineByKey 的區(qū)別?

reduceByKey: 相同 key 的第一個數(shù)據(jù)不進行任何計算,分區(qū)內(nèi)和分區(qū)間計算規(guī)則相同

FoldByKey: 相同 key 的第一個數(shù)據(jù)和初始值進行分區(qū)內(nèi)計算,分區(qū)內(nèi)和分區(qū)間計算規(guī)則相 同?

AggregateByKey:相同 key 的第一個數(shù)據(jù)和初始值進行分區(qū)內(nèi)計算,分區(qū)內(nèi)和分區(qū)間計算規(guī) 則可以不相同

CombineByKey:當計算時,發(fā)現(xiàn)數(shù)據(jù)結(jié)構(gòu)不滿足要求時,可以讓第一個數(shù)據(jù)轉(zhuǎn)換結(jié)構(gòu)。分區(qū) 內(nèi)和分區(qū)間計算規(guī)則不相同。


【23】join

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length) : RDD[(K, V)]

在一個(K,V)的 RDD 上調(diào)用,K 必須實現(xiàn) Ordered 接口(特質(zhì)),返回一個按照 key 進行排序?


【24】leftOuterJoin

def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

類似于 SQL 語句的左外連接?


【25】cogroup

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]

在類型為(K,V)和(K,W)的 RDD 上調(diào)用,返回一個(K,(Iterable,Iterable))類型的 RDD


18、RDD行動算子

所謂的行動算子,就是出發(fā)作業(yè)(job)執(zhí)行的方法

底層代碼調(diào)用的是環(huán)境對象的runJob方法

底層代碼中會創(chuàng)建ActiveJob,并提交執(zhí)行

【1】reduce

def reduce(f: (T, T) => T): T?

聚集 RDD 中的所有元素,先聚合分區(qū)內(nèi)數(shù)據(jù),再聚合分區(qū)間數(shù)據(jù)


【2】collect

def collect(): Array[T]

在驅(qū)動程序中,以數(shù)組 Array 的形式返回數(shù)據(jù)集的所有元素?


【3】count

def count(): Long

返回 RDD 中元素的個數(shù)


【4】first

def first(): T

返回 RDD 中的第一個元素?


【5】take

def take(num: Int): Array[T]

返回一個由 RDD 的前 n 個元素組成的數(shù)組?


【6】takeOrder

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

返回該 RDD 排序后的前 n 個元素組成的數(shù)組


【7】aggregate

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

分區(qū)的數(shù)據(jù)通過初始值和分區(qū)內(nèi)的數(shù)據(jù)進行聚合,然后再和初始值進行分區(qū)間的數(shù)據(jù)聚合

??aggregateByKey: 初始值只會參與分區(qū)內(nèi)的計算

??aggregate: 初始值會參與分區(qū)內(nèi)計算,并且會參與分區(qū)間計算


【8】fold

def fold(zeroValue: T)(op: (T, T) => T): T

aggregate 的簡化版操作?,分區(qū)內(nèi)外計算規(guī)則相同


【9】countByKey

def countByKey(): Map[K, Long]?

統(tǒng)計每種 key 的個數(shù)


【10】save算子

def saveAsTextFile(path: String): Unit

def saveAsObjectFile(path: String): Unit

def saveAsSequenceFile(

?path: String,

?codec: Option[Class[_ <: CompressionCodec]] = None): Unit

將數(shù)據(jù)保存到不同格式的文件中


【11】foreach

def foreach(f: T => Unit): Unit = withScope {

val cleanF = sc.clean(f)

sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))

}

分布式遍歷 RDD 中的每一個元素,調(diào)用指定函數(shù)


為什么RDD的方法稱之為算子:

???RDD的方法和Scala集合對象的方法不一樣

???集合對象的方法都是在同一個節(jié)點的內(nèi)存中完成的

???RDD的方法可以將計算邏輯發(fā)送到Executor(分布式節(jié)點)執(zhí)行

????為了區(qū)分不同的處理效果,所以將RDD的方法稱之為算子

?? ?RDD的方法外部的操作在driver端執(zhí)行,方法內(nèi)的邏輯代碼在executor端執(zhí)行


19、RDD序列化

1) 閉包檢查

從計算的角度, 算子以外的代碼都是在 Driver 端執(zhí)行, 算子里面的代碼都是在 Executor 端執(zhí)行。那么在 scala 的函數(shù)式編程中,就會導(dǎo)致算子內(nèi)經(jīng)常會用到算子外的數(shù)據(jù),這樣就 形成了閉包的效果,如果使用的算子外的數(shù)據(jù)無法序列化,就意味著無法傳值給 Executor 端執(zhí)行,就會發(fā)生錯誤,所以需要在執(zhí)行任務(wù)計算前,檢測閉包內(nèi)的對象是否可以進行序列 化,這個操作我們稱之為閉包檢測。Scala2.12 版本后閉包編譯方式發(fā)生了改變?


20、kryo序列化框架

Java 的序列化能夠序列化任何的類。但是比較重(字節(jié)多),序列化后,對象的提交也 比較大。Spark 出于性能的考慮,Spark2.0 開始支持另外一種 Kryo 序列化機制。Kryo 速度 是 Serializable 的 10 倍。當 RDD 在 Shuffle 數(shù)據(jù)的時候,簡單數(shù)據(jù)類型、數(shù)組和字符串類型 已經(jīng)在 Spark 內(nèi)部使用 Kryo 來序列化。


21、RDD依賴關(guān)系

(1)RDD血緣關(guān)系

相鄰兩個RDD的關(guān)系稱之為依賴關(guān)系

多個連續(xù)的RDD依賴關(guān)系稱之為血緣關(guān)系

每個RDD會保存血緣關(guān)系


RDD不保存數(shù)據(jù)

因此一旦出現(xiàn)報錯需要從頭開始讀取

RDD為了提供容錯性,需要將RDD間的關(guān)系保存下來

一旦出現(xiàn)錯誤就可以根據(jù)血緣關(guān)系將數(shù)據(jù)源重新讀取進行計算

具體血緣關(guān)系保存路程


(2)依賴關(guān)系

新的RDD的一個分區(qū)的數(shù)據(jù)依賴于舊的RDD一個分區(qū)的數(shù)據(jù),這個依賴稱之為OneToOne依賴,又名窄依賴。指代每一個父(上游)RDD的Partition 最多被子(下游)RDD 的一個 Partition 使用, 窄依賴我們形象的比喻為獨生子女。


新的RDD的一個分區(qū)的數(shù)據(jù)依賴于舊的RDD多個分區(qū)的數(shù)據(jù),這個依賴稱之為Shuffle依賴,又名寬依賴,指代同一個父(上游)RDD 的 Partition 被多個子(下游)RDD 的 Partition 依賴,會 引起 Shuffle,總結(jié):寬依賴我們形象的比喻為多生。


當RDD中存在shuffle依賴時,階段會自動增加一個

階段的數(shù)量 = shuffle依賴的數(shù)量 + 1(resultstage)

ResultStage只有一個,最后需要執(zhí)行的階段


22、RDD任務(wù)劃分



任務(wù)的數(shù)量 = 當前階段中最后一個RDD的分區(qū)數(shù)量


23、持久化 cache & persist

如果一個RDD需要重復(fù)使用,那么需要從頭再執(zhí)行來獲取數(shù)據(jù)

RDD對象可以重用,但是數(shù)據(jù)無法重用,因為RDD是不留存數(shù)據(jù)的


解決方法——持久化:在交給下一位時先放入內(nèi)存或文件中

cache 默認持久化的操作只能把數(shù)據(jù)保存在內(nèi)存中,如果想要保存到磁盤文件要用persist

持久化操作必須在行動算子執(zhí)行時完成,不然沒有數(shù)據(jù)

除了重用外,持久化操作還可以再數(shù)據(jù)執(zhí)行較長或數(shù)據(jù)比較重要時也可以使用持久化操作


checkpoint 需要落盤,需要指定檢查點保存路徑

檢查點路徑保存的文件,當作業(yè)執(zhí)行完畢后,不會被刪除

一般保存路徑都是在分布式存儲系統(tǒng):HDFS中


區(qū)別

(1)cache : 將數(shù)據(jù)臨時存儲在內(nèi)存中進行數(shù)據(jù)重用,快但數(shù)據(jù)不夠安全

會在血緣關(guān)系中添加新的依賴,一旦出現(xiàn)問題可以重頭讀取數(shù)據(jù)

(2)persist : 將數(shù)據(jù)存儲在磁盤文件中進行數(shù)據(jù)重用,

涉及磁盤IO,性能較低但數(shù)據(jù)安全,

若作業(yè)執(zhí)行完畢,臨時保存的數(shù)據(jù)文件就會丟失

(3)checkpoint : 將數(shù)據(jù)長久地保存在磁盤文件中進行數(shù)據(jù)重用

涉及磁盤IO,性能較低但數(shù)據(jù)安全,

為了保證數(shù)據(jù)安全,所以一般情況下,會獨立執(zhí)行作業(yè)

為了能夠提高效率,一般情況下是需要和cache聯(lián)合使用的

執(zhí)行過程中,會切斷血緣關(guān)系,重新建立新的血緣關(guān)系

?????????因為checkpoint相當于把計算結(jié)果保存在分布式存儲中,比較安全,相當于數(shù)據(jù)源變化


24、RDD分區(qū)器

由于有時候默認分區(qū)器無法滿足我們的需求,因此我們需要自己寫分區(qū)器

(1)自己用class構(gòu)造分區(qū)器類,并繼承Partitioner類

(2)重寫方法

(3)使用partitionBy(new MyPartitioner)進行引用


25、RDD文件讀取與保存

??rdd.saveAsTextFile("ouput1")

??rdd.saveAsObjectFile("ouput2")

??rdd.saveAsSequenceFile("ouput3")


26、核心編程


累加器:分布式共享只寫變量

在進行累加計算的時候不能直接使用for循環(huán)進行操作。因為實際上所有的累加操作都在Executor端執(zhí)行,沒有返回到Driver端進行聚合,這個時候需要使用累加器ACC


累加器重寫



27、廣播變量

閉包數(shù)據(jù)都是以Task為單位發(fā)送的,每個任務(wù)中包含閉包數(shù)據(jù)

可能會導(dǎo)致一個Executor中含有大量重復(fù)的數(shù)據(jù),并且占用大量的內(nèi)存

Executor其實就是一個JVM,所以在啟動時會自動分配內(nèi)存

完全可以將任務(wù)中的閉包數(shù)據(jù)放置在Executor的內(nèi)存中,達到共享的目的

Spark中的廣播變量就可以將閉包的數(shù)據(jù)保存到Executor的內(nèi)存中

但是它不能更改,是分布式共享只讀變量


28、MVC //三層架構(gòu)

MVC:Model View Controller 架構(gòu)模式

三層架構(gòu): Controller (控制層),service(服務(wù)層),dao(持久層)


29、線程

在對代碼進行冗余抽離過程中,sc是在Application端定義,但在DAO中需要使用,如果直接從Controller到Service進行傳輸會增加耦合度,因此需要單獨開辟線性供sc的調(diào)度

ThreadLocal可以對線程的內(nèi)存進行控制,存儲數(shù)據(jù),共享數(shù)據(jù)


30、總結(jié)

(1)hadoop VS Spark

【1】hadoop在計算時中間結(jié)果會存儲在磁盤中,而Spark會將中間結(jié)果存儲在內(nèi)存中

【2】hadoop本質(zhì)是一個分布式的系統(tǒng)基礎(chǔ)架構(gòu),負責將巨大的數(shù)據(jù)集進行多節(jié)點存儲,降低了硬件成本;spark是一個用scala語言編寫的框架,是大數(shù)據(jù)分析引擎

【3】hadoop采用MapReduce進行數(shù)據(jù)的批處理,而RDD是基于內(nèi)存的彈性數(shù)據(jù)集。

【4】spark相比于hadoop更加適合迭代式、交互式和流式計算等類型的計算


(2)常見RDD算子

【1】map & flatmap

將處理的數(shù)據(jù)按照給出的函數(shù)逐條進行映射轉(zhuǎn)換,flatmap還附帶扁平化操作,例如把輸入的List以其中的元素進行輸出,這里轉(zhuǎn)換可以是類型的轉(zhuǎn)換也可以是值的轉(zhuǎn)換。


【2】groupBy & reduceBy

同樣是對數(shù)據(jù)處理,reduceBy效率會更高,首先groupBy存在數(shù)據(jù)打亂重組的線下(shuffle操作),因此數(shù)據(jù)比較多時會占用大量內(nèi)存,需要落盤處理),而reduceBy 對數(shù)據(jù)有預(yù)處理,可以減少落盤的數(shù)據(jù)量,從而能夠提高shuffle性 能,但是要求分區(qū)內(nèi)和分區(qū)間的計算規(guī)則是相同的

適用場景:

rduceByKey只是進行分區(qū),不存在數(shù)據(jù)量減少的問題。GroupByKey只能分組,不能聚合,所以在分組聚合的場合下,推薦使用 reduceByKey。如果僅僅是分組而不需要聚合,那么還是只能使用groupByKey


【3】filter

對數(shù)據(jù)按照指定規(guī)則進行篩選,篩選過濾后的數(shù)據(jù)分區(qū)不變,但分區(qū)內(nèi)數(shù)據(jù)可能不均衡


【4】sortBy

按照一定規(guī)則進行排序,在排序前可先通過函數(shù)f處理,默認升序列


【5】

reduceByKey、foldByKey、aggregateByKey、combineByKey 的區(qū)別?

reduceByKey:

相同 key 的第一個數(shù)據(jù)不進行任何計算,分區(qū)內(nèi)和分區(qū)間計算規(guī)則相同

FoldByKey:

相同 key 的第一個數(shù)據(jù)和初始值進行分區(qū)內(nèi)計算,分區(qū)內(nèi)和分區(qū)間計算規(guī)則相 同 AggregateByKey:

相同 key 的第一個數(shù)據(jù)和初始值進行分區(qū)內(nèi)計算,分區(qū)內(nèi)和分區(qū)間計算規(guī) 則可以不相同 CombineByKey:

當計算時,發(fā)現(xiàn)數(shù)據(jù)結(jié)構(gòu)不滿足要求時,可以讓第一個數(shù)據(jù)轉(zhuǎn)換結(jié)構(gòu)。分區(qū) 內(nèi)和分區(qū)間計算規(guī)則不相同。


【6】zip

拉鏈,將兩個 RDD 中的元素,以鍵值對的形式進行合并。其中,鍵值對中的 Key 為第 1 個 RDD 中的元素,Value 為第 2 個 RDD 中的相同位置的元素。


【7】collect & foreach

collect中在驅(qū)動程序中,以數(shù)組的形式返回數(shù)據(jù)集的所有元素。而foreach則是在各executor端進行數(shù)據(jù)的處理。之所以將二位放一塊,是因為需要注意一件事,那就RDD的方法(算子)和scala集合對象的方法是不一樣的。scala集合對象的方法都是在同一節(jié)點的內(nèi)存中完成,而RDD的方法是將計算邏輯分布式發(fā)送到executor上執(zhí)行,無法控制先后。為了區(qū)別因此叫算子,RDD的方法外部操作在driver執(zhí)行,內(nèi)邏輯代碼在executor執(zhí)行


(3)血緣關(guān)系 & 依賴關(guān)系

相鄰兩個RDD的關(guān)系稱之為依賴關(guān)系

多個連續(xù)的RDD依賴關(guān)系稱之為血緣關(guān)系

每個RDD會保存血緣關(guān)系


(4)寬依賴 & 窄依賴

寬依賴是一對多的依賴

窄依賴是一對一的依賴


(5)cache & persist & checkpoint

【1】cache : 將數(shù)據(jù)臨時存儲在內(nèi)存中進行數(shù)據(jù)重用,快但數(shù)據(jù)不夠安全 會在血緣關(guān)系中添加新的依賴,一旦出現(xiàn)問題可以重頭讀取數(shù)據(jù) 【2】persist : 將數(shù)據(jù)存儲在磁盤文件中進行數(shù)據(jù)重用, 涉及磁盤IO,性能較低但數(shù)據(jù)安全, 若作業(yè)執(zhí)行完畢,臨時保存的數(shù)據(jù)文件就會丟失

【3】checkpoint : 將數(shù)據(jù)長久地保存在磁盤文件中進行數(shù)據(jù)重用 涉及磁盤IO,性能較低但數(shù)據(jù)安全, 為了保證數(shù)據(jù)安全,所以一般情況下,會獨立執(zhí)行作業(yè) 為了能夠提高效率,一般情況下是需要和cache聯(lián)合使用的 執(zhí)行過程中,會切斷血緣關(guān)系,重新建立新的血緣關(guān)系 因為checkpoint相當于把計算結(jié)果保存在分布式存儲中,比較安全,相當于數(shù)據(jù)源變化?


(6)累加器 & 廣播變量

累加器:分布式共享只寫變量,在進行累加計算的時候不能直接使用for循環(huán)進行操作。因為實際上所有的累加操作都在Executor端執(zhí)行,沒有返回到Driver端進行聚合,這個時候需要使用累加器ACC?

廣播變量:閉包數(shù)據(jù)都是以Task為單位發(fā)送的,每個任務(wù)中包含閉包數(shù)據(jù),可能會導(dǎo)致一個Executor中含有大量重復(fù)的數(shù)據(jù),并且占用大量的內(nèi)存。Spark中的廣播變量就可以將閉包的數(shù)據(jù)保存到Executor的內(nèi)存中 但是它不能更改,是分布式共享只讀變量?


(7)MVC & 三層架構(gòu)

MVC:Model View Controller 架構(gòu)模式

三層架構(gòu): Controller (控制層),service(服務(wù)層),dao(持久層)


尚硅谷大數(shù)據(jù)Spark教程從入門到精通的評論 (共 條)

分享到微博請遵守國家法律
玛纳斯县| 大宁县| 曲周县| 余姚市| 克拉玛依市| 德保县| 汤原县| 佛山市| 腾冲县| 特克斯县| 大城县| 寿光市| 芒康县| 尼木县| 浪卡子县| 石嘴山市| 贺兰县| 新竹县| 佛教| 荥经县| 尚志市| 抚顺县| 屏东市| 灵台县| 定陶县| 曲阳县| 衡阳市| 浏阳市| 珲春市| 大姚县| 龙岩市| 教育| 招远市| 伊宁市| 陵水| 贵溪市| 当雄县| 蓝田县| 保靖县| 石首市| 天全县|