通過 DVT 和 dbt 測試監(jiān)控Airbyte數(shù)據(jù)管道
為數(shù)據(jù)復(fù)制或數(shù)據(jù)遷移構(gòu)建 ELT 數(shù)據(jù)管道的一個(gè)重要部分是能夠在出現(xiàn)錯(cuò)誤時(shí)進(jìn)行監(jiān)視并獲得通知。如果您不知道錯(cuò)誤,您的數(shù)據(jù)將包含不一致之處,并且您的報(bào)告將不準(zhǔn)確。由于使用的工具數(shù)量眾多,大多數(shù)管道的復(fù)雜性使得設(shè)置監(jiān)視和警報(bào)系統(tǒng)更具挑戰(zhàn)性。
在本文中,我將分享為什么為 ELT 數(shù)據(jù)管道設(shè)置監(jiān)視和警報(bào)系統(tǒng)很重要。我解釋了要監(jiān)視的關(guān)鍵指標(biāo),以及設(shè)置監(jiān)視和警報(bào)系統(tǒng)時(shí)將遇到的常見挑戰(zhàn)。我進(jìn)一步強(qiáng)調(diào)了不同的監(jiān)控和警報(bào)工具,并展示了如何使用Google的數(shù)據(jù)驗(yàn)證工具(DVT)和數(shù)據(jù)構(gòu)建工具(dbt)實(shí)現(xiàn)典型的監(jiān)視/警報(bào)系統(tǒng)。
為什么要監(jiān)視數(shù)據(jù)管道?
好吧,問題應(yīng)該是,“為什么不監(jiān)視數(shù)據(jù)管道”?。這是因?yàn)樽屇墓艿雷鳛楹谙蛔舆\(yùn)行對業(yè)務(wù)來說可能非常昂貴。讓我分享一個(gè)個(gè)人故事來解釋這一點(diǎn)。我公司用于數(shù)據(jù)復(fù)制的初始管道是使用 AWS 數(shù)據(jù)遷移服務(wù) (DMS) 設(shè)計(jì)的,將數(shù)據(jù)從 RDS (PostgreSQL) 副本復(fù)制到 S3 存儲桶。然后,我們讓 Snowpipe(Snowflake 的 ELT 工具)從 S3 存儲桶獲取新數(shù)據(jù),并將這些數(shù)據(jù)通過管道傳輸?shù)?Snowflake(我們的數(shù)據(jù)倉庫)。
這種架構(gòu)有效,但它是一個(gè)完整的黑匣子,因?yàn)槲覀兒苌倩蚋究床坏揭嫔w下發(fā)生的事情。沒有適當(dāng)?shù)木瘓?bào)或通知系統(tǒng)來通知我們管道故障。只有當(dāng)我們在一天或更長時(shí)間后看到數(shù)據(jù)不一致或下游分析不準(zhǔn)確時(shí),我們才會知道管道故障。但這對業(yè)務(wù)有何影響?這對我們的影響之一是客戶流失率的提高。由于我們的數(shù)據(jù)到達(dá)較晚且不一致,我們無法及時(shí)檢測到客戶何時(shí)遇到 KYC 驗(yàn)證問題。
以下是應(yīng)監(jiān)視管道的主要原因:
全面鳥瞰數(shù)據(jù)運(yùn)行狀況。
防止數(shù)據(jù)傳輸不一致。
獲取持續(xù)的數(shù)據(jù)測試方法。
及早發(fā)現(xiàn)數(shù)據(jù)質(zhì)量和數(shù)據(jù)完整性問題。
跟蹤數(shù)據(jù)處理成本、元數(shù)據(jù)和整體系統(tǒng)性能。
提供反饋以優(yōu)化管道性能。
應(yīng)監(jiān)視哪些指標(biāo)?
雖然設(shè)置這些監(jiān)視/警報(bào)系統(tǒng)以了解管道非常重要,但確定要衡量的關(guān)鍵指標(biāo)以及要使用哪些工具并不總是一項(xiàng)簡單的任務(wù)。這是因?yàn)橐饬康闹笜?biāo)在很大程度上取決于數(shù)據(jù)管道的用例和其他幾個(gè)因素。
例如,如果其中一個(gè)管道實(shí)時(shí)提供用于跟蹤應(yīng)用程序服務(wù)器停機(jī)時(shí)間的關(guān)鍵數(shù)據(jù),那么您的首要任務(wù)是根據(jù)組織或團(tuán)隊(duì)定義的 SLA 監(jiān)控?cái)?shù)據(jù)到達(dá)的延遲。
以下是要監(jiān)控的不同類別的指標(biāo),適用于您或您的組織可能擁有的任何用例:
數(shù)據(jù)質(zhì)量監(jiān)控
通過數(shù)據(jù)質(zhì)量監(jiān)控,建立一個(gè)監(jiān)控系統(tǒng),以持續(xù)驗(yàn)證管道不同階段的數(shù)據(jù)質(zhì)量。首先,在提取加載 (EL) 步驟中,在加載作業(yè)完成后,根據(jù)目標(biāo)中的數(shù)據(jù)驗(yàn)證源中的數(shù)據(jù)質(zhì)量。此處監(jiān)視的關(guān)鍵指標(biāo)包括源-目標(biāo)記錄計(jì)數(shù)匹配、源-目標(biāo)列計(jì)數(shù)匹配、數(shù)據(jù)格式錯(cuò)誤、數(shù)據(jù)卷錯(cuò)誤、列名更改、引用完整性等。其次,在轉(zhuǎn)換作業(yè)運(yùn)行后的轉(zhuǎn)換步驟中監(jiān)視數(shù)據(jù)的質(zhì)量。此步驟監(jiān)控的關(guān)鍵指標(biāo)包括:數(shù)據(jù)類型錯(cuò)誤、空值等。
管道可靠性監(jiān)測
在這里,監(jiān)控側(cè)重于管道的端到端可靠性。在管道的不同步驟中監(jiān)視錯(cuò)誤:
提取加載 (EL) 步驟:此步驟由 Airbyte 等 ELT 工具處理。此處會出現(xiàn)錯(cuò)誤,例如身份驗(yàn)證問題導(dǎo)致的同步失敗、規(guī)范化錯(cuò)誤、同步期間加載新列 (SCD) 時(shí)出錯(cuò)、JSON 架構(gòu)驗(yàn)證程序錯(cuò)誤等。受到監(jiān)控。
轉(zhuǎn)換步驟:此步驟由 dbt 等轉(zhuǎn)換工具處理。此處會出現(xiàn)轉(zhuǎn)換作業(yè)運(yùn)行失敗、數(shù)據(jù)傳輸延遲(運(yùn)行持續(xù)時(shí)間長于預(yù)期)、數(shù)據(jù)沿襲或數(shù)據(jù)丟失問題等錯(cuò)誤。受到監(jiān)控。
業(yè)務(wù)指標(biāo)監(jiān)控
這種類型的監(jiān)視發(fā)生在管道的轉(zhuǎn)換階段之后。在這里,監(jiān)視轉(zhuǎn)換后的數(shù)據(jù),以根據(jù)特定的業(yè)務(wù)需求識別異常。例如,貨幣價(jià)格貶值或升值、基于市場價(jià)值的交易損失等。當(dāng)這些指標(biāo)達(dá)到特定閾值時(shí),將觸發(fā)警報(bào)。
監(jiān)視數(shù)據(jù)管道有哪些挑戰(zhàn)?
ELT 數(shù)據(jù)管道是使用多種工具組合構(gòu)建的,包括 dbt、Airflow、Airbyte、SQL、云服務(wù)、數(shù)據(jù)庫、數(shù)據(jù)倉庫和數(shù)據(jù)湖。這種工具的多樣性有利于可擴(kuò)展性,以及在數(shù)據(jù)堆棧的每一層使用最有效的工具。但是,這會導(dǎo)致管道中有許多移動(dòng)部件。這可能會使監(jiān)視或全面了解數(shù)據(jù)管道成為一場噩夢。
在設(shè)置管道監(jiān)視和警報(bào)系統(tǒng)之前,我強(qiáng)烈建議先簡化管道中要監(jiān)視的進(jìn)程數(shù)。具有多個(gè)可能故障點(diǎn)的復(fù)雜管道將需要在這些不同層設(shè)置監(jiān)視/警報(bào)系統(tǒng)。這將使事情變得非常難以跟蹤和管理。
為了簡化我上面給出的示例中公司管道的復(fù)雜性,我們首先引入了?Airbyte——一個(gè)開源數(shù)據(jù)攝取工具來處理我們的數(shù)據(jù)復(fù)制。Airbyte 幫助我們減少了管道中可能的故障點(diǎn)數(shù)量。我們沒有首先使用數(shù)據(jù)遷移服務(wù) (DMS) 將數(shù)據(jù)復(fù)制到 S3 存儲桶,而是使用 Airbyte 將數(shù)據(jù)直接復(fù)制到我們的倉庫(Snowflake)。借助此架構(gòu),我們無需在數(shù)據(jù)流的三個(gè)不同級別進(jìn)行監(jiān)控:RDS-DMS 級別、DMS-S3 存儲桶級別和 S3-Snowpipe 級別?,F(xiàn)在,我們只在倉庫級別監(jiān)控我們的管道。
簡化監(jiān)視進(jìn)程的數(shù)量后,讓我們討論數(shù)據(jù)管道監(jiān)視工具以及如何為數(shù)據(jù)管道設(shè)置典型的監(jiān)視/警報(bào)系統(tǒng)。
監(jiān)控管道中的數(shù)據(jù)質(zhì)量指標(biāo)
市場上有很多工具可用于監(jiān)視和觸發(fā)數(shù)據(jù)管道中的警報(bào)。但是,這些工具的功能不同。雖然一些工具專注于監(jiān)控云基礎(chǔ)設(shè)施、日志和應(yīng)用程序安全性,但其他工具則專注于監(jiān)控?cái)?shù)據(jù)質(zhì)量、數(shù)據(jù)驗(yàn)證和數(shù)據(jù)沿襲。此外,一些工具是專有的基于云的解決方案,而另一些則是開源的。
一些專有的基于云的解決方案包括Monte Carlo,Databand,Datadog,Datafold,Accel Data。開源替代方案包括普羅米修斯、洛基、遠(yuǎn)大期望、數(shù)據(jù)驗(yàn)證工具 (DVT)、dbt 測試、Datafold 的數(shù)據(jù)差異等......需要注意的重要一點(diǎn)是,您可能需要組合其中兩個(gè)或多個(gè)工具來實(shí)現(xiàn)您的目標(biāo)。
在下一節(jié)中,我將介紹如何使用兩個(gè)開源工具設(shè)置這些監(jiān)視/警報(bào)系統(tǒng):數(shù)據(jù)驗(yàn)證工具 (DVT) 和數(shù)據(jù)構(gòu)建工具 (dbt)。
使用數(shù)據(jù)驗(yàn)證工具 (DVT) 進(jìn)行數(shù)據(jù)驗(yàn)證監(jiān)控
數(shù)據(jù)驗(yàn)證是在將數(shù)據(jù)用于業(yè)務(wù)運(yùn)營之前檢查數(shù)據(jù)的完整性、準(zhǔn)確性和結(jié)構(gòu)的做法。
數(shù)據(jù)驗(yàn)證是構(gòu)建數(shù)據(jù)管道的關(guān)鍵步驟,因?yàn)樗峁┝嗽趯?shù)據(jù)用于下游分析之前檢查數(shù)據(jù)有效性的層。
數(shù)據(jù)驗(yàn)證工具 (DVT)?是一種開源 Python CLI 工具,可將異構(gòu)數(shù)據(jù)源表與多級驗(yàn)證函數(shù)進(jìn)行比較。在數(shù)據(jù)加載過程完成后,您可以運(yùn)行 DVT 進(jìn)程來驗(yàn)證源表和目標(biāo)表是否匹配且正確。DVT 支持列、行、自定義查詢、架構(gòu)、列數(shù)據(jù)類型驗(yàn)證以及許多數(shù)據(jù)倉庫和數(shù)據(jù)庫的連接。
Datafold 還提供了一個(gè)開源數(shù)據(jù)差異項(xiàng)目,用于有效地比較數(shù)據(jù)庫和數(shù)據(jù)倉庫之間的表。要了解有關(guān)使用 data-diff 的更多信息,請閱讀有關(guān)驗(yàn)證從 Postgres 到 Snowflake 的數(shù)據(jù)復(fù)制管道的教程。
將 DVT 與 BigQuery 結(jié)合使用

您可以在任何云平臺中的虛擬機(jī)上設(shè)置和運(yùn)行 DVT。您還可以選擇在 Docker 容器中運(yùn)行 DVT。按照此處的說明在本地計(jì)算機(jī)或云環(huán)境中安裝和設(shè)置 DVT。本節(jié)中的代碼演練是在 Google Cloud 上的云外殼會話上運(yùn)行的。DVT 提供了一個(gè)命令行界面 (CLI),用于在安裝后執(zhí)行 dvt 命令。
若要根據(jù)目標(biāo)表驗(yàn)證源表,請先創(chuàng)建源連接和目標(biāo)連接。我們通過 CLI 運(yùn)行以下代碼來做到這一點(diǎn)。
# Create MYSQL connection as Source connectiondata-validation connections add--connection-nameMYSQL_CONN MySQL--hostHOST_IP--portPORT--user-nameUSER-NAME--passwordPASSWORD # Create BigQuery connection as target connectiondata-validation connections add--connection-name$BigQuery_CONNBigQuery--project-id$MY_GCP_PROJECT
上面的代碼片段將創(chuàng)建一個(gè) MySQL 連接作為源連接,創(chuàng)建一個(gè) BigQuery 連接作為目標(biāo)連接。
列驗(yàn)證
列驗(yàn)證對源和目標(biāo)都運(yùn)行計(jì)數(shù) (*)。這將計(jì)算源表中的列數(shù),并驗(yàn)證它是否與目標(biāo)表上的計(jì)數(shù)匹配。要運(yùn)行列驗(yàn)證,請通過 CLI 執(zhí)行數(shù)據(jù)驗(yàn)證運(yùn)行命令。以下是 MySQL 源表和 BigQuery 目標(biāo)表之間的列驗(yàn)證的外觀:
data-validation validate column\--source-conn$MYSQL_CONN--target-conn$BigQuery_CONN\--tables-list MYSQL_database.source_table=$YOUR_PROJECT_ID.dataset.target_table \--bq-result-handler$YOUR_PROJECT_ID.bigquery_dataset.validation_result
上面的代碼片段將執(zhí)行源表的列計(jì)數(shù),并根據(jù)目標(biāo)表的列計(jì)數(shù)進(jìn)行驗(yàn)證。--bq-result-handler?標(biāo)志將有助于將驗(yàn)證結(jié)果輸出到中間 BigQuery 表。默認(rèn)情況下,如果沒有 --bq-result-handler?標(biāo)志,驗(yàn)證結(jié)果將輸出到控制臺。
對于啟用了表規(guī)范化的 Airbyte 同步,您需要指定要在列驗(yàn)證中驗(yàn)證的列的名稱。這是為了排除 Airbyte 在同步期間添加的其他元數(shù)據(jù)列。下面的代碼演示如何在驗(yàn)證中指定列:
data-validation validate column\--source-conn$MYSQL_CONN--target-conn$BigQuery_CONN\--tables-list MYSQL_database.source_table=$YOUR_PROJECT_ID.dataset.target_table \--count column1, column2, column3, column4, column5 \--bq-result-handler$YOUR_PROJECT_ID.bigquery_dataset.validation_result
行驗(yàn)證
行驗(yàn)證在源和目標(biāo)上運(yùn)行計(jì)數(shù) (*)。這將計(jì)算源表中的行數(shù),并驗(yàn)證它與目標(biāo)表上的計(jì)數(shù)匹配。以下是 MySQL 源表和 BigQuery 目標(biāo)表之間的行驗(yàn)證的外觀:
data-validation validate row \--source-conn$MYSQL_CONN--target-conn$BigQuery_CONN\--tables-list MYSQL_database.source_table=$YOUR_PROJECT_ID.dataset.target_table--bq-result-handler$YOUR_PROJECT_ID.bigquery_dataset.validation_result \--hash'*'\--primary-keysid\--use-random-row --random-row-batch-size 50
--use-random-row?和 --random-row-batch-size?標(biāo)志指定您只想隨機(jī)驗(yàn)證行的子集。當(dāng)您有大型表時(shí),這會派上用場,因?yàn)樾序?yàn)證需要更多的內(nèi)存和計(jì)算。
架構(gòu)驗(yàn)證
架構(gòu)驗(yàn)證將獲取源表中每一列的列數(shù)據(jù)類型,并驗(yàn)證它是否與目標(biāo)表的列數(shù)據(jù)類型匹配。源表和目標(biāo)表中的類型不匹配會導(dǎo)致驗(yàn)證狀態(tài)失敗。
以下是 MySQL 源表和 BigQuery 目標(biāo)表之間的架構(gòu)驗(yàn)證的外觀:
data-validation validate schema\--source-conn$MYSQL_CONN--target-conn$BigQuery_CONN\--tables-list MYSQL_database.source_table=$YOUR_PROJECT_ID.dataset.target_table \--bq-result-handler$YOUR_PROJECT_ID.bigquery_dataset.validation_result
行比較驗(yàn)證
這種類型的驗(yàn)證對源表和目標(biāo)表中指定列的值執(zhí)行逐行比較。這些值不匹配會導(dǎo)致驗(yàn)證狀態(tài)失敗。
以下是 MySQL 源表和 BigQuery 目標(biāo)表之間的架構(gòu)驗(yàn)證的外觀:
data-validation validate row \--source-conn$MYSQL_CONN--target-conn$BigQuery_CONN\--tables-list MYSQL_database.source_table=$YOUR_PROJECT_ID.dataset.target_table--bq-result-handler$YOUR_PROJECT_ID.bigquery_dataset.validation_result \--hash'*'\--primary-keysid\--use-random-row --random-row-batch-size 50
下面是 BigQuery 表中驗(yàn)證結(jié)果的示例輸出:

名為“difference”的列表示源表中的列/行計(jì)數(shù)與目標(biāo)表中的列/行計(jì)數(shù)之間的差異。validation_status列顯示驗(yàn)證的狀態(tài)。
然后可以查詢此表,并將錯(cuò)誤通知/警報(bào)發(fā)送到電子郵件或 Slack 頻道。
我將在上一節(jié)中介紹向?Slack 頻道發(fā)送通知/警報(bào)。
從 YAML 文件運(yùn)行驗(yàn)證
運(yùn)行驗(yàn)證的另一種方法是將驗(yàn)證配置保存到 YAML 文件。這樣,您可以存儲以前的驗(yàn)證并輕松修改驗(yàn)證配置。此方法還有助于自動(dòng)執(zhí)行驗(yàn)證過程,因?yàn)轵?yàn)證可以按計(jì)劃運(yùn)行。
若要生成用于驗(yàn)證的 YAML 配置文件,請指定 --config-file?標(biāo)志。請參閱下面的代碼:
data-validation validatecolumn\--source-conn $MYSQL_CONN --target-conn $BigQuery_CONN \--tables-list MYSQL_database.source_table=$YOUR_PROJECT_ID.dataset.target_table--bq-result-handler $YOUR_PROJECT_ID.bigquery_dataset.validation_result \--config-file validation_config.yaml
下面是從上述代碼生成的 YAML 配置的外觀。
result_handler:project_id:your-project-idtable_id:data_validation.validation_resultstype:BigQuerysource:MYSQL_CONNtarget:BigQuery_CONNvalidations:-aggregates:-field_alias:countsource_column:nulltarget_column:nulltype:countcalculated_fields:[]filter_status:nullfilters:[]format:tablelabels:[]random_row_batch_size:nullschema_name:transportation_datatable_name:citibike_stationstarget_schema_name:onesphere-analytics.master_datatarget_table_name:citibike_stationsthreshold:0.0type:Columnuse_random_rows:false
生成的 YAML 配置文件可以在執(zhí)行生成驗(yàn)證命令的目錄中找到。
現(xiàn)在,可以使用以下代碼從 YAML 配置文件運(yùn)行驗(yàn)證:
data-validation run-config -c validation_config.yaml
使用數(shù)據(jù)構(gòu)建工具 (dbt) 進(jìn)行數(shù)據(jù)質(zhì)量監(jiān)控
dbt 是一種數(shù)據(jù)轉(zhuǎn)換工具,使數(shù)據(jù)和分析工程師能夠通過簡單地編寫 SQL 語句來轉(zhuǎn)換其倉庫中的數(shù)據(jù)。DBT 處理將這些 SELECT 語句轉(zhuǎn)換為倉庫中的表和視圖。要設(shè)置 dbt 項(xiàng)目,請按照此設(shè)置 dbt 項(xiàng)目指南進(jìn)行操作。
DBT 提供了用于執(zhí)行數(shù)據(jù)質(zhì)量檢查的測試功能,包括數(shù)據(jù)類型檢查、空值檢查、重復(fù)檢查、參照完整性檢查等。讓我們看看如何使用 dbt 測試執(zhí)行數(shù)據(jù)質(zhì)量檢查。dbt 測試定義為單一測試、SQL 文件中的一般測試或具有返回失敗記錄邏輯的 YAML 配置文件。
下面是使用 YAML 配置文件為源表(訂單表)定義測試的示例。
version: 2source:? - name: orderscolumns:? ? ? -name: order_idtests:? ? ? ? ? - unique? ? ? ? ? - not_null? ? ? -name: statustests:? ? ? ? ? -accepted_values:values: ['placed','shipped','completed','returned']? ? ? -name: customer_idtests:? ? ? ? ? -relationships:to: ref('customers')field: id
上面示例中的測試配置首先檢查orders_table order_id列中的重復(fù)值和非空值,然后在狀態(tài)列中檢查接受的值(“已放置”、“已發(fā)貨”、“已完成”、“已退回”)。最后,它檢查customer_id列中的引用完整性,以確保訂單表上的每個(gè)customer_id在客戶表上都有一個(gè)關(guān)聯(lián)的 ID。
若要運(yùn)行測試,請運(yùn)行命令:dbt test --store-failures。
--store-failure?標(biāo)志將測試結(jié)果存儲在中間表中。然后,可以查詢此表以發(fā)送錯(cuò)誤/失敗通知/警報(bào)。未通過測試的記錄保存在數(shù)據(jù)倉庫中后綴為“dbt_test__audit”的架構(gòu)中的結(jié)果表中。
需要注意的是,如上所述,使用 dbt 監(jiān)控?cái)?shù)據(jù)質(zhì)量的方法也適用于業(yè)務(wù)指標(biāo)監(jiān)控。
使用 dbt 進(jìn)行管道可靠性監(jiān)控
DBT 提供用于跟蹤管道錯(cuò)誤或作業(yè)故障的監(jiān)視和警報(bào)系統(tǒng)。通知在 dbt 云上配置,并在作業(yè)運(yùn)行后立即觸發(fā)。通知可以發(fā)送到電子郵件或 Slack 頻道。這是這方面的指南。對于管道中的數(shù)據(jù)提取轉(zhuǎn)換 (EL) 層,Airbyte 提供了一個(gè)可靠的監(jiān)控和警報(bào)系統(tǒng),用于端到端監(jiān)控并將同步失敗/成功通知發(fā)送到 Slack 通道。按照此分步指南為空字節(jié)同步設(shè)置 Slack 通知。
向 Slack 頻道發(fā)送通知
要發(fā)送通知/警報(bào)以跟蹤管道的問題或故障,您將構(gòu)建一個(gè)簡單的 Slack 機(jī)器人,該機(jī)器人可以在無服務(wù)器函數(shù)(例如 AWS Lambda 或 GCP CloudFunction)上運(yùn)行。機(jī)器人是一個(gè)簡單的 Python 腳本,可以計(jì)劃為按時(shí)間間隔運(yùn)行或基于數(shù)據(jù)加載事件運(yùn)行。
機(jī)器人將從上述部分查詢包含數(shù)據(jù)質(zhì)量和數(shù)據(jù)驗(yàn)證測試結(jié)果的任何表,并根據(jù)某些定義的邏輯發(fā)送通知/警報(bào)。
下面的代碼片段實(shí)現(xiàn)了從 Google Cloud Function 運(yùn)行的通知/提醒系統(tǒng),并在源表中的行/列數(shù)與上述數(shù)據(jù)驗(yàn)證示例中的目標(biāo)表不匹配時(shí)將通知/警報(bào)推送到 Slack 渠道。以下是創(chuàng)建 Slack 網(wǎng)絡(luò)鉤子網(wǎng)址的指南。
importpandasaspdimportrequestsfromgoogle.oauth2importservice_account# Credentials from GCP service account saved in a json file.credentials = service_account.Credential.from_service_account_file('./google_credentials.json')defslack_notification_bot(credentials, slack_webhook_url):? ? query ='''select? *
? ? ? ? ? ? from validation_result_table
? ? ? ? ? ? where validation_status = 'fail';'''validation_data = pd.read_gbq(query, project_id='gcp_project_id', credentials= credentials)? ? message ='''Validation Report:\n
? ? Error! Incomplete rows of data loaded'''iflen(validation_data) >0:? ? ? ? requests.post(slack_webhook_url, json={'text': message})else:return'done'defmain():# Run the slack notification botslack_notification_bot(credentials, slack_webhook_url)
結(jié)論
在本文中,你看到了為 ELT 數(shù)據(jù)管道設(shè)置監(jiān)視/警報(bào)系統(tǒng)的必要性。
數(shù)據(jù)質(zhì)量、管道可靠性和業(yè)務(wù)指標(biāo)監(jiān)視是要監(jiān)視管道的關(guān)鍵指標(biāo)。管道復(fù)雜性是數(shù)據(jù)團(tuán)隊(duì)在計(jì)劃為 ELT 管道設(shè)置監(jiān)視/警報(bào)系統(tǒng)時(shí)將面臨的主要挑戰(zhàn)之一。我建議使用像Airbyte這樣的數(shù)據(jù)復(fù)制工具來降低這種復(fù)雜性。然后,我查看了不同的專有數(shù)據(jù)管道監(jiān)視/警報(bào)工具及其開源替代方案。我進(jìn)一步深入研究了如何使用數(shù)據(jù)驗(yàn)證工具 (DVT) 和數(shù)據(jù)構(gòu)建工具 (dbt) 設(shè)置典型的監(jiān)控/警報(bào)系統(tǒng)。最后,我們了解了如何構(gòu)建一個(gè)機(jī)器人來觸發(fā)管道的通知/警報(bào)。