如何基于 Apache Doris 與 Apache Flink 快速構(gòu)建極速易用的實(shí)時(shí)數(shù)倉(cāng)
隨著大數(shù)據(jù)應(yīng)用的不斷深入,企業(yè)不再滿足離線數(shù)據(jù)加工計(jì)算的時(shí)效,實(shí)時(shí)數(shù)據(jù)需求已成為數(shù)據(jù)應(yīng)用新常態(tài)。伴隨著實(shí)時(shí)分析需求的不斷膨脹,傳統(tǒng)的數(shù)據(jù)架構(gòu)面臨的成本高、實(shí)時(shí)性無(wú)法保證、組件繁冗、運(yùn)維難度高等問(wèn)題日益凸顯。為了適應(yīng)業(yè)務(wù)快速迭代的特點(diǎn),幫助企業(yè)提升數(shù)據(jù)生產(chǎn)和應(yīng)用的時(shí)效性、進(jìn)一步挖掘?qū)崟r(shí)數(shù)據(jù)價(jià)值,實(shí)時(shí)數(shù)倉(cāng)的構(gòu)建至關(guān)重要。
本文將分享如何基于 Apache Doris 和 Apache Flink 快速構(gòu)建一個(gè)極速易用的實(shí)時(shí)數(shù)倉(cāng),包括數(shù)據(jù)同步、數(shù)據(jù)集成、數(shù)倉(cāng)分層、數(shù)據(jù)更新、性能提升等方面的具體應(yīng)用方案,在這之前,我們先可以先了解一下傳統(tǒng)的數(shù)據(jù)架構(gòu)如何設(shè)計(jì)的、又存在哪些痛點(diǎn)問(wèn)題。
# ?實(shí)時(shí)數(shù)倉(cāng)的需求與挑戰(zhàn)

上圖所示為傳統(tǒng)的數(shù)據(jù)架構(gòu),如果我們從數(shù)據(jù)流的?度分析傳統(tǒng)的數(shù)據(jù)處理架構(gòu),會(huì)發(fā)現(xiàn)從源端采集到的業(yè)務(wù)數(shù)據(jù)和日志數(shù)據(jù)主要會(huì)分為實(shí)時(shí)和離線兩條鏈路:
在實(shí)時(shí)數(shù)據(jù)部分,通過(guò) Binlog 的?式,將業(yè)務(wù)數(shù)據(jù)庫(kù)中的數(shù)據(jù)變更 (CDC,Change Data Capture)采集到實(shí)時(shí)數(shù)倉(cāng)。同時(shí),通過(guò) Flume-Kafka-Sink 對(duì)日志數(shù)據(jù)進(jìn)?實(shí)時(shí)采集。當(dāng)不同來(lái)源的數(shù)據(jù)都采集到實(shí)時(shí)存儲(chǔ)系統(tǒng)后,便可以基于實(shí)時(shí)存儲(chǔ)系統(tǒng)來(lái)構(gòu)建實(shí)時(shí)數(shù)倉(cāng)。在實(shí)時(shí)數(shù)倉(cāng)的內(nèi)部,我們?nèi)匀粫?huì)遵守傳統(tǒng)數(shù)倉(cāng)分層理論,將數(shù)據(jù)分為 ODS 層、DWD 層、DWS 層、 ADS 層以實(shí)現(xiàn)最大程度的模型復(fù)用。
在離線數(shù)據(jù)部分,通過(guò) DataX 定時(shí)同步的?式,批量同步業(yè)務(wù)庫(kù) RDS 中的數(shù)據(jù)。當(dāng)不同來(lái)源的數(shù)據(jù)進(jìn)?到離線數(shù)倉(cāng)后,便可以在離線數(shù)倉(cāng)內(nèi)部,依賴 Spark SQL 或 Hive SQL 對(duì)數(shù)據(jù)進(jìn)?定時(shí)處理,分離出不同層級(jí) (ODS 、DWD 、ADS 等)的數(shù)據(jù),并將這些數(shù)據(jù)存在?個(gè)存儲(chǔ)介質(zhì)上,?般會(huì)采用如 HDFS 的分布式文件系統(tǒng)或者 S3 對(duì)象存儲(chǔ)上。通過(guò)這樣的?式,離線數(shù)倉(cāng)便構(gòu)建起來(lái)了。與此同時(shí),為了保障數(shù)據(jù)的?致性,通常需要數(shù)據(jù)清洗任務(wù)使?離線數(shù)據(jù)對(duì)實(shí)時(shí)數(shù)據(jù)進(jìn)?清洗或定期覆蓋,保障數(shù)據(jù)最終的?致性。
從技術(shù)架構(gòu)的?度對(duì)傳統(tǒng)數(shù)據(jù)技術(shù)棧進(jìn)行分析,我們同樣會(huì)發(fā)現(xiàn),為了迎合不同場(chǎng)景的需求,往往會(huì)采用多種技術(shù)棧,例如在湖倉(cāng)部分通常使用的是 Hive 、Iceberg 、Hudi 等數(shù)據(jù)湖;面向湖上數(shù)據(jù)的 Ad-hoc 查詢一般選擇 Impala 或 Presto;對(duì)于 OLAP 場(chǎng)景的多維分析,一般使? Doris 或 Kylin、?Druid。除此之外,為應(yīng)對(duì)半結(jié)構(gòu)化數(shù)據(jù)的分析需求,例如日志分析與檢索場(chǎng)景,通常會(huì)使? ES 進(jìn)行分析;面對(duì)高并發(fā)點(diǎn)查詢的 Data Serving 場(chǎng)景會(huì)使? HBase;在某些場(chǎng)景下可能還需要對(duì)外提供統(tǒng)?的數(shù)據(jù)服務(wù),這時(shí)可能會(huì)使?基于 Presto/Trino 的查詢?關(guān),對(duì)?戶提供統(tǒng)一查詢服務(wù)。其中涉及到的數(shù)據(jù)組件有數(shù)十種,高昂的使用成本和組件間兼容、維護(hù)及擴(kuò)展帶來(lái)的繁重壓力成為企業(yè)必須要面臨的問(wèn)題。

從上述介紹即可知道,傳統(tǒng)的數(shù)據(jù)架構(gòu)存在幾個(gè)核心的痛點(diǎn)問(wèn)題:
傳統(tǒng)數(shù)據(jù)架構(gòu)組件繁多,維護(hù)復(fù)雜,運(yùn)維難度非常高。
計(jì)算、存儲(chǔ)和研發(fā)成本都較高,與行業(yè)降本提效的趨勢(shì)背道而馳。
同時(shí)維護(hù)兩套數(shù)據(jù)倉(cāng)庫(kù)(實(shí)時(shí)數(shù)倉(cāng)和離線數(shù)倉(cāng))和兩套計(jì)算(實(shí)時(shí)數(shù)據(jù)量和實(shí)時(shí)計(jì)算任務(wù)),數(shù)據(jù)時(shí)效性與一致性無(wú)法保證。
在此背景下,我們亟需?個(gè)“極速、易用、統(tǒng)一、實(shí)時(shí)”的數(shù)據(jù)架構(gòu)來(lái)解決這些問(wèn)題:
極速:更快的查詢速度,最大化提升業(yè)務(wù)分析人員的效率;
易用:對(duì)于用戶側(cè)的使用和運(yùn)維側(cè)的管控都提供了極簡(jiǎn)的使用體驗(yàn);
統(tǒng)?:異構(gòu)數(shù)據(jù)與分析場(chǎng)景的統(tǒng)一,半結(jié)構(gòu)化和結(jié)構(gòu)化數(shù)據(jù)可以統(tǒng)?存儲(chǔ),多分析場(chǎng)景可以統(tǒng)一技術(shù)棧;
實(shí)時(shí):端到端的高時(shí)效性保證,發(fā)揮實(shí)時(shí)數(shù)據(jù)的價(jià)值。
# ?如何構(gòu)建極速易用的實(shí)時(shí)數(shù)倉(cāng)架構(gòu)
基于以上的需求,我們采取 Apache Doris 和 Apache Flink 來(lái)構(gòu)建極速易用的實(shí)時(shí)數(shù)倉(cāng),具體架構(gòu)如下圖所示。多種數(shù)據(jù)源的數(shù)據(jù)經(jīng)過(guò) Flink CDC 集成或 Flink Job 加?處理后,?庫(kù)到 Doris 或者 Hive/Iceberg 等湖倉(cāng)中,最終基于 Doris 提供統(tǒng)?的查詢服務(wù)。

在數(shù)據(jù)同步上,通過(guò) Flink CDC 將 RDS 的數(shù)據(jù)實(shí)時(shí)同步到 Doris;通過(guò) Routine Load 將 Kafka 等消息系統(tǒng)中的數(shù)據(jù)實(shí)時(shí)同步到 Doris 。在數(shù)倉(cāng)分層上,ODS 層通常選擇使用明細(xì)模型構(gòu)建,DWD 層可以通過(guò) SQL 調(diào)度任務(wù)對(duì) ODS 數(shù)據(jù)抽取并獲取,DWS 和 ADS 層則可以通過(guò) Rollup 和物化視圖進(jìn)行構(gòu)建。在數(shù)據(jù)湖上, Doris ?持為 Hive、Iceberg 、Hudi 以及Delta Lake(todo)提供聯(lián)邦分析和湖倉(cāng)加速的能?。在數(shù)據(jù)應(yīng)用上,Apache Doris 既可以承載批量數(shù)據(jù)加工處理的需求,也可以承載高吞吐的 Adhoc 和高并發(fā)點(diǎn)查詢等多種應(yīng)?場(chǎng)景。
# ?解決方案
如何實(shí)現(xiàn)數(shù)據(jù)的增量與全量同步
1. 增量及全量數(shù)據(jù)同步
在全量數(shù)據(jù)和增量的同步上,我們采取了 Flink CDC 來(lái)實(shí)現(xiàn)。其原理非常簡(jiǎn)單,F(xiàn)link CDC 實(shí)現(xiàn)了基于 Snapshot 的全量數(shù)據(jù)同步、基于 BinLog 的實(shí)時(shí)增量數(shù)據(jù)同步,全量數(shù)據(jù)同步和增量數(shù)據(jù)同步可以?動(dòng)切換,因此我們?cè)跀?shù)據(jù)遷移的過(guò)程中,只需要配置好同步的表即可。當(dāng) Flink 任務(wù)啟動(dòng)時(shí),優(yōu)先進(jìn)?歷史表的數(shù)據(jù)同步,同步完后?動(dòng)切換成實(shí)時(shí)同步。

2. 數(shù)據(jù)一致性保證
如何保證數(shù)據(jù)一致性是大家重點(diǎn)關(guān)注的問(wèn)題之一,那么在新架構(gòu)是如何實(shí)現(xiàn)的呢?
數(shù)據(jù)?致性?般分為“最多?次” 、“?少?次”和“精確?次”三種模型。
最多?次(At-Most-Once):發(fā)送?僅發(fā)送消息,不期待任何回復(fù)。在這種模型中,數(shù)據(jù)的?產(chǎn)和消費(fèi)過(guò)程中可能出現(xiàn)數(shù)據(jù)丟失的問(wèn)題。
?少?次(At-Least-Once):發(fā)送?不斷重試,直到對(duì)?收到為?。在這個(gè)模型中,?產(chǎn)和消費(fèi)過(guò)程都可能出現(xiàn)數(shù)據(jù)重復(fù)。
精確?次(Exactly-Once):能夠保證消息只被嚴(yán)格發(fā)送?次,并且只被嚴(yán)格處理?次。這種數(shù)據(jù)模型能夠嚴(yán)格保證數(shù)據(jù)?產(chǎn)和消費(fèi)過(guò)程中的準(zhǔn)確?致性。
Flink CDC 通過(guò) Flink Checkpoint 機(jī)制結(jié)合 Doris 兩階段提交可以實(shí)現(xiàn)端到端的 Exactly Once 語(yǔ)義。具體過(guò)程分為四步:
事務(wù)開(kāi)啟(Flink Job 啟動(dòng)及 Doris 事務(wù)開(kāi)啟):當(dāng) Flink 任務(wù)啟動(dòng)后, Doris 的 Sink 會(huì)發(fā)起 Precommit 請(qǐng)求,隨后開(kāi)啟寫(xiě)?事務(wù)。
數(shù)據(jù)傳輸(Flink Job 的運(yùn)?和數(shù)據(jù)傳輸):在 Flink Job 運(yùn)?過(guò)程中, Doris Sink 不斷從上游算?獲取數(shù)據(jù),并通過(guò) HTTP Chunked 的?式持續(xù)將數(shù)據(jù)傳輸?shù)?Doris。
事務(wù)預(yù)提交:當(dāng) Flink 開(kāi)始進(jìn)? Checkpoint 時(shí),F(xiàn)link 會(huì)發(fā)起 Checkpoint 請(qǐng)求,此時(shí) Flink 各個(gè)算?會(huì)進(jìn)? Barrier 對(duì)?和快照保存,Doris Sink 發(fā)出停? Stream Load 寫(xiě)?的請(qǐng)求,并發(fā)起?個(gè)事務(wù)提交請(qǐng)求到 Doris。這步完成后,這批數(shù)據(jù)已經(jīng)完全寫(xiě)? Doris BE 中,但在 BE 沒(méi)有進(jìn)?數(shù)據(jù)發(fā)布前對(duì)?戶是不可?的。
事務(wù)提交:當(dāng) Flink 的 Checkpoint 完成之后,將通知各個(gè)算?,Doris 發(fā)起?次事務(wù)提交到 Doris BE ,BE 對(duì)此次寫(xiě)?的數(shù)據(jù)進(jìn)?發(fā)布,最終完成數(shù)據(jù)流的寫(xiě)?。

綜上可知,我們利用 Flink CDC 結(jié)合 Doris 兩階段事務(wù)提交保證了數(shù)據(jù)寫(xiě)入一致性。需要注意的是,在該過(guò)程中可能遇到一個(gè)問(wèn)題:如果事務(wù)預(yù)提交成功、但 Flink Checkpoint 失敗了該怎么辦?針對(duì)該問(wèn)題,Doris 內(nèi)部支持對(duì)寫(xiě)?數(shù)據(jù)進(jìn)?回滾(Rollback),從?保證數(shù)據(jù)最終的?致性。
3. DDL 和 DML 同步
隨著業(yè)務(wù)的發(fā)展,部分?戶可能存在 RDS Schema 的變更需求。當(dāng) RDS 表結(jié)構(gòu)變更時(shí),?戶期望 Flink CDC 不但能夠?qū)?shù)據(jù)變化同步到 Doris,也希望將 RDS 表結(jié)構(gòu)的變更同步到 Doris,?戶則無(wú)需擔(dān)? RDS 表結(jié)構(gòu)和 Doris 表結(jié)構(gòu)不?致的問(wèn)題。
Light Schema Change
目前,Apache Doris 1.2.0 已經(jīng)實(shí)現(xiàn)了 ?Light Schema Change 功能,可滿? DDL 同步需求,快速?持 Schema 的變更。

Light Schema Change 的實(shí)現(xiàn)原理也比較簡(jiǎn)單,對(duì)數(shù)據(jù)表的加減列操作,不再需要同步更改數(shù)據(jù)文件,僅需在 FE 中更新元數(shù)據(jù)即可,從而實(shí)現(xiàn)毫秒級(jí)的 Schema Change 操作,且存在導(dǎo)入任務(wù)時(shí)效率的提升更為顯著。在這個(gè)過(guò)程中,由于 Light Schema Change 只修改了 FE 的元數(shù)據(jù),并沒(méi)有同步給 BE。因此會(huì)產(chǎn)? BE 和 FE Schema 不?致的問(wèn)題。為了解決這種問(wèn)題,我們對(duì) BE 的寫(xiě)出流程進(jìn)?了修改,具體包含三個(gè)??。
數(shù)據(jù)寫(xiě)?:FE 會(huì)將 Schema 持久化到元數(shù)據(jù)中,當(dāng) FE 發(fā)起導(dǎo)?任務(wù)時(shí),會(huì)把最新的 Schema 一起發(fā)給 Doris BE,BE 根據(jù)最新的 Schema 對(duì)數(shù)據(jù)進(jìn)?寫(xiě)?,并與 RowSet 進(jìn)?綁定。將該 Schema 持久化到 RowSet 的元數(shù)據(jù)中,實(shí)現(xiàn)了數(shù)據(jù)的各?解析,解決了寫(xiě)?過(guò)程中 Schema 不?致的問(wèn)題。
數(shù)據(jù)讀?。篎E ?成查詢計(jì)劃時(shí),會(huì)把最新的 Schema 附在其中?起發(fā)送給 BE,BE 拿到最新的 Schema 后對(duì)數(shù)據(jù)進(jìn)?讀取,解決讀取過(guò)程中 Schema 發(fā)?不?致的問(wèn)題。
數(shù)據(jù) Compaction:當(dāng)數(shù)據(jù)進(jìn)? Compaction 時(shí),我們選取需要進(jìn)? Compaction 的 RowSet 中最新的 Schema 作為之后 RowSet 對(duì)應(yīng)的 Schema,以此解決不同 Schema 上 RowSet 的合并問(wèn)題。
經(jīng)過(guò)對(duì) Light Schema Change 寫(xiě)出流程的優(yōu)化后, 單個(gè) Schema Chang 從?310 毫秒降低到了 7 毫秒,整體性能有近百倍的提升,徹底的解決了海量數(shù)據(jù)的 Schema Change 變化難的問(wèn)題。
Flink CDC DML 和 DDL 同步
有了 Light Schema Change 的保證, ?Flink CDC 能夠同時(shí)?持 DML 和 DDL 的數(shù)據(jù)同步。那么是如何實(shí)現(xiàn)的呢?

開(kāi)啟 DDL 變更配置:在 Flink CDC 的 MySQL Source 側(cè)開(kāi)啟同步 MySQL DDL 的變更配置,在 Doris 側(cè)識(shí)別 DDL 的數(shù)據(jù)變更,并對(duì)其進(jìn)?解析。
識(shí)別及校驗(yàn):當(dāng) Doris Sink 發(fā)現(xiàn) DDL 語(yǔ)句后,Doris Sink 會(huì)對(duì)表結(jié)構(gòu)進(jìn)?驗(yàn)證,驗(yàn)證其是否?持 Light Schema Change。
發(fā)起 Schema Change :當(dāng)表結(jié)構(gòu)驗(yàn)證通過(guò)后,Doris Sink 發(fā)起 Schema Change 請(qǐng)求到 Doris,從?完成此次 Schema Change 的變化。
解決了數(shù)據(jù)同步過(guò)程中源數(shù)據(jù)?致性的保證、全量數(shù)據(jù)和增量數(shù)據(jù)的同步以及 DDL 數(shù)據(jù)的變更后,一個(gè)完整的數(shù)據(jù)同步?案就基本形成了。
如何基于 Flink 實(shí)現(xiàn)多種數(shù)據(jù)集成

除了上文中所提及的基于 Flink CDC 進(jìn)行數(shù)據(jù)增量/全量同步外,我們還可以基于 Flink Job 和 Doris 來(lái)構(gòu)建多種不同的數(shù)據(jù)集成方式:
將 MySQL 中兩個(gè)表的數(shù)據(jù)同步到 Flink 后,在 Flink 內(nèi)部進(jìn)?多流 Join 完成數(shù)據(jù)打?qū)?,后?寬表同步到 Doris 中。
對(duì)上游的 Kafka 數(shù)據(jù)進(jìn)?清洗,在 Flink Job 完成清洗后通過(guò) Doris-Sink 寫(xiě)? Doris 中。
將 MySQL 數(shù)據(jù)和 Kafka 數(shù)據(jù)在 Flink 內(nèi)部進(jìn)?多流 Join,將 Join 后的寬表結(jié)果寫(xiě)? Doris中。
在 Doris 側(cè)預(yù)先創(chuàng)建寬表,將上游 RDS 中的數(shù)據(jù)根據(jù) Key 寫(xiě)入, 使? Doris 的部分列更新將多列數(shù)據(jù)分別寫(xiě)?到 Doris 的?寬表中。
如何選擇數(shù)據(jù)模型
Apache Doris 針對(duì)不同場(chǎng)景,提供了不同的數(shù)據(jù)模型,分別為聚合模型、主鍵模型、明細(xì)模型。

AGGREGATE 聚合模型
在企業(yè)實(shí)際業(yè)務(wù)中有很多需要對(duì)數(shù)據(jù)進(jìn)行統(tǒng)計(jì)和匯總操作的場(chǎng)景,如需要分析網(wǎng)站和 APP 訪問(wèn)流量、統(tǒng)計(jì)用戶的訪問(wèn)總時(shí)長(zhǎng)、訪問(wèn)總次數(shù),或者像廠商需要為廣告主提供廣告點(diǎn)擊的總流量、展示總量、消費(fèi)統(tǒng)計(jì)等指標(biāo)。在這些不需要召回明細(xì)數(shù)據(jù)的場(chǎng)景,通??梢允褂镁酆夏P?,比如上圖中需要根據(jù)門店 ID 和時(shí)間對(duì)每個(gè)門店的銷售額實(shí)時(shí)進(jìn)行統(tǒng)計(jì)。
UNIQUE KEY 主鍵模型
在某些場(chǎng)景下用戶對(duì)數(shù)據(jù)更新和數(shù)據(jù)全局唯一性有去重的需求,通常使用 UNIQUE KEY 模型。在 UNIQUE 模型中,會(huì)根據(jù)表中的主鍵進(jìn)? Upsert 操作:對(duì)于已有的主鍵做 Update 操作,更新 value 列,沒(méi)有的主鍵做 Insert 操作,比如圖中我們以訂單id為唯一主鍵,對(duì)訂單上的其他數(shù)據(jù)(時(shí)間和狀態(tài))進(jìn)行更新。
DUPLICATE 明細(xì)模型
在某些多維分析場(chǎng)景下,數(shù)據(jù)既沒(méi)有主鍵,也沒(méi)有聚合需求,Duplicate 數(shù)據(jù)模型可以滿足這類需求。明細(xì)模型主要用于需要保留原始數(shù)據(jù)的場(chǎng)景,如日志分析,用戶行為分析等場(chǎng)景。明細(xì)模型適合任意維度的 Ad-hoc 查詢。雖然同樣無(wú)法利用預(yù)聚合的特性,但是不受聚合模型的約束,可以發(fā)揮列存模型的優(yōu)勢(shì)(只讀取相關(guān)列,而不需要讀取所有 Key 列)。
如何構(gòu)建數(shù)倉(cāng)分層
由于數(shù)據(jù)量級(jí)普遍較大,如果直接查詢數(shù)倉(cāng)中的原始數(shù)據(jù),需要訪問(wèn)的表數(shù)量和底層文件的數(shù)量都較多,體現(xiàn)在日常工作中就是 SQL 異常復(fù)雜、計(jì)算耗時(shí)增高。而分層要做的就是對(duì)原始數(shù)據(jù)重新做歸納整理,在不同層級(jí)對(duì)數(shù)據(jù)或者指標(biāo)做不同粒度的抽象,通過(guò)復(fù)用數(shù)據(jù)模型來(lái)簡(jiǎn)化數(shù)據(jù)管理壓力,利用血緣關(guān)系來(lái)定位數(shù)據(jù)鏈路的異常,同時(shí)進(jìn)一步提升數(shù)據(jù)分析的效率。在 Apache Doris 可以通過(guò)以下多種思路來(lái)構(gòu)建數(shù)據(jù)倉(cāng)庫(kù)分層:
微批調(diào)度
通過(guò) INSERT INTO SELECT 可以將原始表的數(shù)據(jù)進(jìn)行處理和過(guò)濾并寫(xiě)入到目標(biāo)表中,這種 SQL 抽取數(shù)據(jù)的行為一般是以微批形式進(jìn)行(例如 15 分鐘一次的 ETL 計(jì)算任務(wù)),通常發(fā)生在從 ODS 到 DWD 層數(shù)據(jù)的抽取過(guò)程中,因此需要借助外部的調(diào)度工具例如 DolphinScheduler 或 Airflow 等來(lái)對(duì) ETL SQL 進(jìn)行調(diào)度。
Rollup 與物化視圖
物化視圖本質(zhì)是一個(gè)預(yù)先計(jì)算的過(guò)程。我們可以在 Base 表上,創(chuàng)建不同的 Rollup 或者物化視圖來(lái)對(duì) Base 表進(jìn)行聚合計(jì)算。通常在明細(xì)層到匯總層(例如 DWD 層到 DWS 層或從 DWS 層到 ADS 層)的匯聚過(guò)程中可以使用物化視圖,以此實(shí)現(xiàn)指標(biāo)的高度聚合。同時(shí)物化視圖的計(jì)算是實(shí)時(shí)進(jìn)行的,因此站在計(jì)算的角度也可以將物化視圖理解為一個(gè)單表上的實(shí)時(shí)計(jì)算過(guò)程。
多表物化視圖
Apache Doris 2.0 將實(shí)現(xiàn)多表物化視圖這一功能,可以將帶有 Join 的查詢結(jié)果固化以供用戶直接查詢,支持定時(shí)自動(dòng)或手動(dòng)觸發(fā)的方式進(jìn)行全量更新查詢結(jié)果,未來(lái)還將進(jìn)一步支持更加完善的自動(dòng)增量刷新。基于多表物化視圖這一功能的實(shí)現(xiàn),我們可以做更復(fù)雜的數(shù)據(jù)流處理,比如數(shù)據(jù)源側(cè)有 TableA、TableB、TableC,在多表物化視圖的情況下,用戶就可以將 TableA 和 TableB 的數(shù)據(jù)進(jìn)行實(shí)時(shí)Join 計(jì)算后物化到 MV1 中。在這個(gè)角度上來(lái)看,多表物化視圖更像一個(gè)多流數(shù)據(jù)實(shí)時(shí) Join 的過(guò)程。

如何應(yīng)對(duì)數(shù)據(jù)更新
在實(shí)時(shí)數(shù)據(jù)倉(cāng)庫(kù)構(gòu)建的過(guò)程中,還需要面臨高并發(fā)寫(xiě)入和實(shí)時(shí)更新的挑戰(zhàn)。如何在億級(jí)數(shù)據(jù)中快速找到需要更新的數(shù)據(jù),并對(duì)其進(jìn)?更新,?直都是?數(shù)據(jù)領(lǐng)域不斷追尋的答案。
1. 高并發(fā)數(shù)據(jù)更新
在 Apache Doris 中通過(guò) Unique Key 模型來(lái)滿足數(shù)據(jù)更新的需求,同時(shí)通過(guò) MVCC 多版本并發(fā)機(jī)制來(lái)實(shí)現(xiàn)數(shù)據(jù)的讀寫(xiě)隔離。當(dāng)新數(shù)據(jù)寫(xiě)入時(shí),如果不存在相同 Key 的數(shù)據(jù)則會(huì)直接寫(xiě)?;如果有相同 Key 的數(shù)據(jù)則增加版本,此時(shí)數(shù)據(jù)將以多個(gè)版本的形式存在。后臺(tái)會(huì)啟動(dòng)異步的 Compaction 進(jìn)程對(duì)歷史版本數(shù)據(jù)進(jìn)?清理,當(dāng)?戶在查詢時(shí) Doris 會(huì)將最新版本對(duì)應(yīng)的數(shù)據(jù)返回給?戶,這種設(shè)計(jì)解決了海量數(shù)據(jù)的更新問(wèn)題。

在 Doris 中提供了 Merge-on-Read 和 Merge-on-Write 兩種數(shù)據(jù)更新模式。

在此我們以訂單數(shù)據(jù)的寫(xiě)入為例介紹 Merge-on-Read 的數(shù)據(jù)寫(xiě)入與查詢流程,三條訂單數(shù)據(jù)均以 Append 的形式寫(xiě)? Doris 表中:
數(shù)據(jù) Insert:首先我們寫(xiě)入 ID 為 1,2,3 的三條數(shù)據(jù);
數(shù)據(jù) Update:當(dāng)我們將訂單 1 的 Cost 更新為 30 時(shí),其實(shí)是寫(xiě)??條 ID 為 1,Cost 為 30 的新版本數(shù)據(jù),數(shù)據(jù)通過(guò) Append 的形式寫(xiě)? Doris;
數(shù)據(jù) Delete:當(dāng)我們對(duì)訂單 2 的數(shù)據(jù)進(jìn)?刪除時(shí),仍然通過(guò) Append ?式,將數(shù)據(jù)多版本寫(xiě)? Doris ,并將 _DORIS_DELETE_SIGN 字段變?yōu)?1 ,則表示這條數(shù)據(jù)被刪除了。當(dāng) Doris 讀取數(shù)據(jù)時(shí),發(fā)現(xiàn)最新版本的數(shù)據(jù)被標(biāo)記刪除,就會(huì)將該數(shù)據(jù)從查詢結(jié)果中進(jìn)?過(guò)濾。
Merge-on-Read 的特點(diǎn)是寫(xiě)?速度比較快,但是在數(shù)據(jù)讀取過(guò)程中由于需要進(jìn)?多路歸并排序,存在著大量非必要的 CPU 計(jì)算資源消耗和 IO 開(kāi)銷。
因此在 1.2.0 版本中,Apache Doris 在原有的 Unique Key 數(shù)據(jù)模型上增加了 Merge-on-Write 的數(shù)據(jù)更新模式。Merge-on-Write 兼顧了寫(xiě)入和查詢性能。在寫(xiě)?的過(guò)程中引?了 Delete Bitmap 數(shù)據(jù)結(jié)構(gòu),使? Delete Bitmap 標(biāo)記 RowSet 中某??是否被刪除,為了保持 Unique Key 原有的語(yǔ)義, Delete Bitmap 也?持多版本。另外使?了兼顧性能和存儲(chǔ)空間的 Row Bitmap,將 Bitmap 中的 MemTable ?起存儲(chǔ)在 BE 中,每個(gè) Segment 會(huì)對(duì)應(yīng)?個(gè) Bitmap。

寫(xiě)入流程:
DeltaWriter 先將數(shù)據(jù) Flush 到磁盤(pán)
批量檢查所有 Key,在點(diǎn)查過(guò)程中經(jīng)過(guò)區(qū)間樹(shù),查找到對(duì)應(yīng)的 RowSet。
在 RowSet 內(nèi)部通過(guò) BloomFilter 和 index 進(jìn)行?效查詢。
當(dāng)查詢到 Key 對(duì)應(yīng)的 RowSet 后,便會(huì)覆蓋 RowSet Key 對(duì)應(yīng)的 Bitmap,接著在 Publish 階段更新 Bitmap,從?保證批量點(diǎn)查 Key 和更新 Bitmap 期間不會(huì)有新的可? RowSet,以保證 Bitmap 在更新過(guò)程中數(shù)據(jù)的正確性。除此之外,如果某個(gè) Segment 沒(méi)有被修改,則不會(huì)有對(duì)應(yīng)版本的 Bitmap 記錄。
查詢流程:
當(dāng)我們查詢某?版本數(shù)據(jù)時(shí), Doris 會(huì)從 LRU Cache Delete Bitmap 中查找該版本對(duì)應(yīng)的緩存。
如果緩存不存在,再去 RowSet 中讀取對(duì)應(yīng)的 Bitmap。
使? Delete Bitmap 對(duì) RowSet 中的數(shù)據(jù)進(jìn)?過(guò)濾,將結(jié)果返回。
該模式不需要在讀取的時(shí)候通過(guò)歸并排序來(lái)對(duì)主鍵進(jìn)行去重,這對(duì)于高頻寫(xiě)入的場(chǎng)景來(lái)說(shuō),大大減少了查詢執(zhí)行時(shí)的額外消耗。此外還能夠支持謂詞下推,并能夠很好利用 Doris 豐富的索引,在數(shù)據(jù) IO 層面就能夠進(jìn)行充分的數(shù)據(jù)裁剪,大大減少數(shù)據(jù)的讀取量和計(jì)算量,因此在很多場(chǎng)景的查詢中都有非常明顯的性能提升。在真實(shí)場(chǎng)景的測(cè)試中,通過(guò) Merge-on-Write 可以在保證數(shù)萬(wàn) QPS 的高頻 Upset 操作的同時(shí)實(shí)現(xiàn)性能 3-10 倍的提升。
2. 部分列更新
部分列更新是一個(gè)比較普遍的需求,例如廣告業(yè)務(wù)中需要在不同的時(shí)間點(diǎn)對(duì)同一個(gè)廣告行為(展示、點(diǎn)擊、轉(zhuǎn)換等)數(shù)據(jù)的更新。這時(shí)可以通過(guò) Aggregate Key 模型的replace_if_not_null
實(shí)現(xiàn)。具體建表語(yǔ)句如下:
CREATE?TABLE?IF?NOT?EXISTS?request_log
(
????`session_id`?LARGEINT?NOT?NULL?COMMENT?"id",
????`imp_time`?DATE?REPLACE_IF_NOT_NULL?COMMENT?"展示",??#展示數(shù)據(jù)更新????`imp_data`?VARCHAR(20)??REPLACE_IF_NOT_NULL?COMMENT?"",
????`click_time`?DATE?REPLACE_IF_NOT_NULL?COMMENT?"點(diǎn)擊",#點(diǎn)擊數(shù)據(jù)更新????`click_data`?VARCHAR(20)??REPLACE_IF_NOT_NULL?COMMENT?"",
????`conv_time`?DATE?REPLACE_IF_NOT_NULL?COMMENT?"轉(zhuǎn)化",#轉(zhuǎn)換數(shù)據(jù)更新????`conv_data`?VARCHAR(20)??REPLACE_IF_NOT_NULL?COMMENT?"")AGGREGATE?KEY(`session_id`)DISTRIBUTED?BY?HASH(`session_id`)?BUCKETS?1PROPERTIES?("replication_allocation"?=?"tag.location.default:?1");
具體更新過(guò)程如下:
(1)更新展示數(shù)據(jù)
mysql>?insert?into?request_log(session_id,imp_time,imp_data)VALUES(1,'2022-12-20','imp');
Query?OK,?1?row?affected?(0.05?sec)
{'label':'insert_31a037849e2748f6_9b00b852d106eaaa',?'status':'VISIBLE',?'txnId':'385642'}
mysql>?select?*?from?request_log;
+------------+------------+----------+------------+------------+-----------+-----------+|?session_id?|?imp_time???|?imp_data?|?click_time?|?click_data?|?conv_time?|?conv_data?|+------------+------------+----------+------------+------------+-----------+-----------+|?1??????????|?2022-12-20?|?imp??????|?NULL???????|?NULL???????|?NULL??????|?NULL??????|+------------+------------+----------+------------+------------+-----------+-----------+1?row?in?set?(0.01?sec)
(2)更新點(diǎn)擊數(shù)據(jù)
mysql>?insert?into?request_log(session_id,imp_time,imp_data)VALUES(1,'2022-12-20','imp');
Query?OK,?1?row?affected?(0.05?sec)
{'label':'insert_31a037849e2748f6_9b00b852d106eaaa',?'status':'VISIBLE',?'txnId':'385642'}
mysql>?select?*?from?request_log;
+------------+------------+----------+------------+------------+-----------+-----------+|?session_id?|?imp_time???|?imp_data?|?click_time?|?click_data?|?conv_time?|?conv_data?|+------------+------------+----------+------------+------------+-----------+-----------+|?1??????????|?2022-12-20?|?imp??????|?NULL???????|?NULL???????|?NULL??????|?NULL??????|+------------+------------+----------+------------+------------+-----------+-----------+1?row?in?set?(0.01?sec)
(3)更新轉(zhuǎn)化數(shù)據(jù)
ysql>?insert?into?request_log(session_id,click_time,click_data)VALUES(1,'2022-12-21','click');
Query?OK,?1?row?affected?(0.03?sec)
{'label':'insert_2649087d8dc046bd_a39d367af1f93ab0',?'status':'VISIBLE',?'txnId':'385667'}
mysql>?select?*?from?request_log;
+------------+------------+----------+------------+------------+-----------+-----------+|?session_id?|?imp_time???|?imp_data?|?click_time?|?click_data?|?conv_time?|?conv_data?|+------------+------------+----------+------------+------------+-----------+-----------+|?1??????????|?2022-12-20?|?imp??????|?2022-12-21?|?click??????|?NULL??????|?NULL??????|+------------+------------+----------+------------+------------+-----------+-----------+1?row?in?set?(0.01?sec)
mysql>
同時(shí)部分列更新還可用于支持畫(huà)像場(chǎng)景的寬表列實(shí)時(shí)更新。

另外值得期待的是 Apache ?Doris 的 Unique Key 模型也即將實(shí)現(xiàn)部分列更新的功能,可以通過(guò) Apache Doris GitHub 代碼倉(cāng)庫(kù)及官網(wǎng),關(guān)注新版本或新功能的發(fā)布(相關(guān)地址可下滑至文章底部獲?。?。
如何進(jìn)一步提升查詢性能
1. 智能物化視圖
物化視圖除了可以作為高度聚合的匯總層外,其更廣泛的定位是加速相對(duì)固定的聚合分析場(chǎng)景。物化視圖是指根據(jù)預(yù)定義的 SQL 分析語(yǔ)句執(zhí)?預(yù)計(jì)算,并將計(jì)算結(jié)果持久化到另一張對(duì)用戶透明但有實(shí)際存儲(chǔ)的表中,在需要同時(shí)查詢聚合數(shù)據(jù)和明細(xì)數(shù)據(jù)以及匹配不同前綴索引的場(chǎng)景,命中物化視圖時(shí)可以獲得更快的查詢性能。在使用物化視圖時(shí)需要建? Base 表并基于此建?物化視圖,同?張 Base 表可以構(gòu)建多個(gè)不同的物化視圖,從不同的維度進(jìn)?統(tǒng)計(jì)。物化視圖在查詢過(guò)程中提供了智能路由選擇的能力,如果數(shù)據(jù)在物化視圖中存在會(huì)直接查詢物化視圖,如果在物化視圖中不存在才會(huì)查詢 Base 表。對(duì)于數(shù)據(jù)寫(xiě)入或更新時(shí),數(shù)據(jù)會(huì)在寫(xiě)入 Base 表的同時(shí)寫(xiě)入物化視圖,從?讓 Doris 保證物化視圖和 Base 表數(shù)據(jù)的完全?致性。

智能路由選擇遵循最?匹配原則,只有查詢的數(shù)據(jù)集?物化視圖集合?時(shí),才可能?物化視圖。如上圖所示智能選擇過(guò)程包括選擇最優(yōu)和查詢改寫(xiě)兩個(gè)部分:
選擇最優(yōu)
在過(guò)濾候選集過(guò)程中,被執(zhí)行的 SQL 語(yǔ)句通過(guò) Where 條件進(jìn)?判斷,Where 條件為advertiser=1。由此可?,物化視圖和 Base 表都有該字段,這時(shí)的選集是物化視圖和 Base 表。
Group By 計(jì)算,Group By 字段是 advertiser 和 channel,這兩個(gè)字段同時(shí)在物化視圖和 Base 表中。這時(shí)過(guò)濾的候選集仍然是物化視圖和 Base 表。
過(guò)濾計(jì)算函數(shù),?如執(zhí)? count(distinctuser_id),然后對(duì)數(shù)據(jù)進(jìn)?計(jì)算,由于 Count Distinct 的字段 user_id 在物化視圖和 Base 表中都存在,因此過(guò)濾結(jié)果仍是物化視圖和 Base 表。
選擇最優(yōu),通過(guò)?系列計(jì)算,我們發(fā)現(xiàn)查詢條件?論是 Where 、Group By 還是 Agg Function 關(guān)聯(lián)的字段,結(jié)果都有 Base 表和物化視圖,因此需要進(jìn)?最優(yōu)選擇。Doris 經(jīng)過(guò)計(jì)算發(fā)現(xiàn) Base 表的數(shù)據(jù)遠(yuǎn)?于物化視圖,即物化視圖的數(shù)據(jù)更?。
由此過(guò)程可?,如果通過(guò)物化視圖進(jìn)行查詢,查詢效率更?。當(dāng)我們找到最優(yōu)查詢計(jì)劃,就可以進(jìn)??查詢改寫(xiě),將 Count Distinct 改寫(xiě)成 Bitmap ,從?完成物化視圖的智能路由。完成智能路由之后,我們會(huì)將 Doris ?成的查詢 SQL 發(fā)送到 BE 進(jìn)?分布式查詢計(jì)算。
2. 分區(qū)分桶裁剪
Doris 數(shù)據(jù)分為兩級(jí)分區(qū)存儲(chǔ), 第一層為分區(qū)(Partition),目前支持 RANGE 分區(qū)和 LIST 分區(qū)兩種類型, 第二層為 HASH 分桶(Bucket)。我們可以按照時(shí)間對(duì)數(shù)據(jù)進(jìn)?分區(qū),再按照分桶列將?個(gè)分區(qū)的數(shù)據(jù)進(jìn)行 Hash 分到不同的桶?。在查詢時(shí)則可以通過(guò)分區(qū)分桶裁剪來(lái)快速定位數(shù)據(jù),加速查詢性能的同時(shí)實(shí)現(xiàn)高并發(fā)。
3. 索引查詢加速
除了分區(qū)分桶裁剪, 還可以通過(guò)存儲(chǔ)層索引來(lái)裁剪需要讀取的數(shù)據(jù)量,僅以加速查詢:
前綴索引:在排序的基礎(chǔ)上快速定位數(shù)據(jù)
Zone Map 索引:維護(hù)列中 min/max/null 信息
Bitmap 索引:通過(guò) Bitmap 加速去重、交并查詢
Bloom Filter 索引:快速判斷元素是否屬于集合;
Invert 倒排索引:支持字符串類型的全文檢索;
4. 執(zhí)行層查詢加速
同時(shí) Apache Doris 的 MPP 查詢框架、向量化執(zhí)行引擎以及查詢優(yōu)化器也提供了許多性能優(yōu)化方式,在此僅列出部分、不做詳細(xì)展開(kāi):
算子下推:Limit、謂詞過(guò)濾等算子下推到存儲(chǔ)層;
向量化引擎:基于 SIMD 指令集優(yōu)化,充分釋放 CPU 計(jì)算能力;
Join 優(yōu)化:Bucket Shuffle Join、Colocate Join 以及 Runtime Filter 等;
行業(yè)最佳實(shí)踐
截止目前,Apache Doris 在全球范圍內(nèi)企業(yè)用戶規(guī)模已超過(guò)?1500 家,廣泛應(yīng)用于數(shù)十個(gè)行業(yè)中。在用戶行為分析、AB 實(shí)驗(yàn)平臺(tái)、日志檢索分析、用戶畫(huà)像分析、訂單分析等方向均有著豐富的應(yīng)用。在此我們列出了幾個(gè)基于 Doris 構(gòu)建實(shí)時(shí)數(shù)據(jù)倉(cāng)庫(kù)的真實(shí)案例作為參考:

第 1 個(gè)案例是較為典型的基于 Doris 構(gòu)建實(shí)時(shí)數(shù)倉(cāng),下層數(shù)據(jù)源來(lái)自 RDS 業(yè)務(wù)庫(kù)、?件系統(tǒng)數(shù)據(jù)以及埋點(diǎn)日志數(shù)據(jù)。在數(shù)據(jù)接?過(guò)程中通過(guò) DataX 進(jìn)?離線數(shù)據(jù)同步以及通過(guò) Flink CDC 進(jìn)?實(shí)時(shí)數(shù)據(jù)同步,在 Doris 內(nèi)部構(gòu)建不同的數(shù)據(jù)分層;最后在上層構(gòu)建不同的數(shù)據(jù)應(yīng)?,?如?助報(bào)表、?助數(shù)據(jù)抽取、數(shù)據(jù)?屏。除此之外,它還結(jié)合了??的應(yīng)?平臺(tái)構(gòu)建了數(shù)據(jù)開(kāi)發(fā)與治理平臺(tái),完成了源數(shù)據(jù)管理、數(shù)據(jù)分析等操作。
使用收益:
業(yè)務(wù)計(jì)算耗時(shí)從之前的兩?時(shí)降低到三分鐘。
全鏈路的更新報(bào)表的時(shí)間從周級(jí)別更新到?分鐘級(jí)別。
Doris ?度兼容 MySQL,報(bào)表遷移無(wú)壓力,開(kāi)發(fā)周期從周級(jí)別降至?天級(jí)別。

第 2 個(gè)案例是在某運(yùn)營(yíng)服務(wù)商的應(yīng)用,其架構(gòu)是通過(guò) Flink CDC 將 RDS 的數(shù)據(jù)同步到 Doris 中,同時(shí)通過(guò) Routine Load 直接訂閱 Kafka 中接入的日志數(shù)據(jù),然后在 Doris 內(nèi)部構(gòu)建實(shí)時(shí)數(shù)倉(cāng)。在數(shù)據(jù)調(diào)度時(shí), 通過(guò)開(kāi)源 DolphinScheduler 完成數(shù)據(jù)調(diào)度;使? Prometheus+Grafana 進(jìn)?數(shù)據(jù)監(jiān)控。
使用收益:?采? Flink+Doris 架構(gòu)體系后,架構(gòu)簡(jiǎn)潔、組件減少,解決了多架構(gòu)下的數(shù)據(jù)的冗余存儲(chǔ),服務(wù)器資源節(jié)省了 30%,數(shù)據(jù)存儲(chǔ)磁盤(pán)占?節(jié)省了 60%,運(yùn)營(yíng)成本?幅降低。該案例每天在?戶的業(yè)務(wù)場(chǎng)景上,?持?jǐn)?shù)萬(wàn)次的?戶的在線查詢和分析。

第 3 個(gè)應(yīng)用是在供應(yīng)鏈企業(yè),在過(guò)去該企業(yè)采取了 Hadoop 體系,使用組件?較繁多,有 RDS、HBase、Hive、HDFS、Yarn、Kafka 等多個(gè)技術(shù)棧,在該架構(gòu)下,查詢性能無(wú)法得到有效快速的提升,維護(hù)和開(kāi)發(fā)成本一直居高不下。
使用收益:?引入 Doris 之后,將 RDS 的數(shù)據(jù)通過(guò) Flink CDC 實(shí)時(shí)同步到 Doris ?,服務(wù)器資源成本得到了很?的降低。數(shù)據(jù)的查詢時(shí)間從 Spark 的 2~5 ?時(shí),縮短到?分鐘,查詢效率也??提升。在數(shù)據(jù)的同步過(guò)程中,使?了 Flink CDC+MySQL 全量加增量的數(shù)據(jù)同步?式,同時(shí)還利? Doris 的 Light Schema Change 特性實(shí)時(shí)同步 Binlog ?的 DDL 表結(jié)構(gòu)變更,實(shí)現(xiàn)數(shù)據(jù)接?數(shù)倉(cāng)零開(kāi)發(fā)成本。
# ?總結(jié)
憑借 Apache Doris 豐富的分析功能和 Apache Flink 強(qiáng)大的實(shí)時(shí)計(jì)算能力,已經(jīng)有越來(lái)越多的企業(yè)選擇基于 Apache Doris 和 Flink 構(gòu)建極速易用的實(shí)時(shí)數(shù)倉(cāng)架構(gòu),更多案例歡迎關(guān)注 SelectDB 公眾號(hào)以及相關(guān)技術(shù)博客。后續(xù)我們?nèi)詴?huì)持續(xù)提升 Apache Doris 在實(shí)時(shí)數(shù)據(jù)處理場(chǎng)景的能力和性能,包括 Unique 模型上的部分列更新、單表物化視圖上的計(jì)算增強(qiáng)、自動(dòng)增量刷新的多表物化視圖等,后續(xù)研發(fā)進(jìn)展也將在社區(qū)及時(shí)同步。在構(gòu)建實(shí)時(shí)數(shù)據(jù)倉(cāng)庫(kù)架構(gòu)中遇到任何問(wèn)題,歡迎聯(lián)系社區(qū)進(jìn)行支持。同時(shí)也歡迎加入 Apache Doris 社區(qū),一起將 Apache Doris 建設(shè)地更加強(qiáng)大!
更多學(xué)習(xí)資料盡在有軟官網(wǎng)
https://www.jiaruvip.com
https://www.jiaruvip.com/software
https://www.jiaruvip.com/hacker
https://www.jiaruvip.com/phone
https://www.jiaruvip.com/game
https://www.jiaruvip.com/technical
https://www.jiaruvip.com/news
https://www.jiaruvip.com/author