刨根問底: Kafka 到底會(huì)不會(huì)丟數(shù)據(jù)?
大家好,我是 華仔, 又跟大家見面了。
上一篇作為專題系列的第二篇,從演進(jìn)的角度帶你深度剖析了關(guān)于 Kafka 請(qǐng)求處理全流程以及超高并發(fā)的網(wǎng)絡(luò)架構(gòu)設(shè)計(jì)的實(shí)現(xiàn)細(xì)節(jié),今天開啟第三篇,我們來聊聊 Kafka 生產(chǎn)環(huán)境大家都比較關(guān)心的問題。
那么 Kafka 到底會(huì)不會(huì)丟數(shù)據(jù)呢?如果丟數(shù)據(jù),究竟該怎么解決呢?
只有掌握了這些, 我們才能處理好 Kafka 生產(chǎn)級(jí)的一些故障,從而更穩(wěn)定地服務(wù)業(yè)務(wù)。
認(rèn)真讀完這篇文章,我相信你會(huì)對(duì)Kafka 如何解決丟數(shù)據(jù)問題,有更加深刻的理解。
這篇文章干貨很多,希望你可以耐心讀完。

01 總體概述
越來越多的互聯(lián)網(wǎng)公司使用消息隊(duì)列來支撐自己的核心業(yè)務(wù)。由于是核心業(yè)務(wù),一般都會(huì)要求消息傳遞過程中最大限度的做到不丟失,如果中間環(huán)節(jié)出現(xiàn)數(shù)據(jù)丟失,就會(huì)引來用戶的投訴,年底績(jī)效就要背鍋了。
那么使用 Kafka 到底會(huì)不會(huì)丟數(shù)據(jù)呢?如果丟數(shù)據(jù)了該怎么解決呢?為了避免類似情況發(fā)生,除了要做好補(bǔ)償措施,我們更應(yīng)該在系統(tǒng)設(shè)計(jì)的時(shí)候充分考慮系統(tǒng)中的各種異常情況,從而設(shè)計(jì)出一個(gè)穩(wěn)定可靠的消息系統(tǒng)。
大家都知道 Kafka 的整個(gè)架構(gòu)非常簡(jiǎn)潔,是分布式的架構(gòu),主要由 Producer、Broker、Consumer 三部分組成,后面剖析丟失場(chǎng)景會(huì)從這三部分入手來剖析。

02?消息傳遞語義剖析
在深度剖析消息丟失場(chǎng)景之前,我們先來聊聊「消息傳遞語義」到底是個(gè)什么玩意?
所謂的消息傳遞語義是 Kafka 提供的 Producer 和 Consumer 之間的消息傳遞過程中消息傳遞的保證性。主要分為三種, 如下圖所示:

1)首先當(dāng) Producer 向 Broker 發(fā)送數(shù)據(jù)后,會(huì)進(jìn)行 commit,如果?commit?成功,由于 Replica 副本機(jī)制的存在,則意味著消息不會(huì)丟失,但是 Producer 發(fā)送數(shù)據(jù)給 Broker 后,遇到網(wǎng)絡(luò)問題而造成通信中斷,那么 Producer 就無法準(zhǔn)確判斷該消息是否已經(jīng)被提交(commit),這就可能造成 at least once 語義。
2)在 Kafka 0.11.0.0 之前, 如果 Producer 沒有收到消息 commit 的響應(yīng)結(jié)果,它只能重新發(fā)送消息,確保消息已經(jīng)被正確的傳輸?shù)?Broker,重新發(fā)送的時(shí)候會(huì)將消息再次寫入日志中;而在 0.11.0.0 版本之后, Producer 支持冪等傳遞選項(xiàng),保證重新發(fā)送不會(huì)導(dǎo)致消息在日志出現(xiàn)重復(fù)。為了實(shí)現(xiàn)這個(gè), Broker 為 Producer 分配了一個(gè)ID,并通過每條消息的序列號(hào)進(jìn)行去重。也支持了類似事務(wù)語義來保證將消息發(fā)送到多個(gè) Topic 分區(qū)中,保證所有消息要么都寫入成功,要么都失敗,這個(gè)主要用在 Topic 之間的 exactly once 語義。
其中啟用冪等傳遞的方法配置:enable.idempotence = true。
啟用事務(wù)支持的方法配置:設(shè)置屬性 transcational.id = "指定值"。
3)從 Consumer 角度來剖析, 我們知道 Offset 是由 Consumer 自己來維護(hù)的, 如果 Consumer 收到消息后更新 Offset, 這時(shí) Consumer 異常 crash 掉, 那么新的 Consumer 接管后再次重啟消費(fèi),就會(huì)造成 at most once 語義(消息會(huì)丟,但不重復(fù))。
4) 如果 Consumer 消費(fèi)消息完成后, 再更新 Offset,?如果這時(shí) Consumer crash 掉,那么新的 Consumer 接管后重新用這個(gè) Offset 拉取消息, 這時(shí)就會(huì)造成 at least once 語義(消息不丟,但被多次重復(fù)處理)。
總結(jié):默認(rèn) Kafka 提供?「at least once」語義的消息傳遞,允許用戶通過在處理消息之前保存 Offset?的方式提供?「at most?once」?語義。如果我們可以自己實(shí)現(xiàn)消費(fèi)冪等,理想情況下這個(gè)系統(tǒng)的消息傳遞就是嚴(yán)格的「exactly once」,?也就是保證不丟失、且只會(huì)被精確的處理一次,但是這樣是很難做到的。
從 Kafka 整體架構(gòu)圖我們可以得出有三次消息傳遞的過程:
1)Producer 端發(fā)送消息給 Kafka Broker 端。
2)Kafka Broker 將消息進(jìn)行同步并持久化數(shù)據(jù)。
3)Consumer 端從?Kafka Broker 將消息拉取并進(jìn)行消費(fèi)。
在以上這三步中每一步都可能會(huì)出現(xiàn)丟失數(shù)據(jù)的情況, 那么 Kafka 到底在什么情況下才能保證消息不丟失呢?
通過上面三步,我們可以得出:Kafka 只對(duì)?「已提交」的消息做「最大限度的持久化保證不丟失」。
怎么理解上面這句話呢?
1)首先是?「已提交」的消息:當(dāng) Kafka 中?N?個(gè) Broker 成功的收到一條消息并寫入到日志文件后,它們會(huì)告訴 Producer 端這條消息已成功提交了,那么這時(shí)該消息在 Kafka 中就變成?"已提交消息"?了。
這里的?N?個(gè) Broker?我們?cè)趺蠢斫饽??這主要取決于對(duì)?"已提交"?的定義, 這里可以選擇只要一個(gè) Broker 成功保存該消息就算已提交,也可以是所有 Broker 都成功保存該消息才算是已提交。
2)其次是?「最大限度的持久化保證不丟失」,也就是說 Kafka 并不能保證在任何情況下都能做到數(shù)據(jù)不丟失。即 Kafka 不丟失數(shù)據(jù)是有前提條件的。假如這時(shí)你的消息保存在 N 個(gè) Broker 上,那么前提條件就是這 N 個(gè) Broker 中至少有1個(gè)是存活的,就可以保證你的消息不丟失。
也就是說 Kafka 是能做到不丟失數(shù)據(jù)的,?只不過這些消息必須是?「已提交」的消息,且還要滿足一定的條件才可以。
了解了 Kafka 消息傳遞語義以及什么情況下可以保證不丟失數(shù)據(jù),下面我們來詳細(xì)剖析每個(gè)環(huán)節(jié)為什么會(huì)丟數(shù)據(jù),以及如何最大限度的避免丟失數(shù)據(jù)。
03?消息丟失場(chǎng)景剖析
?Producer 端丟失場(chǎng)景剖析
在剖析 Producer 端數(shù)據(jù)丟失之前,我們先來了解下 Producer 端發(fā)送消息的流程,對(duì)于不了解 Producer 的讀者們,可以查看?聊聊 Kafka Producer 那點(diǎn)事

消息發(fā)送流程如下:
1)首先我們要知道一點(diǎn)就是?Producer 端是直接與 Broker 中的 Leader Partition 交互的,所以在 Producer 端初始化中就需要通過 Partitioner 分區(qū)器從 Kafka 集群中獲取到相關(guān) Topic 對(duì)應(yīng)的 Leader Partition 的元數(shù)據(jù)?。
2)待獲取到 Leader Partition 的元數(shù)據(jù)后直接將消息發(fā)送過去。
3)Kafka Broker 對(duì)應(yīng)的 Leader Partition 收到消息會(huì)先寫入 Page Cache,定時(shí)刷盤進(jìn)行持久化(順序?qū)懭氪疟P)。
4) Follower Partition 拉取 Leader Partition 的消息并保持同 Leader Partition 數(shù)據(jù)一致,待消息拉取完畢后需要給 Leader Partition 回復(fù) ACK 確認(rèn)消息。
5)待 Kafka Leader 與 Follower Partition 同步完數(shù)據(jù)并收到所有 ISR 中的 Replica 副本的 ACK 后,Leader Partition 會(huì)給 Producer 回復(fù) ACK 確認(rèn)消息。
根據(jù)上圖以及消息發(fā)送流程可以得出:Producer 端為了提升發(fā)送效率,減少IO操作,發(fā)送數(shù)據(jù)的時(shí)候是將多個(gè)請(qǐng)求合并成一個(gè)個(gè)?RecordBatch,并將其封裝轉(zhuǎn)換成?Request?請(qǐng)求「異步」將數(shù)據(jù)發(fā)送出去(也可以按時(shí)間間隔方式,達(dá)到時(shí)間間隔自動(dòng)發(fā)送),所以 Producer 端消息丟失更多是因?yàn)橄⒏揪蜎]有發(fā)送到 Kafka Broker 端。
導(dǎo)致 Producer 端消息沒有發(fā)送成功有以下原因:
網(wǎng)絡(luò)原因:由于網(wǎng)絡(luò)抖動(dòng)導(dǎo)致數(shù)據(jù)根本就沒發(fā)送到 Broker 端。
數(shù)據(jù)原因:消息體太大超出 Broker 承受范圍而導(dǎo)致 Broker 拒收消息。
另外 Kafka Producer 端也可以通過配置來確認(rèn)消息是否生產(chǎn)成功:

在 Kafka Producer 端的 acks 默認(rèn)配置為1,?默認(rèn)級(jí)別是 at least once 語義, 并不能保證 exactly once 語義。

既然 Producer 端發(fā)送數(shù)據(jù)有 ACK 機(jī)制, 那么這里就可能會(huì)丟數(shù)據(jù)的!!!
acks =?0:由于發(fā)送后就自認(rèn)為發(fā)送成功,這時(shí)如果發(fā)生網(wǎng)絡(luò)抖動(dòng), Producer 端并不會(huì)校驗(yàn) ACK 自然也就丟了,且無法重試。
acks = 1:消息發(fā)送 Leader Parition 接收成功就表示發(fā)送成功,這時(shí)只要 Leader Partition 不 Crash 掉,就可以保證 Leader Partition 不丟數(shù)據(jù),但是如果 Leader Partition 異常 Crash 掉了, Follower Partition 還未同步完數(shù)據(jù)且沒有 ACK,這時(shí)就會(huì)丟數(shù)據(jù)。
acks = -1 或者 all:?消息發(fā)送需要等待 ISR 中 Leader Partition 和 所有的 Follower Partition 都確認(rèn)收到消息才算發(fā)送成功, 可靠性最高, 但也不能保證不丟數(shù)據(jù),比如當(dāng) ISR 中只剩下 Leader Partition 了, 這樣就變成 acks = 1 的情況了。
?Broker 端丟失場(chǎng)景剖析
接下來我們來看看 Broker 端持久化存儲(chǔ)丟失場(chǎng)景, 對(duì)于不了解 Broker 的讀者們,可以先看看?聊聊 Kafka Broker 那點(diǎn)事,數(shù)據(jù)存儲(chǔ)過程如下圖所示:

Kafka?Broker 集群接收到數(shù)據(jù)后會(huì)將數(shù)據(jù)進(jìn)行持久化存儲(chǔ)到磁盤,為了提高吞吐量和性能,采用的是「異步批量刷盤的策略」,也就是說按照一定的消息量和間隔時(shí)間進(jìn)行刷盤。首先會(huì)將數(shù)據(jù)存儲(chǔ)到?「PageCache」?中,至于什么時(shí)候?qū)?Cache 中的數(shù)據(jù)刷盤是由「操作系統(tǒng)」根據(jù)自己的策略決定或者調(diào)用 fsync 命令進(jìn)行強(qiáng)制刷盤,如果此時(shí) Broker 宕機(jī) Crash 掉,且選舉了一個(gè)落后 Leader Partition 很多的 Follower Partition 成為新的 Leader Partition,那么落后的消息數(shù)據(jù)就會(huì)丟失。
? ? ? ??? ? ? ??

既然 Broker 端消息存儲(chǔ)是通過異步批量刷盤的,那么這里就可能會(huì)丟數(shù)據(jù)的!!!
由于 Kafka 中并沒有提供「同步刷盤」的方式,所以說從單個(gè) Broker 來看還是很有可能丟失數(shù)據(jù)的。
kafka 通過「多 Partition (分區(qū))多 Replica(副本)機(jī)制」已經(jīng)可以最大限度的保證數(shù)據(jù)不丟失,如果數(shù)據(jù)已經(jīng)寫入 PageCache 中但是還沒來得及刷寫到磁盤,此時(shí)如果所在 Broker 突然宕機(jī)掛掉或者停電,極端情況還是會(huì)造成數(shù)據(jù)丟失。
?Consumer 端丟失場(chǎng)景剖析
接下來我們來看看 Consumer 端消費(fèi)數(shù)據(jù)丟失場(chǎng)景,對(duì)于不了解 Consumer?的讀者們,可以先看看?聊聊 Kafka Consumer 那點(diǎn)事,?我們先來看看消費(fèi)流程:


1)Consumer 拉取數(shù)據(jù)之前跟?Producer 發(fā)送數(shù)據(jù)一樣, 需要通過訂閱關(guān)系獲取到集群元數(shù)據(jù),?找到相關(guān) Topic 對(duì)應(yīng)的 Leader Partition 的元數(shù)據(jù)。
2)然后 Consumer 通過 Pull 模式主動(dòng)的去 Kafka 集群中拉取消息。
3)在這個(gè)過程中,有個(gè)消費(fèi)者組的概念(不了解的可以看上面鏈接文章),多個(gè) Consumer 可以組成一個(gè)消費(fèi)者組即 Consumer Group,每個(gè)消費(fèi)者組都有一個(gè)Group-Id。同一個(gè) Consumer Group 中的 Consumer 可以消費(fèi)同一個(gè) Topic 下不同分區(qū)的數(shù)據(jù),但是不會(huì)出現(xiàn)多個(gè) Consumer 去消費(fèi)同一個(gè)分區(qū)的數(shù)據(jù)。
4)拉取到消息后進(jìn)行業(yè)務(wù)邏輯處理,待處理完成后,會(huì)進(jìn)行 ACK 確認(rèn),即提交 Offset 消費(fèi)位移進(jìn)度記錄。
5)最后 Offset 會(huì)被保存到 Kafka Broker 集群中的?__consumer_offsets?這個(gè) Topic 中,且每個(gè) Consumer 保存自己的 Offset 進(jìn)度。?
根據(jù)上圖以及消息消費(fèi)流程可以得出消費(fèi)主要分為兩個(gè)階段:
獲取元數(shù)據(jù)并從 Kafka Broker 集群拉取數(shù)據(jù)。
處理消息,并標(biāo)記消息已經(jīng)被消費(fèi),提交 Offset 記錄。? ? ? ? ??

既然 Consumer?拉取后消息最終是要提交 Offset, 那么這里就可能會(huì)丟數(shù)據(jù)的!!!
可能使用的「自動(dòng)提交 Offset 方式」
拉取消息后「先提交 Offset,后處理消息」,如果此時(shí)處理消息的時(shí)候異常宕機(jī),由于 Offset 已經(jīng)提交了, ?待 Consumer 重啟后,會(huì)從之前已提交的 Offset 下一個(gè)位置重新開始消費(fèi), 之前未處理完成的消息不會(huì)被再次處理,對(duì)于該 Consumer 來說消息就丟失了。
拉取消息后「先處理消息,在進(jìn)行提交 Offset」, 如果此時(shí)在提交之前發(fā)生異常宕機(jī),由于沒有提交成功 Offset, 待下次 Consumer 重啟后還會(huì)從上次的 Offset 重新拉取消息,不會(huì)出現(xiàn)消息丟失的情況, 但是會(huì)出現(xiàn)重復(fù)消費(fèi)的情況,這里只能業(yè)務(wù)自己保證冪等性。? ? ? ??
04?消息丟失解決方案
上面帶你從 Producer、Broker、Consumer 三端剖析了可能丟失數(shù)據(jù)的場(chǎng)景,下面我們就來看看如何解決才能最大限度的保證消息不丟失。
?Producer?端解決方案
在剖析?Producer?端丟失場(chǎng)景的時(shí)候, 我們得出其是通過「異步」方式進(jìn)行發(fā)送的,所以如果此時(shí)是使用「發(fā)后即焚」的方式發(fā)送,即調(diào)用 Producer.send(msg) 會(huì)立即返回,由于沒有回調(diào),可能因網(wǎng)絡(luò)原因?qū)е?Broker 并沒有收到消息,此時(shí)就丟失了。
因此我們可以從以下幾方面進(jìn)行解決 Producer 端消息丟失問題:
4.1.1 更換調(diào)用方式:
棄用調(diào)用發(fā)后即焚的方式,使用帶回調(diào)通知函數(shù)的方法進(jìn)行發(fā)送消息,即?Producer.send(msg, callback), 這樣一旦發(fā)現(xiàn)發(fā)送失敗, 就可以做針對(duì)性處理。
Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
? ? ? ?
// intercept the record, which can be potentially modified; this method does not throw exceptions
? ? ??
?ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
? ?
return doSend(interceptedRecord, callback);
}
(1)網(wǎng)絡(luò)抖動(dòng)導(dǎo)致消息丟失,Producer 端可以進(jìn)行重試。
(2)消息大小不合格,可以進(jìn)行適當(dāng)調(diào)整,符合 Broker 承受范圍再發(fā)送。
通過以上方式可以保證最大限度消息可以發(fā)送成功。
4.1.2 ACK 確認(rèn)機(jī)制:
該參數(shù)代表了對(duì)"已提交"消息的定義。
需要將?request.required.acks?設(shè)置為 -1/ all,-1/all 表示有多少個(gè)副本 Broker 全部收到消息,才認(rèn)為是消息提交成功的標(biāo)識(shí)。
針對(duì)?acks = -1/ all?, 這里有兩種非常典型的情況:
(1)數(shù)據(jù)發(fā)送到 Leader Partition, 且所有的 ISR 成員全部同步完數(shù)據(jù), 此時(shí),Leader?Partition?異常 Crash 掉,那么會(huì)選舉新的 Leader Partition,數(shù)據(jù)不會(huì)丟失, 如下圖所示:

(2)數(shù)據(jù)發(fā)送到?Leader Partition,部分 ISR 成員同步完成,此時(shí) Leader Partition 異常 Crash,?剩下的 Follower?Partition?都可能被選舉成新的 Leader Partition,會(huì)給 Producer 端發(fā)送失敗標(biāo)識(shí), 后續(xù)會(huì)重新發(fā)送數(shù)據(jù),數(shù)據(jù)可能會(huì)重復(fù), 如下圖所示:

因此通過上面分析,我們還需要通過其他參數(shù)配置來進(jìn)行保證:
replication.factor >= 2
min.insync.replicas?> 1
這是 Broker 端的配置,下面會(huì)詳細(xì)介紹。
4.1.3 重試次數(shù) retries:
該參數(shù)表示 Producer 端發(fā)送消息的重試次數(shù)。
需要將 retries?設(shè)置為大于0的數(shù), 在 Kafka 2.4 版本中默認(rèn)設(shè)置為Integer.MAX_VALUE。另外如果需要保證發(fā)送消息的順序性,配置如下:
retries = Integer.MAX_VALUE
max.in.flight.requests.per.connection = 1
這樣 Producer 端就會(huì)一直進(jìn)行重試直到 Broker 端返回 ACK 標(biāo)識(shí),同時(shí)只有一個(gè)連接向 Broker 發(fā)送數(shù)據(jù)保證了消息的順序性。
4.1.4 重試時(shí)間 retry.backoff.ms:
該參數(shù)表示消息發(fā)送超時(shí)后兩次重試之間的間隔時(shí)間,避免無效的頻繁重試,默認(rèn)值為100ms,??推薦設(shè)置為300ms。
?Broker 端解決方案
在剖析?Broker 端丟失場(chǎng)景的時(shí)候, 我們得出其是通過「異步批量刷盤」的策略,先將數(shù)據(jù)存儲(chǔ)到?「PageCache」,再進(jìn)行異步刷盤, 由于沒有提供?「同步刷盤」策略,?因此 Kafka 是通過「多分區(qū)多副本」的方式來最大限度的保證數(shù)據(jù)不丟失。
我們可以通過以下參數(shù)配合來保證:
4.2.1?unclean.leader.election.enable:
該參數(shù)表示有哪些 Follower 可以有資格被選舉為 Leader?, 如果一個(gè) Follower 的數(shù)據(jù)落后 Leader 太多,那么一旦它被選舉為新的 Leader, 數(shù)據(jù)就會(huì)丟失,因此我們要將其設(shè)置為false,防止此類情況發(fā)生。
4.2.2?replication.factor:
該參數(shù)表示分區(qū)副本的個(gè)數(shù)。建議設(shè)置?replication.factor >=3, 這樣如果 Leader 副本異常 Crash 掉,F(xiàn)ollower 副本會(huì)被選舉為新的 Leader 副本繼續(xù)提供服務(wù)。
4.2.3?min.insync.replicas:
該參數(shù)表示消息至少要被寫入成功到 ISR 多少個(gè)副本才算"已提交",建議設(shè)置min.insync.replicas > 1,?這樣才可以提升消息持久性,保證數(shù)據(jù)不丟失。
另外我們還需要確保一下?replication.factor > min.insync.replicas, 如果相等,只要有一個(gè)副本異常 Crash 掉,整個(gè)分區(qū)就無法正常工作了,因此推薦設(shè)置成:?replication.factor =?min.insync.replicas +1, 最大限度保證系統(tǒng)可用性。
?Consumer 端解決方案
在剖析 Consumer 端丟失場(chǎng)景的時(shí)候,我們得出其拉取完消息后是需要提交 Offset 位移信息的,因此為了不丟數(shù)據(jù),正確的做法是:拉取數(shù)據(jù)、業(yè)務(wù)邏輯處理、提交消費(fèi) Offset 位移信息。
我們還需要設(shè)置參數(shù)?enable.auto.commit = false, 采用手動(dòng)提交位移的方式。
另外對(duì)于消費(fèi)消息重復(fù)的情況,業(yè)務(wù)自己保證冪等性,?保證只成功消費(fèi)一次即可。
05?總結(jié)
這里,我們一起來總結(jié)一下這篇文章的重點(diǎn)。
1、從 Kafka 整體架構(gòu)上概述了可能發(fā)生數(shù)據(jù)丟失的環(huán)節(jié)。
2、帶你剖析了「消息傳遞語義」的概念, 確定了 Kafka 只對(duì)「已提交」的消息做「最大限度的持久化保證不丟失」。
3、帶你剖析了 Producer、Broker、Consumer 三端可能導(dǎo)致數(shù)據(jù)丟失的場(chǎng)景以及具體的高可靠解決方案。
如果我的文章對(duì)你有所幫助,還請(qǐng)幫忙點(diǎn)贊、在看、轉(zhuǎn)發(fā)一下,非常感謝!
堅(jiān)持總結(jié),?持續(xù)輸出高質(zhì)量文章??關(guān)注我:?華仔聊技術(shù)