最美情侣中文字幕电影,在线麻豆精品传媒,在线网站高清黄,久久黄色视频

歡迎光臨散文網(wǎng) 會員登陸 & 注冊

云原生丨一文教你基于Debezium與Kafka構(gòu)建數(shù)據(jù)同步遷移(建議收藏)

2023-02-24 20:00 作者:神州數(shù)碼云基地  | 我要投稿

前言

在項目中,我們遇到已有數(shù)據(jù)庫現(xiàn)存有大量數(shù)據(jù),但需要將全部現(xiàn)存數(shù)據(jù)同步遷移到新的數(shù)據(jù)庫中,我們應(yīng)該如何處理呢?

本期我們就基于Debezium與Kafka構(gòu)建數(shù)據(jù)同步。


一、安裝部署

1.1 Debezium架構(gòu)

Debezium 是一個基于不同數(shù)據(jù)庫中提供的變更數(shù)據(jù)捕獲功能(例如,PostgreSQL中的邏輯解碼)構(gòu)建的分布式平臺。 Debezium是通過Apache Kafka連接部署的。

Kafka Connect是一個用于實現(xiàn)和操作的框架運行時。

源連接器,如Debezium,它將數(shù)據(jù)攝取到Kafka中(在我們的接下來實際的例子中,Debezium將Mysql數(shù)據(jù)攝取到Kafka中);

接收連接器,它將數(shù)據(jù)從Kafka主題寫入到其他到系統(tǒng),這個系統(tǒng)可以有多種,在我們例子中,會將Kafka主題寫入到PostgreSQL數(shù)據(jù)庫中。


1.2 部署示意圖

  • Zookeeper:Zookeeper容器,用于構(gòu)建Kafka環(huán)境;

  • Kafka:Kafka容器,數(shù)據(jù)庫的變更信息以topic的形式保存在kafka中;

  • Kafka-ui:kafka的UI頁面容器,可以直觀的查看kafka中的Brokers,Topics,Consumers等信息;

  • Connect:Debezium的Connect容器,對接Kafka的Connect,通過Source Connector將數(shù)據(jù)同步到Kafka中,通過Sink Connect消費Kafka的topic消息;

  • Debezium Connector:Source Connector插件,以Jar包的形式部署在Connect中,Debezium自帶有MongoDB,MySQL,PostgreSQL,SQL Server,Oracle,Db2連接器;

  • DBC connector:Sink Connector插件,以Jar包的形式部署在Connect中,本次部署安裝的是JDBC連接器,將Kafka上的數(shù)據(jù)同步到數(shù)據(jù)庫中;

  • Debezium-ui:Debezium connect的ui頁面容器。用于創(chuàng)建和顯示Source Connector

  • Source Database:數(shù)據(jù)遷移來源方數(shù)據(jù)庫。本次部署中使用的是MySQL和Postgres(10+版本);

  • Target Database:數(shù)據(jù)庫遷移目標(biāo)數(shù)據(jù)庫。本次部署中使用的是Postgres。


1.3 安裝部署

本次部署需要先安裝Docker。

Debezium使用Docker安裝部署,如下?

docker-compose.yaml


部署命令:


部署完成后,Docker容器列表,如下:

  • Kafka-ui訪問地址:http://localhost:9093

  • Debezium-ui訪問地址:http://localhost:8080

Source Connector和Sink Connector都是以JAR包的方式,存在于Connect容器的/kafka/connect目錄下。

Connect容器自帶有Debezium的官方Source Connector:

  • debezium-connector-db2

  • debezium-connector-mysql

  • debezium-connector-postgres

  • debezium-connector-vitess

  • debezium-connector-mongodb

  • debezium-connector-oracle

  • debezium-connector-sqlserver

需要自行注冊Sink Connector:Kafka-Connect-JDBC(新建Kafka-Connect-JDBC目錄,下載JAR包放入此目錄,重啟Conenct)。

注冊Sink Connector


二、數(shù)據(jù)遷移

數(shù)據(jù)遷移經(jīng)歷以下幾個步驟:

1)啟動源數(shù)據(jù)庫;

2)注冊Source Connector,Source Connector監(jiān)聽Source Database的數(shù)據(jù)變動,發(fā)布數(shù)據(jù)到Kafka的Topic中,一個表對應(yīng)一個Topic,Topic中包含對表中某條記錄的某個操作(新增,修改,刪除等);

3)啟動目標(biāo)數(shù)據(jù)庫;

4)注冊Sink Connector,Sink Connector消費Kafka中的Topic,通過JDBC連接到Target Database,根據(jù)Topic中的信息,對表記錄執(zhí)行對應(yīng)操作。


2.1 Postgres遷移到Postgres


1. 啟動源數(shù)據(jù)庫-Postgres

本次部署通過容器的方式啟動:


2.注冊Source Connecto

通過Debezium UI頁面進行注冊。

需要注意的有以下幾點:

Debezium Postgres類型的Source Connector支持的Postgres需要將wal_level修改為logical;修改Postgres中的Postgresql.conf文件中的配置(wal_level = logical)并重啟Postgres;

Postgres需要支持解碼插件,Debezium官方一共提供了兩個解碼插件:

  • Decoderbufs:Debezium默認配置,由Debezium維護;

  • Pgoutput:Postgres 10+版本自帶;使用此插件時,需要配置plugin.name=pgoutput


3.啟動目標(biāo)數(shù)據(jù)庫-Postgre


4.注冊Sink Connector

通過Connect提供的API進行注冊

新增Connector


5.驗證數(shù)據(jù)遷移過程

源數(shù)據(jù)庫中的表數(shù)據(jù)遷移到Kafka

新建表test_source和test_source1


Kafka新建數(shù)據(jù)前 ?

Kafka新建數(shù)據(jù)后 ?

源數(shù)據(jù)庫中新建表test_source和表test_source1后,Kafka中出現(xiàn)了兩個Topic:

postgres.public.test_source和postgres.public.test_source1,與這兩個表一一對應(yīng),topic中的message對應(yīng)著對表中記錄的操作(新增1條記錄)。

監(jiān)聽的表可通過連接器配置進行過濾,比如配置"table.include.list": “public.test_source”,就只會出現(xiàn)一個Topic:postgres.public.test_source


Kafka中的數(shù)據(jù)遷移到目標(biāo)數(shù)據(jù)庫

注冊Sink Connector后,Kafka中會新增一個Customer,對postgres.public.test_source進行消費(sink connector配置中的"topics": "postgres.public.test_source"指定);

對應(yīng)的源數(shù)據(jù)庫(sink connector配置中的"connection.url": "jdbc:postgresql://10.3.73.160:25432/postgres?user=debe&password=123456"指定)會新增一個表public.test_source,該表中的數(shù)據(jù)和源數(shù)據(jù)庫中的public.test_source始終保持同步。


2.2 MySQL遷移到PostgresSQL

1.啟動源數(shù)據(jù)庫-mysql

本次部署通過docker啟動:


2.注冊Source Connector

啟動MySQL數(shù)據(jù)源連接注冊


注冊MySQL數(shù)據(jù)源有兩種方式:

1、在Debezium UI中直接添加

2、調(diào)用Kafka API 注冊


在Debezium UI中直接添加


選擇MySQL數(shù)據(jù)源

調(diào)用Kafka API注冊

新增Connector


驗證Source Connector注冊結(jié)果

注冊連接前:

注冊連接后:


多出來的Topics信息是MySQL source表信息,連接MySQL數(shù)據(jù)庫可見表:

UI for Apache Kafka中可以看到Messages同步信息。

訪問Debezium UI(http://localhost:8080/ )可以看到MySQL的連接。



3.啟動目標(biāo)數(shù)據(jù)庫-Postgres

本次部署采用Docker方式啟動:


4.注冊Sink Connector (通過API接口)

新增Connector

注冊PostgreSQL connector后,不會在Debezium中顯示Connector client 信息,但可以在UI for Apache Kafka中看到:


5.驗證數(shù)據(jù)遷移過程

完成安裝步驟后,以Customers表為例,做CUD操作語句,實現(xiàn)MySQL數(shù)據(jù)庫同步數(shù)據(jù)到PostgreSQL 。


Mysql 數(shù)據(jù)庫現(xiàn)有數(shù)據(jù):

手動在MySQL數(shù)據(jù)庫Customers表中添加一條數(shù)據(jù) ?

customers.sql

在PostgreSQL數(shù)據(jù)庫中Customers多出一條數(shù)據(jù):

Kafka中Messages新增一條數(shù)據(jù),完成數(shù)據(jù)同步:

可以看到消費如下信息:

topics-customers.json


重要的部分是 “payload” json 中信息:

  • source 中會展示“版本”,“數(shù)據(jù)源”等信息;

  • after 代表變動信息;

  • “op” 操作信息,例如“c” 代表創(chuàng)建;

需要注意的是,結(jié)果的json格式是Debezium定義好的格式。

Debezium json格式通常前面定義Schema信息,最后才是實際的載荷(payload)信息。

詳細格式定義可以查看:https://debezium.io/documentation/reference/1.6/connectors/mysql.html


通過以上步驟,我們在Docker環(huán)境上使用Debezium實現(xiàn)了數(shù)據(jù)同步到kafaka。本期關(guān)于數(shù)據(jù)同步遷移的內(nèi)容就到這里了,建議大家收藏學(xué)習(xí)!~


我們致力于用數(shù)字技術(shù)重構(gòu)企業(yè)價值,助力企業(yè)實現(xiàn)數(shù)字化轉(zhuǎn)型升級!

公眾號 搜索【神州數(shù)碼云基地】,后臺回復(fù)Odoo,加入Odoo技術(shù)交流群!

知乎 搜索【神州數(shù)碼云基地】,收看更多Odoo相關(guān)回答與文章!??

云原生丨一文教你基于Debezium與Kafka構(gòu)建數(shù)據(jù)同步遷移(建議收藏)的評論 (共 條)

分享到微博請遵守國家法律
博罗县| 彩票| 盈江县| 博兴县| 区。| 阿尔山市| 龙江县| 济宁市| 霍州市| 广饶县| 东辽县| 齐齐哈尔市| 融水| 昂仁县| 江陵县| 沐川县| 星子县| 乡城县| 沂水县| 武穴市| 温宿县| 宜兰市| 神木县| 岳西县| 涡阳县| 南京市| 固安县| 麟游县| 绥棱县| 西青区| 元谋县| 乌拉特中旗| 松原市| 历史| 惠水县| 柳林县| 留坝县| 巴彦淖尔市| 扎兰屯市| 精河县| 内黄县|