使用 INFINI Console 實(shí)現(xiàn) Elasticsearch 的增量數(shù)據(jù)遷移
功能介紹?#
在 INFINI Console 1.3.0 版本里,數(shù)據(jù)遷移功能增加了對(duì)增量遷移的支持。這篇文章將會(huì)介紹增量遷移的具體使用方法和實(shí)現(xiàn)原理。
場(chǎng)景介紹?#
以常見的日志場(chǎng)景為例,假設(shè) A 集群有一個(gè)用來記錄線上 HTTP 請(qǐng)求記錄的索引?request-logs
,數(shù)據(jù)結(jié)構(gòu)如下:
{
?"request_body": {...},
?"request_header": {...},
?"method": "POST",
?"request_time": "2023-06-09 12:30:09+800" // 客戶端記錄請(qǐng)求的時(shí)間
?"@timestamp": "2023-06-09 12:30:11+800" // 請(qǐng)求寫入ES的時(shí)間
}
我們希望完整導(dǎo)入這個(gè)索引的數(shù)據(jù)到另外一個(gè)集群 B 的 request-logs。要想確保導(dǎo)入的數(shù)據(jù)完整,我們首先需要考慮數(shù)據(jù)寫入的延遲問題:
A 集群的數(shù)據(jù)寫入可能會(huì)有延遲。日志往往是從不同的節(jié)點(diǎn)收集異步上傳的,考慮到網(wǎng)絡(luò)環(huán)境波動(dòng)等情況,最終日志寫入 Elasticserach 的時(shí)間會(huì)有差異。
寫入 Elasticsearch 的數(shù)據(jù)并不會(huì)立刻對(duì)查詢請(qǐng)求可見,Elasticsearch 會(huì)異步刷新寫入的數(shù)據(jù)。
也就是說,我們假設(shè)每一條請(qǐng)求日志從采集到寫入 ES 到最終可以被查詢的延遲為d
,每次進(jìn)行增量遷移的時(shí)候,我們可以完整遷移的數(shù)據(jù)范圍就是[當(dāng)前時(shí)間 - 上次遷移的時(shí)間 - d, 當(dāng)前時(shí)間 - d)
。只要數(shù)據(jù)寫入的延遲不超過d
,我們就可以從集群 A 查詢到完整的數(shù)據(jù)集寫入集群 B。
集群 A 的數(shù)據(jù)有更新操作??#
在上述的日志場(chǎng)景里,我們通常不會(huì)對(duì)寫入的日志文檔進(jìn)行后續(xù)的更新操作,每一條文檔寫入后都是不可變的,我們只需要篩選@timestamp
字段就可以找到需要遷移的數(shù)據(jù)了,而且每條數(shù)據(jù)只需要遷移一次就可以確保目標(biāo)集群的數(shù)據(jù)一致。
如果源數(shù)據(jù)有更新,那我們應(yīng)該如何進(jìn)行增量遷移呢?通常情況下,每次更新操作我們都會(huì)記錄文檔更新的時(shí)間到update_time
字段,這樣我們就可以使用update_time
字段來進(jìn)行增量數(shù)據(jù)的遷移。
假設(shè)在第一次遷移的時(shí)候,索引 A 存有以下數(shù)據(jù),我們?cè)谶M(jìn)行第一次遷移操作后,數(shù)據(jù)可以完整寫入目標(biāo)索引:

如果在第二次遷移之前,索引 A 中有一條舊的記錄被更新,這個(gè)時(shí)候,遷移流程可以通過update_time
字段檢測(cè)到這一條數(shù)據(jù),復(fù)制并覆蓋目標(biāo)集群的舊數(shù)據(jù)記錄:

可以看到,即使源數(shù)據(jù)有更新,只要我們記錄了每一條數(shù)據(jù)的更新時(shí)間,遷移過程最后寫入集群 B 的數(shù)據(jù)依然是完整且一致的。
集群 A 的數(shù)據(jù)有刪除操作??#
如果源集群的數(shù)據(jù)有刪除操作,基于上述的數(shù)據(jù)遷移邏輯,第二次遷移過程是無法判斷已經(jīng)遷移的數(shù)據(jù)是否被刪除的:

如果我們希望遷移的數(shù)據(jù)完整,需要避免對(duì)源集群的數(shù)據(jù)進(jìn)行刪除操作。我們可以標(biāo)記文檔為deleted
,但是不做實(shí)際刪除操作,這樣我們就可以通過文檔刪除(更新)操作的時(shí)間來進(jìn)行完整的數(shù)據(jù)遷移:

使用 INFINI Console 來進(jìn)行增量數(shù)據(jù)的遷移?#
在 INFINI Console 里,我們可以使用 數(shù)據(jù)工具 - 數(shù)據(jù)遷移 功能來對(duì)數(shù)據(jù)進(jìn)行增量遷移。作為示例,我們新建一個(gè)數(shù)據(jù)遷移任務(wù),把.infini_requests_logging-000002
索引的數(shù)據(jù)遷移到目標(biāo)集群的request
索引。

我們需要遷移到一個(gè)新創(chuàng)建的索引,在Initialize Configuration
步驟,根據(jù)提示配置目標(biāo)索引的mapping
和setting
信息。如果不需要特殊配置,可以使用Auto Optimize
功能自動(dòng)填充。

接下來我們需要配置數(shù)據(jù)寫入的更新字段和寫入的延遲。請(qǐng)求日志通過timestamp
字段記錄請(qǐng)求時(shí)間,通常延遲不會(huì)超過 1 分鐘,我們?cè)?code>Migrate Setting步驟里配置相應(yīng)的Incremental
信息。如果歷史數(shù)據(jù)量比較大或者增量任務(wù)的運(yùn)行間隔較長,我們也可以配置Partition
分區(qū)規(guī)則來拆分任務(wù)為更細(xì)的粒度,避免因單個(gè)任務(wù)長時(shí)間導(dǎo)出數(shù)據(jù)對(duì) ES 產(chǎn)生過高的負(fù)載。

最后,在創(chuàng)建任務(wù)時(shí),我們勾選Detect Incremental Data
,然后設(shè)置任務(wù)每 15 分鐘檢測(cè)一次增量數(shù)據(jù)。

點(diǎn)擊開始后,增量任務(wù)就會(huì)開始運(yùn)行。第一次運(yùn)行時(shí)會(huì)對(duì)歷史數(shù)據(jù)進(jìn)行全量遷移,后續(xù)每 15 分鐘會(huì)自動(dòng)檢測(cè)新數(shù)據(jù)并遷移到目標(biāo)索引。

總結(jié)?#
除了數(shù)據(jù)一致性,遷移功能的設(shè)計(jì)也需要兼顧性能、穩(wěn)定性、ES 版本兼容性等,INFINI Console 提供了一套操作簡便的數(shù)據(jù)遷移解決方案,可以用于各種場(chǎng)景的數(shù)據(jù)遷移需求。