關(guān)于Kafka的Topic和分區(qū)發(fā)現(xiàn),這回終于講清楚了!
分區(qū)發(fā)現(xiàn)
Flink Kafka Consumer 支持發(fā)現(xiàn)動態(tài)創(chuàng)建的 Kafka 分區(qū),并使用精準(zhǔn)一次的語義保證去消耗它們。在初始檢索分區(qū)元數(shù)據(jù)之后(即,當(dāng) Job 開始運行時)發(fā)現(xiàn)的所有分區(qū)將從最早可能的 offset 中消費。
默認(rèn)情況下,是禁用了分區(qū)發(fā)現(xiàn)的。若要啟用它,請在提供的屬性配置中為 flink.partition-discovery.interval-millis 設(shè)置大于 0 的值,表示發(fā)現(xiàn)分區(qū)的間隔是以毫秒為單位的。
Topic 發(fā)現(xiàn)
在更高的級別上,F(xiàn)link Kafka Consumer 還能夠使用正則表達式基于 Topic 名稱的模式匹配來發(fā)現(xiàn) Topic。請看下面的例子:
在上面的例子中,當(dāng) Job 開始運行時,Consumer 將訂閱名稱與指定正則表達式匹配的所有主題(以 test-topic 開頭并以單個數(shù)字結(jié)尾)。
要允許 consumer 在作業(yè)開始運行后發(fā)現(xiàn)動態(tài)創(chuàng)建的主題,那么請為?flink.partition-discovery.interval-millis 設(shè)置非負(fù)值。這允許 consumer 發(fā)現(xiàn)名稱與指定模式匹配的新主題的分區(qū)。
實際的生產(chǎn)環(huán)境中可能有這樣一些需求,比如:
場景一:有一個 Flink 作業(yè)需要將五份數(shù)據(jù)聚合到一起,五份數(shù)據(jù)對應(yīng)五個 kafka topic,隨著業(yè)務(wù)增長,新增一類數(shù)據(jù),同時新增了一個 kafka topic,如何在不重啟作業(yè)的情況下作業(yè)自動感知新的 topic。
場景二:作業(yè)從一個固定的 kafka topic 讀數(shù)據(jù),開始該 topic 有 10 個 partition,但隨著業(yè)務(wù)的增長數(shù)據(jù)量變大,需要對 kafka partition 個數(shù)進行擴容,由 10 個擴容到 20。該情況下如何在不重啟作業(yè)情況下動態(tài)感知新擴容的 partition?
針對上面的兩種場景,首先需要在構(gòu)建 FlinkKafkaConsumer 時的 properties 中設(shè)置 flink.partition-discovery.interval-millis 參數(shù)為非負(fù)值,表示開啟動態(tài)發(fā)現(xiàn)的開關(guān),以及設(shè)置的時間間隔。此時 FlinkKafkaConsumer 內(nèi)部會啟動一個單獨的線程定期去 kafka 獲取最新的 meta 信息。
針對場景一,還需在構(gòu)建 FlinkKafkaConsumer 時,topic 的描述可以傳一個正則表達式描述的 pattern。每次獲取最新 kafka meta 時獲取正則匹配的最新 topic 列表。
針對場景二,設(shè)置前面的動態(tài)發(fā)現(xiàn)參數(shù),在定期獲取 kafka 最新 meta 信息時會匹配新的 partition。為了保證數(shù)據(jù)的正確性,新發(fā)現(xiàn)的 partition 從最早的位置開始讀取。
Kafka Consumer 提交 Offset 的行為配置
Flink Kafka Consumer 允許有配置如何將 offset 提交回 Kafka broker 的行為。請注意:Flink Kafka Consumer 不依賴于提交的 offset 來實現(xiàn)容錯保證。提交的 offset 只是一種方法,用于公開 consumer 的進度以便進行監(jiān)控。
配置 offset 提交行為的方法是否相同,取決于是否為 job 啟用了 checkpointing。
禁用 Checkpointing:如果禁用了 checkpointing,則 Flink Kafka Consumer 依賴于內(nèi)部使用的 Kafka client 自動定期 offset 提交功能。因此,要禁用或啟用 offset 的提交,只需將?enable.auto.commit?或者?auto.commit.interval.ms?的Key 值設(shè)置為提供的 Properties 配置中的適當(dāng)值。
啟用 Checkpointing:如果啟用了 checkpointing,那么當(dāng) checkpointing 完成時,F(xiàn)link Kafka Consumer 將提交的 offset 存儲在 checkpoint 狀態(tài)中。這確保 Kafka broker 中提交的 offset 與 checkpoint 狀態(tài)中的 offset 一致。用戶可以通過調(diào)用 consumer 上的 setCommitOffsetsOnCheckpoints(boolean) 方法來禁用或啟用 offset 的提交(默認(rèn)情況下,這個值是 true )。注意,在這個場景中,Properties 中的自動定期 offset 提交設(shè)置會被完全忽略。
