技術(shù)干貨|如何利用 ChunJun 實現(xiàn)數(shù)據(jù)實時同步?
實時同步是 ChunJun 的?個重要特性,指在數(shù)據(jù)同步過程中,數(shù)據(jù)源與?標(biāo)系統(tǒng)之間的數(shù)據(jù)傳輸和更新?乎在同?時間進(jìn)?。
在實時同步場景中我們更加關(guān)注源端,當(dāng)源系統(tǒng)中的數(shù)據(jù)發(fā)?變化時,這些變化會?即傳輸并應(yīng)?到?標(biāo)系統(tǒng),以保證兩個系統(tǒng)中的數(shù)據(jù)保持?致。這個特性需要作業(yè)運?過程中 source 插件不間斷地頻繁訪問源端。在?產(chǎn)場景下,對于這類?時間運?、資源可預(yù)估、需要穩(wěn)定性的作業(yè),我們推薦使? perjob 模式部署。
插件?持 JSON 腳本和 SQL 腳本兩種配置?式,具體的參數(shù)配置請參考「ChunJun 連接器文檔」:https://sourl.cn/vxq6Zp
本文將為大家介紹如何使用 ChunJun 實時同步,以及 ChunJun ?持的 RDB 實時采集插件的特性、采集邏輯及其原理,幫助大家更好地理解 ChunJun 與實時同步。
如何使用 ChunJun 實時同步
為了讓?家能更深?了解如何使? ChunJun 做實時同步,我們假設(shè)有這樣?個場景:?個電商?站希望將其訂單數(shù)據(jù)從 MySQL 數(shù)據(jù)庫實時同步到 HBase 數(shù)據(jù)庫,以便于后續(xù)的數(shù)據(jù)分析和處理。
在這個場景中,我們將使? Kafka 作為中間消息隊列,以實現(xiàn) MySQL 和 HBase 之間的數(shù)據(jù)同步。這樣做的好處是 MySQL 表中變更可以實時同步到 HBase 結(jié)果表中,?不?擔(dān)?歷史數(shù)據(jù)被修改后 HBase 表未被同步。
如果在?家的實際應(yīng)用場景中,不關(guān)?歷史數(shù)據(jù)是否變更(或者歷史數(shù)據(jù)根本不會變更),且業(yè)務(wù)表有?個遞增的主鍵,那么可以參考本?之后的 JDBC-Polling 模式?節(jié)的內(nèi)容。
?數(shù)據(jù)源組件的部署以及 ChunJun 的部署這?不做詳細(xì)描述
?案例中的腳本均以 SQL 腳本為例,JSON 腳本也能實現(xiàn)相同功能,但在參數(shù)名上可能存在出?,使? JSON 的同學(xué)可以參考上文 「ChunJun 連接器」?檔中的參數(shù)介紹
采集 MySQL 數(shù)據(jù)到 Kafka
● 數(shù)據(jù)準(zhǔn)備
?先,我們在 Kafka 中創(chuàng)建?個名為 order_dml 的 topic,然后在 MySQL 中創(chuàng)建?個訂單表,并插??些測試數(shù)據(jù)。創(chuàng)建表的 SQL 語句如下:
● 使用 Binlog 插件采集數(shù)據(jù)到 Kafka
為了表示數(shù)據(jù)的變化類型和更好地處理數(shù)據(jù)變化,實時采集插件一般會用 RowData(Flink 內(nèi)部數(shù)據(jù)結(jié)構(gòu))中的 RowKind 記錄?志中的數(shù)據(jù)事件(insert、delete 等)類型,binlog 插件也?樣。而當(dāng)數(shù)據(jù)被打到 Kafka 中時,RowKind 信息應(yīng)該怎么處理呢?
這?我們就需要?到 upsert-kafka-x,upsert-kafka-x 會識別 RowKind。對各類時間的處理邏輯如下:
?insert 數(shù)據(jù):序列化后直接打?
?delete 數(shù)據(jù):只寫 key,value 置為 null
?update 數(shù)據(jù):分為?條 delete 數(shù)據(jù)和 insert 數(shù)據(jù)處理,即先根據(jù)主鍵刪除原本的數(shù)據(jù),再寫? update 后的數(shù)據(jù)
在下?步中我們再解釋如何將 Kafka 中的數(shù)據(jù)還原到 HBase 或者其他?持 upsert 語義的數(shù)據(jù)庫中,接下來我們來編寫 SQL 腳本,實現(xiàn) MySQL 數(shù)據(jù)實時采集到 Kafka 中的功能,示例如下:
還原 Kafka 中的數(shù)據(jù)到 HBase
上述步驟中,我們通過 binlog-x 和 upsert-kafka-x,將 MySQL 中的數(shù)據(jù)實時采集到了 Kafka 中。解鈴還須系鈴?,我們可以通過 upsert-kafka-x 再去將 Kafka 中的數(shù)據(jù)解析成帶有 upsert 語義的數(shù)據(jù)。
upsert-kafka-x 作為 source 插件時,會判斷 Kafka 中數(shù)據(jù)的 value 是否為 null,如果 value 為 null 則標(biāo)記這條數(shù)據(jù)的 RowKind 為 DELETE,否則將數(shù)據(jù)的 ROWKIND 標(biāo)記為 INSERT。
ChunJun 的 hbase-x 插件?前已經(jīng)具備了 upsert 語句的能?,使? hbase-x 即可將 Kafka 中的數(shù)據(jù)還原到 hbase 中。接下來是 SQL 腳本示例,為了?便在 HBase 中查看數(shù)據(jù)結(jié)果,我們將 int 數(shù)據(jù) cast 為 string 類型:
Tips:如果我們不需要 Kafka 中間件,也可以使? binlog-x 插件直接對接 hbase-x 插件。
ChunJun 支持的 RDB 實時采集插件
本節(jié)主要介紹 ChunJun 的 RDB 實時采集插件的特性、采集邏輯及其原理。
ChunJun 的 RDB 實時采集可以實時監(jiān)視數(shù)據(jù)庫中的更改,并在發(fā)?更改時讀取數(shù)據(jù)變化,例如插?、更新和刪除操作。使? ChunJun 實時采集,我們可以實時獲取有關(guān)數(shù)據(jù)庫中更改的信息,從?能夠及時響應(yīng)這些更改,如此便可以幫助我們更好地管理和利? RDB 數(shù)據(jù)庫中的數(shù)據(jù)。
并且 ChunJun 提供了故障恢復(fù)和斷點續(xù)傳功能來確保數(shù)據(jù)的完整性。ChunJun 實時采集類插件的?致實現(xiàn)步驟如下:
?連接數(shù)據(jù)庫,確認(rèn)讀取點位,讀取點位可以理解為?個 offset,如 Binlog 中,指?志的?件名和?件的 position 信息
?根據(jù)讀取點位開始讀取 redolog,獲取其中關(guān)于數(shù)據(jù)變更相關(guān)的操作記錄
?根據(jù) tableName、操作事件(如 insert、delete、update)等過濾信息過濾出需要的 log ?志
?解析 log ?志,解析后的事件信息包括表名、數(shù)據(jù)庫名、操作類型(插?、更新或刪除)和變更的數(shù)據(jù)?等
?將解析出來的數(shù)據(jù)會加?為 ChunJun 內(nèi)部統(tǒng)?的 DdlRowData 供下游使?
ChunJun ?前已?持的實時采集 Connector 有:binlog (mysql)、oceanbasecdc、oraclelogminer、sqlservercdc。
Binlog 簡介
ChunJun binlog 插件的主要功能是讀取 MySQL 的?進(jìn)制?志(binlog)?件。這些?件記錄了所有對數(shù)據(jù)的更改操作,如插?、更新和刪除等。?前,該插件依賴 Canal 組件來讀取 MySQL 的 binlog ?件。
核?操作步驟如下:
?確認(rèn)讀取點位:在 binlog 插件中,我們可以在腳本的 start 字段中直接指定 journal-name(binlog ?件名)和 position(?件的特定位置)
?讀取 binlog:binlog 插件將?身偽裝成 MySQL 的 Slave 節(jié)點,向 MySQL Master 發(fā)送請求,要求將 binlog ?件的數(shù)據(jù)流發(fā)送給它
?故障恢復(fù)和斷點續(xù)傳:故障時,插件會記錄當(dāng)前的 binlog 位置信息,從 checkpoint/savepoint 恢復(fù)后,我們可以從上次記錄的位置繼續(xù)讀取 binlog ?件,確保數(shù)據(jù)變化的完整性
使? binlog 所需的權(quán)限在「binlog 插件使??檔」中有詳細(xì)說明,鏈接如下:
https://sourl.cn/mvae9m
OracleLogminer 簡介
Logminer 插件借助 Oracle 提供的 Logminer ?具通過讀取視圖的?式獲取 Oracle redolog 中的信息。
核?操作步驟如下:
01 定位需讀取起始點位(start_scn)
?前 logminer ?持四種策略指定 StartScn:
?all:從 Oracle 數(shù)據(jù)庫中最早的歸檔?志組開始采集 (不建議使?)
?current:任務(wù)運?時的 SCN 號
?time:指定時間點對應(yīng)的 SCN 號
?scn:直接指定 SCN 號
02 定位需要讀取的結(jié)束點位 (end_scn)
插件根據(jù) start_scn 和 maxLogFileSize(默認(rèn) 5G)獲取可加載的 redolog ?件列表,end_scn 取這個?件列表中最?的 scn 值。
03 加載 redo ?志到 Logminer
通過?個存儲過程,將 scn 區(qū)間范圍內(nèi)的 redolog 加載到 Logminer ?。
04 從視圖中讀取數(shù)據(jù)
以 scn > ? 作為 where 條件直接查詢 v$logmnr_contents 視圖內(nèi)的信息即可獲取 redolog 中的數(shù)據(jù)。
05 重復(fù) 1-4 步驟,實現(xiàn)不斷的讀取
如標(biāo)題。
06 故障恢復(fù)和斷點續(xù)傳
在發(fā)?故障時,插件會保存當(dāng)前消費的 scn 號,重啟時從上次的 scn 號開始讀取,確保數(shù)據(jù)完整性。
?關(guān)于該插件原理的詳細(xì)介紹請參?「Oracle Logminer 實現(xiàn)原理說明?檔」:
https://sourl.cn/6vqz4b
?使? lominer 插件的前提條件詳?「Oracle 配置 LogMiner」:
https://sourl.cn/eteyZY
SqlServerCDC 簡介
SqlServerCDC 插件依賴 SQL Server 的 CDC Agent 服務(wù)提供的視圖獲取 redolog 中的信息。
核?操作步驟如下:
01 定位需讀取起始點位(from_lsn)
?前 SqlserverCDC 僅?持直接配置 lsn 號,如果 lsn 號未配置,則取數(shù)據(jù)庫中當(dāng)前最?的 lsn 號為 from_lsn。
02 定位需要讀取的結(jié)束點位 (to_lsn)
SqlserverCDC 插件定期地(可通過 pollInterval 參數(shù)指定)獲取數(shù)據(jù)庫中的最? lsn 為 end_lsn。
03 從視圖中讀取數(shù)據(jù)
查詢 Agent 服務(wù)提供的視圖中 lsn 區(qū)間范圍內(nèi)的數(shù)據(jù),過濾出需要監(jiān)聽的表及事件類型。
04 重復(fù) 1-3 步驟,實現(xiàn)不斷的讀取
如標(biāo)題。
05 故障恢復(fù)和斷點續(xù)傳
在發(fā)?故障時,插件會保存當(dāng)前消費的 lsn 號。重啟時從上次的 lsn 號開始讀取,確保數(shù)據(jù)完整性。
?關(guān)于該插件原理的詳細(xì)介紹請參?「Sqlserver CDC 實現(xiàn)原理說明?檔」:
https://sourl.cn/5pQvEM
?配置 SqlServer CDC Agent 服務(wù)詳?「Sqlserver 配置 CDC ?檔」:
https://sourl.cn/h5nd8j
OceanBaseCDC 簡介
OceanBase 是螞蟻集團(tuán)開源的?款分布式關(guān)系型數(shù)據(jù)庫,它使??進(jìn)制?志(binlog)記錄數(shù)據(jù)變更。OceanBaseCDC 的實現(xiàn)依賴于 OceanBase 提供的 LogProxy 服務(wù),LogProxy 提供了基于發(fā)布 - 訂閱模型的服務(wù),允許使? OceanBase 的 logclient 訂閱特定的 binlog 數(shù)據(jù)流。
OceanBaseCDC 啟動?個 Listener 線程。當(dāng) logclient 連接到 LogProxy 后,Listener 會訂閱經(jīng)過數(shù)據(jù)過濾的 binlog,然后將其添加到內(nèi)部維護(hù)的列表中。當(dāng)收到 COMMIT 信息后,Listener 會將?志變更信息傳遞給?個阻塞隊列,由主線程消費并將其轉(zhuǎn)換為 ChunJun 內(nèi)部的 DdlRowData,最終發(fā)送到下游。
JDBC-Polling 模式讀
JDBC 插件的 polling 讀取模式是基于 SQL 語句做數(shù)據(jù)讀取的,相對于基于重做?志的實時采集成本更低,但 jdbc 插件做實時同步對業(yè)務(wù)場景有更?的要求:
?有?個數(shù)值類型或者時間類型的遞增主鍵
?不更新歷史數(shù)據(jù)或者不關(guān)?歷史數(shù)據(jù)是否更新,僅關(guān)?新數(shù)據(jù)的獲取
實現(xiàn)原理簡介
?設(shè)置遞增的業(yè)務(wù)主鍵作為 polling 模式依賴的增量鍵
?在增量讀取的過程中,實時記錄 increColumn 對應(yīng)的值(state),作為下?次數(shù)據(jù)讀取的起始點位
?當(dāng)?批數(shù)據(jù)讀取完后,間隔?段時間之后依據(jù) state 讀取下?批數(shù)據(jù)
polling 依賴部分增量同步的邏輯,關(guān)于增量同步的更多介紹可以點擊:
https://sourl.cn/UC8n6K
如何配置?個 jdbc-polling 作業(yè)
先介紹?下開啟 polling 模式需要關(guān)注的配置項:

以 MySQL 為例,假設(shè)我們有?個存儲訂單信息的歷史表,且訂單的 order_id 是遞增的,我們希望定期地獲取這張表的新增數(shù)據(jù)。
我們可以這樣配置 json 腳本的 reader 信息。
《數(shù)據(jù)治理行業(yè)實踐白皮書》下載地址:https://fs80.cn/l134d5?
《數(shù)棧產(chǎn)品白皮書》免費獲?。篽ttps://fs80.cn/cw0iw1
想了解或咨詢更多有關(guān)袋鼠云大數(shù)據(jù)產(chǎn)品、行業(yè)解決方案、客戶案例的朋友,瀏覽袋鼠云官網(wǎng):https://www.dtstack.com/?src=szbzhan
同時,歡迎對大數(shù)據(jù)開源項目有興趣的同學(xué)加入「袋鼠云開源框架釘釘技術(shù) qun」,交流最新開源技術(shù)信息,qun 號碼:30537511,項目地址:https://github.com/DTStack