最美情侣中文字幕电影,在线麻豆精品传媒,在线网站高清黄,久久黄色视频

歡迎光臨散文網(wǎng) 會(huì)員登陸 & 注冊(cè)

RocketMQ 主從復(fù)制

2023-06-30 21:41 作者:Cpp程序員  | 我要投稿

提到主從復(fù)制,我們可能立馬會(huì)聯(lián)想到 MySQL 的主從復(fù)制。

MySQL 主從復(fù)制是 MySQL 高可用機(jī)制之一,數(shù)據(jù)可以從數(shù)據(jù)庫服務(wù)器主節(jié)點(diǎn)復(fù)制到一個(gè)或多個(gè)從節(jié)點(diǎn)。

這篇文章,我們聊聊 RocketMQ 的主從復(fù)制,希望你讀完之后,能夠理解主從復(fù)制的精髓。

1 同步與異步

在 RocketMQ 的集群模式中,Broker 分為 Master 與 Slave,一個(gè) Master 可以對(duì)應(yīng)多個(gè) Slave,但是一個(gè) Slave 只能對(duì)應(yīng)一個(gè) Master。

每個(gè) Broker 與 Name Server 集群中的所有節(jié)點(diǎn)建立長(zhǎng)連接,定時(shí)注冊(cè) Topic 信息到所有 Name Server。

Master 節(jié)點(diǎn)負(fù)責(zé)接收客戶端的寫入請(qǐng)求,并將消息持久化到磁盤上。而 Slave 節(jié)點(diǎn)則負(fù)責(zé)從 Master 節(jié)點(diǎn)復(fù)制消息數(shù)據(jù),并保持與 Master 節(jié)點(diǎn)的同步。

  • 同步復(fù)制

生產(chǎn)者發(fā)送消息后,Master 接收到存儲(chǔ)消息請(qǐng)求,將消息數(shù)據(jù)同步給 Slave 后,才將存儲(chǔ)結(jié)果返回給生產(chǎn)者。同步復(fù)制模式下,發(fā)送消息會(huì)有一定延遲,系統(tǒng)吞吐量也會(huì)降低。

  • 異步復(fù)制

生產(chǎn)者發(fā)送消息后,Master 接收到存儲(chǔ)消息請(qǐng)求,將消息存儲(chǔ)后,直接將存儲(chǔ)結(jié)果返回給生產(chǎn)者。 Master 和 Slave 再通過異步的方式同步數(shù)據(jù),這種復(fù)制模式具有較小的延遲,可以實(shí)現(xiàn)比較高的吞吐量。

若 Master 出現(xiàn)故障,有些數(shù)據(jù)可能未寫入 Slave ,未同步的數(shù)據(jù)可能丟失。

復(fù)制流程分為兩個(gè)部分:元數(shù)據(jù)復(fù)制消息數(shù)據(jù)復(fù)制。

  • 主從服務(wù)器同步主題,消費(fèi)者進(jìn)度,延遲消費(fèi)進(jìn)度,消費(fèi)者配置數(shù)據(jù)

  • 主從服務(wù)器同步消息數(shù)據(jù)

2 元數(shù)據(jù)復(fù)制

Slave Broker 定時(shí)任務(wù)每隔 10 秒會(huì)同步元數(shù)據(jù),包括主題消費(fèi)進(jìn)度,延遲消費(fèi)進(jìn)度消費(fèi)者配置。

同步主題時(shí), Slave Broker 向 Master Broker 發(fā)送 RPC 請(qǐng)求,返回?cái)?shù)據(jù)后,首先加入本地緩存里,然后持久化到本地。

3 消息數(shù)據(jù)復(fù)制

下圖是 Master 和 Slave 消息數(shù)據(jù)同步的流程圖。

1、Master 啟動(dòng)后監(jiān)聽指定端口;

Master 啟動(dòng)后創(chuàng)建 AcceptSocketService 服務(wù) , 用來創(chuàng)建客戶端到服務(wù)端的 TCP 鏈接。

RocketMQ 抽象了鏈接對(duì)象 HAConnection , HAConnection 會(huì)啟動(dòng)兩個(gè)線程,分別用于讀服務(wù)和寫服務(wù):

  • 讀服務(wù):處理 Slave 發(fā)送的請(qǐng)求

  • 寫服務(wù):用于向 Slave 傳輸數(shù)據(jù)

2、Slave 啟動(dòng)后,嘗試連接 Master ,建立 TCP 連接;

HAClient 是客戶端 Slave 的核心類 ,負(fù)責(zé)和 Master 創(chuàng)建連接和數(shù)據(jù)交互。

客戶端在啟動(dòng)后,首先嘗試連接 Master , 查詢當(dāng)前消息存儲(chǔ)中最大的物理偏移量 ,并存儲(chǔ)在變量 currentReportedOffset 里。

3、Slave 判定拉取間隔是否大于 5 秒,則向 Master 匯報(bào)已拉取消息偏移量;

上報(bào)進(jìn)度的數(shù)據(jù)格式是一個(gè) Long 類型的 Offset , 8個(gè)字節(jié) , 非常簡(jiǎn)潔 。

發(fā)送到 Socket 緩沖區(qū)后 , 修改最后一次的寫時(shí)間 lastWriteTimestamp 。

4、Master 解析請(qǐng)求偏移量,從消息文件中檢索該偏移量后的所有消息;

當(dāng) Slave 上報(bào)數(shù)據(jù)到 Master 時(shí),觸發(fā) SelectionKey.OP_READ 事件,Master 將請(qǐng)求交由 ReadSocketService 服務(wù)處理:

當(dāng) Slave Broker 傳遞了自身 commitlog 的 maxPhyOffset 時(shí),Master 會(huì)馬上中斷?selector.select(1000),執(zhí)行?processReadEvent?方法。

processReadEvent 方法的核心邏輯是設(shè)置 Slave 的當(dāng)前進(jìn)度 offset ,然后通知復(fù)制線程當(dāng)前的復(fù)制進(jìn)度。

寫服務(wù) WriteSocketService 從消息文件中檢索該偏移量后的所有消息,并將消息數(shù)據(jù)發(fā)送給 Slave。

5、Slave 接收到數(shù)據(jù),將消息數(shù)據(jù) append 到消息文件 commitlog 里 。

首先 HAClient 類中調(diào)用 dispatchReadRequest 方法 , 解析出消息數(shù)據(jù) ;

然后將消息數(shù)據(jù) append 到本地的消息存儲(chǔ)。

4 同步的實(shí)現(xiàn)

從數(shù)據(jù)復(fù)制流程圖,我們發(fā)覺數(shù)據(jù)復(fù)制本身就是一個(gè)異步執(zhí)行的,但是同步是如何實(shí)現(xiàn)的呢?

Master Broker 接收到寫入消息的請(qǐng)求后 ,調(diào)用 Commitlog 的 aysncPutMessage 方法寫入消息。

這段代碼中,當(dāng) commitLog 執(zhí)行完 appendMessage 后, 需要執(zhí)行刷盤任務(wù)同步復(fù)制兩個(gè)任務(wù)。

但這兩個(gè)任務(wù)并不是同步執(zhí)行,而是異步的方式,使用了 CompletableFuture 這個(gè)異步神器。

當(dāng) HAConnection 讀服務(wù)接收到 Slave 的進(jìn)度反饋,發(fā)現(xiàn)消息數(shù)據(jù)復(fù)制成功,則喚醒 future 。

最后 Broker 組裝響應(yīng)命令 ,并將響應(yīng)命令返回給客戶端。

5 總結(jié)

1、主從復(fù)制包含元數(shù)據(jù)復(fù)制和消息數(shù)據(jù)復(fù)制兩個(gè)部分;

2、元數(shù)據(jù)復(fù)制

Slave Broker 定時(shí)任務(wù)每隔 10 秒向 Master Broker 發(fā)送 RPC 請(qǐng)求,將元數(shù)據(jù)同步到緩存后,然后持久化到磁盤里;

3、消息數(shù)據(jù)復(fù)制

  • Master 啟動(dòng)監(jiān)聽指定端口

  • Slave 啟動(dòng) HaClient 服務(wù),和 Master 創(chuàng)建 TCP 鏈接

  • Slave 向 Master 上報(bào)存儲(chǔ)進(jìn)度

  • Master 接收進(jìn)度,消息文件中檢索該偏移量后的所有消息,并傳輸給 Slave

  • Slave 接收到數(shù)據(jù)后,將消息數(shù)據(jù) append 到本地的消息存儲(chǔ)。

4、同步的實(shí)現(xiàn)

當(dāng) commitLog 執(zhí)行完 appendMessage 后, 需要執(zhí)行刷盤任務(wù)同步復(fù)制兩個(gè)任務(wù),這里用到了 CompletableFuture 這個(gè)異步神器。

當(dāng) HAConnection 讀服務(wù)接收到 Slave 的進(jìn)度反饋,發(fā)現(xiàn)消息數(shù)據(jù)復(fù)制成功,則喚醒 future 。最后 Broker 組裝響應(yīng)命令 ,并將響應(yīng)命令 返回給客戶端 。


RocketMQ 主從復(fù)制的評(píng)論 (共 條)

分享到微博請(qǐng)遵守國(guó)家法律
屯留县| 林芝县| 都昌县| 马公市| 当雄县| 汉川市| 南澳县| 永济市| 周至县| 阿拉尔市| 临沂市| 鄂州市| 潼南县| 绥江县| 黄龙县| 澳门| 抚顺市| 余姚市| 淮安市| 濮阳县| 寿光市| 铅山县| 阿拉善右旗| 印江| 禹城市| 安吉县| 太原市| 汉寿县| 上栗县| 金华市| 房产| 东阳市| 调兵山市| 五华县| 石城县| 镇江市| 民乐县| 泰州市| 松桃| 新河县| 辽宁省|