一文詳解數(shù)棧FlinkX實時采集原理與使用
數(shù)棧是云原生—站式數(shù)據(jù)中臺PaaS,我們在github和gitee上有一個有趣的開源項目:FlinkX,F(xiàn)linkX是一個基于Flink的批流統(tǒng)一的數(shù)據(jù)同步工具,既可以采集靜態(tài)的數(shù)據(jù),也可以采集實時變化的數(shù)據(jù),是全域、異構(gòu)、批流一體的數(shù)據(jù)同步引擎。大家喜歡的話請給我們點個star!star!star!
github開源項目:https://github.com/DTStack/flinkx
gitee開源項目:https://gitee.com/dtstack_dev_0/flinkx
一、FlinkX實時采集功能的基本介紹
首先為大家介紹下FlinkX實時模塊的分類,如下圖所示:

1、實時采集模塊(CDC)
1)MySQL Binlog插件
利用阿里開源的Canal組件實時從MySQL中捕獲變更數(shù)據(jù)。
2)PostgreSQL Wal插件
PostgreSQL 實時采集是基于 PostgreSQL的邏輯復(fù)制以及邏輯解碼功能來完成的。邏輯復(fù)制同步數(shù)據(jù)的原理是,在Wal日志產(chǎn)生的數(shù)據(jù)庫上,由邏輯解析模塊對Wal日志進(jìn)行初步的解析,它的解析結(jié)果為ReorderBufferChange(可以簡單理解為HeapTupleData),再由Pgoutput Plugin對中間結(jié)果進(jìn)行過濾和消息化拼接后,然后將其發(fā)送到訂閱端,訂閱端通過邏輯解碼功能進(jìn)行解析。
2、消息隊列
1)Kafka:Kafka插件存在四個版本,根據(jù)Kafka版本的不同,插件名稱也略有不同。具體對應(yīng)關(guān)系如下表所示:

2)EMQX:EMQX 是一款完全開源,高度可伸縮,高可用的分布式MQTT消息服務(wù)器,適用于IoT、M2M 和移動應(yīng)用程序,可處理千萬級別的并發(fā)客戶端。
3、間隔輪詢
RDB類型插件的使用限制:
只有RDB類型的reader插件支持間隔輪詢
輪詢字段只能為數(shù)值類型或者時間類型
輪詢字段只能為連續(xù)遞增且不重復(fù)
4、其他
Hive插件: Hive插件只有寫入插件,功能基于HDFS的寫入插件實現(xiàn),也就是說從實時采集插件讀取,寫入Hive也支持失敗恢復(fù)的功能。

二、Binlog實時采集原理
1、什么是Binlog
MySQL 的二進(jìn)制日志 Binlog 可以說是 MySQL 最重要的日志,它記錄了所有的 DDL 和 DML 語句(除了數(shù)據(jù)查詢語句Select、Show等),以事件形式記錄,還包含語句所執(zhí)行的消耗的時間,MySQL的二進(jìn)制日志是事務(wù)安全型的,Binlog 的主要目的是復(fù)制和恢復(fù)。
2、Binlog插件基本原理
實時采集插件的核心是如何實時捕獲數(shù)據(jù)庫數(shù)據(jù)的變更,對于MySQL數(shù)據(jù)庫而言,阿里開源的Canal已經(jīng)很好的幫我們實現(xiàn)了基于MySQL數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱和消費功能。因此這里我們直接用Canal捕獲MySQL數(shù)據(jù)庫數(shù)據(jù)的變更信息,基于FlinkX框架將任務(wù)簡化成腳本的配置,基于Flink的Checkpoint機(jī)制提供了任務(wù)的故障恢復(fù),提高了任務(wù)的容錯性。
基本步驟如下:
任務(wù)啟動時啟動Canal線程
Canal模擬MySQL slave的交互協(xié)議,偽裝自己為MySQL slave,向MySQL master發(fā)送dump協(xié)議
MySQL master收到dump請求,開始推送Binary Log給slave(即Canal)
Canal解析Binary Log 對象(原始為Byte流)
FlinkX獲取Canal解析后的對象做二次解析,封裝后發(fā)送至下游數(shù)據(jù)源
三、Binlog到Hive實戰(zhàn)
1、環(huán)境準(zhǔn)備:確認(rèn)數(shù)據(jù)庫開啟了Binlog
show variables like '%log_bin%';

2、建表
CREATE TABLE `kudu` ( ?
?`id` bigint(11) NOT NULL AUTO_INCREMENT, ?
?`user_id` bigint(11) DEFAULT NULL, ?
?`name` varchar(255) DEFAULT NULL, ?
?PRIMARY KEY (`id`) ?
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4
?