Flink中,如何使用水印解決網(wǎng)絡(luò)延遲問(wèn)題?
水?。?strong>watermark)就是一個(gè)時(shí)間戳,F(xiàn)link可以給數(shù)據(jù)流添加水印,可以理解為:收到一條消息后,額外給這個(gè)消息添加了一個(gè)時(shí)間字段,這就是添加水印,一般人為添加的消息的水印都會(huì)比當(dāng)前消息的事件時(shí)間小一些。
窗口是否關(guān)閉按照水印時(shí)間來(lái)判斷,但原有事件時(shí)間不會(huì)被修改,窗口的邊界依舊是事件時(shí)間來(lái)決定。
水印并不會(huì)影響原有Eventtime
當(dāng)數(shù)據(jù)流添加水印后,會(huì)按照水印時(shí)間來(lái)觸發(fā)窗口計(jì)算
一般會(huì)設(shè)置水印時(shí)間,比Eventtime小一些(一般幾秒鐘)
當(dāng)接收到的水印時(shí)間 >= 窗口的endTime且窗口內(nèi)有數(shù)據(jù),則觸發(fā)計(jì)算
水?。ㄋr(shí)間)的計(jì)算:?事件時(shí)間 – 設(shè)置的最大允許延遲時(shí)間 = 水印時(shí)間
比如,事件時(shí)間是10分30秒, 最大延遲時(shí)間是2秒,那么水印時(shí)間就是10分28秒
水印實(shí)現(xiàn)延遲等待功能的思路剖析
舉例:
窗口5秒,延遲(水印)3秒,按照事件時(shí)間計(jì)算
數(shù)據(jù)事件時(shí)間3, 落入窗口0-5.水印時(shí)間0
來(lái)一條數(shù)據(jù)事件時(shí)間7, 落入窗口5-10,水印時(shí)間4
來(lái)一條數(shù)據(jù)事件時(shí)間4,落入窗口0-5,水印時(shí)間1
來(lái)一條數(shù)據(jù)事件時(shí)間8,落入窗口5-10,水印時(shí)間5
這一條數(shù)據(jù)水印時(shí)間大于等于 窗口0-5的窗口結(jié)束時(shí)間。
滿足了對(duì)窗口0-5的提交,這個(gè)窗口關(guān)閉,并觸發(fā)數(shù)據(jù)計(jì)算
可以看出,第三條數(shù)據(jù),其是延遲數(shù)據(jù),它的事件時(shí)間是4,卻來(lái)的比事件時(shí)間為7的數(shù)據(jù)還要晚。
但是因?yàn)樗〉臋C(jī)制,這個(gè)數(shù)據(jù)未錯(cuò)過(guò)它的窗口,依舊成功進(jìn)入屬于它的窗口并且被計(jì)算
這就是水印的功能:在不影響按照事件時(shí)間判斷數(shù)據(jù)屬于哪個(gè)窗口的前提下,延遲某個(gè)窗口的關(guān)閉時(shí)間,讓其等待一會(huì)兒延遲數(shù)據(jù)。
多并行度的水印觸發(fā)
在多并行度下,每個(gè)并行有一個(gè)水印
比如并行度是6,那么程序中就有6個(gè)watermark
分別屬于這6個(gè)并行度(線程)
那么,觸發(fā)條件以6個(gè)水印中最小的那個(gè)為準(zhǔn)
比如, 有個(gè)窗口是0-5
其中5個(gè)并行度的水印都超過(guò)了5
但有一個(gè)并行度的水印是3
那么,不管另外5個(gè)并行度中的水印達(dá)到了多大,都不會(huì)觸發(fā)
因?yàn)?個(gè)并行度中的6個(gè)水印,最小的是3,不滿足大于等于窗口結(jié)束5的條件

Keyby 分流
一個(gè)程序中有多少個(gè)水印和并行度有關(guān),和keyby無(wú)關(guān)
也就是
比如有單詞hadoop spark
按照keyby,會(huì)分成hadoop組 和spark組
但是這兩個(gè)組是共用1個(gè)水印的
hadoop來(lái)的數(shù)據(jù)滿足了觸發(fā)條件,會(huì)將spark組的數(shù)據(jù)也觸發(fā)
