Apache Flink與RisingWave:流處理性能報(bào)告公開預(yù)覽版

Apache Flink是一個(gè)在大數(shù)據(jù)時(shí)代廣受歡迎的分布式流處理框架,RisingWave是一個(gè)云時(shí)代誕生的流處理數(shù)據(jù)庫(kù)。
無(wú)論時(shí)代如何更迭,性能總是一個(gè)廣受關(guān)注的話題。作為一個(gè)使用Rust開發(fā)、完全面向云環(huán)境設(shè)計(jì)的流處理系統(tǒng),RisingWave宣稱已經(jīng)取得了相比于Flink十倍的性能提升。
為了得到極致性能,RisingWave已經(jīng)進(jìn)行了長(zhǎng)達(dá)半年的性能測(cè)試。在上周,RisingWave團(tuán)隊(duì)向50余位社區(qū)伙伴分享了私有預(yù)覽版性能報(bào)告。這些社區(qū)伙伴中超過(guò)一半對(duì)Flink擁有3年以上使用經(jīng)驗(yàn),并且有不少也是Flink項(xiàng)目的代碼貢獻(xiàn)者與PMC。我們從這些伙伴中學(xué)習(xí)到了新的Flink調(diào)優(yōu)經(jīng)驗(yàn),同時(shí)也收獲了不少肯定與鼓勵(lì)。
在今天(2023年7月4日),我們將公開發(fā)布性能報(bào)告預(yù)覽版,供社區(qū)伙伴了解RisingWave與Flink項(xiàng)目的性能對(duì)比。
注:由于系統(tǒng)版本不斷迭代,本性能報(bào)告僅代表2023年7月4日之前測(cè)得的數(shù)據(jù)。我們會(huì)定期更新報(bào)告,讓大家更加全面的理解Flink與RisingWave的性能。
開始之前
盡管Apache Flink與RisingWave都是開源流處理系統(tǒng),使用場(chǎng)景有大量重合,但兩者的設(shè)計(jì)理念有較大差別,直接將兩者進(jìn)行對(duì)比其實(shí)有失公允。在開始比較之前,我們先概括兩個(gè)項(xiàng)目的異同,供大家參考。
Flink(https://flink.apache.org/)作為大數(shù)據(jù)時(shí)代便誕生發(fā)展的項(xiàng)目,其設(shè)計(jì)理念是成為通用流批計(jì)算平臺(tái)。盡管因其高效的流處理而著名,F(xiàn)link目前正在向大一統(tǒng)系統(tǒng)進(jìn)化:其不斷向批處理、機(jī)器學(xué)習(xí)(Flink?ML)、數(shù)據(jù)湖(Paimon,或稱Flink Table Store)、有狀態(tài)函數(shù)(StateFun)等方向發(fā)展。與此同時(shí),F(xiàn)link主要提供類似MapReduce方式的Java API,并在Java API的基礎(chǔ)上提供了更高層的Python與SQL接口。
RisingWave(https://www.risingwave.dev/)作為流處理數(shù)據(jù)庫(kù),其設(shè)計(jì)理念是大幅提升云上流計(jì)算性能效益。RisingWave與PostgreSQL協(xié)議兼容,用戶可通過(guò)psql、JDBC等連接PostgreSQL的方式來(lái)連接RisingWave。RisingWave不提供類似MapReduce的Java?API,但提供Python UDF以提升SQL語(yǔ)言的表達(dá)性能。
為了求同存異,我們主要對(duì)比Flink SQL與RisingWave。
基準(zhǔn)測(cè)試與環(huán)境
我們使用Flink 1.16版本與RisingWave 0.19版本進(jìn)行對(duì)比。
在流處理領(lǐng)域,最常用的基準(zhǔn)測(cè)試是Nexmark。Flink官方所使用的Nexmark源代碼在此:https://github.com/nexmark/nexmark。RisingWave團(tuán)隊(duì)實(shí)現(xiàn)了基于Kafka為消息源的Nexmark:https://github.com/risingwavelabs/nexmark-bench。由于Nexmark基準(zhǔn)測(cè)試設(shè)計(jì)的年代較早,并沒(méi)有覆蓋一些常用的SQL算子,因此我們又額外添加了5條查詢語(yǔ)句,參考:https://github.com/risingwavelabs/nexmark-bench/tree/main/risingwave。
為模擬真實(shí)場(chǎng)景,我們使用Kafka不斷產(chǎn)生數(shù)據(jù),并在下游分別接上Flink與RisingWave進(jìn)行對(duì)比。我們專注于吞吐量,而非延遲。對(duì)于Flink與RisingWave,我們均要求保證exactly once語(yǔ)義。
由于具體測(cè)試環(huán)境,包括Flink的調(diào)優(yōu),相對(duì)復(fù)雜,我們并不在此詳細(xì)列出。感興趣的朋友可以微信添加risingwave_assistant進(jìn)入RisingWave中文社區(qū)進(jìn)行交流。
硬件配置
測(cè)試中使用到的機(jī)型均為AWS EC2 c5.2xLarge (8vCPUs 16GB memory)。
Risingwave一共有四個(gè)組件:1. Frontend Node; 2. Meta Node; 3. Compute Node; 4. Compactor Node. Compute Node和Compactor Node共享同一臺(tái)機(jī)器。Frontend Node和Meta Node對(duì)性能無(wú)影響,他們共享另一臺(tái)機(jī)器。
Flink則有兩個(gè)組件:1. Job Manager; 2. Task Manager. 基于對(duì)應(yīng)關(guān)系,Task Manager獨(dú)占一臺(tái)機(jī)器(RocksDB中的compaction threads對(duì)應(yīng)了Compactor Node),而Job Manager則獨(dú)占另一臺(tái)機(jī)器。
性能測(cè)試結(jié)果
我們直接放出測(cè)試結(jié)果,如下圖。

注意,在本圖中我們對(duì)比了兩個(gè)版本的Flink:Flink與Flink (better storage)。這原因是我們使用了EBS存儲(chǔ)RocksDB狀態(tài)。默認(rèn)的EBS gp3性能參數(shù)為3000 IOPS與125 MB/s。由于在流處理中,內(nèi)部狀態(tài)管理很可能是性能的瓶頸,我們?cè)谑褂媚J(rèn)EBS gp3的基礎(chǔ)上,又將性能參數(shù)提升到12000 IOPS與500 MB/s,以此來(lái)提升Flink性能。
可以看到,在列出的27條query中,RisingWave在22條query中獲得了性能優(yōu)勢(shì)。其中,在12條query中都跑出了相比于Flink至少高出50%的性能(即圖中大于150%),10條query超過(guò)Flink性能兩倍(即圖中大于200%)。其中,q102獲得了大于520倍的性能提升,q104獲得了660倍的性能提升。
詳細(xì)解釋
為什么缺少了部分查詢結(jié)果?
如果大家仔細(xì)觀察,可以發(fā)現(xiàn),我們并沒(méi)有放出q6、q11、q13、q14這四條查詢語(yǔ)句的結(jié)果。原因如下:
q6:該查詢使用了窗口函數(shù),當(dāng)前版本的RisingWave暫時(shí)不支持該查詢,但很快就會(huì)支持(本月底);
q11:該查詢使用了session window,而我們認(rèn)為該功能并非高優(yōu)先級(jí),因此目前沒(méi)有實(shí)現(xiàn);
q13:需要生成side_input,忽略;
q14:該查詢需要UDF支持。盡管RisingWave支持UDF,但這要求部署單獨(dú)的UDF服務(wù),而我們主要是測(cè)試RisingWave與Flink的性能,因此忽略。
為什么無(wú)狀態(tài)計(jì)算的性能看起來(lái)差不多?
在q0-q3以及q21、q22中,RisingWave相比于Flink的性能提升并不多。這點(diǎn)是反直覺(jué)的:RisingWave作為一個(gè)由Rust開發(fā)的項(xiàng)目,理論上應(yīng)該輕易達(dá)到由Java開發(fā)的Flink的數(shù)倍性能。但為什么在測(cè)試中,并沒(méi)有觀察到這樣的性能提升?這主要是因?yàn)樵谶@些查詢中,F(xiàn)link/RisingWave與Kafka之間進(jìn)行的信息傳輸所引入的網(wǎng)絡(luò)IO成為了主要瓶頸。也就是說(shuō),大量時(shí)間被消耗在了網(wǎng)絡(luò)傳輸上,而非CPU計(jì)算上。實(shí)際上,我們的確可以構(gòu)造出復(fù)雜無(wú)狀態(tài)計(jì)算(如對(duì)nested json進(jìn)行解析等)來(lái)體現(xiàn)出Rust項(xiàng)目的性能優(yōu)勢(shì),但Nexmark并不涵蓋此類操作,為公平起見(jiàn),我們忽略了該類測(cè)試。
如何解釋q5與q16這兩條RisingWave明顯慢于Flink的查詢?
在Flink的測(cè)試中,我們保留了Flink官方所使用的Nexmark源代碼對(duì)于source的定義。與Risingwave的唯一區(qū)別在于:Flink在nexmark數(shù)據(jù)源上定義了watermark,即水位線。水位線的存在,截至目前,為Flink帶來(lái)了額外的優(yōu)化機(jī)會(huì)。比如一個(gè)窗口聚合函數(shù)可以在水位線到來(lái)時(shí),判定某一個(gè)時(shí)間窗口可以關(guān)閉、并輸出一次且僅有一次最終結(jié)果,而不是在某一行的更新之后實(shí)時(shí)輸出最新的中間結(jié)果。 我們將前者的語(yǔ)義稱為: Emit On Window Close (EOWC)。我們注意到,F(xiàn)link不支持非EOWC語(yǔ)義的窗口聚合函數(shù),于是我們也無(wú)法通過(guò)去除watermark的定義來(lái)強(qiáng)制Flink使用非EOWC語(yǔ)義。在Risingwave用戶的生產(chǎn)環(huán)境使用中,他們向我們反饋對(duì)兩種語(yǔ)義都有需求。
Risingwave馬上會(huì)支持EOWC,但在上述的測(cè)試中還尚未支持。Flink的q5采用了這樣的語(yǔ)義,而Risingwave則仍舊輸出大量的實(shí)時(shí)中間結(jié)果。這個(gè)區(qū)別導(dǎo)致了Risingwave的下游join算子會(huì)被大量觸發(fā),導(dǎo)致性能低于Flink。我們將在下一次的測(cè)試中,為大家呈現(xiàn)支持EOWC后的結(jié)果。
對(duì)于q16,在優(yōu)化執(zhí)行計(jì)劃時(shí),Risingwave目前尚未提供如https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/tuning/#split-distinct-aggregation中所描述的Split Distinct Aggregation的優(yōu)化。我們將在支持之后,在下一版的性能報(bào)告中,提供相同執(zhí)行計(jì)劃的性能數(shù)據(jù)。
RisingWave在哪類查詢上顯著優(yōu)于Flink?
對(duì)內(nèi)部狀態(tài)復(fù)雜占用空間大的查詢,RisingWave幾乎肯定能夠?qū)崿F(xiàn)性能翻倍,甚至百倍的提升。一般來(lái)說(shuō),一個(gè)查詢?cè)綇?fù)雜,所需維護(hù)的內(nèi)部狀態(tài)也就更復(fù)雜,狀態(tài)占用空間也更大。例如在測(cè)試中,q4, q7, q9, q18, q20這些查詢的內(nèi)部狀態(tài)都接近或者超過(guò)20GB,而q102與q105內(nèi)部狀態(tài)接近10GB。此時(shí)對(duì)于內(nèi)部狀態(tài)的存取往往成為計(jì)算過(guò)程的瓶頸,需要流計(jì)算引擎對(duì)狀態(tài)進(jìn)行緩存、索引等優(yōu)化加速。從測(cè)試結(jié)果來(lái)看,RisingWave相比于Flink很顯然能夠更好的支持復(fù)雜的流處理任務(wù)。
RisingWave為什么能夠達(dá)到如此高的性能?
RisingWave的性能與其設(shè)計(jì)實(shí)現(xiàn)密不可分??偨Y(jié)下來(lái),我認(rèn)為RisingWave的性能優(yōu)勢(shì)得益于三個(gè)關(guān)鍵點(diǎn):
RisingWave是由Rust從頭開發(fā),且?guī)缀醪灰蕾囉谌魏蔚谌絁VM組件。相比于由Java開發(fā)的Flink來(lái)說(shuō),從語(yǔ)言上就有巨大優(yōu)勢(shì)。當(dāng)然,該優(yōu)勢(shì)主要存在于計(jì)算層,因?yàn)镕link所使用的RocksDB存儲(chǔ)使用的是C++開發(fā);
作為大數(shù)據(jù)項(xiàng)目,F(xiàn)link擁有類似于MapReduce樣式的Java API層,而Flink SQL實(shí)質(zhì)上是對(duì)Flink Java API進(jìn)行的封裝。計(jì)算機(jī)基礎(chǔ)理論告訴我們,封裝越多,性能越差。相比之下,RisingWave并無(wú)中間表達(dá)層,因此可以直接針對(duì)SQL進(jìn)行高度定制化優(yōu)化,從而實(shí)現(xiàn)性能大幅提升;
Flink直接使用RocksDB來(lái)存儲(chǔ)計(jì)算中間狀態(tài),RocksDB對(duì)計(jì)算并無(wú)感知。相反,RisingWave內(nèi)部自己實(shí)現(xiàn)了存儲(chǔ),存儲(chǔ)能夠感知計(jì)算,并且使用遠(yuǎn)端存儲(chǔ)(例如S3、HDFS等)來(lái)大幅降低存儲(chǔ)成本,從而實(shí)現(xiàn)計(jì)算性價(jià)比提升。
事實(shí)上,除了這三點(diǎn)之外,還有很多因素可能導(dǎo)致RisingWave相比于Flink的性能優(yōu)勢(shì)。例如,F(xiàn)link目前的發(fā)展方向是成為大一統(tǒng)平臺(tái),其批處理、機(jī)器學(xué)習(xí)、數(shù)據(jù)湖等功能的引入會(huì)輕易提升系統(tǒng)復(fù)雜性,從而帶來(lái)不必要的性能開銷??偟膩?lái)說(shuō),關(guān)于性能這一塊,我們可以單獨(dú)寫新文章介紹,感興趣的朋友也可以微信添加risingwave_assistant進(jìn)入RisingWave中文社區(qū)進(jìn)行交流。
沒(méi)有被測(cè)量的部分
復(fù)雜無(wú)狀態(tài)計(jì)算
正如上面提到的,當(dāng)處理復(fù)雜無(wú)狀態(tài)計(jì)算時(shí),理論來(lái)說(shuō)由Rust開發(fā)的系統(tǒng)性能應(yīng)該遠(yuǎn)高于基于JVM語(yǔ)言編寫的系統(tǒng)??紤]到如今不少應(yīng)用都需要在流處理系統(tǒng)內(nèi)部進(jìn)行json解析、字符串處理等復(fù)雜操作,我們的確會(huì)考慮在今后添加測(cè)試該類查詢。
多流join
多流join是流處理中的一個(gè)非常典型的場(chǎng)景。當(dāng)用戶有多個(gè)數(shù)據(jù)源時(shí)(例如多個(gè)MySQL實(shí)例或者多個(gè)Kafka topic),便會(huì)考慮使用流處理系統(tǒng)進(jìn)行join來(lái)進(jìn)行分析。在處理多流join的過(guò)程城中,往往都會(huì)需要面臨內(nèi)部狀態(tài)過(guò)大的問(wèn)題。根據(jù)Nexmark基準(zhǔn)測(cè)試的結(jié)果,我們已經(jīng)初步驗(yàn)證了RisingWave在內(nèi)部狀態(tài)較大情況下的出色性能,而多流join很顯然是RisingWave所擅長(zhǎng)的。事實(shí)上,我們已經(jīng)開始了對(duì)多流join的實(shí)驗(yàn),根據(jù)初步實(shí)驗(yàn)結(jié)果,RisingWave可以輕松處理十個(gè)以上數(shù)據(jù)流的join,而相比之下,F(xiàn)link往往會(huì)直接因?yàn)闋顟B(tài)管理問(wèn)題而崩潰。我們會(huì)逐步將多流join的實(shí)驗(yàn)結(jié)果公布給大家。
UDF、水位線等高級(jí)功能
流處理系統(tǒng)的精湛之處遠(yuǎn)遠(yuǎn)不至于處理聚合、join、窗口等算子。事實(shí)上,用戶經(jīng)常需要UDF或者水位線等進(jìn)階功能來(lái)擴(kuò)展表達(dá)能力,以及保證系統(tǒng)正確性。然而,限于Nexmark基準(zhǔn)測(cè)試的局限性,我們并沒(méi)有對(duì)這些功能進(jìn)行測(cè)試。我們會(huì)考慮在今后的測(cè)試中涵蓋這些功能。
如何對(duì)Flink進(jìn)行性能調(diào)優(yōu)?
Flink性能調(diào)優(yōu)不僅是個(gè)技術(shù)活,更是個(gè)經(jīng)驗(yàn)活。RisingWave團(tuán)隊(duì)在Flink性能調(diào)優(yōu)方面積累了將近一年的經(jīng)驗(yàn)。總結(jié)下來(lái),我們可以通過(guò)三個(gè)方面對(duì)Flink進(jìn)行性能優(yōu)化:
部署調(diào)優(yōu)。Flink支持Kubernetes部署。然而,由于對(duì)于JVM生態(tài)的重度依賴,通過(guò)Kubernetes部署Flink并不能直接得到最佳性能。我們往往需要考慮Flink的具體部署方式(比如如何部署Zookeeper節(jié)點(diǎn)等等)。
SQL調(diào)優(yōu)。Flink使用Calcite進(jìn)行SQL解析與規(guī)劃,但由于對(duì)數(shù)據(jù)缺少感知,F(xiàn)link并不能很好的進(jìn)行SQL優(yōu)化。因此,在一些查詢中,我們需要手動(dòng)改寫SQL實(shí)現(xiàn)更高性能。
RocksDB調(diào)優(yōu)。由于Flink使用RocksDB進(jìn)行內(nèi)部狀態(tài)管理,而RocksDB作為存儲(chǔ)層并不感知計(jì)算,因此用戶往往需要手工調(diào)節(jié)RocksDB參數(shù)來(lái)實(shí)現(xiàn)最佳性能。值得注意的是,RocksDB本身就擁有數(shù)百個(gè)可調(diào)參數(shù),希望調(diào)好RocksDB還得對(duì)RocksDB的內(nèi)部結(jié)構(gòu)進(jìn)行非常深入的研究。
如何對(duì)RisingWave進(jìn)行性能調(diào)差?
最有可能降低RisingWave性能的方式在于調(diào)小計(jì)算節(jié)點(diǎn)的內(nèi)存,并通過(guò)大量不規(guī)則訪問(wèn)來(lái)使RisingWave出現(xiàn)大量緩存未命中。其原理是:RisingWave使用遠(yuǎn)端存儲(chǔ)來(lái)維護(hù)內(nèi)部狀態(tài),并使用計(jì)算節(jié)點(diǎn)緩存最常訪問(wèn)的遠(yuǎn)端存儲(chǔ)上的狀態(tài)。當(dāng)計(jì)算節(jié)點(diǎn)的容量較小,并訪問(wèn)不規(guī)則的情況下,就可能欺騙到緩存策略(RisingWave使用的是基于LRU的算法),從而讓RisingWave頻繁訪問(wèn)遠(yuǎn)端存儲(chǔ)。遠(yuǎn)端存儲(chǔ)訪問(wèn)往往代價(jià)較大,頻繁訪問(wèn)注定會(huì)造成整體性能下降。
總結(jié)
在本文,我們描述了基于Nexmark基準(zhǔn)測(cè)試進(jìn)行的Flink和RisingWave性能對(duì)比。誠(chéng)然,單一的基準(zhǔn)測(cè)試并不能夠覆蓋流處理系統(tǒng)的方方面面,但是我們?nèi)匀豢梢源笾铝私獾紽link與RisingWave在常用場(chǎng)景下的性能區(qū)別以及原因。除流處理能力以外,F(xiàn)link有著諸多特性值得用戶探索(包括批處理、機(jī)器學(xué)習(xí)、StateFun等等),而RisingWave則會(huì)更加專注于優(yōu)化流處理方面的性能效率。
性能并不是評(píng)價(jià)系統(tǒng)優(yōu)劣的唯一標(biāo)準(zhǔn),但的確我們一直都在為實(shí)現(xiàn)極致性能而努力。
關(guān)于 RisingWave
RisingWave是一款分布式SQL流處理數(shù)據(jù)庫(kù),旨在幫助用戶降低實(shí)時(shí)應(yīng)用的的開發(fā)成本。作為專為云上分布式流處理而設(shè)計(jì)的系統(tǒng),RisingWave為用戶提供了與PostgreSQL類似的使用體驗(yàn),并且具備比Flink高出10倍的性能以及更低的成本。