使用輕量級(jí) CDC debezium-server-databend 構(gòu)建實(shí)時(shí)數(shù)據(jù)同步
作者:韓山杰Databend Cloud 研發(fā)工程師https://github.com/hantmac

Debezium Server Databend[1]?是一個(gè)基于 Debezium Engine 自研的輕量級(jí) CDC 項(xiàng)目,用于實(shí)時(shí)捕獲數(shù)據(jù)庫(kù)更改并將其作為事件流傳遞最終將數(shù)據(jù)寫(xiě)入目標(biāo)數(shù)據(jù)庫(kù) Databend。它提供了一種簡(jiǎn)單的方式來(lái)監(jiān)視和捕獲關(guān)系型數(shù)據(jù)庫(kù)的變化,并支持將這些變化轉(zhuǎn)換為可消費(fèi)事件。
使用 Debezium server databend 實(shí)現(xiàn) CDC 無(wú)須依賴(lài)大型的 Data Infra 比如 Flink, Kafka, Spark 等,只需一個(gè)啟動(dòng)腳本即可開(kāi)啟實(shí)時(shí)數(shù)據(jù)同步。
這篇教程將展示如何基于 Debezium server databend 快速構(gòu)建 MySQL 到 Databend 的實(shí)時(shí)數(shù)據(jù)同步。
假設(shè)我們有電子商務(wù)業(yè)務(wù),商品的數(shù)據(jù)存儲(chǔ)在 MySQL ,我們需要實(shí)時(shí)把它同步到 Databend 中。
接下來(lái)的內(nèi)容將介紹如何使用 Debezium server databend CDC 來(lái)實(shí)現(xiàn)這個(gè)需求,系統(tǒng)的整體架構(gòu)如下圖所示:

準(zhǔn)備階段
準(zhǔn)備一臺(tái)已經(jīng)安裝了 Docker ,docker-compose 以及 Java 11 環(huán)境 的 Linux 或者 MacOS 。
???準(zhǔn)備教程所需要的組件
接下來(lái)的教程將以?docker-compose
?的方式準(zhǔn)備所需要的組件。
???debezium-MySQL
docker-compose.yaml
???Debezium Server Databend
Clone 項(xiàng)目:?
git clone ``https://github.com/databendcloud/debezium-server-databend.git
從項(xiàng)目根目錄開(kāi)始:
構(gòu)建和打包 debezium server:?
mvn -Passembly -Dmaven.test.skip package
構(gòu)建完成后,解壓服務(wù)器分發(fā)包:?
unzip debezium-server-databend-dist/target/debezium-server-databend-dist*.zip -d databendDist
進(jìn)入解壓后的文件夾:?
cd databendDist
創(chuàng)建?
application.properties
?文件并修改:?nano conf/application.properties
,將下面的 application.properties 拷貝進(jìn)去,根據(jù)用戶實(shí)際情況修改相應(yīng)的配置。使用提供的腳本運(yùn)行服務(wù):?
bash run.sh
Debezium Server with Databend 將會(huì)啟動(dòng)
同時(shí)我們也提供了相應(yīng)的 Docker image,可以在容器中一鍵啟動(dòng):
NOTE: 在容器中啟動(dòng)注意所連接數(shù)據(jù)庫(kù)的網(wǎng)絡(luò)。
???Debezium Server Databend Application Properties
本文章使用下面提供的配置,更多的參數(shù)說(shuō)明以及配置可以參考文檔[2]。
???準(zhǔn)備數(shù)據(jù)
1?? ?在?MySQL?數(shù)據(jù)庫(kù)中準(zhǔn)備數(shù)據(jù)
進(jìn)入 MySQL 容器
創(chuàng)建數(shù)據(jù)庫(kù) mydb 和表?products
,并插入數(shù)據(jù):
2?? 在 Databend 中創(chuàng)建 Database

NOTE: 用戶可以不必先在 Databend 中創(chuàng)建表,系統(tǒng)檢測(cè)到后會(huì)自動(dòng)為用戶建表。
???啟動(dòng) Debezium Server Databend

首次啟動(dòng)會(huì)進(jìn)入 init snapshot 模式,通過(guò)配置的 Batch Size 全量將 MySQL 中的數(shù)據(jù)同步到 Databend,所以在 Databend 中可以看到 MySQL 中的數(shù)據(jù)已經(jīng)同步過(guò)來(lái)了:

???同步 Insert 數(shù)據(jù)
我們繼續(xù)往 MySQL 中插入 5 條數(shù)據(jù):
Debezium server databend 日志:

同時(shí)在 Databend 中可以查到 5 條數(shù)據(jù)已經(jīng)同步過(guò)來(lái)了:

???同步?Update?數(shù)據(jù)
配置文件中?debezium.sink.databend.upsert=true
?,所以我們也可以處理 Update/Delete 的事件。
在 MySQL 中更新 id=10 的數(shù)據(jù):
在 Databend 中可以查到 id 為 10 的數(shù)據(jù)已經(jīng)被更新:

???同步?Delete?數(shù)據(jù)
在配置文件中,有以下的配置,既可開(kāi)啟處理 Delete 事件的能力:
Debezim Server 對(duì) Delete 的處理比較復(fù)雜,在 DELETE 操作下會(huì)生成兩條事件記錄:
一個(gè)包含 "op": "d",其他的行數(shù)據(jù)以及字段;
一個(gè)tombstones記錄,它具有與被刪除行相同的鍵,但值為null。
這兩條事件會(huì)同時(shí)發(fā)出,在 Debezium Server Databend 中我們選擇對(duì) Delete 數(shù)據(jù)實(shí)行軟刪除,這就要求我們?cè)?target table 中擁有?__deleted
?字段,當(dāng) Delete 事件過(guò)來(lái)的時(shí)候我們將該字段置為 TRUE 后插入到目標(biāo)表。
這樣設(shè)計(jì)的好處是,有些用戶想要保留這些數(shù)據(jù),但可能未來(lái)會(huì)想到將其刪除,這樣就為用戶提供了可選的方案,未來(lái)想要?jiǎng)h除這些數(shù)據(jù)的時(shí)候,只需要?delete from table where __deleted=true
?即可。
關(guān)于 Debezium 對(duì)刪除事件的說(shuō)明以及處理方式,詳情可參考文檔[3]?。
在 MySQL 中刪除 id=12 的數(shù)據(jù):
delete?from?products?where?id=12;
在 Databend 中可以觀察到 id=12 的值的?__deleted
?字段已經(jīng)被置為?true
。
?
環(huán)境清理
操作結(jié)束后,在?docker-compose.yml
?文件所在的目錄下執(zhí)行如下命令停止所有容器:
?結(jié)論
以上就是基于輕量級(jí) CDC debezium server databend 構(gòu)建 MySQL 到 Databend 的 實(shí)時(shí)數(shù)據(jù)同步的全部過(guò)程,這種方式不需要依賴(lài) Flink, Kafka 等大型組件,啟動(dòng)和管理非常方便。
引用鏈接
[1]
?Debezium Server Databend:?https://github.com/databendcloud/debezium-server-databend[2
]
?文檔:?https://github.com/databendcloud/debezium-server-databend/blob/main/docs/docs.md[3]
?文檔:?https://debezium.io/documentation/reference/stable/transformations/event-flattening.html