Facebook Velox 運行機制全面解析

概述
Facebook Velox 是一個針對 SQL 運行時的 C++ 庫,旨在統(tǒng)一 Facebook 各種計算流,包括 Spark 和 Presto,使用推的模式、支持向量計算。最近由于工作原因,需要學習一下其運行機制和代碼,遂對比文檔和代碼梳理了以下一些細節(jié)。
Velox 接受一棵優(yōu)化過的 PlanNode Tree,然后將其切成一個個的線性的 Pipeline,Task 負責這個轉(zhuǎn)變過程,每個 Task 針對一個 PlanTree Segment。大多數(shù)算子是一對一翻譯的,但是有一些特殊的算子,通常出現(xiàn)在多個 Pipeline 的切口處,通常來說,這些切口對應(yīng)計劃樹的分叉處,如 HashJoinNode,CrossJoinNode, MergeJoinNode ,通常會翻譯成 XXProbe 和 XXBuild。但也有一些例外,比如 LocalPartitionNode 和 LocalMergeNode 。

為了提高執(zhí)行的并行度,Velox 引入了 LocalPartitionNode 節(jié)點,可以將一個 Pipeline 進行多線程(每個線程一個實例)并行運行,并且互斥的消費數(shù)據(jù)。其中每個實例稱為 Driver。該算子在輸入計劃樹里并沒有分叉(即沒有多個 source),但在翻譯成物理算子時,會在此節(jié)點處進行切開,并在切口前后改變執(zhí)行的并行度,對應(yīng)的物理算子是LocalPartition ?和 ?LocalExchange。

還有一個特殊節(jié)點,稱為 LocalMergeNode,該對輸入有要求:必須有序,然后會進行單線程的歸并排序,從而使輸出全局有序。也因此,由其而切開的消費 Pipeline 一定是單 Driver 的。翻譯成算子,對應(yīng)兩個 CallbackSink 和 LocalMerge。

總結(jié)一下,上述五個 PlanNode,HashJoinNode,CrossJoinNode, MergeJoinNode ,LocalPartitionNode ,LocalMergeNode 在翻譯時會造成切口,即將邏輯 PlanTree 切成多個物理 Pipeline,因此在切口處會將一個邏輯算子翻譯成多個物理算子,分到不同 Pipeline 上。每個 Pipeline 會有一個從 0 開始的編號:Pipeline ID,是全局粒度的。
并且,可以由 LocalPartitionNode 來按需改變每個 Pipeline 并行度,其中 Pipeline 的每個線程由一個 Driver 來執(zhí)行。每個 Driver 也有一個從 0 開始的編號:Driver ID,是 Pipeline 粒度的。
其他 PlanNode 到算子的翻譯基本都是一對一的,感興趣的可以看官方文檔的這個頁面:Plan Nodes and Operators。
下面展開一些細節(jié)。
Splits
Velox 允許應(yīng)用層(即 Velox 的使用方)以 Splits (每個算子的輸入片段稱為 Split)的方式給 Pipeline 喂數(shù)據(jù),可以流式的喂,因此有兩個 API:
Task::addSplit(planNodeId, split) :喂一份數(shù)據(jù)給 Velox
Task::noMoreSplits() :通知 Velox 我喂完了。
Velox 會使用一個隊列在緩存這些 Splits 數(shù)據(jù)。在數(shù)據(jù)喂完之前的任意一個時刻,Pipeline 的葉子算子(對的,外部喂數(shù)據(jù)只能發(fā)生在葉子節(jié)點,如 TableScan,Exchange 和 MergeExchange)都可以從隊列中取數(shù)據(jù),對應(yīng) API 是 Task::getSplitOrFuture(planNodeId) ,返回值有兩種:
如果隊列中有數(shù)據(jù),則返回一個 Split
如果隊列中無數(shù)據(jù),但還沒有收到喂完的信號,則返回一個 Future (類似于一個欠條,之后有數(shù)據(jù)之后,會憑該欠條兌付)。

編輯切換為居中
Join Bridges and Barriers
Join (HashJoinNode 和 CrossJoinNode)會翻譯成 XXProbe 和 XXBuild 兩個算子,并且通過一個共享的 Bridge 來溝通數(shù)據(jù),兩側(cè) Pipeline 都可以通過 Task::getHashJoinBridge() 函數(shù)來根據(jù) PlanNodeId 獲取該共享的 Bridge。
為了提高 build 速度,build 側(cè) Pipeline 通常使用多個 Driver 并發(fā)執(zhí)行。但由于只有一個 Bridge,每個 Driver 在結(jié)束時可以調(diào)用 Task::allPeersFinished() (內(nèi)部是使用一個 BarrierState 的結(jié)構(gòu)來實現(xiàn)的)來判斷自己是否為最后一個 Driver,如果是,則將所有 Driver 的輸出進行合并后送到 Bridge。
當然,在 RIGHT and FULL OUTER join 情況下,Probe 側(cè)也需要將沒有 match 上的數(shù)據(jù)喂給 Bridge,此時也需要由最后一個 Driver 來負責這件事,于是同樣需要調(diào)用 ?Task::allPeersFinished() 函數(shù)。

下面來詳細看下 Join 類算子的切分細節(jié)。以 HashJoin 為例,Task 在切分 PlanTree 時,會將邏輯上的一個 HashJoin 算子,轉(zhuǎn)化成物理上的一對算子:HashProbe 和 HashJoin,并且使用異步機制進行通知:在 HashJoin 完成后,通知 HashProbe 所在 Pipeline 繼續(xù)執(zhí)行,在此之前,后者是阻塞等待的。

如上圖,每個 Pipeline 在實例化(邏輯 PlanNode 轉(zhuǎn)物理 Operator)的時候,可以生成多份,進行并行執(zhí)行,互斥的消費數(shù)據(jù)。并且,每個 Pipeline 的并行粒度可以不一樣,如上圖 Probe Pipeline 實例化了兩份,而 Build Pipeline 實例化了三份。并且,Build Pipeline 組中最后一個運行完的 Pipeline 負責將數(shù)據(jù)通過 Bridge 發(fā)送給 Probe Pipeline。
Exchange Clients
Velox 使用 Exchange Clients 來獲取遠程 worker 的數(shù)據(jù)。分兩個步驟:
第一步,Pipeline 中第一個 Driver (driverId == 0) 的 Exchange 算子從 Task 中獲取一個 Split,并且初始化一個共享 Exchange Client。
第二步,Exchange Client 會為上游每個 Task 構(gòu)造一個 Exchange Source,并行的拉取每個上游 Task 同一個 Partition (圖中是 Partition-15)數(shù)據(jù),然后將其放在 Client 的隊列 Queue 中。Exchange 的每個 Driver 都會去隊列中拉取這些數(shù)據(jù)。
如何從上游 Task 拉取數(shù)據(jù)的邏輯,需要由用戶自定義實現(xiàn) ?ExchangeSource 和 ExchangeSource::Factory ?。每個 ExchangeSource 接受一個上游 Task 的字符串 ID、Partition 編號和一個隊列作為參數(shù)。然后會從上游 Task 中拉取該 Partition 的數(shù)據(jù),并且放到隊列中。

編輯切換為居中
文章剩下還有一部分,歡迎訂閱我的專欄看全文:
木鳥雜記:系統(tǒng)日知錄xiaobot.net/post/ca020297-0901-45d1-8acc-1d884bf1cc84
這是在小報童上面的一個付費訂閱專欄,這是專欄地址:https://xiaobot.net/p/system-thinking小報童是一個借鑒國外 newsletter 專注寫作的付費訂閱平臺,只不過將郵箱換成了微信。 現(xiàn)在初步打算圍繞“系統(tǒng)”開幾個系列:?
圖數(shù)據(jù)庫101系列?
每天學點數(shù)據(jù)庫系列?
系統(tǒng)好文推薦系列?
讀書筆記系列?
數(shù)據(jù)密集型論文導(dǎo)讀系列?
系統(tǒng),既是數(shù)據(jù)庫系統(tǒng)中的系統(tǒng),也是分布式系統(tǒng)中的系統(tǒng),也是人類組織系統(tǒng)中的系統(tǒng),也可是一切有跡可循、有規(guī)律可考的系統(tǒng)。學習系統(tǒng)的架構(gòu),借鑒系統(tǒng)的組織,使我們的認知也系統(tǒng)起來。 會保證每周不低于兩篇更新,現(xiàn)價一季度 32 塊錢,作為給先訂閱同學的福利,如果有同學通過你的分享訂閱本專欄,你可以拿到該同學訂閱費用的30% 的抽成~ 分享方式見專欄介紹。 如果有任何建議以及想看的系統(tǒng)文章,歡迎留言~
參考
https://facebookincubator.github.io/velox/develop/task.html