最美情侣中文字幕电影,在线麻豆精品传媒,在线网站高清黄,久久黄色视频

歡迎光臨散文網(wǎng) 會員登陸 & 注冊

Apache Kafka 真的保留了消息排序嗎?

2023-03-22 15:06 作者:沃趣Squids數(shù)據(jù)庫平臺  | 我要投稿

Apache Kafka最著名的口頭禪之一是“它保留了每個主題分區(qū)的消息排序”,但這總是正確的嗎?在這篇博文中,我們將分析一些真實的場景,在這些場景中,接受教條而不質(zhì)疑它可能會導致意想不到的、錯誤的消息序列。

基本方案:單個生產(chǎn)者

我們可以從一個基本場景開始我們的旅程:單個生產(chǎn)者通過單個分區(qū)依次向具有單個分區(qū)的 Apache Kafka 主題發(fā)送消息。

在這種基本情況下,根據(jù)已知的咒語,我們應(yīng)該期待始終正確的排序。但是,這是真的嗎?……這要看情況!

網(wǎng)絡(luò)不平等

在理想情況下,單個生產(chǎn)者方案應(yīng)始終導致正確的排序。但是我們的世界并不完美!不同的網(wǎng)絡(luò)路徑、錯誤和延遲可能意味著消息延遲或丟失。

讓我們想象一下下面的情況:一個生產(chǎn)者,向一個主題發(fā)送三條消息:

  • 消息【1】,由于某種原因,找到到Apache Kafka的長網(wǎng)絡(luò)路由

  • 消息【2】查找到 Apache Kafka 的最快網(wǎng)絡(luò)路由

  • 消息【3】在網(wǎng)絡(luò)中丟失

即使在這種基本場景中,只有一個生產(chǎn)者,我們也可能會在主題中獲得一系列意想不到的消息。?Kafka 主題的最終結(jié)果將僅顯示兩個正在存儲的事件,意外順序為【2】【1】。

如果你仔細想想,從Apache Kafka的角度來看,這是正確的順序:主題只是一個信息的日志,Apache Kafka會根據(jù)它何時“感知”到新事件的到來將消息寫入日志。它基于 Kafka?引入時間,而不是消息的創(chuàng)建時間(事件時間)。

確認和重試

但是,并非一切都丟失了!如果我們研究生產(chǎn)庫(aiokafka?就是一個例子),我們有辦法確保消息正確傳遞。

首先,為了避免上述場景中的消息【3】問題,我們可以定義一個適當?shù)拇_認機制。acks生產(chǎn)者參數(shù)允許我們定義我們希望從 Apache Kafka 獲得的消息接收確認。

將此參數(shù)設(shè)置為【1】將確保我們收到來自負責主題(和分區(qū))的主代理的確認。將其設(shè)置為【all】?將確保我們僅在主副本和副本都正確存儲消息時才收到 ack,從而避免在只有主節(jié)點收到消息然后在將其傳播到副本之前失敗時出現(xiàn)問題。

一旦我們設(shè)置了一個合理的ack,我們應(yīng)該設(shè)置如果我們沒有收到正確的確認,則重試發(fā)送消息的可能性。與其他庫(kafka-python?是其中之一)不同,aiokafka 將自動重試發(fā)送消息,直到超過超時(由request_timeout_ms參數(shù)設(shè)置)。

通過確認和自動重試,我們應(yīng)該解決消息【3】的問題。第一次發(fā)送時,生產(chǎn)者不會收到ack ,因此,在retry_backoff_ms間隔之后,它將再次發(fā)送消息【3】。


最大飛行請求數(shù)

但是,如果您仔細觀察 Apache Kafka 主題中的最終結(jié)果,則結(jié)果排序不正確:我們發(fā)送了1,2,3,在主題中得到了2,1,3……如何解決這個問題?

舊方法(在kafka-python中可用)是設(shè)置每個連接的最大飛行請求:我們允許同時“空中”而不確認的消息數(shù)量。我們同時允許在空中傳播的消息越多,獲得無序消息的風險就越大。

使用 kafka-python 時,如果我們絕對需要在主題中有一個特定的排序,我們被迫將max_in_flight_requests_per_connection限制為1??;旧希僭O(shè)我們將ack參數(shù)設(shè)置為至少1,那么在發(fā)送下一條消息之前,我們等待每一條消息的確認(如果消息大小小于批大小,則等待每批消息的確認)。




排序、確認和重試的絕對正確性是以吞吐量為代價的。我們允許同時“空中”的消息量越少,我們需要接收的確認就越多,我們可以在定義的時間范圍內(nèi)傳遞給 Kafka 的總體消息就越少。

冪等生產(chǎn)者

為了克服一次發(fā)送一條消息并等待確認的嚴格序列化,我們可以定義冪等生產(chǎn)者。使用冪等生產(chǎn)者時,每條消息都標有生產(chǎn)者 ID 和序列號(為每個分區(qū)維護的序列)。然后,此組合 ID 將與消息一起發(fā)送到代理。

代理跟蹤每個生產(chǎn)者和主題/分區(qū)的序列號。每當有新消息到達時,代理都會檢查組合 ID,如果在同一生產(chǎn)者中,該值等于前一個數(shù)字 + 1,則確認新消息,否則將被拒絕。這保證了消息的全局排序,允許每個連接有更多的動態(tài)請求(Java 客戶端最多 5?個)。

增加多個生產(chǎn)商的復雜性

到目前為止,我們設(shè)想了一個只有一個生產(chǎn)者的基本場景,但 Apache Kafka 的現(xiàn)實是,生產(chǎn)者通常是多個。如果我們想確定最終訂購結(jié)果,需要注意哪些小細節(jié)?

不同的位置,不同的延遲

同樣,網(wǎng)絡(luò)是不平等的,并且由于多個生產(chǎn)者可能位于非常偏遠的位置,不同的延遲意味著 Kafka 排序可能與基于事件時間的排序不同。

不幸的是,地球上不同位置之間的不同延遲無法修復,因此我們需要接受這種情況。

批處理,一個附加變量

為了實現(xiàn)更高的吞吐量,我們可能需要批處理消息。通過批處理,我們以“組”的形式發(fā)送消息,最大限度地減少調(diào)用總數(shù),并提高有效負載與整體消息大小的比率。但是,通過這樣做,我們可以再次改變事件的順序。Apache Kafka 中的消息將按批次存儲,具體取決于批處理引入時間。因此,每批的消息順序是正確的,但不同的批次中可能具有不同的排序消息。

現(xiàn)在,由于不同的延遲和批處理都到位,我們的全球訂購前提似乎將完全丟失......那么,為什么我們聲稱我們可以按順序管理事件呢?

救世主:事件時間

我們知道,關(guān)于 Kafka 保持消息排序的原始前提并不是 100% 正確的,消息的順序取決于 Kafka 攝取時間,而不是事件生成時間。但是,如果基于事件時間的排序很重要呢?

好吧,我們不能在生產(chǎn)方面解決問題,但我們可以在消費者方面解決問題。所有與Apache Kafka一起使用的最常見工具都能夠定義將哪個字段用作事件時間,包括Kafka?Streams,Kafka Connect與專用的時間戳提取器單消息轉(zhuǎn)換(SMT)和Apache Flink?。

如果定義得當,消費者將能夠重新調(diào)整來自特定 Apache Kafka 主題的消息的順序。下面我們來分析一下 Apache Flink 的例子:

在上面的 Apache Flink 表定義中,我們可以注意到:

  • occurred_at:該字段在 Unix 時間的源 Apache Kafka 主題中定義(數(shù)據(jù)類型為 BIGINT)。

  • time_ltz AS TO_TIMESTAMP_LTZ(occurred_at, 3):將 unix 時間轉(zhuǎn)換為 Flink 時間戳。

  • WATERMARK FOR time_ltz AS time_ltz - INTERVAL '10' SECOND將新字段(計算自 occurred_at)定義為事件時間,并定義事件延遲最多為 10 秒的事件延遲的閾值。

定義上表后,可以使用time_ltz字段對事件進行正確排序并定義聚合窗口,從而確保計算中包含可接受的延遲內(nèi)的所有事件。

- INTERVAL '10' SECOND定義了數(shù)據(jù)管道的延遲,并且是我們需要包含的懲罰值,以允許正確引入延遲到達的事件。但請注意,吞吐量不受影響。我們可以根據(jù)需要在管道中流動任意數(shù)量的消息,但是在計算任何最終 KPI 之前,我們需要“等待 10 秒”,以確保我們將特定時間范圍內(nèi)的所有事件都包含在圖中。

另一種僅在事件包含完整狀態(tài)時才有效的方法是為某個鍵(在上面的示例中是主機名和cpu)保留到目前為止達到的最大事件時間,并且只接受新事件時間大于最大值的更改。

總結(jié)

在 Kafka 中排序的概念可能很棘手,即使我們只包含具有單個分區(qū)的單個主題。這篇文章分享了一些可能導致一系列意外事件的常見情況。幸運的是,限制傳輸中的消息數(shù)量或使用冪等生產(chǎn)者等選項可以幫助實現(xiàn)符合預期的排序。在多個生產(chǎn)者以及網(wǎng)絡(luò)延遲不可預測的情況下,可用的選項是通過正確處理需要在有效負載中指定的事件時間來修復使用者端的整體排序。


Apache Kafka 真的保留了消息排序嗎?的評論 (共 條)

分享到微博請遵守國家法律
望奎县| 马边| 阿巴嘎旗| 方山县| 巴塘县| 清河县| 万山特区| 正安县| 彰化县| 正镶白旗| 保靖县| 孝感市| 都江堰市| 安多县| 英吉沙县| 临夏市| 罗城| 中宁县| 兰州市| 托克逊县| 阳曲县| 磴口县| 武强县| 木里| 广宗县| 鄂伦春自治旗| 岳西县| 庆安县| 黄山市| 苍山县| 乌兰察布市| 阿克苏市| 军事| 镇巴县| 合江县| 苏尼特左旗| 偏关县| 丽江市| 东至县| 淳化县| 江西省|