任務運維和數(shù)據(jù)指標相關(guān)的使用
本文作者:劉星(花名:吹雪),袋鼠云大數(shù)據(jù)開發(fā)工程師。
本文首發(fā)于gzh:數(shù)棧研習社
我們在github上有一個flinkx的開源項目,歡迎大家來討論交流~
https://github.com/DTStack/flinkx
一、實時開發(fā)常見問題
1、一個實時計算任務該分配多少資源?
建議:一些簡單ETL任務,并且源數(shù)據(jù)流量在一定范圍內(nèi), tm個數(shù)1、全局并行度1、內(nèi)存1G。
分析:
全局并行度為1,對于簡單ETL任務會有operator chain,在一個task(線程)中運行、減少線程切換、減少消息序列化/反序列化等,該類問題的瓶頸一般在下游寫入端。
寫入端是瓶頸:一般建議開啟批量寫入(需要控制批量大小,防止內(nèi)存溢出)、開啟多并行度寫入。
如果是單臺數(shù)據(jù)庫的瓶頸:開啟多個并行度就沒法提升性能、一般建議按照一定路由規(guī)則寫入多臺數(shù)據(jù)庫、建議使用分布式數(shù)據(jù)庫(如Hbase:提前建立分區(qū)、避免數(shù)據(jù)熱點寫入等)。
2、為什么寫入Kafka結(jié)果中有些分區(qū)沒有數(shù)據(jù)?
建議:如果現(xiàn)有topic已經(jīng)存在,并且是多個分區(qū),結(jié)果表并行度設置partition數(shù)一樣。
分析:
由于Flink寫Kafka默認采用的是FixedPartitioner。如果并行度比partition大,則數(shù)據(jù)都會發(fā)送到partition中,但是如果并行度比partition小,則有部分分區(qū)是沒有數(shù)據(jù)的。
source端,如果并行度小于partition,會取模的方式分給并行度,都會消費到數(shù)據(jù)。如果并行度大于partition,則會有部分task消費不到數(shù)據(jù)。
3、為什么和維表關(guān)聯(lián)后任務處理數(shù)據(jù)的能力變慢?
建議:小數(shù)據(jù)量不常更新的維表使用ALL模式。大數(shù)據(jù)量的維表使用使用LRU模式,并且根據(jù)數(shù)據(jù)庫不同做相應的處理(比如關(guān)系型數(shù)據(jù)庫則建立索引等)。
分析:1.ALL模式啟動時候直接將數(shù)據(jù)全量加載到內(nèi)存中,每次關(guān)聯(lián)數(shù)據(jù)不需要查庫,沒有其他開銷。2.異步(async)查詢模式
LRU異步查詢數(shù)據(jù)庫,可以并發(fā)地處理多個請求。
根據(jù)SQL中的關(guān)聯(lián)字段順序建立復合索引。
防止關(guān)聯(lián)字段索引失效(關(guān)聯(lián)順序不對、關(guān)聯(lián)列做計算等)。
如果維表字段個數(shù)少,考慮將將多余字段都加入到索引中,減少回表(帶來的問題是索引變大)。
4、為什么某些任務提高并行度能提升性能,某些不能?
建議:查看是否數(shù)據(jù)傾斜,如果是將數(shù)據(jù)打散。
分析:
源頭是否數(shù)據(jù)傾斜。
SQL中是否存在導致傾斜的語句。
登陸到Flink web頁面查看。
通過修改SQL解決或者打散groupby字段。
二、實時任務運維
1、配置反壓告警
場景:反壓導致cp失敗,數(shù)據(jù)出現(xiàn)延遲或者不產(chǎn)出。
排查方法:
1)借助Flink web-ui 提供的的反壓功能查找具體的operatorChain。
2)查詢Flink metric 'inPoolUsage、outPoolUsage' 來確定具體的反壓算子。
2、配置cp失敗告警
場景:cp失敗導致數(shù)據(jù)無法真正落地,任務恢復間隔太長。
排查方法:
1)是否存在反壓。
2)檢查集群負載、IO、CPU、MEM 是否處于高負荷狀態(tài)。
3、拆分實時任務日志
場景: Flink實時任務運行時間長之后導致日志占用磁盤大,另外一個大的日志文件不利于排查問題。
解決方法:
配置log4j.log的滾動參數(shù),設置日志按日期或者大小滾動生產(chǎn),并且限制保留的大小。
4、監(jiān)控任務運行中tm日志
場景: 任務執(zhí)行中產(chǎn)生的運行日志沒有監(jiān)控,比如網(wǎng)絡抖動導致的鏈接失敗等等。
解決方法:
修改Flink自帶的log4j jar包中的代碼,將異常日志重定向一份到Kafka或ES中,進行后續(xù)分析,找到程序中可能存在的隱藏bug。
5、臟數(shù)據(jù)管理
場景:由于數(shù)據(jù)源都是從Kafka過來的數(shù)據(jù),可能存在數(shù)據(jù)類型錯誤、字段名稱錯誤、字段閾值在Flink中超范圍等。落庫過程中,由于字段類型不匹配、閾值超范圍等等情況。
解決方法:
在數(shù)據(jù)解析和數(shù)據(jù)落庫等代碼中,對catch中的數(shù)據(jù)進行收集。當異常數(shù)據(jù)達到一定的量時,告警通知。線下離線修正結(jié)果數(shù)據(jù)。
三、通過Metrics定位問題
1.常用內(nèi)置Metrics介紹
端到端的延時(最大、平均、百分位):
flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency
輸入數(shù)據(jù)量:
flink_taskmanager_job_task_operator_numRecordsIn
flink_taskmanager_job_task_numBytesIn
輸出數(shù)據(jù)量:
flink_taskmanager_job_task_operator_numRecordsOut
flink_taskmanager_job_task_numBytesOut
反壓值:
flink_taskmanager_job_task_isBackPressured
任務buffer:
inPoolUsage、outPoolUsage等其他
2、flinkStreamSql中常用metrics
業(yè)務延遲:
flink_taskmanager_job_task_operator_dtEventDelay(單位s)
數(shù)據(jù)本身的時間和進入flink的當前時間的差值。
各個輸入源的臟數(shù)據(jù):
flink_taskmanager_job_task_operator_dtDirtyData
從Kafka獲取的數(shù)據(jù)解析失敗視為臟數(shù)據(jù)。
各Source的數(shù)據(jù)輸入TPS:
flink_taskmanager_job_task_operator_dtNumRecordsInRate
Kafka接受的記錄數(shù)(未解析前)/s。
各Source的數(shù)據(jù)輸入RPS:
flink_taskmanager_job_task_operator_dtNumRecordsInResolveRate
Kafka接受的記錄數(shù)(未解析前)/s。
各Source的數(shù)據(jù)輸入BPS:
flink_taskmanager_job_task_operator_dtNumBytestInRate
Kafka接受的字節(jié)數(shù)/s。
Kafka作為輸入源的各個分區(qū)的延遲數(shù):
flink_taskmanager_job_task_operator_topic_partition_dtTopicPartitionLag
當前Kafka10、Kafka11有采集該指標。
各個輸入源RPS:
fink_taskmanager_job_task_operator_dtNumRecordsOutRate
寫入的外部記錄數(shù)/s。
四、FlinkStreamSQL v1.11.1介紹
1.DDL建表語句和FlinkStreamSql v1.10之前版本保持一致。
2.DML語句有兩種不同的模式:
dtstack模式:和之前的版本是一致的。
Flink模式:和Flink原生的語法保持一致。
3.主要區(qū)別點:和維表join方式不同。
4.如何使用:在提交任務的時候加上 -planner dtstack/flink即可。