萬字長文講透 RocketMQ 的消費邏輯
RocketMQ 是筆者非常喜歡的消息隊列,4.9.X 版本是目前使用最廣泛的版本,但它的消費邏輯相對較重,很多同學(xué)學(xué)習(xí)起來沒有頭緒。
這篇文章,筆者梳理了 RocketMQ 的消費邏輯,希望對大家有所啟發(fā)。

1 架構(gòu)概覽
在展開集群消費邏輯細(xì)節(jié)前,我們先對 RocketMQ 4.9.X 架構(gòu)做一個概覽。

整體架構(gòu)中包含四種角色?:
1、NameServer
名字服務(wù)是是一個幾乎無狀態(tài)節(jié)點,可集群部署,節(jié)點之間無任何信息同步。它是一個非常簡單的 Topic 路由注冊中心,其角色類似 Dubbo 中的 zookeeper ,支持 Broker 的動態(tài)注冊與發(fā)現(xiàn)。
2、BrokerServer
Broker 主要負(fù)責(zé)消息的存儲、投遞和查詢以及服務(wù)高可用保證 。
3、Producer
消息發(fā)布的角色,Producer 通過 MQ 的負(fù)載均衡模塊選擇相應(yīng)的 Broker 集群隊列進行消息投遞,投遞的過程支持快速失敗并且低延遲。
4、Consumer
消息消費的角色,支持以 push 推,pull 拉兩種模式對消息進行消費。
RocketMQ 集群工作流程:
1、啟動 NameServer,NameServer 起來后監(jiān)聽端口,等待 Broker、Producer 、Consumer 連上來,相當(dāng)于一個路由控制中心。
2、Broker 啟動,跟所有的 NameServer 保持長連接,定時發(fā)送心跳包。心跳包中包含當(dāng)前 Broker信息( IP+端口等 )以及存儲所有 Topic 信息。注冊成功后,NameServer 集群中就有 Topic 跟 Broker 的映射關(guān)系。
3、收發(fā)消息前,先創(chuàng)建 Topic,創(chuàng)建 Topic 時需要指定該 Topic 要存儲在哪些 Broker 上,也可以在發(fā)送消息時自動創(chuàng)建 Topic。
4、Producer 發(fā)送消息,啟動時先跟 NameServer 集群中的其中一臺建立長連接,并從 NameServer 中獲取當(dāng)前發(fā)送的 Topic 存在哪些 Broker 上,輪詢從隊列列表中選擇一個隊列,然后與隊列所在的 Broker 建立長連接從而向 Broker 發(fā)消息。
5、Consumer 跟 Producer 類似,跟其中一臺 NameServer 建立長連接,獲取當(dāng)前訂閱 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立連接通道,開始消費消息。
2 發(fā)布訂閱
RocketMQ 的傳輸模型是:發(fā)布訂閱模型?。
發(fā)布訂閱模型具有如下特點:
消費獨立
相比隊列模型的匿名消費方式,發(fā)布訂閱模型中消費方都會具備的身份,一般叫做訂閱組(訂閱關(guān)系),不同訂閱組之間相互獨立不會相互影響。
一對多通信
基于獨立身份的設(shè)計,同一個主題內(nèi)的消息可以被多個訂閱組處理,每個訂閱組都可以拿到全量消息。因此發(fā)布訂閱模型可以實現(xiàn)一對多通信。
RocketMQ 支持兩種消息模式:集群消費( Clustering )和廣播消費( Broadcasting )。
集群消費:同一 Topic 下的一條消息只會被同一消費組中的一個消費者消費。也就是說,消息被負(fù)載均衡到了同一個消費組的多個消費者實例上。

廣播消費:當(dāng)使用廣播消費模式時,每條消息推送給集群內(nèi)所有的消費者,保證消息至少被每個消費者消費一次。

為了實現(xiàn)這種發(fā)布訂閱模型 , RocketMQ 精心設(shè)計了它的存儲模型。先進入 Broker 的文件存儲目錄。

RocketMQ 采用的是混合型的存儲結(jié)構(gòu)。
1、Broker 單個實例下所有的隊列共用一個數(shù)據(jù)文件(commitlog)來存儲
生產(chǎn)者發(fā)送消息至 Broker 端,然后 Broker 端使用同步或者異步的方式對消息刷盤持久化,保存至 commitlog 文件中。只要消息被刷盤持久化至磁盤文件 commitlog 中,那么生產(chǎn)者發(fā)送的消息就不會丟失。
單個文件大小默認(rèn) 1G , 文件名長度為 20 位,左邊補零,剩余為起始偏移量,比如 00000000000000000000 代表了第一個文件,起始偏移量為 0 ,文件大小為1 G = 1073741824 。

這種設(shè)計有兩個優(yōu)點:
充分利用順序?qū)懀蟠筇嵘龑懭霐?shù)據(jù)的吞吐量;
快讀定位消息。
因為消息是一條一條寫入到 commitlog 文件 ,寫入完成后,我們可以得到這條消息的物理偏移量。
每條消息的物理偏移量是唯一的, commitlog 文件名是遞增的,可以根據(jù)消息的物理偏移量通過二分查找,定位消息位于那個文件中,并獲取到消息實體數(shù)據(jù)。
2、Broker 端的后臺服務(wù)線程會不停地分發(fā)請求并異步構(gòu)建 consumequeue(消費文件)和 indexfile(索引文件)
進入索引文件存儲目錄 :

1、消費文件按照主題存儲,每個主題下有不同的隊列,圖中主題 my-mac-topic 有 16 個隊列 (0 到 15) ;
2、每個隊列目錄下 ,存儲 consumequeue 文件,每個 consumequeue 文件也是順序?qū)懭?,?shù)據(jù)格式見下圖。

每個 consumequeue 文件包含 30 萬個條目,每個條目大小是 20 個字節(jié),每個文件的大小是 30 萬 * 20 = 60萬字節(jié),每個文件大小約 5.72M 。
和 commitlog 文件類似,consumequeue 文件的名稱也是以偏移量來命名的,可以通過消息的邏輯偏移量定位消息位于哪一個文件里。
消費文件按照主題-隊列來保存 ,這種方式特別適配發(fā)布訂閱模型。
消費者從 Broker 獲取訂閱消息數(shù)據(jù)時,不用遍歷整個 commitlog 文件,只需要根據(jù)邏輯偏移量從 consumequeue 文件查詢消息偏移量 , ?最后通過定位到 commitlog 文件, 獲取真正的消息數(shù)據(jù)。
要實現(xiàn)發(fā)布訂閱模型,還需要一個重要文件:消費進度文件。原因有兩點:
不同消費組之間相互獨立,不會相互影響 ;
消費者下次拉取數(shù)據(jù)時,需要知道從哪個進度開始拉取 ,就像我們小時候玩單機游戲存盤一樣。
因此消費進度文件需要保存消費組所訂閱主題的消費進度。
我們?yōu)g覽下集群消費場景下的 Broker 端的消費進度文件?consumerOffset.json?。


在進度文件 consumerOffset.json 里,數(shù)據(jù)以 key-value 的結(jié)構(gòu)存儲,key 表示:主題@消費者組 , value 是 consumequeue 中每個隊列對應(yīng)的邏輯偏移量 。
寫到這里,我們粗糙模擬下 RocketMQ?存儲模型如何滿足發(fā)布訂閱模型(集群模式)?。

1、發(fā)送消息:生產(chǎn)者發(fā)送消息到 Broker ;
2、保存消息:Broker 將消息存儲到 commitlog 文件 ,異步線程會構(gòu)建消費文件 consumequeue ;
3、消費流程:消費者啟動后,會通過負(fù)載均衡分配對應(yīng)的隊列,然后向 Broker 發(fā)送拉取消息請求。Broker 收到消費者拉取請求之后,根據(jù)訂閱組,消費者編號,主題,隊列名,邏輯偏移量等參數(shù) ,從該主題下的 consumequeue 文件查詢消息消費條目,然后從 commitlog 文件中獲取消息實體。消費者在收到消息數(shù)據(jù)之后,執(zhí)行消費監(jiān)聽器,消費完消息;
4、保存進度:消費者將消費進度提交到 Broker ,Broker 會將該消費組的消費進度存儲在進度文件里。
3 消費流程
我們重點講解下集群消費的消費流程 ,因為集群消費是使用最普遍的消費模式,理解了集群消費,廣播消費也就能順理成章的掌握了。

集群消費示例代碼里,啟動消費者,我們需要配置三個核心屬性:消費組名、訂閱主題、消息監(jiān)聽器,最后調(diào)用 start 方法啟動。
消費者啟動后,我們可以將整個流程簡化成:

4 負(fù)載均衡
消費端的負(fù)載均衡是指將 Broker 端中多個隊列按照某種算法分配給同一個消費組中的不同消費者,負(fù)載均衡是客戶端開始消費的起點。
RocketMQ 負(fù)載均衡的核心設(shè)計理念是
消費隊列在同一時間只允許被同一消費組內(nèi)的一個消費者消費
一個消費者能同時消費多個消息隊列
負(fù)載均衡是每個客戶端獨立進行計算,那么何時觸發(fā)呢 ?

消費端啟動時,立即進行負(fù)載均衡;
消費端定時任務(wù)每隔 20 秒觸發(fā)負(fù)載均衡;
消費者上下線,Broker 端通知消費者觸發(fā)負(fù)載均衡。
負(fù)載均衡流程如下:
1、發(fā)送心跳
消費者啟動后,它就會通過定時任務(wù)不斷地向 RocketMQ 集群中的所有 Broker 實例發(fā)送心跳包(消息消費分組名稱、訂閱關(guān)系集合、消息通信模式和客戶端實例編號等信息)。
Broker 端在收到消費者的心跳消息后,會將它維護在 ConsumerManager 的本地緩存變量 consumerTable,同時并將封裝后的客戶端網(wǎng)絡(luò)通道信息保存在本地緩存變量 channelInfoTable 中,為之后做消費端的負(fù)載均衡提供可以依據(jù)的元數(shù)據(jù)信息。
2、啟動負(fù)載均衡服務(wù)
負(fù)載均衡服務(wù)會根據(jù)消費模式為”廣播模式”還是“集群模式”做不同的邏輯處理,這里主要來看下集群模式下的主要處理流程:
(1) 獲取該主題下的消息消費隊列集合;
(2) 查詢 Broker 端獲取該消費組下消費者 Id 列表;
(3) 先對 Topic 下的消息消費隊列、消費者 Id 排序,然后用消息隊列分配策略算法(默認(rèn)為:消息隊列的平均分配算法),計算出待拉取的消息隊列;


這里的平均分配算法,類似于分頁的算法,將所有 MessageQueue 排好序類似于記錄,將所有消費端排好序類似頁數(shù),并求出每一頁需要包含的平均 size 和每個頁面記錄的范圍 range ,最后遍歷整個 range 而計算出當(dāng)前消費端應(yīng)該分配到的記錄。
(4) 分配到的消息隊列集合與 processQueueTable 做一個過濾比對操作。

消費者實例內(nèi) ,processQueueTable 對象存儲著當(dāng)前負(fù)載均衡的隊列 ,以及該隊列的處理隊列 processQueue (消費快照)。
標(biāo)紅的 Entry 部分表示與分配到的消息隊列集合互不包含,則需要將這些紅色隊列 Dropped 屬性為 true , 然后從 processQueueTable 對象中移除。
綠色的 Entry 部分表示與分配到的消息隊列集合的交集,processQueueTable 對象中已經(jīng)存在該隊列。
黃色的 Entry 部分表示這些隊列需要添加到 processQueueTable 對象中,為每個分配的新隊列創(chuàng)建一個消息拉取請求 ?
pullRequest
??, ?在消息拉取請求中保存一個處理隊列?processQueue
?(隊列消費快照),內(nèi)部是紅黑樹(TreeMap
),用來保存拉取到的消息。
最后創(chuàng)建拉取消息請求列表,并將請求分發(fā)到消息拉取服務(wù),進入拉取消息環(huán)節(jié)。
5 長輪詢
在負(fù)載均衡這一小節(jié),我們已經(jīng)知道負(fù)載均衡觸發(fā)了拉取消息的流程。
消費者啟動的時候,會創(chuàng)建一個拉取消息服務(wù) PullMessageService?,它是一個單線程的服務(wù)。

核心流程如下:
1、負(fù)載均衡服務(wù)將消息拉取請求放入到拉取請求隊列 pullRequestQueue , 拉取消息服務(wù)從隊列中獲取拉取消息請求?;
2、拉取消息服務(wù)向 Brorker 服務(wù)發(fā)送拉取請求 ,拉取請求的通訊模式是異步回調(diào)模式?;

消費者的拉取消息服務(wù)本身就是一個單線程,使用異步回調(diào)模式,發(fā)送拉取消息請求到 Broker 后,拉取消息線程并不會阻塞?,可以繼續(xù)處理隊列 pullRequestQueue 中的其他拉取任務(wù)。
3、Broker 收到消費者拉取消息請求后,從存儲中查詢出消息數(shù)據(jù),然后返回給消費者;
4、消費者的網(wǎng)絡(luò)通訊層會執(zhí)行拉取回調(diào)函數(shù)相關(guān)邏輯,首先會將消息數(shù)據(jù)存儲在隊列消費快照 processQueue 里;
消費快照使用紅黑樹 msgTreeMap?存儲拉取服務(wù)拉取到的消息 。

5、回調(diào)函數(shù)將消費請求提交到消息消費服務(wù)?,而消息消費服務(wù)會異步的消費這些消息;
6、回調(diào)函數(shù)會將處理中隊列的拉取請放入到定時任務(wù)中;
7、定時任務(wù)再次將消息拉取請求放入到隊列 pullRequestQueue 中,形成了閉環(huán):負(fù)載均衡后的隊列總會有任務(wù)執(zhí)行拉取消息請求,不會中斷。
細(xì)心的同學(xué)肯定有疑問:既然消費端是拉取消息,為什么是長輪詢呢??
雖然拉模式的主動權(quán)在消費者這一側(cè),但是缺點很明顯。
因為消費者并不知曉 Broker 端什么時候有新的消息 ,所以會不停地去 Broker 端拉取消息,但拉取頻率過高, Broker 端壓力就會很大,頻率過低則會導(dǎo)致消息延遲。
所以要想消費消息的延遲低,服務(wù)端的推送必不可少。
下圖展示了 RocketMQ 如何通過長輪詢減小拉取消息的延遲。

核心流程如下:
1、Broker 端接收到消費者的拉取消息請求后,拉取消息處理器開始處理請求,根據(jù)拉取請求查詢消息存儲 ;
2、從消息存儲中獲取消息數(shù)據(jù) ,若存在新消息 ,則將消息數(shù)據(jù)通過網(wǎng)絡(luò)返回給消費者。若無新消息,則將拉取請求放入到拉取請求表 pullRequestTable?。
3、長輪詢請求管理服務(wù)?pullRequestHoldService 每隔 5 秒從拉取請求表中判斷拉取消息請求的隊列是否有新的消息。
判定標(biāo)準(zhǔn)是:拉取消息請求的偏移量是否小于當(dāng)前消費隊列最大偏移量,如果條件成立則說明有新消息了。
若存在新的消息 , ?長輪詢請求管理服務(wù)會觸發(fā)拉取消息處理器重新處理該拉取消息請求。
4、當(dāng) commitlog 中新增了新的消息,消息分發(fā)服務(wù)會構(gòu)建消費文件和索引文件,并且會通知長輪詢請求管理服務(wù),觸發(fā)拉取消息處理器重新處理該拉取消息請求。
6 消費消息
在拉取消息的流程里, Broker 端返回消息數(shù)據(jù),消費者的通訊框架層會執(zhí)行回調(diào)函數(shù)。
回調(diào)線程會將數(shù)據(jù)存儲在隊列消費快照 processQueue(內(nèi)部使用紅黑樹 msgTreeMap)里,然后將消息提交到消費消息服務(wù),消費消息服務(wù)會異步消費這些消息。

消息消費服務(wù)有兩種類型:并發(fā)消費服務(wù)和順序消費服務(wù)?。

6.1 并發(fā)消費
并發(fā)消費是指消費者將并發(fā)消費消息,消費的時候可能是無序的。
消費消息并發(fā)服務(wù)啟動后,會初始化三個組件:消費線程池、清理過期消息定時任務(wù)、處理失敗消息定時任務(wù)。

核心流程如下:
0、通訊框架回調(diào)線程會將數(shù)據(jù)存儲在消費快照里,然后將消息列表 msgList 提交到消費消息服務(wù)
1、 消息列表 msgList 組裝成消費對象
2、將消費對象提交到消費線程池

我們看到10 條消息被組裝成三個消費請求對象,不同的消費線程會執(zhí)行不同的消費請求對象。
3、消費線程執(zhí)行消息監(jiān)聽器

執(zhí)行完消費監(jiān)聽器,會返回消費結(jié)果。

4、處理異常消息

當(dāng)消費異常時,異常消息將重新發(fā)回 Broker 端的重試隊列( RocketMQ 會為每個 topic 創(chuàng)建一個重試隊列,以 %RETRY% 開頭),達到重試時間后將消息投遞到重試隊列中進行消費重試。
我們將在重試機制這一節(jié)重點講解 RocketMQ 如何實現(xiàn)延遲消費功能 。
假如異常的消息發(fā)送到 Broker 端失敗,則重新將這些失敗消息通過處理失敗消息定時任務(wù)重新提交到消息消費服務(wù)。
5、更新本地消費進度
消費者消費一批消息完成之后,需要保存消費進度到進度管理器的本地內(nèi)存。

首先我們會從隊列消費快照 processQueue 中移除消息,返回消費快照 msgTreeMap 第一個偏移量 ,然后調(diào)用消費消息進度管理器 offsetStore 更新消費進度。
待更新的偏移量是如何計算的呢?

場景1:快照中1001(消息1)到1010(消息10)消費了,快照中沒有了消息,返回已消費的消息最大偏移量 + 1 也就是1011。

場景2:快照中1001(消息1)到1008(消息8)消費了,快照中只剩下兩條消息了,返回最小的偏移量 1009。

場景3:1001(消息1)在消費對象中因為某種原因一直沒有被消費,即使后面的消息1005-1010都消費完成了,返回的最小偏移量是1001。

在場景3,RocketMQ 為了保證消息肯定被消費成功,消費進度只能維持在1001(消息1),直到1001也被消費完,本地的消費進度才會一下子更新到1011。
假設(shè)1001(消息1)還沒有消費完成,消費者實例突然退出(機器斷電,或者被 kill ),就存在重復(fù)消費的風(fēng)險。
因為隊列的消費進度還是維持在1001,當(dāng)隊列重新被分配給新的消費者實例的時候,新的實例從 Broker 上拿到的消費進度還是維持在1001,這時候就會又從1001開始消費,1001-1010這批消息實際上已經(jīng)被消費過還是會投遞一次。
所以業(yè)務(wù)必須要保證消息消費的冪等性。
寫到這里,我們會有一個疑問:假設(shè)1001(消息1)因為加鎖或者消費監(jiān)聽器邏輯非常耗時,導(dǎo)致極長時間沒有消費完成,那么消費進度就會一直卡住 ,怎么解決呢 ?
RocketMQ 提供兩種方式一起配合解決:
拉取服務(wù)根據(jù)并發(fā)消費間隔配置限流

拉取消息服務(wù)在拉取消息時候,會判斷當(dāng)前隊列的 processQueue 消費快照里消息的最大偏移量 - 消息的最小偏移量大于消費并發(fā)間隔(2000)的時候 , 就會觸發(fā)流控 , ?這樣就可以避免消費者無限循環(huán)的拉取新的消息。
清理過期消息

消費消息并發(fā)服務(wù)啟動后,會定期掃描所有消費的消息,若當(dāng)前時間減去開始消費的時間大于消費超時時間,首先會將過期消息發(fā)送 sendMessageBack 命令發(fā)送到 Broker ,然后從快照中刪除該消息。
6.2 順序消費
順序消息是指對于一個指定的 Topic ,消息嚴(yán)格按照先進先出(FIFO)的原則進行消息發(fā)布和消費,即先發(fā)布的消息先消費,后發(fā)布的消息后消費。
順序消息分為分區(qū)順序消息和全局順序消息。
1、分區(qū)順序消息
對于指定的一個 Topic ,所有消息根據(jù) Sharding Key 進行區(qū)塊分區(qū),同一個分區(qū)內(nèi)的消息按照嚴(yán)格的先進先出(FIFO)原則進行發(fā)布和消費。同一分區(qū)內(nèi)的消息保證順序,不同分區(qū)之間的消息順序不做要求。
適用場景:適用于性能要求高,以 Sharding Key 作為分區(qū)字段,在同一個區(qū)塊中嚴(yán)格地按照先進先出(FIFO)原則進行消息發(fā)布和消費的場景。
示例:電商的訂單創(chuàng)建,以訂單 ID 作為 Sharding Key ,那么同一個訂單相關(guān)的創(chuàng)建訂單消息、訂單支付消息、訂單退款消息、訂單物流消息都會按照發(fā)布的先后順序來消費。
2、全局順序消息
對于指定的一個 Topic ,所有消息按照嚴(yán)格的先入先出(FIFO)的順序來發(fā)布和消費。
適用場景:適用于性能要求不高,所有的消息嚴(yán)格按照 FIFO 原則來發(fā)布和消費的場景。
示例:在證券處理中,以人民幣兌換美元為 Topic,在價格相同的情況下,先出價者優(yōu)先處理,則可以按照 FIFO 的方式發(fā)布和消費全局順序消息。
全局順序消息實際上是一種特殊的分區(qū)順序消息,即 Topic 中只有一個分區(qū),因此全局順序和分區(qū)順序的實現(xiàn)原理相同。
因為分區(qū)順序消息有多個分區(qū),所以分區(qū)順序消息比全局順序消息的并發(fā)度和性能更高。

消息的順序需要由兩個階段保證:
消息發(fā)送
如上圖所示,A1、B1、A2、A3、B2、B3 是訂單 A 和訂單 B 的消息產(chǎn)生的順序,業(yè)務(wù)上要求同一訂單的消息保持順序,例如訂單 A 的消息發(fā)送和消費都按照 A1、A2、A3 的順序。
如果是普通消息,訂單A 的消息可能會被輪詢發(fā)送到不同的隊列中,不同隊列的消息將無法保持順序,而順序消息發(fā)送時 RocketMQ 支持將 Sharding Key 相同(例如同一訂單號)的消息序路由到同一個隊列中。
下圖是生產(chǎn)者發(fā)送順序消息的封裝,原理是發(fā)送消息時,實現(xiàn) MessageQueueSelector 接口,?根據(jù) Sharding Key 使用 Hash 取模法來選擇待發(fā)送的隊列。

消息消費
消費者消費消息時,需要保證單線程消費每個隊列的消息數(shù)據(jù),從而實現(xiàn)消費順序和發(fā)布順序的一致。
順序消費服務(wù)的類是?ConsumeMessageOrderlyService?,在負(fù)載均衡階段,并發(fā)消費和順序消費并沒有什么大的差別。
最大的差別在于:順序消費會向 Borker 申請鎖?。消費者根據(jù)分配的隊列 messageQueue ,向 Borker 申請鎖 ,如果申請成功,則會拉取消息,如果失敗,則定時任務(wù)每隔20秒會重新嘗試。

順序消費核心流程如下:
1、 組裝成消費對象
2、 將請求對象提交到消費線程池

和并發(fā)消費不同的是,這里的消費請求包含消費快照 processQueue ,消息隊列 messageQueue 兩個對象,并不對消息列表做任何處理。
3、 消費線程內(nèi),對消費隊列加鎖

順序消費也是通過線程池消費的,synchronized 鎖用來保證同一時刻對于同一個隊列只有一個線程去消費它
4、 從消費快照中取得待消費的消息列表

消費快照 processQueue 對象里,創(chuàng)建了一個紅黑樹對象 consumingMsgOrderlyTreeMap 用于臨時存儲的待消費的消息。
5、 執(zhí)行消息監(jiān)聽器

消費快照的消費鎖 consumeLock?的作用是:防止負(fù)載均衡線程把當(dāng)前消費的 MessageQueue 對象移除掉。
6、 處理消費結(jié)果
消費成功時,首先計算需要提交的偏移量,然后更新本地消費進度。

消費失敗時,分兩種場景:
假如已消費次數(shù)小于最大重試次數(shù),則將對象 consumingMsgOrderlyTreeMap 中臨時存儲待消費的消息,重新加入到消費快照紅黑樹 msgTreeMap?中,然后使用定時任務(wù)嘗試重新消費。
假如已消費次數(shù)大于等于最大重試次數(shù),則將失敗消息發(fā)送到 Broker ,Broker 接收到消息后,會加入到死信隊列里 , 最后計算需要提交的偏移量,然后更新本地消費進度。
我們做一個關(guān)于順序消費的總結(jié) :
順序消費需要由兩個階段消息發(fā)送和消息消費協(xié)同配合,底層支撐依靠的是 RocketMQ 的存儲模型;
順序消費服務(wù)啟動后,隊列的數(shù)據(jù)都會被消費者實例單線程的執(zhí)行消費;
假如消費者擴容,消費者重啟,或者 Broker 宕機 ,順序消費也會有一定幾率較短時間內(nèi)亂序,所以消費者的業(yè)務(wù)邏輯還是要保障冪等。
7 保存進度
RocketMQ 消費者消費完一批數(shù)據(jù)后, 會將隊列的進度保存在本地內(nèi)存,但還需要將隊列的消費進度持久化。
1、 集群模式

集群模式下,分兩種場景:
拉取消息服務(wù)會在拉取消息時,攜帶該隊列的消費進度,提交給 Broker 的拉取消息處理器。
消費者定時任務(wù),每隔5秒將本地緩存中的消費進度提交到 Broker 的消費者管理處理器。
Broker 的這兩個處理器都調(diào)用消費者進度管理器 consumerOffsetManager 的 commitOffset 方法,定時任務(wù)異步將消費進度持久化到消費進度文件 consumerOffset.json 中。

2、 廣播模式
廣播模式消費進度存儲在消費者本地,定時任務(wù)每隔 5 秒通過 LocalFileOffsetStore 持久化到本地文件offsets.json
?,數(shù)據(jù)格式為?MessageQueue:Offset?
。

廣播模式下,消費進度和消費組沒有關(guān)系,本地文件?offsets.json
?存儲在配置的目錄,文件中包含訂閱主題中所有的隊列以及隊列的消費進度。
8 重試機制
集群消費下,重試機制的本質(zhì)是 RocketMQ 的延遲消息功能。
消費消息失敗后,消費者實例會通過?CONSUMER_SEND_MSG_BACK?請求,將失敗消息發(fā)回到 Broker 端。
Broker 端會為每個 topic 創(chuàng)建一個重試隊列?,隊列名稱是:%RETRY% + 消費者組名 ,達到重試時間后將消息投遞到重試隊列中進行消費重試(消費者組會自動訂閱重試 Topic)。最多重試消費 16 次,重試的時間間隔逐漸變長,若達到最大重試次數(shù)后消息還沒有成功被消費,則消息將被投遞至死信隊列。
第幾次重試與上次重試的間隔時間第幾次重試與上次重試的間隔時間110 秒97 分鐘230 秒108 分鐘31 分鐘119 分鐘42 分鐘1210 分鐘53 分鐘1320 分鐘64 分鐘1430 分鐘75 分鐘151 小時86 分鐘162 小時

開源 RocketMQ 4.X 支持延遲消息,默認(rèn)支持18 個 level 的延遲消息,這是通過 broker 端的 messageDelayLevel 配置項確定的,如下:

Broker 在啟動時,內(nèi)部會創(chuàng)建一個內(nèi)部主題:SCHEDULE_TOPIC_XXXX,根據(jù)延遲 level 的個數(shù),創(chuàng)建對應(yīng)數(shù)量的隊列,也就是說18個 level 對應(yīng)了18個隊列。
我們先梳理下延遲消息的實現(xiàn)機制。
1、生產(chǎn)者發(fā)送延遲消息
2、Broker端存儲延遲消息
延遲消息在 RocketMQ Broker 端的流轉(zhuǎn)如下圖所示:

第一步:修改消息 Topic 名稱和隊列信息
Broker 端接收到生產(chǎn)者的寫入消息請求后,首先都會將消息寫到 commitlog 中。假如是正常非延遲消息,MessageStore 會根據(jù)消息中的 Topic 信息和隊列信息,將其轉(zhuǎn)發(fā)到目標(biāo) Topic 的指定隊列 consumequeue 中。
但由于消息一旦存儲到 consumequeue 中,消費者就能消費到,而延遲消息不能被立即消費,所以 RocketMQ 將 Topic 的名稱修改為SCHEDULE_TOPIC_XXXX,并根據(jù)延遲級別確定要投遞到哪個隊列下。
同時,還會將消息原來要發(fā)送到的目標(biāo) Topic 和隊列信息存儲到消息的屬性中。

第二步:構(gòu)建 consumequeue 文件時,計算并存儲投遞時間


上圖是 consumequeue 文件一條消息的格式,最后 8 個字節(jié)存儲 Tag 的哈希值,此時存儲消息的投遞時間。
第三步:定時調(diào)度服務(wù)啟動
ScheduleMessageService 類是一個定時調(diào)度服務(wù),讀取 SCHEDULE_TOPIC_XXXX 隊列的消息,并將消息投遞到目標(biāo) Topic 中。
定時調(diào)度服務(wù)啟動時,創(chuàng)建一個定時調(diào)度線程池 ,并根據(jù)延遲級別的個數(shù),啟動對應(yīng)數(shù)量的 HandlePutResultTask ,每個 HandlePutResultTask 負(fù)責(zé)一個延遲級別的消費與投遞。

第四步:投遞時間到了,將消息數(shù)據(jù)重新寫入到 commitlog
消息到期后,需要投遞到目標(biāo) Topic 。第一步已經(jīng)記錄了原來的 Topic 和隊列信息,這里需要重新設(shè)置,再存儲到 commitlog 中。
第五步:將消息投遞到目標(biāo) Topic 中
Broker 端的后臺服務(wù)線程會不停地分發(fā)請求并異步構(gòu)建 consumequeue(消費文件)和 indexfile(索引文件)。因此消息會直接投遞到目標(biāo) Topic 的 consumequeue 中,之后消費者就可以消費到這條消息。
回顧了延遲消息的機制,消費消息失敗后,消費者實例會通過?CONSUMER_SEND_MSG_BACK?請求,將失敗消息發(fā)回到 Broker 端。
Broker 端 SendMessageProcessor 處理器會調(diào)用 asyncConsumerSendMsgBack 方法。

首先判斷消息的當(dāng)前重試次數(shù)是否大于等于最大重試次數(shù),如果達到最大重試次數(shù),或者配置的重試級別小于0,則重新創(chuàng)建 Topic ,規(guī)則是?%DLQ% + consumerGroup,后續(xù)處理消息發(fā)送到死信隊列。
正常的消息會進入 else 分支,對于首次重試的消息,默認(rèn)的 delayLevel 是 0 ,RocketMQ 會將 delayLevel + 3,也就是加到 3 ,這就是說,如果沒有顯示的配置延時級別,消息消費重試首次,是延遲了第三個級別發(fā)起的重試,也就是距離首次發(fā)送 10s 后重試,其主題的默認(rèn)規(guī)則是?%RETRY% + consumerGroup。
當(dāng)延時級別設(shè)置完成,刷新消息的重試次數(shù)為當(dāng)前次數(shù)加 1 ,Broker 端將該消息刷盤,邏輯如下:

延遲消息寫入到 commitlog 里 ,這里其實和延遲消息機制的第一步類似,后面按照延遲消息機制的流程執(zhí)行即可(第二步到第六步)。
9 總結(jié)
下圖展示了集群模式下消費者并發(fā)消費流程 :

核心流程如下:
消費者啟動后,觸發(fā)負(fù)載均衡服務(wù) ,負(fù)載均衡服務(wù)為消費者實例分配對應(yīng)的隊列 ;
分配完隊列后,負(fù)載均衡服務(wù)會為每個分配的新隊列創(chuàng)建一個消息拉取請求 ?
pullRequest
??, ?拉取請求保存一個處理隊列?processQueue
,內(nèi)部是紅黑樹(TreeMap
),用來保存拉取到的消息 ;拉取消息服務(wù)單線程從拉取請求隊列 ?
pullRequestQueue
?中彈出拉取消息,執(zhí)行拉取任務(wù) ,拉取請求是異步回調(diào)模式,將拉取到的消息放入到處理隊列;拉取請求在一次拉取消息完成之后會復(fù)用,重新被放入拉取請求隊列?
pullRequestQueue
?中 ;拉取完成后,調(diào)用消費消息服務(wù) ?
consumeMessageService?
的 ?submitConsumeRequest?
方法 ,消費消息服務(wù)內(nèi)部有一個消費線程池;消費線程池的消費線程從消費任務(wù)隊列中獲取消費請求,執(zhí)行消費監(jiān)聽器 ?
listener.consumeMessage
?;消費完成后,若消費成功,則更新偏移量?
updateOffset
,先更新到內(nèi)存?offsetTable
,定時上報到 Broker ;若消費失敗,則將失敗消費發(fā)送到 Broker 。Broker 端接收到請求后, 調(diào)用消費進度管理器的?
commitOffset
?方法修改內(nèi)存的消費進度,定時刷盤到 ?consumerOffset.json
。
RocketMQ 4.X 的消費邏輯有兩個非常明顯的特點:
客戶端代碼邏輯較重。假如要支持一種新的編程語言,那么客戶端就必須實現(xiàn)完整的負(fù)載均衡邏輯,此外還需要實現(xiàn)拉消息、位點管理、消費失敗后將消息發(fā)回 Broker 重試等邏輯。這給多語言客戶端的支持造成很大的阻礙。
保證冪等非常重要。當(dāng)客戶端升級或者下線時,或者 Broker 宕機,都要進行負(fù)載均衡操作,可能造成消息堆積,同時有一定幾率造成重復(fù)消費。
參考資料:
1、RocketMQ 4.9.4 Github 文檔
https://github.com/apache/rocketmq/tree/rocketmq-all-4.9.4/docs
2、RocketMQ 技術(shù)內(nèi)幕
3、消息隊列核心知識點
https://mp.weixin.qq.com/s/v7_ih9X5mG3X4E4ecfgYXA
4、消息ACK機制及消費進度管理
https://zhuanlan.zhihu.com/p/25265380