python spark
當(dāng)您運(yùn)行這段代碼時(shí),它會(huì)創(chuàng)建一個(gè) Spark 應(yīng)用程序并連接到集群中的 Spark 環(huán)境。以下是每個(gè)行的作用及原理:
第一行導(dǎo)入了 SparkConf 和 SparkContext 類(lèi),這些類(lèi)是 PySpark 中用于配置和管理 Spark 應(yīng)用程序的關(guān)鍵類(lèi)。
第二行創(chuàng)建了一個(gè) SparkConf 對(duì)象 conf,它允許您設(shè)置應(yīng)用程序的配置選項(xiàng)。在這里,通過(guò) setMaster()
方法將 Spark 應(yīng)用程序連接到本地機(jī)器上的一個(gè)執(zhí)行線(xiàn)程(單機(jī)模式),local[*]
表示使用所有可用的 CPU 核心來(lái)執(zhí)行任務(wù)。setAppName()
方法設(shè)置應(yīng)用程序的名稱(chēng)。
第三行創(chuàng)建了一個(gè) SparkContext 對(duì)象 sc,它實(shí)際上是整個(gè) Spark 應(yīng)用程序的入口點(diǎn),可以用它來(lái)創(chuàng)建 RDD 以及調(diào)度任務(wù)。SparkContext 需要傳遞一個(gè) SparkConf 參數(shù)來(lái)初始化。在這里,我們將剛剛創(chuàng)建的 SparkConf 對(duì)象傳遞給了 SparkContext。因此,Spark 應(yīng)用程序現(xiàn)在已經(jīng)啟動(dòng),并且可以使用 RDD 進(jìn)行計(jì)算了。
第一行代碼告訴電腦我們要使用這個(gè)叫做 Spark 的東西。第二行代碼告訴電腦我們想要用所有電腦里面的 CPU 來(lái)處理數(shù)據(jù),同時(shí)給這個(gè)程序起了一個(gè)名字,叫做 "test_spark_app"。第三行代碼告訴電腦我們正式啟動(dòng)這個(gè)叫做 Spark 的東西,并且現(xiàn)在可以開(kāi)始用它來(lái)處理數(shù)據(jù)了。
在 Spark 應(yīng)用程序中,應(yīng)用程序名稱(chēng)通常是一個(gè)重要的元數(shù)據(jù),它可以在 Spark Web UI 界面中輕松地識(shí)別和區(qū)分不同的應(yīng)用程序。例如,在分布式集群環(huán)境中,可能會(huì)有多個(gè) Spark 應(yīng)用程序同時(shí)運(yùn)行,每個(gè)應(yīng)用程序都可能使用相同的資源,因此為每個(gè)應(yīng)用程序分配獨(dú)特的名稱(chēng)有助于更好地跟蹤和管理這些應(yīng)用程序。此外,還可以在 Spark 計(jì)算過(guò)程中使用應(yīng)用程序名稱(chēng)來(lái)記錄日志和錯(cuò)誤消息,以便更輕松地診斷問(wèn)題并進(jìn)行故障排除。
RDD(Resilient Distributed Datasets)是 Spark 中的一個(gè)核心概念,它代表了一個(gè)彈性分布式數(shù)據(jù)集合。簡(jiǎn)單來(lái)說(shuō),RDD 就是 Spark 中存儲(chǔ)和處理數(shù)據(jù)的基本抽象。
RDD 是一種可以分割和并行處理的數(shù)據(jù)結(jié)構(gòu)。它通常是從外部數(shù)據(jù)源創(chuàng)建的,如 Hadoop 文件系統(tǒng)(HDFS)、本地文件系統(tǒng)、NoSQL 數(shù)據(jù)庫(kù)等。用戶(hù)還可以從內(nèi)存中的其他 RDD 或 DataFrame / DataSet 轉(zhuǎn)換生成新的 RDD。
RDD 通常具有以下兩個(gè)特點(diǎn):
1. 分區(qū):RDD 可以被分割成多個(gè)邏輯分區(qū),以便在集群中的不同節(jié)點(diǎn)上并行處理。
2. 不可變性:RDD 是不可變的,這意味著我們無(wú)法直接修改 RDD 中的數(shù)據(jù),只能通過(guò)轉(zhuǎn)換操作創(chuàng)建新的 RDD。
RDD 提供了許多轉(zhuǎn)換操作(如 map、filter、reduce 等),用于對(duì) RDD 中的元素進(jìn)行處理和計(jì)算。此外,它還提供了許多動(dòng)作操作(如 count、collect、reduce 等),用于觸發(fā)實(shí)際的計(jì)算并返回結(jié)果。由于 RDD 具有良好的分區(qū)和容錯(cuò)性,因此它在大規(guī)模數(shù)據(jù)處理場(chǎng)景中得到了廣泛的應(yīng)用。
在計(jì)算機(jī)科學(xué)中,RDD 的功能與這個(gè)玩具箱非常類(lèi)似。它可以將大量的數(shù)據(jù)分成小塊,然后在集群上并行處理這些塊。每個(gè)節(jié)點(diǎn)可以獨(dú)立地處理它所負(fù)責(zé)的那部分?jǐn)?shù)據(jù),并將結(jié)果傳遞給其他節(jié)點(diǎn)。最后,所有節(jié)點(diǎn)的結(jié)果將被匯總起來(lái),形成一個(gè)完整的結(jié)果。
RDD 還提供了許多操作,例如篩選、轉(zhuǎn)換和匯總等,可以幫助我們對(duì)數(shù)據(jù)進(jìn)行各種處理。同時(shí),RDD 還具有良好的容錯(cuò)性,即使其中某個(gè)節(jié)點(diǎn)出現(xiàn)了故障,整個(gè)計(jì)算過(guò)程也可以繼續(xù)執(zhí)行。
在 Spark 中,可以通過(guò)多種方式將數(shù)據(jù)輸入到 RDD 中
map 算子
在上面的代碼中,我們首先通過(guò) parallelize()
方法創(chuàng)建了一個(gè)包含整數(shù)序列的 RDD。接著,我們使用 map()
方法對(duì) RDD 中的每個(gè)元素進(jìn)行平方計(jì)算,并將結(jié)果保存到新的 RDD 中。最后,我們使用 collect()
方法將新 RDD 中的所有元素收集到本地,并輸出結(jié)果。
需要注意的是,map()
算子返回的是一個(gè)新的 RDD,而不是修改原來(lái)的 RDD。這意味著我們可以對(duì)同一個(gè) RDD 應(yīng)用多個(gè)不同的算子,以創(chuàng)建更復(fù)雜的數(shù)據(jù)處理任務(wù)。
鏈?zhǔn)秸{(diào)用是一種常見(jiàn)的編程技巧,在 Spark 中也得到了廣泛應(yīng)用。它允許我們將多個(gè)轉(zhuǎn)換操作組合在一起,形成一個(gè)完整的數(shù)據(jù)處理流水線(xiàn),并且可以通過(guò)鏈?zhǔn)秸{(diào)用的方式來(lái)構(gòu)建這個(gè)流水線(xiàn)。
具體來(lái)說(shuō),Spark 中的每個(gè)轉(zhuǎn)換操作都會(huì)返回一個(gè)新的 RDD 對(duì)象,因此我們可以通過(guò)將多個(gè)轉(zhuǎn)換操作連接在一起,實(shí)現(xiàn)一系列的數(shù)據(jù)轉(zhuǎn)換。例如:
在上面的代碼中,我們首先使用 parallelize()
方法創(chuàng)建了一個(gè)包含整數(shù)序列的 RDD。接著,我們通過(guò)鏈?zhǔn)秸{(diào)用的方式對(duì) RDD 進(jìn)行了多次轉(zhuǎn)換操作,包括 filter()
、map()
和 sortBy()
等。最后,我們使用 collect()
方法將新 RDD 中的所有元素收集到本地,并輸出結(jié)果。
需要注意的是,鏈?zhǔn)秸{(diào)用中每個(gè)函數(shù)的返回值必須是一個(gè) RDD 對(duì)象,以便下一個(gè)函數(shù)能夠繼續(xù)使用這個(gè) RDD 進(jìn)行轉(zhuǎn)換。同時(shí),由于每個(gè)轉(zhuǎn)換操作都會(huì)創(chuàng)建一個(gè)新的 RDD,因此對(duì)于大規(guī)模的數(shù)據(jù)處理任務(wù),鏈?zhǔn)秸{(diào)用可能會(huì)導(dǎo)致性能問(wèn)題,需要進(jìn)行優(yōu)化和調(diào)整。
使用flatMap操作來(lái)對(duì)RDD執(zhí)行解除嵌套操作
ambda x: x是一個(gè)匿名函數(shù),也稱(chēng)為 lambda 函數(shù)。這個(gè)函數(shù)接受一個(gè)參數(shù) x,并返回它本身。在上文的例子中,這個(gè) lambda 函數(shù)被傳遞給了 flatMap() 操作,用于展開(kāi) RDD 中的元素。由于單個(gè)列表是一個(gè)可迭代對(duì)象,而 flatMap() 要求返回一個(gè)扁平化的 RDD,因此我們使用 (lambda x: x) 函數(shù)來(lái)返回每個(gè)列表本身,從而達(dá)到展開(kāi)嵌套列表的目的。
reduceByKey算子
當(dāng)調(diào)用
reduceByKey(lambda x, y: x + y)
時(shí),對(duì)于 key=1 的元素,算子會(huì)將 [10, 30] 這兩個(gè)值傳遞給 lambda 函數(shù)進(jìn)行計(jì)算,即計(jì)算 10 + 30 = 40;對(duì)于 key=2 的元素,算子會(huì)將 [20, 40] 這兩個(gè)值傳遞給 lambda 函數(shù)進(jìn)行計(jì)算,即計(jì)算 20 + 40 = 60;對(duì)于 key=3 的元素,算子會(huì)將 [50] 這一個(gè)值傳遞給 lambda 函數(shù)進(jìn)行計(jì)算,即計(jì)算 50。
`result.collect()` 是一個(gè)用于觸發(fā)算子計(jì)算并返回 RDD 中所有元素的動(dòng)作(Action)。在 Apache Spark 中,RDD 分為兩類(lèi)操作:轉(zhuǎn)換操作和動(dòng)作操作。轉(zhuǎn)換操作僅僅是定義了一系列的數(shù)據(jù)轉(zhuǎn)換規(guī)則,并不會(huì)立即執(zhí)行,而動(dòng)作操作則會(huì)觸發(fā)實(shí)際的計(jì)算過(guò)程。
在本例中,`reduceByKey` 返回的結(jié)果是一個(gè)新的 RDD,其中包含每個(gè) key 值以及對(duì)應(yīng)的聚合后的值。但是,這個(gè) RDD 并沒(méi)有真正被計(jì)算出來(lái)。只有當(dāng)我們調(diào)用動(dòng)作操作 `collect()` 時(shí),Spark 才會(huì)將該 RDD 中的元素收集到驅(qū)動(dòng)器程序中,并返回一個(gè)列表,其中包含所有的 key-value 對(duì)。
因此,`result.collect()` 的作用就是將經(jīng)過(guò) `reduceByKey` 聚合后的結(jié)果取回到本地,以便進(jìn)行后續(xù)的處理或輸出。注意,如果結(jié)果集非常大,則應(yīng)該避免使用 `collect()` 操作,因?yàn)樗鼤?huì)將所有數(shù)據(jù)都拉到驅(qū)動(dòng)器程序中,可能會(huì)導(dǎo)致內(nèi)存溢出或性能問(wèn)題。相反,可以考慮使用類(lèi)似 `take()` 或 `foreach()` 等其他動(dòng)作操作,以分批次地獲取數(shù)據(jù)。
filter()
是 Spark 中的一個(gè)轉(zhuǎn)換操作(Transformation),它用于從 RDD 中選擇滿(mǎn)足給定條件的元素,并返回一個(gè)新的 RDD,其中只包含符合條件的元素。
可以使用以下代碼過(guò)濾出一個(gè)整數(shù) RDD 中所有大于 10 的元素,因此,filtered_rdd
將只包含 11、12 和 20 這三個(gè)元素。
`distinct()` 是 Spark 中的一個(gè)轉(zhuǎn)換操作(Transformation),它用于從 RDD 中刪除重復(fù)的元素,并返回一個(gè)新的 RDD,其中只包含不同的元素。該方法會(huì)對(duì)整個(gè) RDD 進(jìn)行去重操作,返回一個(gè)新的 RDD,其中僅包含唯一的元素。
例如,以下代碼創(chuàng)建了一個(gè)包含多個(gè)重復(fù)元素的整數(shù) RDD,并應(yīng)用 `distinct()` 方法將其去重:
在這個(gè)例子中,我們創(chuàng)建了一個(gè)整數(shù) RDD(即 `rdd`),其中包含多個(gè)重復(fù)元素。然后,我們使用 `distinct()` 方法將其去重,得到一個(gè)新的 RDD(即 `distinct_rdd`)。因此,`distinct_rdd` 將只包含不同的元素,即 `[1, 2, 3, 4, 5, 6]`。
需要注意的是,`distinct()` 方法具有顯著的性能問(wèn)題,特別是在大數(shù)據(jù)集的情況下,因?yàn)樗枰獙⑺袛?shù)據(jù)傳遞到網(wǎng)絡(luò)上進(jìn)行去重操作。如果數(shù)據(jù)集非常大,建議使用其他方法來(lái)處理重復(fù)數(shù)據(jù),例如使用類(lèi)似 Bloom 過(guò)濾器等技術(shù)來(lái)快速排除可能的重復(fù)項(xiàng)。
`sortBy()` 是 Spark 中的一個(gè)轉(zhuǎn)換操作(Transformation),它用于對(duì) RDD 中的元素進(jìn)行排序,并返回一個(gè)新的 RDD,其中包含已排序的元素。該方法接受一個(gè)用于比較兩個(gè)元素大小的函數(shù),并根據(jù)該函數(shù)的比較結(jié)果將元素排序。
例如,以下代碼創(chuàng)建了一個(gè)整數(shù) RDD,并應(yīng)用 `sortBy()` 方法將其按升序排列:
在這個(gè)例子中,我們創(chuàng)建了一個(gè)整數(shù) RDD(即 `rdd`)。然后,我們使用 `sortBy()` 方法將其按升序排列,得到一個(gè)新的 RDD(即 `sorted_rdd`)。因此,`sorted_rdd` 將包含按升序排列的元素,即 `[1, 2, 3, 4, 5]`。
需要注意的是,`sortBy()` 方法會(huì)生成一個(gè)新的 RDD,而不會(huì)就地修改原始 RDD。另外,Spark 還提供了 `sortByKey()` 方法,它可以對(duì)鍵值對(duì) RDD 中的鍵進(jìn)行排序,類(lèi)似于 SQL 中的 ORDER BY 子句。如果需要按鍵進(jìn)行排序,則建議使用 `sortByKey()` 方法。
我們使用 False
參數(shù)將元素按降序排列。