最美情侣中文字幕电影,在线麻豆精品传媒,在线网站高清黄,久久黄色视频

歡迎光臨散文網(wǎng) 會員登陸 & 注冊

面試官:Flink 中水印是什么概念,起到什么作用?

2023-03-30 11:37 作者:程序員四次元ポケット  | 我要投稿

1 為什么要有 Watermark?


當(dāng) flink 以?EventTime?模式處理流數(shù)據(jù)時,它會根據(jù)數(shù)據(jù)里的時間戳來處理基于時間的算子。但是由于網(wǎng)絡(luò)、分布式等原因,會導(dǎo)致數(shù)據(jù)亂序的情況。如下圖所示:



2 Flink中的WaterMark


只要使用event time,就必須使用watermark,在上游指定,比如:source、map算子后

2.1 基本概念

Watermark的核心本質(zhì)可以理解成一個延遲觸發(fā)機(jī)制。

我們知道,流處理從事件產(chǎn)生,到流經(jīng)source,再到operator,中間是有一個過程和時間的,雖然大部分情況下,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時間順序來的,但是也不排除由于網(wǎng)絡(luò)、背壓等原因,導(dǎo)致亂序的產(chǎn)生,所謂亂序,就是指Flink接收到的事件的先后順序不是嚴(yán)格按照事件的Event Time順序排列的。

我們來設(shè)想一下下面這個場景:

使用時間窗口來統(tǒng)計10分鐘內(nèi)的用戶流量

有一個時間窗口

  • 開始時間為:2017-03-19 10:00:00

  • 結(jié)束時間為:2017-03-19 10:10:00

有一個數(shù)據(jù),因為網(wǎng)絡(luò)延遲

  • 事件發(fā)生的時間為:2017-03-19 10:10:00

  • 但進(jìn)入到窗口的時間為:2017-03-19 10:10:02,延遲了2秒中

時間窗口并沒有將59這個數(shù)據(jù)計算進(jìn)來,導(dǎo)致數(shù)據(jù)統(tǒng)計不正確

根據(jù)窗口計算時間的不同,這個數(shù)據(jù)都會被遺漏,只是:

如果按照處理時間來計算,這個窗口在系統(tǒng)時間大于2017-03-19 10:10:00的時候就會關(guān)閉,延遲進(jìn)來的這個59會被忽略

如果按照事件時間來計算,這個窗口當(dāng)進(jìn)入一條數(shù)據(jù),其事件時間大于2017-03-19 10:10:00的時候,會導(dǎo)致窗口關(guān)閉,同樣因為這個59延遲了,會因為別的正常順序的數(shù)據(jù)進(jìn)入Flink而導(dǎo)致屬于它的窗口被提前關(guān)閉

也就是:

處理時間窗口,按照當(dāng)前系統(tǒng)時間來判斷進(jìn)行窗口關(guān)閉

事件時間窗口,按照進(jìn)入數(shù)據(jù)的事件時間來判斷是否關(guān)閉窗口,如果進(jìn)來的一條新數(shù)據(jù)是下一個窗口的數(shù)據(jù),那么會關(guān)閉上一個窗口

總結(jié):

watermark是水印,也稱水位線。用來測量事件時間的進(jìn)度。

watermark作為數(shù)據(jù)流中的一部分在流動,并且攜帶一個時間戳t。

watermark(t) 表示這個流里面事件時間已經(jīng)到了時間t,意味著流中不應(yīng)該存在時間戳t2<=t的數(shù)據(jù)。

觸發(fā)窗口等的計算、關(guān)閉

單調(diào)遞增的(時間不能倒退)

用來處理數(shù)據(jù)亂序的問題

大數(shù)據(jù)進(jìn)階flink教程

適合零基礎(chǔ)自學(xué)的Hadoop教程


2.2 使用水印解決網(wǎng)絡(luò)延遲問題

水?。?strong>watermark)就是一個時間戳,F(xiàn)link可以給數(shù)據(jù)流添加水印,可以理解為:收到一條消息后,額外給這個消息添加了一個時間字段,這就是添加水印,一般人為添加的消息的水印都會比當(dāng)前消息的事件時間小一些。

窗口是否關(guān)閉按照水印時間來判斷,但原有事件時間不會被修改,窗口的邊界依舊是事件時間來決定。

  • 水印并不會影響原有Eventtime

  • 當(dāng)數(shù)據(jù)流添加水印后,會按照水印時間來觸發(fā)窗口計算

  • 一般會設(shè)置水印時間,比Eventtime小一些(一般幾秒鐘)

  • 當(dāng)接收到的水印時間 >= 窗口的endTime且窗口內(nèi)有數(shù)據(jù),則觸發(fā)計算

水?。ㄋr間)的計算:事件時間 – 設(shè)置的最大允許延遲時間 = 水印時間

比如,事件時間是10分30秒, 最大延遲時間是2秒,那么水印時間就是10分28秒


2.2.1 水印實現(xiàn)延遲等待功能的思路剖析

舉例:窗口5秒,延遲(水印)3秒,按照事件時間計算

  • 數(shù)據(jù)事件時間3, 落入窗口0-5.水印時間0

  • 來一條數(shù)據(jù)事件時間7, 落入窗口5-10,水印時間4

  • 來一條數(shù)據(jù)事件時間4,落入窗口0-5,水印時間1

  • 來一條數(shù)據(jù)事件時間8,落入窗口5-10,水印時間5

  • 這一條數(shù)據(jù)水印時間大于等于 窗口0-5的窗口結(jié)束時間。

滿足了對窗口0-5的提交,這個窗口關(guān)閉,并觸發(fā)數(shù)據(jù)計算

可以看出,第三條數(shù)據(jù),其是延遲數(shù)據(jù),它的事件時間是4,卻來的比事件時間為7的數(shù)據(jù)還要晚。

但是因為水印的機(jī)制,這個數(shù)據(jù)未錯過它的窗口,依舊成功進(jìn)入屬于它的窗口并且被計算

這就是水印的功能:在不影響按照事件時間判斷數(shù)據(jù)屬于哪個窗口的前提下,延遲某個窗口的關(guān)閉時間,讓其等待一會兒延遲數(shù)據(jù)


2.2.2 多并行度的水印觸發(fā)

在多并行度下,每個并行有一個水印

比如并行度是6,那么程序中就有6個watermark

分別屬于這6個并行度(線程)

那么,觸發(fā)條件以6個水印中最小的那個為準(zhǔn)

比如, 有個窗口是0-5,其中5個并行度的水印都超過了5,但有一個并行度的水印是3,那么,不管另外5個并行度中的水印達(dá)到了多大,都不會觸發(fā)。

因為6個并行度中的6個水印,最小的是3,不滿足大于等于窗口結(jié)束5的條件

2.2.3 Keyby 分流

一個程序中有多少個水印和并行度有關(guān),和keyby無關(guān)

也就是:比如有單詞hadoop spark,按照keyby,會分成hadoop組 和spark組,但是這兩個組是共用1個水印的,hadoop來的數(shù)據(jù)滿足了觸發(fā)條件,會將spark組的數(shù)據(jù)也觸發(fā)



3 Watermark 的設(shè)定


在Flink1.11中就已經(jīng)發(fā)現(xiàn)assignTimestampsAndWatermarks的有些實現(xiàn)過期了,從Flink1.12版本開始,官網(wǎng)推薦用WatermarkStrategy。

3.1 背景

在flink 1.11之前的版本中,提供了兩種生成水?。╓atermark)的策略,分別是AssignerWithPunctuatedWatermarks和AssignerWithPeriodicWatermarks,這兩個接口都繼承自TimestampAssigner接口。

用戶想使用不同的水印生成方式,則需要實現(xiàn)不同的接口,但是這樣引發(fā)了一個問題,對于想給水印添加一些通用的、公共的功能則變得復(fù)雜,因為我們需要給這兩個接口都同時添加新的功能,這樣還造成了代碼的重復(fù)。

所以為了避免代碼的重復(fù),在flink 1.11 中對flink的水印生成接口(WatermarkStrategy)進(jìn)行了重構(gòu)

3.2 Watermark應(yīng)用代碼結(jié)構(gòu)

WatermarkStrategy在Flink中有兩種使用方式:

  • 一種是直接在數(shù)據(jù)源上使用

  • 另一種是直接在非數(shù)據(jù)源的操作之后使用

推薦使用第一種方式,因為數(shù)據(jù)源可以利用watermark生成邏輯中有關(guān)分片/分區(qū)的信息。使用這種方式可以更加精準(zhǔn)的跟蹤watermark,整體的watermark生成將更加精確,直接在數(shù)據(jù)源指定watermarkStrategy必須使用特定的數(shù)據(jù)源接口,例如與kafka鏈接,使用kafka Connerctor,僅當(dāng)無法直接在數(shù)據(jù)源上設(shè)置策略是時才使用第二種方式

在數(shù)據(jù)源直接使用時如果因為數(shù)據(jù)源中的某一個分區(qū)/分片在一段時間內(nèi)未發(fā)送事件數(shù)據(jù),則意味著watermarkStrategy也不會獲得任何數(shù)據(jù)去生成watermark,在這種情況下可以通過設(shè)置有一個空閑時間,當(dāng)超過這個時間則將這個分片或分區(qū)標(biāo)記為空閑狀態(tài)。


大數(shù)據(jù)進(jìn)階flink教程

適合零基礎(chǔ)自學(xué)的Hadoop教程


3.3 水印的生成策略

3.3.1 內(nèi)置水印生成策略

為了方便開發(fā),flink提供了一些內(nèi)置的水印生成方法供我們使用。

固定延遲生成水印

通過靜態(tài)方法forBoundedOutOfOrderness提供,入?yún)⒔邮找粋€Duration類型的時間間隔,也就是我們可以接受的最大的延遲時間。使用這種延遲策略的時候需要我們對數(shù)據(jù)的延遲時間有一個大概的預(yù)估判斷。

我們實現(xiàn)一個延遲3秒的固定延遲水印,可以這樣做:

他的底層使用的WatermarkGenerator接口的一個實現(xiàn)類BoundedOutOfOrdernessWatermarks。我們看下源碼中的這兩個方法:

單調(diào)遞增生成水印

周期性 watermark 生成方式的一個最簡單特例就是你給定的數(shù)據(jù)源中數(shù)據(jù)的時間戳升序出現(xiàn)。在這種情況下,當(dāng)前時間戳就可以充當(dāng) watermark,因為后續(xù)到達(dá)數(shù)據(jù)的時間戳不會比當(dāng)前的小。

注意:在 Flink 應(yīng)用程序中,如果是并行數(shù)據(jù)源,則只要求并行數(shù)據(jù)源中的每個單分區(qū)數(shù)據(jù)源任務(wù)時間戳遞增。例如,設(shè)置每一個并行數(shù)據(jù)源實例都只讀取一個 Kafka 分區(qū),則時間戳只需在每個 Kafka 分區(qū)內(nèi)遞增即可。Flink 的 watermark 合并機(jī)制會在并行數(shù)據(jù)流進(jìn)行分發(fā)(shuffle)、聯(lián)合(union)、連接(connect)或合并(merge)時生成正確的 watermark

通過靜態(tài)方法forMonotonousTimestamps來提供.

這個也就是相當(dāng)于上述的延遲策略去掉了延遲時間,以event中的時間戳充當(dāng)了水印。

在程序中可以這樣使用:

它的底層實現(xiàn)是AscendingTimestampsWatermarks,其實它就是BoundedOutOfOrdernessWatermarks類的一個子類,沒有了延遲時間,我們來看看具體源碼的實現(xiàn)。

3.3.2 自定義水印生成策略


watermark 的生成方式本質(zhì)上是有兩種:周期性生成和標(biāo)記生成

  • 周期性生成器通常通過?onEvent()?觀察傳入的事件數(shù)據(jù),然后在框架調(diào)用?onPeriodicEmit()?時發(fā)出?watermark

  • 標(biāo)記生成器將查看?onEvent()?中的事件數(shù)據(jù),并等待檢查在流中攜帶 watermark 的特殊標(biāo)記事件或打點數(shù)據(jù)。當(dāng)獲取到這些事件數(shù)據(jù)時,它將立即發(fā)出 watermark。通常情況下,標(biāo)記生成器不會通過 onPeriodicEmit() 發(fā)出 watermark。

周期性watermark策略

官方提供的 watermark生成器有兩種

  • 升序watermark:單調(diào)遞增生成水印

  • 亂序watermark:固定延遲生成水印

都是基于周期性生成,默認(rèn)的周期是?200ms,一般不去改,保持在ms級別?onPeriodicEmit()

每間隔200ms一個周期,就會生成一個watermark

間歇性watermark策略

每一個事件時間都會產(chǎn)生一個watermark


3.4 在非數(shù)據(jù)源的操作之后使用 Watermark [重點]

3.4.1 Watermark的三種使用情況

  • 本來有序的Stream中的 Watermark

如果數(shù)據(jù)元素的事件時間是有序的,Watermark 時間戳?xí)S著數(shù)據(jù)元素的事件時間按順序生成,此時水位線的變化和事件時間保持一致(因為既然是有序的時間,就不需要設(shè)置延遲了,那么t就是 0。所以 watermark=maxtime-0 = maxtime),也就是理想狀態(tài)下的水位線。當(dāng) Watermark 時間大于 Windows 結(jié)束時間就會觸發(fā)對 Windows 的數(shù)據(jù)計算,以此類推, 下一個 Window 也是一樣。這種情況其實是亂序數(shù)據(jù)的一種特殊情況。

  • 亂序事件中的Watermark

現(xiàn)實情況下數(shù)據(jù)元素往往并不是按照其產(chǎn)生順序接入到 Flink 系統(tǒng)中進(jìn)行處理,而頻繁出現(xiàn)亂序或遲到的情況,這種情況就需要使用 Watermarks 來應(yīng)對。

3.4.2 有序流中的水?。ㄉ颍?/strong>

升序流水印概念:事件是有序的(按照他們自己的時間戳來看),watermark是流中簡單的周期性的標(biāo)記。

升序的底層實現(xiàn)

  • 底層調(diào)用的也是 亂序的 watermark生成器,只是 亂序程度 傳了一個 0ms

  • watermark = maxTimestamp - outOfOrdernessMillis - 1

? ? ? ? ? ? ? ? ? = maxTimestamp - 0ms -1 ms

? ? ? ? ? ? ? ? ?=> 事件時間 - 1ms

需求-從socket獲取數(shù)據(jù),來計算傳感器水位信息

開發(fā)步驟

  • 1.定義類 WaterSensor? String id; Long ts; Integer vc;?

  • 2.創(chuàng)建流執(zhí)行環(huán)境

  • 3.獲取socket文本數(shù)據(jù)

  • 4.將字符串?dāng)?shù)據(jù)切分成 WaterSensor 對象數(shù)據(jù)

  • 5.分配水印機(jī)制,單調(diào)遞增

  • 6.分配后的數(shù)據(jù)根據(jù)id進(jìn)行分組

  • 7.設(shè)置滾動事件時間窗口,時間為10秒

  • 8.對開窗數(shù)據(jù)進(jìn)行process

  • 9.打印輸出

  • 10.執(zhí)行流環(huán)境

參考代碼


3.4.3 無序流中的水印

按照數(shù)據(jù)的時間戳來看,事件是亂序的,則watermark就非常重要了:

亂序怎樣產(chǎn)生的呢?

  • 采集過程中導(dǎo)致的亂序

  • 網(wǎng)絡(luò)傳輸過程導(dǎo)致的亂序

亂序?qū)?dǎo)致數(shù)據(jù)丟失?

如何解決亂序的數(shù)據(jù)丟失問題呢?- watermark

等待時間(亂序程度)如何設(shè)置?

等待時間 = 最大的亂序程度。

  • 經(jīng)驗值 => 對自身集群和數(shù)據(jù)的了解,大概估算

  • 對數(shù)據(jù)進(jìn)行抽樣

  • 肯定不會設(shè)置為幾個小時,一般設(shè)為 秒 或者 分鐘

底層實現(xiàn)

  • watermark = maxTimestamp - outOfOrdernessMillis - 1?

  • = 最大亂序事件時間 - 亂序程度(等待時間) - 1ms

需求-根據(jù)socket文本計算當(dāng)前亂序3秒的數(shù)據(jù)統(tǒng)計

分配水印機(jī)制-forBoundedOutOfOrderness

參考代碼


大數(shù)據(jù)進(jìn)階flink教程

適合零基礎(chǔ)自學(xué)的Hadoop教程


3.5 直接在數(shù)據(jù)源上使用Watermark(Kafka 連接器)

當(dāng)使用 Apache Kafka 連接器作為數(shù)據(jù)源時,每個 Kafka 分區(qū)可能有一個簡單的事件時間模式(遞增的時間戳或有界無序)。然而,當(dāng)使用 Kafka 數(shù)據(jù)源時,多個分區(qū)常常并行使用,因此交錯來自各個分區(qū)的事件數(shù)據(jù)就會破壞每個分區(qū)的事件時間模式(這是 Kafka 消費(fèi)客戶端所固有的)。

在這種情況下,你可以使用 Flink 中可識別 Kafka 分區(qū)的 watermark 生成機(jī)制。使用此特性,將在 Kafka 消費(fèi)端內(nèi)部針對每個 Kafka 分區(qū)生成 watermark,并且不同分區(qū) watermark 的合并方式與在數(shù)據(jù)流?shuffle?時的合并方式相同。

例如,如果每個 Kafka 分區(qū)中的事件時間戳嚴(yán)格遞增,則使用時間戳單調(diào)遞增按分區(qū)生成的 watermark 將生成完美的全局 watermark。

注意,我們在示例中未使用 TimestampAssigner,而是使用了 Kafka 記錄自身的時間戳。

下圖展示了如何使用單 kafka 分區(qū) watermark 生成機(jī)制,以及在這種情況下 watermark 如何通過 dataflow 傳播。

在實際的計算中,往往會出現(xiàn)一個作業(yè)中會處理多個source的數(shù)據(jù),對source的數(shù)據(jù)進(jìn)行g(shù)roupBy分組,那么來自不同source的相同的key會shuffle到同一個處理節(jié)點。并且攜帶各自的Watermark。

Flink內(nèi)部要保證Watermark的單調(diào)遞增,多個source的Watermark匯聚到一起是不可能單調(diào)遞增的。

Flink內(nèi)部實現(xiàn)每一個邊上只能有一個遞增的Watermark,當(dāng)出現(xiàn)多個流攜帶EventTime匯聚到一起(groupBy或者Union)。Flink會選擇所有輸入流中EventTime中最小的一個向下游流出。從而保證Watermark的單調(diào)遞增和數(shù)據(jù)的完整性。

需求-讀取kafkaconsumer并設(shè)置水印機(jī)制,根據(jù)時間窗口為

操作步驟

  • 1.獲取流執(zhí)行環(huán)境,設(shè)置并行度1

  • 2.創(chuàng)建FlinkKafkaConsumer和相應(yīng)的配置

  • 3.設(shè)置consumer的水印機(jī)制為20

  • 4.通過consumer添加數(shù)據(jù)源

  • 5.根據(jù)滾動處理時間窗口5s做wordcount,先flatMap,keyBy,window,sum

  • 6.打印輸出

  • 7.執(zhí)行流環(huán)境

參考代碼


面試官:Flink 中水印是什么概念,起到什么作用?的評論 (共 條)

分享到微博請遵守國家法律
蒙城县| 饶平县| 峡江县| 扎鲁特旗| 平原县| 北碚区| 克拉玛依市| 南京市| 项城市| 茶陵县| 汉寿县| 浏阳市| 宜兴市| 金寨县| 金川县| 玉林市| 枝江市| 通化市| 辽阳县| 承德市| 乌鲁木齐县| 栖霞市| 咸丰县| 兴国县| 阜宁县| 探索| 六枝特区| 珠海市| 连城县| 陈巴尔虎旗| 抚宁县| 资阳市| 孙吴县| 且末县| 嘉祥县| 台湾省| 舞钢市| 罗城| 潜江市| 陵水| 周宁县|