最美情侣中文字幕电影,在线麻豆精品传媒,在线网站高清黄,久久黄色视频

歡迎光臨散文網(wǎng) 會(huì)員登陸 & 注冊(cè)

python spark

2023-03-20 13:30 作者:小霹靂莊  | 我要投稿


當(dāng)您運(yùn)行這段代碼時(shí),它會(huì)創(chuàng)建一個(gè) Spark 應(yīng)用程序并連接到集群中的 Spark 環(huán)境。以下是每個(gè)行的作用及原理:

第一行導(dǎo)入了 SparkConf 和 SparkContext 類(lèi),這些類(lèi)是 PySpark 中用于配置和管理 Spark 應(yīng)用程序的關(guān)鍵類(lèi)。

第二行創(chuàng)建了一個(gè) SparkConf 對(duì)象 conf,它允許您設(shè)置應(yīng)用程序的配置選項(xiàng)。在這里,通過(guò) setMaster() 方法將 Spark 應(yīng)用程序連接到本地機(jī)器上的一個(gè)執(zhí)行線(xiàn)程(單機(jī)模式),local[*] 表示使用所有可用的 CPU 核心來(lái)執(zhí)行任務(wù)。setAppName() 方法設(shè)置應(yīng)用程序的名稱(chēng)。

第三行創(chuàng)建了一個(gè) SparkContext 對(duì)象 sc,它實(shí)際上是整個(gè) Spark 應(yīng)用程序的入口點(diǎn),可以用它來(lái)創(chuàng)建 RDD 以及調(diào)度任務(wù)。SparkContext 需要傳遞一個(gè) SparkConf 參數(shù)來(lái)初始化。在這里,我們將剛剛創(chuàng)建的 SparkConf 對(duì)象傳遞給了 SparkContext。因此,Spark 應(yīng)用程序現(xiàn)在已經(jīng)啟動(dòng),并且可以使用 RDD 進(jìn)行計(jì)算了。

第一行代碼告訴電腦我們要使用這個(gè)叫做 Spark 的東西。第二行代碼告訴電腦我們想要用所有電腦里面的 CPU 來(lái)處理數(shù)據(jù),同時(shí)給這個(gè)程序起了一個(gè)名字,叫做 "test_spark_app"。第三行代碼告訴電腦我們正式啟動(dòng)這個(gè)叫做 Spark 的東西,并且現(xiàn)在可以開(kāi)始用它來(lái)處理數(shù)據(jù)了。

在 Spark 應(yīng)用程序中,應(yīng)用程序名稱(chēng)通常是一個(gè)重要的元數(shù)據(jù),它可以在 Spark Web UI 界面中輕松地識(shí)別和區(qū)分不同的應(yīng)用程序。例如,在分布式集群環(huán)境中,可能會(huì)有多個(gè) Spark 應(yīng)用程序同時(shí)運(yùn)行,每個(gè)應(yīng)用程序都可能使用相同的資源,因此為每個(gè)應(yīng)用程序分配獨(dú)特的名稱(chēng)有助于更好地跟蹤和管理這些應(yīng)用程序。此外,還可以在 Spark 計(jì)算過(guò)程中使用應(yīng)用程序名稱(chēng)來(lái)記錄日志和錯(cuò)誤消息,以便更輕松地診斷問(wèn)題并進(jìn)行故障排除。

RDD(Resilient Distributed Datasets)是 Spark 中的一個(gè)核心概念,它代表了一個(gè)彈性分布式數(shù)據(jù)集合。簡(jiǎn)單來(lái)說(shuō),RDD 就是 Spark 中存儲(chǔ)和處理數(shù)據(jù)的基本抽象。

RDD 是一種可以分割和并行處理的數(shù)據(jù)結(jié)構(gòu)。它通常是從外部數(shù)據(jù)源創(chuàng)建的,如 Hadoop 文件系統(tǒng)(HDFS)、本地文件系統(tǒng)、NoSQL 數(shù)據(jù)庫(kù)等。用戶(hù)還可以從內(nèi)存中的其他 RDD 或 DataFrame / DataSet 轉(zhuǎn)換生成新的 RDD。

RDD 通常具有以下兩個(gè)特點(diǎn):

1. 分區(qū):RDD 可以被分割成多個(gè)邏輯分區(qū),以便在集群中的不同節(jié)點(diǎn)上并行處理。

2. 不可變性:RDD 是不可變的,這意味著我們無(wú)法直接修改 RDD 中的數(shù)據(jù),只能通過(guò)轉(zhuǎn)換操作創(chuàng)建新的 RDD。

RDD 提供了許多轉(zhuǎn)換操作(如 map、filter、reduce 等),用于對(duì) RDD 中的元素進(jìn)行處理和計(jì)算。此外,它還提供了許多動(dòng)作操作(如 count、collect、reduce 等),用于觸發(fā)實(shí)際的計(jì)算并返回結(jié)果。由于 RDD 具有良好的分區(qū)和容錯(cuò)性,因此它在大規(guī)模數(shù)據(jù)處理場(chǎng)景中得到了廣泛的應(yīng)用。

在計(jì)算機(jī)科學(xué)中,RDD 的功能與這個(gè)玩具箱非常類(lèi)似。它可以將大量的數(shù)據(jù)分成小塊,然后在集群上并行處理這些塊。每個(gè)節(jié)點(diǎn)可以獨(dú)立地處理它所負(fù)責(zé)的那部分?jǐn)?shù)據(jù),并將結(jié)果傳遞給其他節(jié)點(diǎn)。最后,所有節(jié)點(diǎn)的結(jié)果將被匯總起來(lái),形成一個(gè)完整的結(jié)果。

RDD 還提供了許多操作,例如篩選、轉(zhuǎn)換和匯總等,可以幫助我們對(duì)數(shù)據(jù)進(jìn)行各種處理。同時(shí),RDD 還具有良好的容錯(cuò)性,即使其中某個(gè)節(jié)點(diǎn)出現(xiàn)了故障,整個(gè)計(jì)算過(guò)程也可以繼續(xù)執(zhí)行。

在 Spark 中,可以通過(guò)多種方式將數(shù)據(jù)輸入到 RDD 中

map 算子

在上面的代碼中,我們首先通過(guò) parallelize() 方法創(chuàng)建了一個(gè)包含整數(shù)序列的 RDD。接著,我們使用 map() 方法對(duì) RDD 中的每個(gè)元素進(jìn)行平方計(jì)算,并將結(jié)果保存到新的 RDD 中。最后,我們使用 collect() 方法將新 RDD 中的所有元素收集到本地,并輸出結(jié)果。

需要注意的是,map() 算子返回的是一個(gè)新的 RDD,而不是修改原來(lái)的 RDD。這意味著我們可以對(duì)同一個(gè) RDD 應(yīng)用多個(gè)不同的算子,以創(chuàng)建更復(fù)雜的數(shù)據(jù)處理任務(wù)。

鏈?zhǔn)秸{(diào)用是一種常見(jiàn)的編程技巧,在 Spark 中也得到了廣泛應(yīng)用。它允許我們將多個(gè)轉(zhuǎn)換操作組合在一起,形成一個(gè)完整的數(shù)據(jù)處理流水線(xiàn),并且可以通過(guò)鏈?zhǔn)秸{(diào)用的方式來(lái)構(gòu)建這個(gè)流水線(xiàn)。

具體來(lái)說(shuō),Spark 中的每個(gè)轉(zhuǎn)換操作都會(huì)返回一個(gè)新的 RDD 對(duì)象,因此我們可以通過(guò)將多個(gè)轉(zhuǎn)換操作連接在一起,實(shí)現(xiàn)一系列的數(shù)據(jù)轉(zhuǎn)換。例如:

在上面的代碼中,我們首先使用 parallelize() 方法創(chuàng)建了一個(gè)包含整數(shù)序列的 RDD。接著,我們通過(guò)鏈?zhǔn)秸{(diào)用的方式對(duì) RDD 進(jìn)行了多次轉(zhuǎn)換操作,包括 filter()、map()sortBy() 等。最后,我們使用 collect() 方法將新 RDD 中的所有元素收集到本地,并輸出結(jié)果。

需要注意的是,鏈?zhǔn)秸{(diào)用中每個(gè)函數(shù)的返回值必須是一個(gè) RDD 對(duì)象,以便下一個(gè)函數(shù)能夠繼續(xù)使用這個(gè) RDD 進(jìn)行轉(zhuǎn)換。同時(shí),由于每個(gè)轉(zhuǎn)換操作都會(huì)創(chuàng)建一個(gè)新的 RDD,因此對(duì)于大規(guī)模的數(shù)據(jù)處理任務(wù),鏈?zhǔn)秸{(diào)用可能會(huì)導(dǎo)致性能問(wèn)題,需要進(jìn)行優(yōu)化和調(diào)整。

使用flatMap操作來(lái)對(duì)RDD執(zhí)行解除嵌套操作

ambda x: x是一個(gè)匿名函數(shù),也稱(chēng)為 lambda 函數(shù)。這個(gè)函數(shù)接受一個(gè)參數(shù) x,并返回它本身。在上文的例子中,這個(gè) lambda 函數(shù)被傳遞給了 flatMap() 操作,用于展開(kāi) RDD 中的元素。由于單個(gè)列表是一個(gè)可迭代對(duì)象,而 flatMap() 要求返回一個(gè)扁平化的 RDD,因此我們使用 (lambda x: x) 函數(shù)來(lái)返回每個(gè)列表本身,從而達(dá)到展開(kāi)嵌套列表的目的。

reduceByKey算子

當(dāng)調(diào)用 reduceByKey(lambda x, y: x + y) 時(shí),對(duì)于 key=1 的元素,算子會(huì)將 [10, 30] 這兩個(gè)值傳遞給 lambda 函數(shù)進(jìn)行計(jì)算,即計(jì)算 10 + 30 = 40;對(duì)于 key=2 的元素,算子會(huì)將 [20, 40] 這兩個(gè)值傳遞給 lambda 函數(shù)進(jìn)行計(jì)算,即計(jì)算 20 + 40 = 60;對(duì)于 key=3 的元素,算子會(huì)將 [50] 這一個(gè)值傳遞給 lambda 函數(shù)進(jìn)行計(jì)算,即計(jì)算 50。

`result.collect()` 是一個(gè)用于觸發(fā)算子計(jì)算并返回 RDD 中所有元素的動(dòng)作(Action)。在 Apache Spark 中,RDD 分為兩類(lèi)操作:轉(zhuǎn)換操作和動(dòng)作操作。轉(zhuǎn)換操作僅僅是定義了一系列的數(shù)據(jù)轉(zhuǎn)換規(guī)則,并不會(huì)立即執(zhí)行,而動(dòng)作操作則會(huì)觸發(fā)實(shí)際的計(jì)算過(guò)程。

在本例中,`reduceByKey` 返回的結(jié)果是一個(gè)新的 RDD,其中包含每個(gè) key 值以及對(duì)應(yīng)的聚合后的值。但是,這個(gè) RDD 并沒(méi)有真正被計(jì)算出來(lái)。只有當(dāng)我們調(diào)用動(dòng)作操作 `collect()` 時(shí),Spark 才會(huì)將該 RDD 中的元素收集到驅(qū)動(dòng)器程序中,并返回一個(gè)列表,其中包含所有的 key-value 對(duì)。

因此,`result.collect()` 的作用就是將經(jīng)過(guò) `reduceByKey` 聚合后的結(jié)果取回到本地,以便進(jìn)行后續(xù)的處理或輸出。注意,如果結(jié)果集非常大,則應(yīng)該避免使用 `collect()` 操作,因?yàn)樗鼤?huì)將所有數(shù)據(jù)都拉到驅(qū)動(dòng)器程序中,可能會(huì)導(dǎo)致內(nèi)存溢出或性能問(wèn)題。相反,可以考慮使用類(lèi)似 `take()` 或 `foreach()` 等其他動(dòng)作操作,以分批次地獲取數(shù)據(jù)。

filter() 是 Spark 中的一個(gè)轉(zhuǎn)換操作(Transformation),它用于從 RDD 中選擇滿(mǎn)足給定條件的元素,并返回一個(gè)新的 RDD,其中只包含符合條件的元素。

可以使用以下代碼過(guò)濾出一個(gè)整數(shù) RDD 中所有大于 10 的元素,因此,filtered_rdd 將只包含 11、12 和 20 這三個(gè)元素。

`distinct()` 是 Spark 中的一個(gè)轉(zhuǎn)換操作(Transformation),它用于從 RDD 中刪除重復(fù)的元素,并返回一個(gè)新的 RDD,其中只包含不同的元素。該方法會(huì)對(duì)整個(gè) RDD 進(jìn)行去重操作,返回一個(gè)新的 RDD,其中僅包含唯一的元素。

例如,以下代碼創(chuàng)建了一個(gè)包含多個(gè)重復(fù)元素的整數(shù) RDD,并應(yīng)用 `distinct()` 方法將其去重:

在這個(gè)例子中,我們創(chuàng)建了一個(gè)整數(shù) RDD(即 `rdd`),其中包含多個(gè)重復(fù)元素。然后,我們使用 `distinct()` 方法將其去重,得到一個(gè)新的 RDD(即 `distinct_rdd`)。因此,`distinct_rdd` 將只包含不同的元素,即 `[1, 2, 3, 4, 5, 6]`。

需要注意的是,`distinct()` 方法具有顯著的性能問(wèn)題,特別是在大數(shù)據(jù)集的情況下,因?yàn)樗枰獙⑺袛?shù)據(jù)傳遞到網(wǎng)絡(luò)上進(jìn)行去重操作。如果數(shù)據(jù)集非常大,建議使用其他方法來(lái)處理重復(fù)數(shù)據(jù),例如使用類(lèi)似 Bloom 過(guò)濾器等技術(shù)來(lái)快速排除可能的重復(fù)項(xiàng)。

`sortBy()` 是 Spark 中的一個(gè)轉(zhuǎn)換操作(Transformation),它用于對(duì) RDD 中的元素進(jìn)行排序,并返回一個(gè)新的 RDD,其中包含已排序的元素。該方法接受一個(gè)用于比較兩個(gè)元素大小的函數(shù),并根據(jù)該函數(shù)的比較結(jié)果將元素排序。

例如,以下代碼創(chuàng)建了一個(gè)整數(shù) RDD,并應(yīng)用 `sortBy()` 方法將其按升序排列:

在這個(gè)例子中,我們創(chuàng)建了一個(gè)整數(shù) RDD(即 `rdd`)。然后,我們使用 `sortBy()` 方法將其按升序排列,得到一個(gè)新的 RDD(即 `sorted_rdd`)。因此,`sorted_rdd` 將包含按升序排列的元素,即 `[1, 2, 3, 4, 5]`。

需要注意的是,`sortBy()` 方法會(huì)生成一個(gè)新的 RDD,而不會(huì)就地修改原始 RDD。另外,Spark 還提供了 `sortByKey()` 方法,它可以對(duì)鍵值對(duì) RDD 中的鍵進(jìn)行排序,類(lèi)似于 SQL 中的 ORDER BY 子句。如果需要按鍵進(jìn)行排序,則建議使用 `sortByKey()` 方法。

我們使用 False 參數(shù)將元素按降序排列。




python spark的評(píng)論 (共 條)

分享到微博請(qǐng)遵守國(guó)家法律
灵台县| 田东县| 安顺市| 达州市| 陆川县| 巢湖市| 无极县| 安龙县| 青川县| 乌海市| 龙陵县| 汽车| 合肥市| 长沙市| 四子王旗| 花莲县| 灵璧县| 辽宁省| 茂名市| 曲麻莱县| 正定县| 安溪县| 库伦旗| 枣庄市| 瑞安市| 久治县| 莱阳市| 吴川市| 拉孜县| 高要市| 政和县| 游戏| 屏东县| 鄂托克旗| 准格尔旗| 任丘市| 云林县| 苍梧县| 桑植县| 喜德县| 沙雅县|