通過 DVT 和 dbt 測試監(jiān)控Airbyte數(shù)據(jù)管道
為數(shù)據(jù)復制或數(shù)據(jù)遷移構建 ELT 數(shù)據(jù)管道的一個重要部分是能夠在出現(xiàn)錯誤時進行監(jiān)視并獲得通知。如果您不知道錯誤,您的數(shù)據(jù)將包含不一致之處,并且您的報告將不準確。由于使用的工具數(shù)量眾多,大多數(shù)管道的復雜性使得設置監(jiān)視和警報系統(tǒng)更具挑戰(zhàn)性。
在本文中,我將分享為什么為 ELT 數(shù)據(jù)管道設置監(jiān)視和警報系統(tǒng)很重要。我解釋了要監(jiān)視的關鍵指標,以及設置監(jiān)視和警報系統(tǒng)時將遇到的常見挑戰(zhàn)。我進一步強調了不同的監(jiān)控和警報工具,并展示了如何使用Google的數(shù)據(jù)驗證工具(DVT)和數(shù)據(jù)構建工具(dbt)實現(xiàn)典型的監(jiān)視/警報系統(tǒng)。
為什么要監(jiān)視數(shù)據(jù)管道?
好吧,問題應該是,“為什么不監(jiān)視數(shù)據(jù)管道”?。這是因為讓您的管道作為黑匣子運行對業(yè)務來說可能非常昂貴。讓我分享一個個人故事來解釋這一點。我公司用于數(shù)據(jù)復制的初始管道是使用 AWS 數(shù)據(jù)遷移服務 (DMS) 設計的,將數(shù)據(jù)從 RDS (PostgreSQL) 副本復制到 S3 存儲桶。然后,我們讓 Snowpipe(Snowflake 的 ELT 工具)從 S3 存儲桶獲取新數(shù)據(jù),并將這些數(shù)據(jù)通過管道傳輸?shù)?Snowflake(我們的數(shù)據(jù)倉庫)。
這種架構有效,但它是一個完整的黑匣子,因為我們很少或根本看不到引擎蓋下發(fā)生的事情。沒有適當?shù)木瘓蠡蛲ㄖ到y(tǒng)來通知我們管道故障。只有當我們在一天或更長時間后看到數(shù)據(jù)不一致或下游分析不準確時,我們才會知道管道故障。但這對業(yè)務有何影響?這對我們的影響之一是客戶流失率的提高。由于我們的數(shù)據(jù)到達較晚且不一致,我們無法及時檢測到客戶何時遇到 KYC 驗證問題。
以下是應監(jiān)視管道的主要原因:
全面鳥瞰數(shù)據(jù)運行狀況。
防止數(shù)據(jù)傳輸不一致。
獲取持續(xù)的數(shù)據(jù)測試方法。
及早發(fā)現(xiàn)數(shù)據(jù)質量和數(shù)據(jù)完整性問題。
跟蹤數(shù)據(jù)處理成本、元數(shù)據(jù)和整體系統(tǒng)性能。
提供反饋以優(yōu)化管道性能。
應監(jiān)視哪些指標?
雖然設置這些監(jiān)視/警報系統(tǒng)以了解管道非常重要,但確定要衡量的關鍵指標以及要使用哪些工具并不總是一項簡單的任務。這是因為要衡量的指標在很大程度上取決于數(shù)據(jù)管道的用例和其他幾個因素。
例如,如果其中一個管道實時提供用于跟蹤應用程序服務器停機時間的關鍵數(shù)據(jù),那么您的首要任務是根據(jù)組織或團隊定義的 SLA 監(jiān)控數(shù)據(jù)到達的延遲。
以下是要監(jiān)控的不同類別的指標,適用于您或您的組織可能擁有的任何用例:
數(shù)據(jù)質量監(jiān)控
通過數(shù)據(jù)質量監(jiān)控,建立一個監(jiān)控系統(tǒng),以持續(xù)驗證管道不同階段的數(shù)據(jù)質量。首先,在提取加載 (EL) 步驟中,在加載作業(yè)完成后,根據(jù)目標中的數(shù)據(jù)驗證源中的數(shù)據(jù)質量。此處監(jiān)視的關鍵指標包括源-目標記錄計數(shù)匹配、源-目標列計數(shù)匹配、數(shù)據(jù)格式錯誤、數(shù)據(jù)卷錯誤、列名更改、引用完整性等。其次,在轉換作業(yè)運行后的轉換步驟中監(jiān)視數(shù)據(jù)的質量。此步驟監(jiān)控的關鍵指標包括:數(shù)據(jù)類型錯誤、空值等。
管道可靠性監(jiān)測
在這里,監(jiān)控側重于管道的端到端可靠性。在管道的不同步驟中監(jiān)視錯誤:
提取加載 (EL) 步驟:此步驟由 Airbyte 等 ELT 工具處理。此處會出現(xiàn)錯誤,例如身份驗證問題導致的同步失敗、規(guī)范化錯誤、同步期間加載新列 (SCD) 時出錯、JSON 架構驗證程序錯誤等。受到監(jiān)控。
轉換步驟:此步驟由 dbt 等轉換工具處理。此處會出現(xiàn)轉換作業(yè)運行失敗、數(shù)據(jù)傳輸延遲(運行持續(xù)時間長于預期)、數(shù)據(jù)沿襲或數(shù)據(jù)丟失問題等錯誤。受到監(jiān)控。
業(yè)務指標監(jiān)控
這種類型的監(jiān)視發(fā)生在管道的轉換階段之后。在這里,監(jiān)視轉換后的數(shù)據(jù),以根據(jù)特定的業(yè)務需求識別異常。例如,貨幣價格貶值或升值、基于市場價值的交易損失等。當這些指標達到特定閾值時,將觸發(fā)警報。
監(jiān)視數(shù)據(jù)管道有哪些挑戰(zhàn)?
ELT 數(shù)據(jù)管道是使用多種工具組合構建的,包括 dbt、Airflow、Airbyte、SQL、云服務、數(shù)據(jù)庫、數(shù)據(jù)倉庫和數(shù)據(jù)湖。這種工具的多樣性有利于可擴展性,以及在數(shù)據(jù)堆棧的每一層使用最有效的工具。但是,這會導致管道中有許多移動部件。這可能會使監(jiān)視或全面了解數(shù)據(jù)管道成為一場噩夢。
在設置管道監(jiān)視和警報系統(tǒng)之前,我強烈建議先簡化管道中要監(jiān)視的進程數(shù)。具有多個可能故障點的復雜管道將需要在這些不同層設置監(jiān)視/警報系統(tǒng)。這將使事情變得非常難以跟蹤和管理。
為了簡化我上面給出的示例中公司管道的復雜性,我們首先引入了?Airbyte——一個開源數(shù)據(jù)攝取工具來處理我們的數(shù)據(jù)復制。Airbyte 幫助我們減少了管道中可能的故障點數(shù)量。我們沒有首先使用數(shù)據(jù)遷移服務 (DMS) 將數(shù)據(jù)復制到 S3 存儲桶,而是使用 Airbyte 將數(shù)據(jù)直接復制到我們的倉庫(Snowflake)。借助此架構,我們無需在數(shù)據(jù)流的三個不同級別進行監(jiān)控:RDS-DMS 級別、DMS-S3 存儲桶級別和 S3-Snowpipe 級別?,F(xiàn)在,我們只在倉庫級別監(jiān)控我們的管道。
簡化監(jiān)視進程的數(shù)量后,讓我們討論數(shù)據(jù)管道監(jiān)視工具以及如何為數(shù)據(jù)管道設置典型的監(jiān)視/警報系統(tǒng)。
監(jiān)控管道中的數(shù)據(jù)質量指標
市場上有很多工具可用于監(jiān)視和觸發(fā)數(shù)據(jù)管道中的警報。但是,這些工具的功能不同。雖然一些工具專注于監(jiān)控云基礎設施、日志和應用程序安全性,但其他工具則專注于監(jiān)控數(shù)據(jù)質量、數(shù)據(jù)驗證和數(shù)據(jù)沿襲。此外,一些工具是專有的基于云的解決方案,而另一些則是開源的。
一些專有的基于云的解決方案包括Monte Carlo,Databand,Datadog,Datafold,Accel Data。開源替代方案包括普羅米修斯、洛基、遠大期望、數(shù)據(jù)驗證工具 (DVT)、dbt 測試、Datafold 的數(shù)據(jù)差異等......需要注意的重要一點是,您可能需要組合其中兩個或多個工具來實現(xiàn)您的目標。
在下一節(jié)中,我將介紹如何使用兩個開源工具設置這些監(jiān)視/警報系統(tǒng):數(shù)據(jù)驗證工具 (DVT) 和數(shù)據(jù)構建工具 (dbt)。
使用數(shù)據(jù)驗證工具 (DVT) 進行數(shù)據(jù)驗證監(jiān)控
數(shù)據(jù)驗證是在將數(shù)據(jù)用于業(yè)務運營之前檢查數(shù)據(jù)的完整性、準確性和結構的做法。
數(shù)據(jù)驗證是構建數(shù)據(jù)管道的關鍵步驟,因為它提供了在將數(shù)據(jù)用于下游分析之前檢查數(shù)據(jù)有效性的層。
數(shù)據(jù)驗證工具 (DVT)?是一種開源 Python CLI 工具,可將異構數(shù)據(jù)源表與多級驗證函數(shù)進行比較。在數(shù)據(jù)加載過程完成后,您可以運行 DVT 進程來驗證源表和目標表是否匹配且正確。DVT 支持列、行、自定義查詢、架構、列數(shù)據(jù)類型驗證以及許多數(shù)據(jù)倉庫和數(shù)據(jù)庫的連接。
Datafold 還提供了一個開源數(shù)據(jù)差異項目,用于有效地比較數(shù)據(jù)庫和數(shù)據(jù)倉庫之間的表。要了解有關使用 data-diff 的更多信息,請閱讀有關驗證從 Postgres 到 Snowflake 的數(shù)據(jù)復制管道的教程。
將 DVT 與 BigQuery 結合使用

您可以在任何云平臺中的虛擬機上設置和運行 DVT。您還可以選擇在 Docker 容器中運行 DVT。按照此處的說明在本地計算機或云環(huán)境中安裝和設置 DVT。本節(jié)中的代碼演練是在 Google Cloud 上的云外殼會話上運行的。DVT 提供了一個命令行界面 (CLI),用于在安裝后執(zhí)行 dvt 命令。
若要根據(jù)目標表驗證源表,請先創(chuàng)建源連接和目標連接。我們通過 CLI 運行以下代碼來做到這一點。
# Create MYSQL connection as Source connectiondata-validation connections add--connection-nameMYSQL_CONN MySQL--hostHOST_IP--portPORT--user-nameUSER-NAME--passwordPASSWORD # Create BigQuery connection as target connectiondata-validation connections add--connection-name$BigQuery_CONNBigQuery--project-id$MY_GCP_PROJECT
上面的代碼片段將創(chuàng)建一個 MySQL 連接作為源連接,創(chuàng)建一個 BigQuery 連接作為目標連接。
列驗證
列驗證對源和目標都運行計數(shù) (*)。這將計算源表中的列數(shù),并驗證它是否與目標表上的計數(shù)匹配。要運行列驗證,請通過 CLI 執(zhí)行數(shù)據(jù)驗證運行命令。以下是 MySQL 源表和 BigQuery 目標表之間的列驗證的外觀:
data-validation validate column\--source-conn$MYSQL_CONN--target-conn$BigQuery_CONN\--tables-list MYSQL_database.source_table=$YOUR_PROJECT_ID.dataset.target_table \--bq-result-handler$YOUR_PROJECT_ID.bigquery_dataset.validation_result
上面的代碼片段將執(zhí)行源表的列計數(shù),并根據(jù)目標表的列計數(shù)進行驗證。--bq-result-handler?標志將有助于將驗證結果輸出到中間 BigQuery 表。默認情況下,如果沒有 --bq-result-handler?標志,驗證結果將輸出到控制臺。
對于啟用了表規(guī)范化的 Airbyte 同步,您需要指定要在列驗證中驗證的列的名稱。這是為了排除 Airbyte 在同步期間添加的其他元數(shù)據(jù)列。下面的代碼演示如何在驗證中指定列:
data-validation validate column\--source-conn$MYSQL_CONN--target-conn$BigQuery_CONN\--tables-list MYSQL_database.source_table=$YOUR_PROJECT_ID.dataset.target_table \--count column1, column2, column3, column4, column5 \--bq-result-handler$YOUR_PROJECT_ID.bigquery_dataset.validation_result
行驗證
行驗證在源和目標上運行計數(shù) (*)。這將計算源表中的行數(shù),并驗證它與目標表上的計數(shù)匹配。以下是 MySQL 源表和 BigQuery 目標表之間的行驗證的外觀:
data-validation validate row \--source-conn$MYSQL_CONN--target-conn$BigQuery_CONN\--tables-list MYSQL_database.source_table=$YOUR_PROJECT_ID.dataset.target_table--bq-result-handler$YOUR_PROJECT_ID.bigquery_dataset.validation_result \--hash'*'\--primary-keysid\--use-random-row --random-row-batch-size 50
--use-random-row?和 --random-row-batch-size?標志指定您只想隨機驗證行的子集。當您有大型表時,這會派上用場,因為行驗證需要更多的內(nèi)存和計算。
架構驗證
架構驗證將獲取源表中每一列的列數(shù)據(jù)類型,并驗證它是否與目標表的列數(shù)據(jù)類型匹配。源表和目標表中的類型不匹配會導致驗證狀態(tài)失敗。
以下是 MySQL 源表和 BigQuery 目標表之間的架構驗證的外觀:
data-validation validate schema\--source-conn$MYSQL_CONN--target-conn$BigQuery_CONN\--tables-list MYSQL_database.source_table=$YOUR_PROJECT_ID.dataset.target_table \--bq-result-handler$YOUR_PROJECT_ID.bigquery_dataset.validation_result
行比較驗證
這種類型的驗證對源表和目標表中指定列的值執(zhí)行逐行比較。這些值不匹配會導致驗證狀態(tài)失敗。
以下是 MySQL 源表和 BigQuery 目標表之間的架構驗證的外觀:
data-validation validate row \--source-conn$MYSQL_CONN--target-conn$BigQuery_CONN\--tables-list MYSQL_database.source_table=$YOUR_PROJECT_ID.dataset.target_table--bq-result-handler$YOUR_PROJECT_ID.bigquery_dataset.validation_result \--hash'*'\--primary-keysid\--use-random-row --random-row-batch-size 50
下面是 BigQuery 表中驗證結果的示例輸出:

名為“difference”的列表示源表中的列/行計數(shù)與目標表中的列/行計數(shù)之間的差異。validation_status列顯示驗證的狀態(tài)。
然后可以查詢此表,并將錯誤通知/警報發(fā)送到電子郵件或 Slack 頻道。
我將在上一節(jié)中介紹向?Slack 頻道發(fā)送通知/警報。
從 YAML 文件運行驗證
運行驗證的另一種方法是將驗證配置保存到 YAML 文件。這樣,您可以存儲以前的驗證并輕松修改驗證配置。此方法還有助于自動執(zhí)行驗證過程,因為驗證可以按計劃運行。
若要生成用于驗證的 YAML 配置文件,請指定 --config-file?標志。請參閱下面的代碼:
data-validation validatecolumn\--source-conn $MYSQL_CONN --target-conn $BigQuery_CONN \--tables-list MYSQL_database.source_table=$YOUR_PROJECT_ID.dataset.target_table--bq-result-handler $YOUR_PROJECT_ID.bigquery_dataset.validation_result \--config-file validation_config.yaml
下面是從上述代碼生成的 YAML 配置的外觀。
result_handler:project_id:your-project-idtable_id:data_validation.validation_resultstype:BigQuerysource:MYSQL_CONNtarget:BigQuery_CONNvalidations:-aggregates:-field_alias:countsource_column:nulltarget_column:nulltype:countcalculated_fields:[]filter_status:nullfilters:[]format:tablelabels:[]random_row_batch_size:nullschema_name:transportation_datatable_name:citibike_stationstarget_schema_name:onesphere-analytics.master_datatarget_table_name:citibike_stationsthreshold:0.0type:Columnuse_random_rows:false
生成的 YAML 配置文件可以在執(zhí)行生成驗證命令的目錄中找到。
現(xiàn)在,可以使用以下代碼從 YAML 配置文件運行驗證:
data-validation run-config -c validation_config.yaml
使用數(shù)據(jù)構建工具 (dbt) 進行數(shù)據(jù)質量監(jiān)控
dbt 是一種數(shù)據(jù)轉換工具,使數(shù)據(jù)和分析工程師能夠通過簡單地編寫 SQL 語句來轉換其倉庫中的數(shù)據(jù)。DBT 處理將這些 SELECT 語句轉換為倉庫中的表和視圖。要設置 dbt 項目,請按照此設置 dbt 項目指南進行操作。
DBT 提供了用于執(zhí)行數(shù)據(jù)質量檢查的測試功能,包括數(shù)據(jù)類型檢查、空值檢查、重復檢查、參照完整性檢查等。讓我們看看如何使用 dbt 測試執(zhí)行數(shù)據(jù)質量檢查。dbt 測試定義為單一測試、SQL 文件中的一般測試或具有返回失敗記錄邏輯的 YAML 配置文件。
下面是使用 YAML 配置文件為源表(訂單表)定義測試的示例。
version: 2source:? - name: orderscolumns:? ? ? -name: order_idtests:? ? ? ? ? - unique? ? ? ? ? - not_null? ? ? -name: statustests:? ? ? ? ? -accepted_values:values: ['placed','shipped','completed','returned']? ? ? -name: customer_idtests:? ? ? ? ? -relationships:to: ref('customers')field: id
上面示例中的測試配置首先檢查orders_table order_id列中的重復值和非空值,然后在狀態(tài)列中檢查接受的值(“已放置”、“已發(fā)貨”、“已完成”、“已退回”)。最后,它檢查customer_id列中的引用完整性,以確保訂單表上的每個customer_id在客戶表上都有一個關聯(lián)的 ID。
若要運行測試,請運行命令:dbt test --store-failures。
--store-failure?標志將測試結果存儲在中間表中。然后,可以查詢此表以發(fā)送錯誤/失敗通知/警報。未通過測試的記錄保存在數(shù)據(jù)倉庫中后綴為“dbt_test__audit”的架構中的結果表中。
需要注意的是,如上所述,使用 dbt 監(jiān)控數(shù)據(jù)質量的方法也適用于業(yè)務指標監(jiān)控。
使用 dbt 進行管道可靠性監(jiān)控
DBT 提供用于跟蹤管道錯誤或作業(yè)故障的監(jiān)視和警報系統(tǒng)。通知在 dbt 云上配置,并在作業(yè)運行后立即觸發(fā)。通知可以發(fā)送到電子郵件或 Slack 頻道。這是這方面的指南。對于管道中的數(shù)據(jù)提取轉換 (EL) 層,Airbyte 提供了一個可靠的監(jiān)控和警報系統(tǒng),用于端到端監(jiān)控并將同步失敗/成功通知發(fā)送到 Slack 通道。按照此分步指南為空字節(jié)同步設置 Slack 通知。
向 Slack 頻道發(fā)送通知
要發(fā)送通知/警報以跟蹤管道的問題或故障,您將構建一個簡單的 Slack 機器人,該機器人可以在無服務器函數(shù)(例如 AWS Lambda 或 GCP CloudFunction)上運行。機器人是一個簡單的 Python 腳本,可以計劃為按時間間隔運行或基于數(shù)據(jù)加載事件運行。
機器人將從上述部分查詢包含數(shù)據(jù)質量和數(shù)據(jù)驗證測試結果的任何表,并根據(jù)某些定義的邏輯發(fā)送通知/警報。
下面的代碼片段實現(xiàn)了從 Google Cloud Function 運行的通知/提醒系統(tǒng),并在源表中的行/列數(shù)與上述數(shù)據(jù)驗證示例中的目標表不匹配時將通知/警報推送到 Slack 渠道。以下是創(chuàng)建 Slack 網(wǎng)絡鉤子網(wǎng)址的指南。
importpandasaspdimportrequestsfromgoogle.oauth2importservice_account# Credentials from GCP service account saved in a json file.credentials = service_account.Credential.from_service_account_file('./google_credentials.json')defslack_notification_bot(credentials, slack_webhook_url):? ? query ='''select? *
? ? ? ? ? ? from validation_result_table
? ? ? ? ? ? where validation_status = 'fail';'''validation_data = pd.read_gbq(query, project_id='gcp_project_id', credentials= credentials)? ? message ='''Validation Report:\n
? ? Error! Incomplete rows of data loaded'''iflen(validation_data) >0:? ? ? ? requests.post(slack_webhook_url, json={'text': message})else:return'done'defmain():# Run the slack notification botslack_notification_bot(credentials, slack_webhook_url)
結論
在本文中,你看到了為 ELT 數(shù)據(jù)管道設置監(jiān)視/警報系統(tǒng)的必要性。
數(shù)據(jù)質量、管道可靠性和業(yè)務指標監(jiān)視是要監(jiān)視管道的關鍵指標。管道復雜性是數(shù)據(jù)團隊在計劃為 ELT 管道設置監(jiān)視/警報系統(tǒng)時將面臨的主要挑戰(zhàn)之一。我建議使用像Airbyte這樣的數(shù)據(jù)復制工具來降低這種復雜性。然后,我查看了不同的專有數(shù)據(jù)管道監(jiān)視/警報工具及其開源替代方案。我進一步深入研究了如何使用數(shù)據(jù)驗證工具 (DVT) 和數(shù)據(jù)構建工具 (dbt) 設置典型的監(jiān)控/警報系統(tǒng)。最后,我們了解了如何構建一個機器人來觸發(fā)管道的通知/警報。