【優(yōu)分享】JMeter源碼解析之結(jié)果收集器

本文作者 優(yōu)測性能測試專家高源。
簡介:本文以最新的JMeter 5.5版本源代碼為例詳細(xì)介紹了單機(jī)模式和分布式模式下結(jié)果收集器的工作原理。通篇干貨,還不快來了解一下!?
一、JMeter結(jié)果收集器概述
JMeter是在壓力領(lǐng)域中最常見的性能測試工具,由于其開源的特點,受到廣大測試和開發(fā)同學(xué)的青睞。但是,在實際應(yīng)用過程中,JMeter存在的一些性能瓶頸也凸顯出來,經(jīng)常會遇到大并發(fā)下壓不上去的情況。筆者通過深入分析其源碼實現(xiàn),找到JMeter存在的瓶頸問題及根本原因,為以后更好地使用工具提供一些思路。
結(jié)果收集器:在JMeter中擔(dān)任報告數(shù)據(jù)收集的重任,無論是單機(jī)模式還是master-slave模式,每一個請求的結(jié)果都是通過相應(yīng)的結(jié)果收集器進(jìn)行數(shù)據(jù)采集的。在單機(jī)模式下用Result Collector這個監(jiān)聽器去采集,在分布式(master-slave)場景下通過配RemoteSampleListenerWrapper下的指定sender進(jìn)行收集,具體配置jmeter.property文件的mode屬性和隊列長度實現(xiàn)。下面我們以當(dāng)前最新的JMeter 5.5版本的源代碼為例詳細(xì)介紹下單機(jī)模式和分布式模式下結(jié)果收集器的工作原理。
二、單機(jī)模式
1、初始化
在命令行模式下,JMeter會根據(jù)用戶的logfile配置選擇是否添加Result Collector,一般在實際測試的時候,我們都是需要有詳細(xì)統(tǒng)計報告生成的,所以都會添加Result Collector,收集器放在了整個hashtree的第一個節(jié)點,代碼如下:
?void runNonGui(String testFile, String logFile, boolean remoteStart, String remoteHostsString, boolean generateReportDashboard){
?....
?ResultCollector resultCollector = null;
?? if (logFile != null) {
???? resultCollector = new ResultCollector(summariser);
???? resultCollector.setFilename(logFile);
???? clonedTree.add(clonedTree.getArray()[0], resultCollector);
???? }
?? else {
???? // only add Summariser if it can not be shared with the ResultCollector
?? if (summariser != null) {
????? clonedTree.add(clonedTree.getArray()[0], summariser);
????? }
????? }
?....
?
?}
2、加載流程
添加完結(jié)果收集器后,執(zhí)行腳本過程中,JMeter會根據(jù)jmx的編排,按照如下的執(zhí)行順序進(jìn)行調(diào)用:
?

?
每一個線程都是按照以上的順序循環(huán)反復(fù)執(zhí)行,直到壓測停止。具體代碼如下(相應(yīng)的關(guān)鍵點已增加注釋):
private void executeSamplePackage(Sampler current,
????? TransactionSampler transactionSampler,
????? SamplePackage transactionPack,
????? JMeterContext threadContext) {
? threadContext.setCurrentSampler(current);
? // Get the sampler ready to sample
? SamplePackage pack = compiler.configureSampler(current);
? runPreProcessors(pack.getPreProcessors());//運行前置處理器
? // Hack: save the package for any transaction controllers
? threadVars.putObject(PACKAGE_OBJECT, pack);
? delay(pack.getTimers());//定時器timer
? SampleResult result = null;
? if (running) {
?????? Sampler sampler = pack.getSampler();
?????? result = doSampling(threadContext, sampler);
?? }
?? // If we got any results, then perform processing on the result
?? if (result != null) {
?? if (!result.isIgnore()) {
????????? ...???????????????
?? runPostProcessors(pack.getPostProcessors());//運行后置處理器
?? checkAssertions(pack.getAssertions(), result, threadContext);//運行斷言處理器
??????????? // PostProcessors can call setIgnore, so reevaluate here
??????????? if (!result.isIgnore()) {
??????????? // Do not send subsamples to listeners which receive the transaction sample
??????????? List<SampleListener> sampleListeners = getSampleListeners(pack, transactionPack, transactionSampler);
??????????? notifyListeners(sampleListeners, result);//執(zhí)行監(jiān)聽器,此處為執(zhí)行報告收集器的sampleOccurred方法
??????????? }
??????????? compiler.done(pack);
????? ??????...
??? }
?
?
收集器Result Collector執(zhí)行的具體代碼:
@Override
public void sampleOccurred(SampleEvent event) {
??? SampleResult result = event.getResult();
??? if (isSampleWanted(result.isSuccessful())) {
??????? sendToVisualizer(result);
??????? if (out != null && !isResultMarked(result) && !this.isStats) {
??????? SampleSaveConfiguration config = getSaveConfig();
??????? result.setSaveConfig(config);
??????? try {
?????????????? if (config.saveAsXml()) {
?????????????????? SaveService.saveSampleResult(event, out);
?????????????? } else { // !saveAsXml
?????????????????? CSVSaveService.saveSampleResult(event, out);
?????????????? }
????????? } catch (Exception err) {
????????????? log.error("Error trying to record a sample", err); // should throw exception back to caller
?????????? }
????? }
? }
?? if(summariser != null) {
?????? summariser.sampleOccurred(event);
?? }
}
?
以上主要實現(xiàn)了將每個請求的結(jié)果數(shù)據(jù)存儲到日志文件中(CSV /XML),為后續(xù)的報告生成提供數(shù)據(jù)文件。
3、性能瓶頸分析
從以上的流程不難看出,由于每個線程的每個請求后都會頻繁調(diào)用Result Collector的sample Occurred方法,即會頻繁讀寫文件,有可能導(dǎo)致IO瓶頸。一旦存儲的速度下降,必然導(dǎo)致線程循環(huán)發(fā)包的速度下降,從而導(dǎo)致壓不上去的情況出現(xiàn)。所以單機(jī)模式下不建議設(shè)置超過200以上的并發(fā),若非必須,盡量關(guān)閉日志采集和html報告生成,以免報告置信度存在問題。
?
?
三、分布式模式
為了應(yīng)對單機(jī)的各種瓶頸問題,JMeter采用了分布式(master-slave)模式。加載執(zhí)行流程與單機(jī)基本一致,不再贅述,區(qū)別在于監(jiān)聽器換成了Remote Sample ListenerImpl收集器。
1、發(fā)送模式指定方法
下面我們重點看下Remote Sample ListenerImpl監(jiān)聽器的代碼:
@Override
public void processBatch(List<SampleEvent> samples) {
??? if (samples != null && sampleListener != null) {
??????? for (SampleEvent e : samples) {
??????????? sampleListener.sampleOccurred(e);
??????? }
??? }
}
@Override
public void sampleOccurred(SampleEvent e) {
??? if (sampleListener != null) {
??????? sampleListener.sampleOccurred(e);
??? }
}
?
從以上代碼可以看出,這個監(jiān)聽器里又調(diào)用了sample Listener的sample Occurred方法,而sample Listener是通過用戶在jmeter.property文件中指定的。
?

?
?
2、AsynchSampleSender源碼解析
下面我們以Asynch Sample Sender為例進(jìn)行源碼詳細(xì)介紹:
public class AsynchSampleSender extends AbstractSampleSender implements Serializable {
?????? protected Object readResolve() throws ObjectStreamException{
??????? int capacity = getCapacity();
??????? log.info("Using batch queue size (asynch.batch.queue.size): {}", capacity); // server log file
??????? queue = new ArrayBlockingQueue<>(capacity);
??????? Worker worker = new Worker(queue, listener);
??????? worker.setDaemon(true);
??????? worker.start();
??????? return this;
??? }
@Override
public void testEnded(String host)
??? log.debug("Test Ended on {}", host);
??? try {
??????? listener.testEnded(host);
??????? queue.put(FINAL_EVENT);
??? } catch (Exception ex) {
??????? log.warn("testEnded(host)", ex);
??? }
??? if (queueWaits > 0) {
??????? log.info("QueueWaits: {}; QueueWaitTime: {} (nanoseconds)", queueWaits, queueWaitTime);
??????? }
??? }
?@Override
public void sampleOccurred(SampleEvent e)
??? try {
??????? if (!queue.offer(e)){ // we failed to add the element first time
??????????? queueWaits++;
??????????? long t1 = System.nanoTime();
??????????? queue.put(e);
? ??????????long t2 = System.nanoTime();
??????????? queueWaitTime += t2-t1;
??????? }
??? } catch (Exception err) {
??????? log.error("sampleOccurred; failed to queue the sample", err);
??? }
}
private static class Worker extends Thread {
??? @Override
??? public void run()
??????? try {
??????????? boolean eof = false;
??????????? while (!eof) {
??????????????? List<SampleEvent> l = new ArrayList<>();
??????????????? SampleEvent e = queue.take();
??????????????? // try to process as many as possible
??????????????? // The == comparison is not an error
??????????????? while (!(eof = e == FINAL_EVENT) && e != null) {
???????????????????? l.add(e);
???????????????????? e = queue.poll(); // returns null if nothing on queue currently
???????????????? }
??????????????? int size = l.size();
??????????????? if (size > 0) {
??????????????????? try {
?????????????????????? listener.processBatch(l);
??????????????????? } catch (RemoteException err) {
????? ??????????????????if (err.getCause() instanceof java.net.ConnectException){
??????????????????????????? throw new JMeterError("Could not return sample",err);
??????????????????????? }
??????????????????????? log.error("Failed to return sample", err);
???? ???????????????}
???????????????? }
??????????? }
??????? } catch (InterruptedException e) {
??????????? Thread.currentThread().interrupt();
??????????? }
??????? log.debug("Worker ended");
??????? }
??? }
}
?
?
從以上代碼可以看出,Asynch SampleSender的sample Occurred方法里只進(jìn)行入列的操作,而采集上報工作是啟動了一個work線程實現(xiàn)的,相當(dāng)于異步處理所有請求數(shù)據(jù)。這樣設(shè)計不會阻塞發(fā)包的流程,性能上要優(yōu)于單機(jī)模式。但是,在一定情況下,也是會出現(xiàn)性能瓶頸的。
這個隊列采用的是Array Blocking Queue(阻塞隊列),這個隊列有如下特點:
·Array Blocking Queue是有界的初始化必須指定大小,隊列滿了后,無法入列。
·Array Blocking Queue實現(xiàn)的隊列中的鎖是沒有分離的,即添加操作和移除操作采用的同一個Reenter Lock鎖。
3、性能瓶頸分析
瓶頸點一:隊列大小問題
當(dāng)我們實際壓測過程中,如果隊列大?。╝synch.batch.queue.size)設(shè)置過小,入列速度大于出列速度,就會導(dǎo)致隊列滿而阻塞整個發(fā)壓流程,而如果隊列設(shè)置過大,一旦請求的包體比較大,很容易造成內(nèi)存溢出。
瓶頸點二:單一鎖問題
在壓測過程中,入列出列是非常頻繁的,而同一個Reenter Lock鎖也可能造成入列和出列過程中,因無法獲得鎖而入列或者出列延遲,繼而影響發(fā)壓效率。
四、總結(jié)
JMeter因其完善的社區(qū)和開源特點,在日常壓測中可廣泛使用。JMeter適合進(jìn)行小規(guī)模的壓測。但是在大規(guī)模的壓測過程中,受本地機(jī)器性能、帶寬等限制,不宜進(jìn)行單機(jī)壓測,可以使用JMeter的master-slave的方式進(jìn)行分布式壓測。但是需提前設(shè)置好結(jié)果收集器和隊列的大小,并進(jìn)行預(yù)先演練評估出上限qps,防止出現(xiàn)壓不上去的情況。此外,master-slave通信方式是遠(yuǎn)程RMI的雙向通信方式,連接數(shù)過多也會造成master的瓶頸出現(xiàn),需要做好量級的提前評估。
?
*版權(quán)聲明:本文作者 優(yōu)測性能測試專家高源。
?
?
優(yōu)測壓力測試是一款在線云原生全鏈路壓測平臺,百萬級并發(fā)即召即用。兼容JMeter腳本,一鍵上傳即可隨時發(fā)壓,免去壓測工具搭建成本。歡迎大家登錄優(yōu)測官網(wǎng)免費體驗!
?
想了解更多壓力測試知識、產(chǎn)品與服務(wù),可掃下方二維碼進(jìn)群交流。

?
聯(lián)系我們
熱線:
0755-86013388-843186
官網(wǎng):
https://utest.21kunpeng.com/home/perftest?from=baiduseo
企業(yè)微信:
?

?