使用Logstash同步Mysql到Easysearch
從 Mysql 同步數(shù)據(jù)到 ES 有多種方案,這次我們使用 ELK 技術(shù)棧中的 Logstash 來(lái)將數(shù)據(jù)從 Mysql 同步到 Easysearch 。
方案前提
Mysql 表記錄必須有主鍵,比如 id 字段。通過(guò)該字段,可將 Easysearch 索引數(shù)據(jù)與 Mysql 表數(shù)據(jù)形成一對(duì)一映射關(guān)系,支持修改。
Mysql 表記錄必須有時(shí)間字段,以支持增量同步。
如果上述條件具備,便可使用 logstash 定期同步新寫(xiě)入或修改后的數(shù)據(jù)到 Easysearch 中。
方案演示
版本信息
Mysql: 5.7
Logstash: 7.10.2
Easysearch: 1.5.0
MySQL 設(shè)置
創(chuàng)建演示用的表。
說(shuō)明
id 字段: 主鍵、唯一鍵,將作為 Easysearch 索引中的 doc id 字段。
modification_time 字段: 表記錄的插入和修改都會(huì)記錄在此。
client_name: 代表用戶數(shù)據(jù)。
insertion_time: 可省略,用來(lái)記錄數(shù)據(jù)插入到 Mysql 數(shù)據(jù)的時(shí)間。
插入數(shù)據(jù)
Logstash
配置文件
每 5 秒鐘同步一次 es_table 表的數(shù)據(jù)到 mysql_sync_idx 索引。
每 5 秒統(tǒng)計(jì)一次 es_table 表的記錄條數(shù)到 table_counts 索引,用于監(jiān)控。
啟動(dòng) logstash
./bin/logstash -f sync_es_table.conf
查看同步結(jié)果, 3 條數(shù)據(jù)都已同步到索引。

Mysql 數(shù)據(jù)庫(kù)新增記錄
INSERT INTO es_table (id, client_name) VALUES (4, 'test 4');
Easysearch 確認(rèn)新增

Mysql 數(shù)據(jù)庫(kù)修改記錄
UPDATE es_table SET client_name = 'test 0001' WHERE id=1;
Easysearch 確認(rèn)修改

刪除數(shù)據(jù)
Logstash 無(wú)法直接刪除操作到 ES ,有兩個(gè)方案:
在表中增加 is_deleted 字段,實(shí)現(xiàn)軟刪除,可達(dá)到同步的目的。查詢過(guò)濾掉 is_deleted : true 的記錄,后續(xù)通過(guò)腳本等方式定期清理 is_deleted : true 的數(shù)據(jù)。
執(zhí)行刪除操作的程序,刪除完 Mysql 中的記錄后,繼續(xù)刪除 Easysearch 中的記錄。
同步監(jiān)控
數(shù)據(jù)已經(jīng)在 ES 中了,我們可利用 INFINI Console 的數(shù)據(jù)看板來(lái)監(jiān)控?cái)?shù)據(jù)是否同步,展示表記錄數(shù)、索引記錄數(shù)及其變化。
