最美情侣中文字幕电影,在线麻豆精品传媒,在线网站高清黄,久久黄色视频

歡迎光臨散文網(wǎng) 會員登陸 & 注冊

關(guān)于Kafka的Topic和分區(qū)發(fā)現(xiàn),這回終于講清楚了!

2023-07-13 15:35 作者:ingemar-  | 我要投稿

分區(qū)發(fā)現(xiàn)


Flink Kafka Consumer 支持發(fā)現(xiàn)動態(tài)創(chuàng)建的 Kafka 分區(qū),并使用精準(zhǔn)一次的語義保證去消耗它們。在初始檢索分區(qū)元數(shù)據(jù)之后(即,當(dāng) Job 開始運行時)發(fā)現(xiàn)的所有分區(qū)將從最早可能的 offset 中消費。

默認(rèn)情況下,是禁用了分區(qū)發(fā)現(xiàn)的。若要啟用它,請在提供的屬性配置中為 flink.partition-discovery.interval-millis 設(shè)置大于 0 的值,表示發(fā)現(xiàn)分區(qū)的間隔是以毫秒為單位的。


Topic 發(fā)現(xiàn)


在更高的級別上,F(xiàn)link Kafka Consumer 還能夠使用正則表達式基于 Topic 名稱的模式匹配來發(fā)現(xiàn) Topic。請看下面的例子:

在上面的例子中,當(dāng) Job 開始運行時,Consumer 將訂閱名稱與指定正則表達式匹配的所有主題(以 test-topic 開頭并以單個數(shù)字結(jié)尾)。

要允許 consumer 在作業(yè)開始運行后發(fā)現(xiàn)動態(tài)創(chuàng)建的主題,那么請為?flink.partition-discovery.interval-millis 設(shè)置非負(fù)值。這允許 consumer 發(fā)現(xiàn)名稱與指定模式匹配的新主題的分區(qū)。

實際的生產(chǎn)環(huán)境中可能有這樣一些需求,比如:

  • 場景一:有一個 Flink 作業(yè)需要將五份數(shù)據(jù)聚合到一起,五份數(shù)據(jù)對應(yīng)五個 kafka topic,隨著業(yè)務(wù)增長,新增一類數(shù)據(jù),同時新增了一個 kafka topic,如何在不重啟作業(yè)的情況下作業(yè)自動感知新的 topic。

  • 場景二:作業(yè)從一個固定的 kafka topic 讀數(shù)據(jù),開始該 topic 有 10 個 partition,但隨著業(yè)務(wù)的增長數(shù)據(jù)量變大,需要對 kafka partition 個數(shù)進行擴容,由 10 個擴容到 20。該情況下如何在不重啟作業(yè)情況下動態(tài)感知新擴容的 partition?

針對上面的兩種場景,首先需要在構(gòu)建 FlinkKafkaConsumer 時的 properties 中設(shè)置 flink.partition-discovery.interval-millis 參數(shù)為非負(fù)值,表示開啟動態(tài)發(fā)現(xiàn)的開關(guān),以及設(shè)置的時間間隔。此時 FlinkKafkaConsumer 內(nèi)部會啟動一個單獨的線程定期去 kafka 獲取最新的 meta 信息。

  • 針對場景一,還需在構(gòu)建 FlinkKafkaConsumer 時,topic 的描述可以傳一個正則表達式描述的 pattern。每次獲取最新 kafka meta 時獲取正則匹配的最新 topic 列表。

  • 針對場景二,設(shè)置前面的動態(tài)發(fā)現(xiàn)參數(shù),在定期獲取 kafka 最新 meta 信息時會匹配新的 partition。為了保證數(shù)據(jù)的正確性,新發(fā)現(xiàn)的 partition 從最早的位置開始讀取。


Kafka Consumer 提交 Offset 的行為配置


Flink Kafka Consumer 允許有配置如何將 offset 提交回 Kafka broker 的行為。請注意:Flink Kafka Consumer 不依賴于提交的 offset 來實現(xiàn)容錯保證。提交的 offset 只是一種方法,用于公開 consumer 的進度以便進行監(jiān)控。

配置 offset 提交行為的方法是否相同,取決于是否為 job 啟用了 checkpointing。

  • 禁用 Checkpointing:如果禁用了 checkpointing,則 Flink Kafka Consumer 依賴于內(nèi)部使用的 Kafka client 自動定期 offset 提交功能。因此,要禁用或啟用 offset 的提交,只需將?enable.auto.commit?或者?auto.commit.interval.ms?的Key 值設(shè)置為提供的 Properties 配置中的適當(dāng)值。

  • 啟用 Checkpointing:如果啟用了 checkpointing,那么當(dāng) checkpointing 完成時,F(xiàn)link Kafka Consumer 將提交的 offset 存儲在 checkpoint 狀態(tài)中。這確保 Kafka broker 中提交的 offset 與 checkpoint 狀態(tài)中的 offset 一致。用戶可以通過調(diào)用 consumer 上的 setCommitOffsetsOnCheckpoints(boolean) 方法來禁用或啟用 offset 的提交(默認(rèn)情況下,這個值是 true )。注意,在這個場景中,Properties 中的自動定期 offset 提交設(shè)置會被完全忽略。


關(guān)于Kafka的Topic和分區(qū)發(fā)現(xiàn),這回終于講清楚了!的評論 (共 條)

分享到微博請遵守國家法律
科技| 吕梁市| 廊坊市| 郑州市| 阳泉市| 台前县| 福贡县| 新乡县| 营口市| 大城县| 贵定县| 敦煌市| 阿拉尔市| 嘉义县| 大余县| 什邡市| 铜陵市| 淮北市| 壤塘县| 淮南市| 禹州市| 松滋市| 仙游县| 阿城市| 丹江口市| 神木县| 仙居县| 汤阴县| 定边县| 开阳县| 神农架林区| 乐清市| 吉林省| 和硕县| 西丰县| 延寿县| 施甸县| 琼海市| 开鲁县| 鸡西市| 五原县|