基于 Flink CDC 構(gòu)建 MySQL 到 Databend 的實時數(shù)據(jù)同步
作者:韓山杰Databend Cloud 研發(fā)工程師https://github.com/hantmac
轉(zhuǎn)自微信?基于 Flink CDC 構(gòu)建 MySQL 到 Databend 的實時數(shù)據(jù)同步 (qq.com)?

這篇教程將展示如何基于 Flink CDC 快速構(gòu)建 MySQL 到 Databend 的實時數(shù)據(jù)同步。本教程的演示都將在 Flink SQL CLI 中進行,只涉及 SQL,無需一行 Java/Scala 代碼,也無需安裝 IDE。
假設(shè)我們有電子商務(wù)業(yè)務(wù),商品的數(shù)據(jù)存儲在 MySQL ,我們需要實時把它同步到 Databend 中。接下來的內(nèi)容將介紹如何使用 Flink Mysql/Databend CDC 來實現(xiàn)這個需求,系統(tǒng)的整體架構(gòu)如下圖所示:

準備階段
準備一臺已經(jīng)安裝了 Docker 和 docker-compose 的 Linux 或者 MacOS 。
???準備教程所需要的組件
接下來的教程將以?docker-compose
?的方式準備所需要的組件。
???debezium-MySQL
docker-compose.yaml
???Databend
docker-compose.yaml
在?docker-compose.yml
?所在目錄下執(zhí)行下面的命令來啟動本教程需要的組件:
該命令將以 detached 模式自動啟動 Docker Compose 配置中定義的所有容器。你可以通過 docker ps 來觀察上述的容器是否正常啟動。
???下載?Flink?和所需要的依賴包
下載?Flink 1.16.0[1]?并將其解壓至目錄?
flink-1.16.0
下載下面列出的依賴包,并將它們放到目錄?
flink-1.16.0/lib/
?下:下載鏈接只對已發(fā)布的版本有效, SNAPSHOT 版本需要本地編譯
flink-sql-connector-mysql-cdc-2.3.0.jar[2]
編譯 flink-connector-databend
將 target/flink-connector-databend-1.16.0-SNAPSHOT.jar 拷貝到目錄?flink-1.16.0/lib/
?下。
???準備數(shù)據(jù)
在?MySQL?數(shù)據(jù)庫中準備數(shù)據(jù)
進入 MySQL 容器
創(chuàng)建數(shù)據(jù)庫 mydb 和表?products
,并插入數(shù)據(jù):
???Databend 中建表
使用下面的命令啟動 Flink 集群
./bin/start-cluster.sh
啟動成功的話,可以在?http://localhost:8081/?訪問到 Flink Web UI,如下所示:

使用下面的命令啟動 Flink SQL CLI
./bin/sql-client.sh

在?Flink?SQL?CLI?中使用 Flink?DDL?創(chuàng)建表
首先,開啟 checkpoint,每隔3秒做一次 checkpoint
--?Flink?SQL??????????????
Flink?SQL>?SET?execution.checkpointing.interval?=?3s;
然后, 對于數(shù)據(jù)庫中的表?products
?使用 Flink SQL CLI 創(chuàng)建對應(yīng)的表,用于同步底層數(shù)據(jù)庫表的數(shù)據(jù)
--?Flink?SQL
Flink?SQL>?CREATE?TABLE?products?(id?INT,name?STRING,description?STRING,PRIMARY?KEY?(id)?NOT?ENFORCED)?
WITH?('connector'?=?'mysql-cdc',
'hostname'?=?'localhost',
'port'?=?'3306',
'username'?=?'root',
'password'?=?'123456',
'database-name'?=?'mydb',
'table-name'?=?'products',
'server-time-zone'?=?'UTC'
);
最后,創(chuàng)建 d_products 表, 用來訂單數(shù)據(jù)寫入 Databend 中
--?Flink?SQL
create?table?d_products?(id?INT,name?String,description?String,?PRIMARY?KEY?(`id`)?NOT?ENFORCED)?
with?('connector'?=?'databend',
'url'='databend://localhost:8000',
'username'='databend',
'password'='databend',
'database-name'='default',
'table-name'='bend_products',
'sink.batch-size'?=?'5',
'sink.flush-interval'?=?'1000',
'sink.max-retries'?=?'3');
使用 Flink SQL 將 products 表中的數(shù)據(jù)同步到 Databend 的 d_products 表中:
insert?into?d_products?select?*?from?products;

此時 flink job 就會提交成功,打開 flink UI 可以看到:

同時在 databend 中可以看到 MySQL 中的數(shù)據(jù)已經(jīng)同步過來了:

同步 Insert/Update 數(shù)據(jù)
此時我們在 MySQL 中再插入 10 條數(shù)據(jù):
INSERT?INTO?products?VALUES?
(default,"scooter","Small?2-wheel?scooter"),
(default,"car?battery","12V?car?battery"),
(default,"12-pack?drill?bits","12-pack?of?drill?bits?with?sizes?ranging?from?#40?to?#3"),????????
(default,"hammer","12oz?carpenter's?hammer"),????????
(default,"hammer","14oz?carpenter's?hammer"),????????
(default,"hammer","16oz?carpenter's?hammer"),????????
(default,"rocks","box?of?assorted?rocks"),????????
(default,"jacket","water?resistent?black?wind?breaker"),
(default,"cloud","test?for?databend"),????????
(default,"spare?tire","24?inch?spare?tire");

這些數(shù)據(jù)會立即同步到 Databend 當中。

假如此時 MySQL 中更新了一條數(shù)據(jù):

那么 id=10 的數(shù)據(jù)在 databend 中也會被立即更新:

環(huán)境清理操作結(jié)束后,在?docker-compose.yml
?文件所在的目錄下執(zhí)行如下命令停止所有容器:
docker-compose?down
在 Flink 所在目錄?flink-1.16.0
?下執(zhí)行如下命令停止 Flink 集群:
./bin/stop-cluster.sh
結(jié)論
以上就是基于 Flink CDC 構(gòu)建 MySQL 到 Databend 的 實時數(shù)據(jù)同步的全部過程,通過 Flink CDC connectors 可以替換 Debezium+Kafka 的數(shù)據(jù)采集模塊,實現(xiàn) Flink SQL 采集+計算+傳輸一體化,減少維護的組件,簡化實時鏈路,減輕部署成本的同時也能達到 Exactly Once 的語義效果。引用鏈接[1]?Flink 1.16.0:?https://archive.apache.org/dist/flink/flink-1.16.0/flink-1.16.0-bin-scala_2.12.tgz
[2]?flink-sql-connector-mysql-cdc-2.3.0.jar:?https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.3.0/flink-sql-connector-mysql-cdc-2.3.0.jar
[3]?flink-connector-databnend:https://github.com/databendcloud/flink-connector-databend
關(guān)于?Databend
Databend 是一款開源、彈性、低成本,基于對象存儲也可以做實時分析的新式數(shù)倉。期待您的關(guān)注,一起探索云原生數(shù)倉解決方案,打造新一代開源 Data Cloud。
???????Databend Cloud:https://databend.cn
???Databend 文檔:https://databend.rs/
???Wechat:Databend
??GitHub:https://github.com/datafuselabs/databend

本文使用 文章同步助手 同步