遠程調(diào)試+頁面監(jiān)控:用最熟悉的方式開發(fā)Spark應用
大家好。 我是樓蘭,持續(xù)分享最純粹的技術內(nèi)容。
大數(shù)據(jù)技術已經(jīng)大行其道,但是很多人對大數(shù)據(jù)組件依然會覺得很陌生,很不順手。對大部分人來說,環(huán)境部署、API使用其實問題都不大,技術人員最不欠缺的就是學習能力。而陌生的根源就在于這些遠程執(zhí)行的代碼很難像本地應用一樣進行靠譜的調(diào)試。并且這些遠程組件又很難像我們熟悉的數(shù)據(jù)庫之類的產(chǎn)品一樣集成進來。這里就簡單總結(jié)一下Spark的遠程調(diào)試以及應用監(jiān)控的思路,讓Spark不再那么陌生。
以往我們開發(fā)熟悉的J2EE應用,在本地開發(fā)調(diào)試好了之后,放到服務器上運行效果差別一般不會太大,所以我們也習慣了這種本地調(diào)試,服務端運行的開發(fā)模式。但是在大數(shù)據(jù)場景下,MapReduce計算,Spark計算等這些大型的計算,就很難使用這種熟悉的開發(fā)模式了。雖然MapReduce,Spark都提供了Local的運行模式,可以在本地運行。但是當計算規(guī)模比較大,計算任務比較復雜時,本地調(diào)試就顯得有點捉襟見肘了。本地調(diào)試完美的應用,放到集群上,往往漏洞百出,這也讓很多人頭疼不已。
這時,就可以引入遠程調(diào)試模式,讓程序在遠程Spark集群上運行,而在本地IDEA中打斷點調(diào)試。用我們熟悉的方式開發(fā),并且在真實環(huán)境中驗證,這樣就回到了我們熟悉的開發(fā)模式。
步驟也不復雜,但是非常實用。秘密就在最常用的JDK中。Java提供了JDWP遠程調(diào)試機制,可以對遠程應用進行本地調(diào)試。JDWP全稱Java Debug Wire Protocol,是JAVA提供的一個非常有用的功能。像Hadoop的MapReduce計算,Spark計算這些基于JVM的應用程序,都可以進行遠程調(diào)試。大大釋放本地資源。
接下來使用JDK1.8版本 , spark-3.1.1-bin-hadoop3.2版本 和 IDEA 2021.1.3旗艦版一起來嘗試一下遠程調(diào)試功能把。
二、Spark集群中提交任務
1、普通提交方式
正常任務提交方式會通過Spark提供的spark-submit腳本進行提交。需要一大堆的參數(shù),記不住怎么辦?直接執(zhí)行腳本就行了。他會給你足夠的提示。

按照提示,將任務打成jar包后就可以往Spark集群中提交了。
?/app/spark/spark-3.1.1-bin-hadoop3.2/bin/spark-submit --class com.roy.personaeng.etl.DataToJsonJob --master yarn /app/hspersona/engine/HsPersonaEng.jar ID2112131310569166728092638 admin /data/admin/hspersona/DS ID2112131310569166728092638 /
其中 --class指定任務啟動類,也就是main方法所在的類。--master 指定為yarn,表示交由hadoop的yarn來進行資源管理。然后后面的幾個參數(shù)代表任務所在的jar包,以及任務執(zhí)行時的參數(shù),這些參數(shù)就是任務啟動類的main方法接收到的參數(shù)。
2、帶遠程監(jiān)聽的提交方式
如果正常提交任務,那任務在提交之后就會直接在spark集群中運行起來了,這樣是無法進行斷點調(diào)試的。這時,可以使用JVM提供的遠程調(diào)試功能,讓Spark程序乖乖的聽話。
?/app/spark/spark-3.1.1-bin-hadoop3.2/bin/spark-submit --class com.roy.personaeng.etl.DataToJsonJob --master yarn --driver-java-options "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005" /app/hspersona/engine/HsPersonaEng.jar ID2112131310569166728092638 admin /data/admin/hspersona/DS ID2112131310569166728092638 /
其中主要是增加了幾個JVM的參數(shù),-Xdebug表示啟用調(diào)試特性。-Xrunjdwp表示啟用JDWP。配置時給JDWP增加了幾個子選項:
transport=dt_socket:這表示Java前端和后端之間的傳輸方法。dt_socket表示使用套接字傳輸。
address=8888:這表示JVM在8888端口上監(jiān)聽請求。
server=y:y表示啟動的JVM是被調(diào)試者。如果為n,則表示啟動的JVM是調(diào)試器。
suspend=y:y表示啟動的JVM會暫停等待,直到調(diào)試器連接上才繼續(xù)執(zhí)行。suspend=n,則JVM不會暫停等待。
帶上這幾個JVM參數(shù)后提交任務,任務將會在剛開始的時候阻塞住,等待開啟監(jiān)聽。

這時應用會等待IDEA發(fā)起監(jiān)聽。下面就需要在IDEA中開啟監(jiān)聽程序,任務才會繼續(xù)往下執(zhí)行。
三、IDEA中配置遠程監(jiān)聽端口
回到IDEA項目中,首選需要配置一個遠程執(zhí)行任務

在打開的窗口中配置一個Remote JVM Debug執(zhí)行任務,指向遠程服務端的監(jiān)聽端口。

接下來在本地任務中提前打好斷點,就可以用Debug指令啟動調(diào)試了。

接下來就可以像調(diào)試本地代碼一樣一行行進行調(diào)試了。
四、應用中如何監(jiān)控Spark任務執(zhí)行情況
解決了第一個開發(fā)的陌生感問題,接下來就是結(jié)果驗證了。我們?nèi)ピL問一些熟悉的遠程功能組件,比如數(shù)據(jù)庫,都可以立即拿到結(jié)果,所以我們在開發(fā)應用時,可以很方便的做出很多監(jiān)控程序,在應用中直接跟蹤指令執(zhí)行結(jié)果。
但是使用Spark后,情況就不太妙了。如何在應用中啟動Spark的應用程序并監(jiān)控Spark任務執(zhí)行情況呢?
Spark應用出問題的地方太多了,如果只是拿到計算任務的執(zhí)行結(jié)果是成功還是失敗,那幾乎沒有任務作用。程序拋出的異常對于問題排查也幾乎起不到任何幫助。只能從運行日志中觀察。但是運行日志是在命令行展開的,難道要我蹩腳的去查應用日志文件?
No,No,No.內(nèi)心強大的程序員自然有更妙的技術設計(偷懶工具)。于是我轉(zhuǎn)變了一下思路,整理出了另外一種在J2EE應用中管理Spark任務的方法。

由于觸發(fā)的是一個遠程Spark集群中的計算任務,所以后臺肯定不可能集成Spark的API去啟動任務。這時,可以調(diào)用遠程Shell指令的方式,實現(xiàn)曲線救國。
首先:獲取到遠程指令執(zhí)行的結(jié)果
?<dependency>
? ? ?<groupId>ch.ethz.ganymed</groupId>
? ? ?<artifactId>ganymed-ssh2</artifactId>
? ? ?<version>build210</version>
?</dependency>
使用這個功能組件,可以在Java應用中調(diào)用shell指令,并且獲得shell指令的執(zhí)行日志。并且還提供了SSH連接到遠程服務器的功能,非常全面貼心。這里提供一個簡單的執(zhí)行遠程shell指令的測試方法,大家趕緊拿到自己的環(huán)境中去試試把。
?//hostName:遠程服務器地址,userName:遠程服務器登錄用戶,password:遠程服務器登錄密碼,command:要執(zhí)行的shell指令
?private static int submitCommand(String hostName String userName,String password, String command) {
? ? ? ? ?try {
? ? ? ? ? ? ?//登錄遠程服務器 注意Connection,Session這些都是ganymed-ssh2工具包中的類
? ? ? ? ? ? ?Connection conn = new Connection(hostName);
? ? ? ? ? ? ?conn.connect();
? ? ? ? ? ? ?boolean isAuthenticated = conn.authenticateWithPassword(userName, password);
? ? ? ? ? ? ?if (isAuthenticated == false) {
? ? ? ? ? ? ? ? ?throw new IOException("Authentication failed.");
? ? ? ? ? ? ?}
?
? ? ? ? ? ? ?Session sess = conn.openSession();
? ? ? ? ? ? ?sess.execCommand(command);
?
? ? ? ? ? ? ?System.out.println("be executing command : " + command);
? ? ? ? ? ? ?System.out.println("===================== Start =====================");
? ? ? ? ? ? ?//解析指令執(zhí)行日志
? ? ? ? ? ? ?InputStream stdout = null;
? ? ? ? ? ? ?stdout = new StreamGobbler(sess.getStdout());
? ? ? ? ? ? ?BufferedReader br = new BufferedReader(new InputStreamReader(stdout, "UTF-8"));
? ? ? ? ? ? ?while (true) {
? ? ? ? ? ? ? ? ?String line = br.readLine();
? ? ? ? ? ? ? ? ?if (line == null) {
? ? ? ? ? ? ? ? ? ? ?break;
? ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ? ?if ("".equals(line.trim())) {
? ? ? ? ? ? ? ? ? ? ?continue;
? ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ? ?System.out.println(line);
? ? ? ? ? ? ?}
? ? ? ? ? ? ?br.close();
? ? ? ? ? ? ?System.out.println("===================== END =====================");
? ? ? ? ? ? ?sess.waitForCondition(ChannelCondition.EXIT_STATUS, 60*60*1000);//默認等待一個小時的數(shù)據(jù)
? ? ? ? ? ? ?System.out.println("ExitCode: " + sess.getExitStatus());
?
? ? ? ? ? ? ?sess.close();
? ? ? ? ? ? ?conn.close();
? ? ? ? ? ? ?if(sess.getExitStatus()==null){
? ? ? ? ? ? ? ? ?return 1;
? ? ? ? ? ? ?}
? ? ? ? ? ? ?return sess.getExitStatus();
? ? ? ? ?} catch (IOException e) {
? ? ? ? ? ? ?e.printStackTrace(System.err);
? ? ? ? ? ? ?return 1;
? ? ? ? ?}
? ? ?}
這樣就可以在Java應用中打印出在遠程服務器上執(zhí)行shell指令的執(zhí)行日志了。
然后:實現(xiàn)頁面監(jiān)控執(zhí)行日志
其實獲取到了執(zhí)行結(jié)果之后,想要在頁面展示執(zhí)行結(jié)果,相對就比較簡單了。我們可以引入SpringBoot當中的HttpSession來作為一個橋梁。將上面方法中通過readline讀取到的每一行內(nèi)容都放到Session當中。然后在前端啟動一個定時任務,固定從Session中讀取日志內(nèi)容展示到前端頁面。
這個實現(xiàn)方式并不難,設置并讀取Session信息,這應該是最基本的操作了。但是唯一需要注意的是,在讀取Session中的日志信息時,要記得讀取完后就把Session中的這個信息清空,這樣可以防止Session中的信息爆炸。通過這樣簡單的思路,就可以在頁面上監(jiān)控Spark任務執(zhí)行的每一行日志。再對日志關鍵部分稍微做一點點美化,最終就形成了這樣的執(zhí)行效果

當然,基于Spark的場景,計算結(jié)果到底是成功還是失敗?失敗時問題出在哪里? 這些問題還是只能從日志中自行進行判斷,在應用層還是很難完成這種復雜日志的解析的。