確定 Kafka 分區(qū)起始位置的配置方法詳解!
配置 Kafka Consumer 開始消費的位置
Flink Kafka Consumer 允許通過配置來確定 Kafka 分區(qū)的起始位置。
Flink Kafka Consumer 的所有版本都具有上述明確的起始位置配置方法。
setStartFromGroupOffsets(默認(rèn)方法):從 Kafka brokers 中的 consumer 組(consumer 屬性中的 group.id 設(shè)置)提交的偏移量中開始讀取分區(qū)。如果找不到分區(qū)的偏移量,那么將會使用配置中的?auto.offset.reset?設(shè)置。
setStartFromEarliest() 或者 setStartFromLatest():從最早或者最新的記錄開始消費,在這些模式下,Kafka 中的 committed offset 將被忽略,不會用作起始位置。
setStartFromTimestamp(long):從指定的時間戳開始。對于每個分區(qū),其時間戳大于或等于指定時間戳的記錄將用作起始位置。如果一個分區(qū)的最新記錄早于指定的時間戳,則只從最新記錄讀取該分區(qū)數(shù)據(jù)。在這種模式下,Kafka 中的已提交 offset 將被忽略,不會用作起始位置。
setStartFromSpecificOffsets:從指定的分區(qū)的offset位置開始讀取,如指定的offsets中不存在某個分區(qū),該分區(qū)從group offset位置開始讀取
例子如下:為每個分區(qū)指定 consumer 應(yīng)該開始消費的具體 offset:
上面的例子中使用的配置是指定從 myTopic 主題的 0 、1 和 2 分區(qū)的指定偏移量開始消費。offset 值是 consumer 應(yīng)該為每個分區(qū)讀取的下一條消息。
請注意:如果 consumer 需要讀取在提供的 offset 映射中沒有指定 offset 的分區(qū),那么它將回退到該特定分區(qū)的默認(rèn)組偏移行為(即 setStartFromGroupOffsets())。
請注意:當(dāng) Job 從故障中自動恢復(fù)或使用 savepoint 手動恢復(fù)時,這些起始位置配置方法不會影響消費的起始位置。在恢復(fù)時,每個 Kafka 分區(qū)的起始位置由存儲在 savepoint 或 checkpoint 中的 offset 確定(有關(guān) checkpointing 的信息,請參閱下一節(jié),以便為 consumer 啟用容錯功能)。
Kafka Consumer 和容錯
伴隨著啟用 Flink 的 checkpointing 后,F(xiàn)link Kafka Consumer 將使用 topic 中的記錄,并以一致的方式定期檢查其所有 Kafka offset 和其他算子的狀態(tài)。如果 Job 失敗,F(xiàn)link 會將流式程序恢復(fù)到最新 checkpoint 的狀態(tài),并從存儲在 checkpoint 中的 offset 開始重新消費 Kafka 中的消息。
因此,設(shè)置 checkpoint 的間隔定義了程序在發(fā)生故障時最多需要返回多少。
為了使 Kafka Consumer 支持容錯,需要在 執(zhí)行環(huán)境 中啟用拓?fù)涞?checkpointing。
如果未啟用 checkpoint,那么 Kafka consumer 將定期向 Zookeeper 提交 offset。
