最美情侣中文字幕电影,在线麻豆精品传媒,在线网站高清黄,久久黄色视频

歡迎光臨散文網(wǎng) 會員登陸 & 注冊

一文詳解數(shù)棧FlinkX實時采集原理與使用

2021-04-19 16:59 作者:袋鼠云  | 我要投稿

數(shù)棧是云原生—站式數(shù)據(jù)中臺PaaS,我們在github和gitee上有一個有趣的開源項目:FlinkX,F(xiàn)linkX是一個基于Flink的批流統(tǒng)一的數(shù)據(jù)同步工具,既可以采集靜態(tài)的數(shù)據(jù),也可以采集實時變化的數(shù)據(jù),是全域、異構(gòu)、批流一體的數(shù)據(jù)同步引擎。大家喜歡的話請給我們點個star!star!star!
github開源項目:https://github.com/DTStack/flinkx
gitee開源項目:https://gitee.com/dtstack_dev_0/flinkx


一、FlinkX實時采集功能的基本介紹

首先為大家介紹下FlinkX實時模塊的分類,如下圖所示:

1、實時采集模塊(CDC)

1)MySQL Binlog插件

利用阿里開源的Canal組件實時從MySQL中捕獲變更數(shù)據(jù)。

2)PostgreSQL Wal插件

PostgreSQL 實時采集是基于 PostgreSQL的邏輯復(fù)制以及邏輯解碼功能來完成的。邏輯復(fù)制同步數(shù)據(jù)的原理是,在Wal日志產(chǎn)生的數(shù)據(jù)庫上,由邏輯解析模塊對Wal日志進(jìn)行初步的解析,它的解析結(jié)果為ReorderBufferChange(可以簡單理解為HeapTupleData),再由Pgoutput Plugin對中間結(jié)果進(jìn)行過濾和消息化拼接后,然后將其發(fā)送到訂閱端,訂閱端通過邏輯解碼功能進(jìn)行解析。

2、消息隊列

1)Kafka:Kafka插件存在四個版本,根據(jù)Kafka版本的不同,插件名稱也略有不同。具體對應(yīng)關(guān)系如下表所示:

2)EMQX:EMQX 是一款完全開源,高度可伸縮,高可用的分布式MQTT消息服務(wù)器,適用于IoT、M2M 和移動應(yīng)用程序,可處理千萬級別的并發(fā)客戶端。

3、間隔輪詢

RDB類型插件的使用限制:

  • 只有RDB類型的reader插件支持間隔輪詢

  • 輪詢字段只能為數(shù)值類型或者時間類型

  • 輪詢字段只能為連續(xù)遞增且不重復(fù)

4、其他

Hive插件: Hive插件只有寫入插件,功能基于HDFS的寫入插件實現(xiàn),也就是說從實時采集插件讀取,寫入Hive也支持失敗恢復(fù)的功能。

二、Binlog實時采集原理

1、什么是Binlog

MySQL 的二進(jìn)制日志 Binlog 可以說是 MySQL 最重要的日志,它記錄了所有的 DDL 和 DML 語句(除了數(shù)據(jù)查詢語句Select、Show等),以事件形式記錄,還包含語句所執(zhí)行的消耗的時間,MySQL的二進(jìn)制日志是事務(wù)安全型的,Binlog 的主要目的是復(fù)制和恢復(fù)。

2、Binlog插件基本原理

實時采集插件的核心是如何實時捕獲數(shù)據(jù)庫數(shù)據(jù)的變更,對于MySQL數(shù)據(jù)庫而言,阿里開源的Canal已經(jīng)很好的幫我們實現(xiàn)了基于MySQL數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱和消費功能。因此這里我們直接用Canal捕獲MySQL數(shù)據(jù)庫數(shù)據(jù)的變更信息,基于FlinkX框架將任務(wù)簡化成腳本的配置,基于Flink的Checkpoint機(jī)制提供了任務(wù)的故障恢復(fù),提高了任務(wù)的容錯性。

基本步驟如下:

  • 任務(wù)啟動時啟動Canal線程

  • Canal模擬MySQL slave的交互協(xié)議,偽裝自己為MySQL slave,向MySQL master發(fā)送dump協(xié)議

  • MySQL master收到dump請求,開始推送Binary Log給slave(即Canal)

  • Canal解析Binary Log 對象(原始為Byte流)

  • FlinkX獲取Canal解析后的對象做二次解析,封裝后發(fā)送至下游數(shù)據(jù)源

三、Binlog到Hive實戰(zhàn)

1、環(huán)境準(zhǔn)備:確認(rèn)數(shù)據(jù)庫開啟了Binlog

show variables like '%log_bin%';

2、建表

CREATE TABLE `kudu` ( ? ?`id` bigint(11) NOT NULL AUTO_INCREMENT, ? ?`user_id` bigint(11) DEFAULT NULL, ? ?`name` varchar(255) DEFAULT NULL, ? ?PRIMARY KEY (`id`) ? ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4





?


一文詳解數(shù)棧FlinkX實時采集原理與使用的評論 (共 條)

分享到微博請遵守國家法律
黄浦区| 钦州市| 香格里拉县| 靖宇县| 泽普县| 马边| 锦州市| 德格县| 肃南| 江口县| 深圳市| 萨嘎县| 三穗县| 鄂托克前旗| 南投市| 庆云县| 维西| 新乐市| 正安县| 嵊泗县| 新源县| 青海省| 綦江县| 梨树县| 高阳县| 灯塔市| 玉林市| 韩城市| 泰兴市| 曲阜市| 大关县| 筠连县| 昆明市| 磐安县| 郴州市| 永丰县| 军事| 海淀区| 永州市| 花莲县| 和田市|