基于 RocketMQ Connect 構(gòu)建數(shù)據(jù)流轉(zhuǎn)處理平臺(tái)
本文作者:周波,阿里云智能高級(jí)開(kāi)發(fā)工程師, Apache RocketMQ Committer 。
01?從問(wèn)題中來(lái)的RocketMQ Connect
在電商系統(tǒng)、金融系統(tǒng)及物流系統(tǒng),我們經(jīng)常可以看到 RocketMQ 的身影。原因不難理解,隨著數(shù)字化轉(zhuǎn)型范圍的擴(kuò)大及進(jìn)程的加快,業(yè)務(wù)系統(tǒng)的數(shù)據(jù)也在每日暴增,此時(shí)為了保證系統(tǒng)的穩(wěn)定運(yùn)行,就需要把運(yùn)行壓力分擔(dān)出去。RocketMQ就擔(dān)任著這樣的角色,它的異步消息處理與高并發(fā)讀寫能力,決定了系統(tǒng)底層的重構(gòu)不會(huì)影響上層應(yīng)用的功能。而RocketMQ的另一個(gè)優(yōu)勢(shì)——可伸縮能力,使系統(tǒng)在面臨流量的不確定性時(shí),實(shí)現(xiàn)對(duì)流量的緩沖處理。此外,RocketMQ的順序設(shè)計(jì)特性使其成為一個(gè)天然的排隊(duì)引擎,例如,三個(gè)應(yīng)用同時(shí)對(duì)一個(gè)后臺(tái)引擎發(fā)起請(qǐng)求,確保不引起“撞車”事故。因此,RocketMQ被用在異步解耦、削峰填谷以及事務(wù)消息等場(chǎng)景中。
但是,數(shù)字化轉(zhuǎn)型浪潮也帶來(lái)了更多用戶對(duì)數(shù)據(jù)價(jià)值的關(guān)注——如何讓數(shù)據(jù)產(chǎn)生更大利用價(jià)值?RocketMQ自身不具備數(shù)據(jù)分析能力,但是有不少用戶希望從 RocketMQ Topic 中獲取數(shù)據(jù)并進(jìn)行在線或離線的數(shù)據(jù)分析。然而,使用市面上的數(shù)據(jù)集成或數(shù)據(jù)同步工具,將 RocketMQ Topic 數(shù)據(jù)同步到一些分析系統(tǒng)中雖然是一種可行方案,卻會(huì)引入新的組件,造成數(shù)據(jù)同步的鏈路較長(zhǎng),時(shí)延相對(duì)較高,用戶體驗(yàn)不佳。
舉個(gè)例子,假設(shè)業(yè)務(wù)場(chǎng)景中使用 OceanBase 作為數(shù)據(jù)存儲(chǔ),同時(shí)希望將這些數(shù)據(jù)同步到 Elasticsearch進(jìn)行全文搜索,有兩種可行的數(shù)據(jù)同步方案。
方案一:從 OceanBase 中獲取數(shù)據(jù),寫入 Elasticsearch組件并進(jìn)行數(shù)據(jù)同步,在數(shù)據(jù)源較少時(shí)此方案沒(méi)什么問(wèn)題,一旦數(shù)據(jù)增多,開(kāi)發(fā)和維護(hù)都非常復(fù)雜,此時(shí)就要用到第二種方案。

方案二:引入消息中間件對(duì)上下游進(jìn)行解藕,這能解決第一種方案的問(wèn)題,但是一些較為復(fù)雜的問(wèn)題還沒(méi)有完全解決。比如,如何將數(shù)據(jù)從源數(shù)據(jù)同步到目標(biāo)系統(tǒng)并保證高性能,如果保證同步任務(wù)的部分節(jié)點(diǎn)掛掉,數(shù)據(jù)同步依然正常進(jìn)行,節(jié)點(diǎn)恢復(fù)依然可以斷點(diǎn)續(xù)傳,同時(shí)隨著數(shù)據(jù)管道的增多,如何管理數(shù)據(jù)管道也變得十分困難。

總的來(lái)說(shuō),數(shù)據(jù)集成過(guò)程中的挑戰(zhàn)主要有五個(gè)。
挑戰(zhàn)一:?數(shù)據(jù)源多,市面上可能有上百個(gè)數(shù)據(jù)源,且各數(shù)據(jù)源的系統(tǒng)差異較大,實(shí)現(xiàn)任意數(shù)據(jù)源之間的數(shù)據(jù)同步工作量較大,研發(fā)周期很長(zhǎng)。
挑戰(zhàn)二:?高性能問(wèn)題,如何高效地從源數(shù)據(jù)系統(tǒng)同步到目的數(shù)據(jù)系統(tǒng),并保障其性能。
挑戰(zhàn)三:?高可用問(wèn)題,即Failover能力,當(dāng)一個(gè)節(jié)點(diǎn)掛掉是否這個(gè)節(jié)點(diǎn)的任務(wù)就停止了,任務(wù)重新啟動(dòng)是否還可以斷點(diǎn)續(xù)傳。
挑戰(zhàn)四:?彈性擴(kuò)縮容能力,根據(jù)系統(tǒng)流量動(dòng)態(tài)增加或減少節(jié)點(diǎn)數(shù)量,既能通過(guò)擴(kuò)容滿足高峰期業(yè)務(wù),也能在低峰期縮減節(jié)點(diǎn),節(jié)省成本。
挑戰(zhàn)五:?數(shù)據(jù)管道的管理運(yùn)維,隨著數(shù)據(jù)管道的增多,運(yùn)維監(jiān)控的數(shù)據(jù)管道也會(huì)變得越來(lái)越復(fù)雜,如何高效管理監(jiān)控眾多的同步任務(wù)。
面對(duì)上述挑戰(zhàn) RocketMQ 如何解決?
第一,標(biāo)準(zhǔn)化數(shù)據(jù)集成 API (Open Messaging Connect API)。在 RocketMQ 生態(tài)中增加 Connect 組件,一方面對(duì)數(shù)據(jù)集成過(guò)程抽象,抽象標(biāo)準(zhǔn)的數(shù)據(jù)格式以及描述數(shù)據(jù)的 Schema,另一方面對(duì)同步任務(wù)進(jìn)行抽象,任務(wù)的創(chuàng)建、分片都抽象成一套標(biāo)準(zhǔn)化的流程。
第二,基于標(biāo)準(zhǔn)的 API 實(shí)現(xiàn) Connect Runtime。Runtime提供了集群管理、配置管理、位點(diǎn)管理、負(fù)載均衡相關(guān)的能力,擁有了這些能力,開(kāi)發(fā)者或者用戶就只需要關(guān)注數(shù)據(jù)如何獲取或如何寫入,從而快速構(gòu)建數(shù)據(jù)生態(tài),如與OceanBase、MySQL、Elasticsearc等快速建立連接,搭建數(shù)據(jù)集成平臺(tái)。整個(gè)數(shù)據(jù)集成平臺(tái)的構(gòu)建也非常簡(jiǎn)單,通過(guò) Runtime 提供的 RESTFull API 進(jìn)行簡(jiǎn)單調(diào)用即可。
第三,提供完善的運(yùn)維工具,方便管理同步任務(wù),同時(shí)提供豐富的Metrics 信息,方便查看同步任務(wù)的TPS,流量等信息。
02?RocketMQ Connect 兩大使用場(chǎng)景
這里為大家整理了 RocketMQ Connect的兩大使用場(chǎng)景。
場(chǎng)景一,RocketMQ 作為中間媒介,可以將上下游數(shù)據(jù)打通,比如在新舊系統(tǒng)遷移的過(guò)程中,如果在業(yè)務(wù)量不大時(shí)使用 MySQL 就可以滿足業(yè)務(wù)需求,而隨著業(yè)務(wù)的增長(zhǎng),MySQL 性能無(wú)法滿足業(yè)務(wù)要求時(shí),需要對(duì)系統(tǒng)進(jìn)行升級(jí),選用分布式數(shù)據(jù)庫(kù)OceanBase 提升系統(tǒng)性能。
如何將舊系統(tǒng)數(shù)據(jù)無(wú)縫遷移到 OceanBase 中呢?在這個(gè)場(chǎng)景中RocketMQ Connect 就可以發(fā)揮作用,RocketMQ Connect可以構(gòu)建一個(gè)從 MySQL 到 OceanBase 的數(shù)據(jù)管道,實(shí)現(xiàn)數(shù)據(jù)的平滑遷移。RocketMQ Connect 還可以用在搭建數(shù)據(jù)湖、搜索引擎、ETL 平臺(tái)等場(chǎng)景。例如將各個(gè)數(shù)據(jù)源的數(shù)據(jù)集成到 RocketMQ Topic當(dāng)中,目標(biāo)存儲(chǔ)只需要對(duì)接 Elasticsearch 就可以構(gòu)建一個(gè)搜索平臺(tái),目標(biāo)存儲(chǔ)如果是數(shù)據(jù)湖就可以構(gòu)建一個(gè)數(shù)據(jù)湖平臺(tái)。
除此之外,RocketMQ 自身也可以作為一個(gè)數(shù)據(jù)源,將一個(gè) RocketMQ 集群的數(shù)據(jù)同步到另一個(gè)集群,可以構(gòu)建 RocketMQ 多活容災(zāi)能力,這是社區(qū)正在孵化的Replicator可以實(shí)現(xiàn)的能力。

場(chǎng)景二,RocketMQ 作為端點(diǎn)。?RocketMQ 的生態(tài)中提供了流計(jì)算能力組件-RocketMQ Streams,Connector將各個(gè)存儲(chǔ)系統(tǒng)的數(shù)據(jù)集成到RocketMQ Topic當(dāng)中,下游使用 RocketMQ Streams流計(jì)算的能力就可以構(gòu)建一個(gè)實(shí)時(shí)的流計(jì)算平臺(tái)。當(dāng)然也可以配合業(yè)務(wù)系統(tǒng)的Service 實(shí)現(xiàn)業(yè)務(wù)系統(tǒng)快速?gòu)钠渌鎯?chǔ)統(tǒng)一快速獲取數(shù)據(jù)的能力。

還可以將 RocketMQ 作為端點(diǎn)的上游,將業(yè)務(wù)消息發(fā)到 Topic 中,使用 Connector 對(duì)數(shù)據(jù)做持久化或轉(zhuǎn)存的操作。

如此一來(lái),RocketMQ 就具備數(shù)據(jù)集成能力,可以實(shí)現(xiàn)任意任意異構(gòu)數(shù)據(jù)源之間的數(shù)據(jù)同步,同時(shí)也具備統(tǒng)一的集群管理、監(jiān)控能力及配置化搭建數(shù)據(jù)管道搭建能力,開(kāi)發(fā)者或者用戶只需要專注于數(shù)據(jù)拷貝,簡(jiǎn)單配置就可以得到一個(gè)具備配置化、低代碼、低延時(shí)、高可用,支持故障處理和動(dòng)態(tài)擴(kuò)縮容數(shù)據(jù)集成平臺(tái)。
那么, RocketMQ Connect 是如何實(shí)現(xiàn)的呢?
03RocketMQ Connect 實(shí)現(xiàn)原理
在介紹實(shí)現(xiàn)原理前,先來(lái)了解兩個(gè)概念。
概念一,什么是 Connector(連接器)??它定義數(shù)據(jù)從哪復(fù)制到哪,是從源數(shù)據(jù)系統(tǒng)讀取數(shù)據(jù)寫入RocketMQ,這種是SourceConnector,或從RocketMQ讀數(shù)據(jù)寫入到目標(biāo)系統(tǒng),這種是SinkConnector。Connector決定需要?jiǎng)?chuàng)建任務(wù)的數(shù)量,從Worker接收配置傳遞給任務(wù)。

概念二,什么是 Task ??Task 是 Connector 任務(wù)分片的最小分配單位,是實(shí)際將源數(shù)據(jù)源數(shù)據(jù)復(fù)制到 RocketMQ(SourceTask),或者將數(shù)據(jù)從RocketMQ 讀出寫入到目標(biāo)系統(tǒng)(SinkTask)真正的執(zhí)行者,Task是無(wú)狀態(tài)的,可以動(dòng)態(tài)的啟停任務(wù),多個(gè)Task可以并行執(zhí)行,Connector 復(fù)制數(shù)據(jù)的并行度主要體現(xiàn)在 Task 上。一個(gè) Task 任務(wù)可以理解為一個(gè)線程,多個(gè) Task 則以多線程的方式運(yùn)行。

通過(guò)Connect的API也可以看到Connector和Task各自的職責(zé),Connector實(shí)現(xiàn)時(shí)就已經(jīng)確定數(shù)據(jù)復(fù)制的流向,Connector接收數(shù)據(jù)源相關(guān)的配置,taskClass獲取需要?jiǎng)?chuàng)建的任務(wù)類型,通過(guò)taskConfigs的數(shù)量確定任務(wù)數(shù)量,并且為Task分配好配置。Task拿到配置以后數(shù)據(jù)源建立連接并獲取數(shù)據(jù)寫入到目標(biāo)存儲(chǔ)。通過(guò)下面的兩張圖可以清楚的看到,Connector和Task處理基本流程。

一個(gè) RocketMQ Connect 集群中會(huì)有多個(gè) Connector ,每個(gè) Connector 會(huì)對(duì)應(yīng)一個(gè)或多個(gè) Task,這些任務(wù)運(yùn)行在 Worker(進(jìn)程)中。Worker進(jìn)程是Connector和Task運(yùn)行環(huán)境,它提供RESTFull能力,接收HTTP請(qǐng)求,將獲取到的配置傳遞給Connector和Task,它還負(fù)責(zé)啟動(dòng)Connector和Task,保存Connector配置信息,保存Task同步數(shù)據(jù)的位點(diǎn)信息,除此以外,Worker還提供負(fù)載均衡能力,Connect集群高可用、擴(kuò)縮容、故障處理主要依賴Worker的負(fù)責(zé)均衡能力實(shí)現(xiàn)的。Worker 提供服務(wù)的流程如下:

Worker 提供的服務(wù)發(fā)現(xiàn)及負(fù)載均衡的實(shí)現(xiàn)原理如下:
服務(wù)發(fā)現(xiàn):

用過(guò) RocketMQ 的開(kāi)發(fā)者應(yīng)該知道,它的使用很簡(jiǎn)單,就是發(fā)送和接收消息。消費(fèi)模式分為集群模式和廣播模式兩種,集群消費(fèi)模式下一個(gè) Topic 可以有多個(gè) Consumer 消費(fèi)消息,任意一個(gè) Consumer 的上線或下線 RocketMQ 服務(wù)端都有感知,并且還可以將客戶端上下線信息通知給其它節(jié)點(diǎn),利用RocketMQ這個(gè)特性就實(shí)現(xiàn)了 Worker的服務(wù)發(fā)現(xiàn)。
配置/Offset同步:

Connector 的配置/Offset信息同步通過(guò)每個(gè) Worker訂閱相同的 Topic,不同Worker 使用不同的 Consumer Group 實(shí)現(xiàn)的, Worker 節(jié)點(diǎn)可以通過(guò)這種方式消費(fèi)到相同Topic的所有數(shù)據(jù),即Connector配置/Offset信息,這類似于廣播消費(fèi)模式,這種數(shù)據(jù)同步模式可以保證任何一個(gè) Worker 掛掉,該Worker上的任務(wù)依舊可以在存活的 Worker 正常拉起運(yùn)行 ,并且可以獲取到任務(wù)對(duì)應(yīng)的 Offset 信息實(shí)現(xiàn)斷點(diǎn)續(xù)傳, 這是故障轉(zhuǎn)移以及高可用能力的基礎(chǔ)。
負(fù)載均衡
RocketMQ 消費(fèi)場(chǎng)景中,消費(fèi)客戶端 與Topic Queue 之間有負(fù)載均衡能力,Connector 在這一部分也是類似的,只不過(guò)它負(fù)載均衡的對(duì)象不一樣,Connector 是 Worker 節(jié)點(diǎn)和 Task之間的負(fù)載均衡,與RocketMQ客戶端負(fù)載均衡一樣,可以根據(jù)使用場(chǎng)景選擇不同負(fù)載均衡算法。

上文提到過(guò) RocketMQ Connect 提供 RESTFull API能力。通過(guò) RESTFull AP可以創(chuàng)建 Connector,管理Connector 以及查看 Connector 狀態(tài),簡(jiǎn)單列舉:
POST /connectors/
GET /connectors/{connector name}/config
GET /connectors/{connector name}/status
POST /connectors/{connector name}/stop
目前 Connector 支持單機(jī)、集群兩種部署模式。集群模式至少要有兩個(gè)節(jié)點(diǎn),才能保證它的高可用。并且集群可以動(dòng)態(tài)增加或者減少,做到了動(dòng)態(tài)控制提升集群性能和節(jié)省成本節(jié)省的能力。單機(jī)模式更多方便了開(kāi)發(fā)者開(kāi)發(fā)測(cè)試 Connector 。
如何如何實(shí)現(xiàn)一個(gè) Connector呢??還是結(jié)合一個(gè)具體的場(chǎng)景看一看,例如業(yè)務(wù)數(shù)據(jù)當(dāng)前是寫入MySQL 數(shù)據(jù)庫(kù)中的,希望將MySQL中數(shù)據(jù)實(shí)時(shí)同步到數(shù)據(jù)湖Hudi當(dāng)中。只要實(shí)現(xiàn) MySQL Source Connector 、Hudi Sink Connector這兩個(gè) Connector 即可。

下面就以 MySQLSource Connector 為例,來(lái)看一下具體的如何實(shí)現(xiàn)。
實(shí)現(xiàn) Connector最主要的就是實(shí)現(xiàn)兩個(gè) API 。第一個(gè)是 Connector API ,除了實(shí)現(xiàn)它生命周期相關(guān)的 API 外,還有任務(wù)如何分配,是通過(guò) Topic、Table 還是通過(guò)數(shù)據(jù)庫(kù)的維度去分。第二個(gè)API是需要?jiǎng)?chuàng)建的 Task,Connector 通過(guò)任務(wù)分配將相關(guān)的配置信息傳遞給 Task, Task 拿到這些信息,例如數(shù)據(jù)庫(kù)賬號(hào),密碼,IP,端口后就會(huì)創(chuàng)建數(shù)據(jù)庫(kù)連接,再通過(guò) MySQL 提供的 BINLOG 機(jī)智獲取到表的數(shù)據(jù),將這些數(shù)據(jù)寫到一個(gè)阻塞隊(duì)列中。Task有個(gè) Poll 方法,實(shí)現(xiàn)Connector時(shí)只要調(diào)用到Poll方法時(shí)可以獲取到數(shù)據(jù)即可,這樣 Connector 就基本寫完了。然后打包以Jar包的形式提供出來(lái),將它加載到 Worker 的節(jié)點(diǎn)中。

創(chuàng)建 Connector 任務(wù)后, Worker 中會(huì)創(chuàng)建一個(gè)或者多個(gè)線程,不停的輪詢 Poll方法,從而獲取到MySQL表中的數(shù)據(jù),再通過(guò) RocketMQ Producer 發(fā)送到 RocketMQ Broker中,這就是 Connector 從實(shí)現(xiàn)到運(yùn)行的整體過(guò)程(見(jiàn)下圖)。

04?RocketMQ Connect 現(xiàn)狀與未來(lái)
RocketMQ Connect 的發(fā)展歷程分為三個(gè)階段。
第一階段:Preview 階段
RocketMQ Connect發(fā)展的初期也即Preview階段,實(shí)現(xiàn)了Open Messaging Connect API 1.0 版本,基于該版本實(shí)現(xiàn)了 RocketMQ Connect Runtime ,同時(shí)提供了 10+ Connector 實(shí)現(xiàn)(MySQL,Redis,Kafka,Jms,MongoDB……)。在該階段,RocketMQ Connect 可以簡(jiǎn)單實(shí)現(xiàn)端到端的數(shù)據(jù)源同步,但功能還不夠完善,不支持?jǐn)?shù)據(jù)轉(zhuǎn)換,序列化等能力,生態(tài)相對(duì)還比較貧乏。
第二階段:1.0 階段
在 1.0 階段,Open Messaging Connect API 進(jìn)行了升級(jí),支持Schema、Transform,Converter等能力,在此基礎(chǔ)上對(duì) Connect Runtime 也進(jìn)行了重大升級(jí),對(duì)數(shù)據(jù)轉(zhuǎn)換,序列化做了支持,復(fù)雜Schema也做了完善的支持。該階段的 API、Runtime 能力已經(jīng)基本完善,在此基礎(chǔ)上,還有30+ Connecotor 實(shí)現(xiàn),覆蓋了 CDC、JDBC、SFTP、NoSQL、緩存Redis、HTTP、AMQP、JMS、數(shù)據(jù)湖、實(shí)時(shí)數(shù)倉(cāng)、Replicator、等Connector實(shí)現(xiàn),還做了Kafka Connector Adaptor可以運(yùn)行Kafka生態(tài)的Connector。
第三階段:2.0 階段
RocketMQ Connect當(dāng)前處于這個(gè)階段,重點(diǎn)發(fā)展Connector生態(tài),當(dāng) RocketMQ 的 Connector生態(tài)達(dá)到 100 + 時(shí),RocketMQ 基本上可以與任意的一個(gè)數(shù)據(jù)系統(tǒng)去做連接。
目前 RocketMQ 社區(qū)正在和 OceanBase 社區(qū)合作,進(jìn)行 OceanBase 到 RocketMQ Connect 的研發(fā)工作,提供 JDBC 和 CDC 兩種模式接入模式,后續(xù)會(huì)在社區(qū)中發(fā)布,歡迎感興趣的同學(xué)試用。
05?總結(jié)
RocketMQ是一個(gè)可靠的數(shù)據(jù)集成組件,具備分布式、伸縮性、故障容錯(cuò)等能力,可以實(shí)現(xiàn)RocketMQ與其他數(shù)據(jù)系統(tǒng)之間的數(shù)據(jù)流入與流出。通過(guò) RocketMQ Connect 可以實(shí)現(xiàn)CDC,構(gòu)建數(shù)據(jù)湖,結(jié)合流計(jì)算可實(shí)現(xiàn)數(shù)據(jù)價(jià)值。