一文詳解 Flink 的分布式緩存使用步驟!
Flink提供了一個(gè)類似于Hadoop的分布式緩存,讓并行運(yùn)行實(shí)例的函數(shù)可以在本地訪問。這個(gè)功能可以被使用來分享外部靜態(tài)的數(shù)據(jù),例如:機(jī)器學(xué)習(xí)的邏輯回歸模型等!
緩存的使用流程:
使用ExecutionEnvironment實(shí)例對(duì)本地的或者遠(yuǎn)程的文件(例如:HDFS上的文件),為緩存文件指定一個(gè)名字注冊(cè)該緩存文件!當(dāng)程序執(zhí)行時(shí)候,Flink會(huì)自動(dòng)將復(fù)制文件或者目錄到所有worker節(jié)點(diǎn)的本地文件系統(tǒng)中,函數(shù)可以根據(jù)名字去該節(jié)點(diǎn)的本地文件系統(tǒng)中檢索該文件!
和廣播變量的區(qū)別:
-廣播變量廣播的是程序中的變量(DataSet)數(shù)據(jù),分布式緩存廣播的是文件
-廣播變量將數(shù)據(jù)廣播到各個(gè)TaskManager的內(nèi)存中,分布式緩存廣播到各個(gè)TaskManager的本地文件系統(tǒng)
用法
使用Flink運(yùn)行時(shí)環(huán)境的 registerCachedFile 注冊(cè)一個(gè)分布式緩存
在操作中,使用 getRuntimeContext.getDistributedCache.getFile ( 文件名 )獲取分布式緩存
示例
創(chuàng)建一個(gè) 成績(jī) 數(shù)據(jù)集
請(qǐng)通過分布式緩存獲取到學(xué)生姓名,將數(shù)據(jù)轉(zhuǎn)換為
注:資料\測(cè)試數(shù)據(jù)源\distribute_cache_student 文件保存了學(xué)生ID以及學(xué)生姓名
操作步驟
1. 將 distribute_cache_student 文件上傳到HDFS / 目錄下
2. 獲取批處理運(yùn)行環(huán)境
3. 創(chuàng)建成績(jī)數(shù)據(jù)集
4. 對(duì) 成績(jī) 數(shù)據(jù)集進(jìn)行map轉(zhuǎn)換,將(學(xué)生ID, 學(xué)科, 分?jǐn)?shù))轉(zhuǎn)換為(學(xué)生姓名,學(xué)科,分?jǐn)?shù))
RichMapFunction 的 open 方法中,獲取分布式緩存數(shù)據(jù)
在 map 方法中進(jìn)行轉(zhuǎn)換
5. 實(shí)現(xiàn) open 方法
使用 getRuntimeContext.getDistributedCache.getFile 獲取分布式緩存文件
使用 Scala.fromFile 讀取文件,并獲取行
將文本轉(zhuǎn)換為元組(學(xué)生ID,學(xué)生姓名),再轉(zhuǎn)換為L(zhǎng)ist
6. 實(shí)現(xiàn) map 方法
從分布式緩存中根據(jù)學(xué)生ID過濾出來學(xué)生
獲取學(xué)生姓名
構(gòu)建最終結(jié)果元組
7. 打印測(cè)試
參考代碼
