最美情侣中文字幕电影,在线麻豆精品传媒,在线网站高清黄,久久黄色视频

歡迎光臨散文網(wǎng) 會員登陸 & 注冊

確定 Kafka 分區(qū)起始位置的配置方法詳解!

2023-07-12 14:34 作者:ingemar-  | 我要投稿

配置 Kafka Consumer 開始消費的位置


Flink Kafka Consumer 允許通過配置來確定 Kafka 分區(qū)的起始位置。

Flink Kafka Consumer 的所有版本都具有上述明確的起始位置配置方法。

  • setStartFromGroupOffsets(默認(rèn)方法):從 Kafka brokers 中的 consumer 組(consumer 屬性中的 group.id 設(shè)置)提交的偏移量中開始讀取分區(qū)。如果找不到分區(qū)的偏移量,那么將會使用配置中的?auto.offset.reset?設(shè)置。

  • setStartFromEarliest() 或者 setStartFromLatest():從最早或者最新的記錄開始消費,在這些模式下,Kafka 中的 committed offset 將被忽略,不會用作起始位置。

  • setStartFromTimestamp(long):從指定的時間戳開始。對于每個分區(qū),其時間戳大于或等于指定時間戳的記錄將用作起始位置。如果一個分區(qū)的最新記錄早于指定的時間戳,則只從最新記錄讀取該分區(qū)數(shù)據(jù)。在這種模式下,Kafka 中的已提交 offset 將被忽略,不會用作起始位置。

  • setStartFromSpecificOffsets:從指定的分區(qū)的offset位置開始讀取,如指定的offsets中不存在某個分區(qū),該分區(qū)從group offset位置開始讀取

例子如下:為每個分區(qū)指定 consumer 應(yīng)該開始消費的具體 offset:

上面的例子中使用的配置是指定從 myTopic 主題的 0 、1 和 2 分區(qū)的指定偏移量開始消費。offset 值是 consumer 應(yīng)該為每個分區(qū)讀取的下一條消息。

請注意:如果 consumer 需要讀取在提供的 offset 映射中沒有指定 offset 的分區(qū),那么它將回退到該特定分區(qū)的默認(rèn)組偏移行為(即 setStartFromGroupOffsets())。

請注意:當(dāng) Job 從故障中自動恢復(fù)或使用 savepoint 手動恢復(fù)時,這些起始位置配置方法不會影響消費的起始位置。在恢復(fù)時,每個 Kafka 分區(qū)的起始位置由存儲在 savepoint 或 checkpoint 中的 offset 確定(有關(guān) checkpointing 的信息,請參閱下一節(jié),以便為 consumer 啟用容錯功能)。


Kafka Consumer 和容錯


伴隨著啟用 Flink 的 checkpointing 后,F(xiàn)link Kafka Consumer 將使用 topic 中的記錄,并以一致的方式定期檢查其所有 Kafka offset 和其他算子的狀態(tài)。如果 Job 失敗,F(xiàn)link 會將流式程序恢復(fù)到最新 checkpoint 的狀態(tài),并從存儲在 checkpoint 中的 offset 開始重新消費 Kafka 中的消息。

因此,設(shè)置 checkpoint 的間隔定義了程序在發(fā)生故障時最多需要返回多少。

為了使 Kafka Consumer 支持容錯,需要在 執(zhí)行環(huán)境 中啟用拓?fù)涞?checkpointing。

如果未啟用 checkpoint,那么 Kafka consumer 將定期向 Zookeeper 提交 offset。


確定 Kafka 分區(qū)起始位置的配置方法詳解!的評論 (共 條)

分享到微博請遵守國家法律
汝南县| 浮梁县| 大足县| 临澧县| 奈曼旗| 淳安县| 东莞市| 泽普县| 太原市| 中西区| 扶沟县| 洮南市| 鄯善县| 洛宁县| 拉孜县| 龙岩市| 益阳市| 淮滨县| 昌邑市| 津市市| 南充市| 湘阴县| 紫云| 静宁县| 右玉县| 台江县| 如皋市| 潍坊市| 沐川县| 桂平市| 河津市| 西充县| 万山特区| 修文县| 和林格尔县| 霍山县| 石嘴山市| 曲靖市| 山东省| 鄂尔多斯市| 乌恰县|