最美情侣中文字幕电影,在线麻豆精品传媒,在线网站高清黄,久久黄色视频

歡迎光臨散文網(wǎng) 會員登陸 & 注冊

Kafka關(guān)鍵原理

2023-06-09 20:08 作者:程序員-王堅  | 我要投稿

日志分段切分條件

日志分段文件切分包含以下4個條件,滿足其一即可:

  1. 當前日志分段文件的大小超過了broker端參數(shù)?log.segment.bytes?配置的值。log.segment.bytes參數(shù)的默認值為?1073741824,即1GB

  2. 當前日志分段中消息的最小時間戳與當前系統(tǒng)的時間戳的差值大于log.roll.mslog.roll.hours參數(shù)配置的值。如果同時配置了log.roll.mslog.roll.hours參數(shù),那么log.roll.ms的優(yōu)先級高,默認情況下,只配置了log.roll.hours參數(shù),其值為168,即7天。

  3. 偏移量索引文件或時間戳索引文件的大小達到 broker 端參數(shù)log.index.size.max.bytes配置的值。log.index.size .max.bytes的默認值為10485760,即10MB

  4. 追加的消息的偏移量與當前日志分段的起始偏移量之間的差值大于Integer.MAX_VALUE, 即要追加的消息的偏移量不能轉(zhuǎn)變?yōu)橄鄬ζ屏浚╫ffset - baseOffset > Integer.MAX_VALUE)。

什么是Controller

Controller作為Kafka集群中的核心組件,它的主要作用是在Apache ZooKeeper的幫助下管理和協(xié)調(diào)整個Kafka集群。

Controller與Zookeeper進行交互,獲取與更新集群中的元數(shù)據(jù)信息。其他broker并不直接與zookeeper進行通信,而是與Controller進行通信并同步Controller中的元數(shù)據(jù)信息。

Kafka集群中每個節(jié)點都可以充當Controller節(jié)點,但集群中同時只能有一個Controller節(jié)點。

Controller簡單來說,就是kafka集群的狀態(tài)管理者

controller競選機制:簡單說,先來先上!

Broker 在啟動時,會嘗試去 ZooKeeper 中創(chuàng)建 /controller 節(jié)點。Kafka 當前選舉控制器的規(guī)則是:第一個成功創(chuàng)建 /controller 節(jié)點的 Broker 會被指定為控制器。

在Kafka集群中會有一個或者多個broker,其中有一個broker會被選舉為控制器(Kafka Controller),它負責維護整個集群中所有分區(qū)和副本的狀態(tài)及分區(qū)leader的選舉。

當某個分區(qū)的leader副本出現(xiàn)故障時,由控制器負責為該分區(qū)選舉新的leader副本。當檢測到某個分區(qū)的ISR集合發(fā)生變化時,由控制器負責通知所有broker更新其元數(shù)據(jù)信息。當使用kafka-topics.sh腳本為某個topic增加分區(qū)數(shù)量時,同樣還是由控制器負責分區(qū)的重新分配。

Kafka中的控制器選舉的工作依賴于Zookeeper,成功競選為控制器的broker會在Zookeeper中創(chuàng)建/controller這個臨時(EPHEMERAL)節(jié)點,此臨時節(jié)點的內(nèi)容參考如下:

{"version":1,"brokerid":0,"timestamp":"1529210278988"}

其中version在目前版本中固定為1,brokerid表示成為控制器的broker的id編號,timestamp表示競選成為控制器時的時間戳。

在任意時刻,集群中有且僅有一個控制器。每個broker啟動的時候會去嘗試去讀取zookeeper上的/controller節(jié)點的brokerid的值,如果讀取到brokerid的值不為-1,則表示已經(jīng)有其它broker節(jié)點成功競選為控制器,所以當前broker就會放棄競選;如果Zookeeper中不存在/controller這個節(jié)點,或者這個節(jié)點中的數(shù)據(jù)異常,那么就會嘗試去創(chuàng)建/controller這個節(jié)點,當前broker去創(chuàng)建節(jié)點的時候,也有可能其他broker同時去嘗試創(chuàng)建這個節(jié)點,只有創(chuàng)建成功的那個broker才會成為控制器,而創(chuàng)建失敗的broker則表示競選失敗。每個broker都會在內(nèi)存中保存當前控制器的brokerid值,這個值可以標識為activeControllerId。

controller的職責

  • 監(jiān)聽partition相關(guān)變化

對Zookeeper中的/admin/reassign_partitions節(jié)點注冊PartitionReassignmentListener,用來處理分區(qū)重分配的動作。 對Zookeeper中的/isr_change_notification節(jié)點注冊IsrChangeNotificetionListener,用來處理ISR集合變更的動作。 對Zookeeper中的/admin/preferred-replica-election節(jié)點添加PreferredReplicaElectionListener,用來處理優(yōu)先副本選舉。

  • 監(jiān)聽topic增減變化

對Zookeeper中的/brokers/topics節(jié)點添加TopicChangeListener,用來處理topic增減的變化; 對Zookeeper中的/admin/delete_topics節(jié)點添加TopicDeletionListener,用來處理刪除topic的動作

  • 監(jiān)聽broker相關(guān)的變化

對Zookeeper中的/brokers/ids/節(jié)點添加BrokerChangeListener,用來處理broker增減的變化

  • 更新集群的元數(shù)據(jù)信息

從Zookeeper中讀取獲取當前所有與topic、partition以及broker有關(guān)的信息并進行相應的管理。 對各topic所對應的Zookeeper中的/brokers/topics/[topic]節(jié)點添加PartitionModificationsListener,用來監(jiān)聽topic中的分區(qū)分配變化。并將最新信息同步給其他所有broker。

  • 啟動并管理分區(qū)狀態(tài)機和副本狀態(tài)機。

  • 如果參數(shù)auto.leader.rebalance.enable設置為true,則還會開啟一個名為“auto-leader-rebalance-task”的定時任務來負責維護分區(qū)的leader副本的均衡。

分區(qū)的負載分布

客戶端請求創(chuàng)建一個topic時,每一個分區(qū)副本在broker上的分配,是由集群controller來決定;

結(jié)論:里面會創(chuàng)建出來兩個隨機數(shù)

第一個隨機數(shù)確定0號分區(qū)leader的位置,往后1號分區(qū)2號分區(qū)的leader依次往后順延1

第二個隨機數(shù)確定每個分區(qū)的第一個副本的位置 在leader所在機器上往后順延(隨機數(shù)+1)臺機器,該臺機器就是第一個副本的位置,剩余副本依次往后順延1

// 舉例:// broker_id = 0~19 一共20臺機器// 分區(qū)數(shù)20,副本數(shù)10// 第一個隨機數(shù):19// 第二個隨機數(shù):0(0,ArrayBuffer(19, 0, 1, 2, 3, 4, 5, 6, 7, 8)) (1,ArrayBuffer(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)) (2,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) (3,ArrayBuffer(2, 3, 4, 5, 6, 7, 8, 9, 10, 11)) (4,ArrayBuffer(3, 4, 5, 6, 7, 8, 9, 10, 11, 12)) (5,ArrayBuffer(4, 5, 6, 7, 8, 9, 10, 11, 12, 13)) (6,ArrayBuffer(5, 6, 7, 8, 9, 10, 11, 12, 13, 14)) (7,ArrayBuffer(6, 7, 8, 9, 10, 11, 12, 13, 14, 15)) (8,ArrayBuffer(7, 8, 9, 10, 11, 12, 13, 14, 15, 16)) (9,ArrayBuffer(8, 9, 10, 11, 12, 13, 14, 15, 16, 17)) (10,ArrayBuffer(9, 10, 11, 12, 13, 14, 15, 16, 17, 18)) (11,ArrayBuffer(10, 11, 12, 13, 14, 15, 16, 17, 18, 19)) (12,ArrayBuffer(11, 12, 13, 14, 15, 16, 17, 18, 19, 0)) (13,ArrayBuffer(12, 13, 14, 15, 16, 17, 18, 19, 0, 1)) (14,ArrayBuffer(13, 14, 15, 16, 17, 18, 19, 0, 1, 2)) (15,ArrayBuffer(14, 15, 16, 17, 18, 19, 0, 1, 2, 3)) (16,ArrayBuffer(15, 16, 17, 18, 19, 0, 1, 2, 3, 4)) (17,ArrayBuffer(16, 17, 18, 19, 0, 1, 2, 3, 4, 5)) (18,ArrayBuffer(17, 18, 19, 0, 1, 2, 3, 4, 5, 6)) (19,ArrayBuffer(18, 19, 0, 1, 2, 3, 4, 5, 6, 7))// 其分布策略源碼如下:private def assignReplicasToBrokersRackUnaware( nPartitions: Int, //分區(qū)的個數(shù) ? 10replicationFactor: Int, ?//副本的個數(shù) ?5 brokerList: Seq[Int],//broker的集合 ? ?8 ? 0~7fixedStartIndex: Int//默認值是-1 ?固定開始的索引位置startPartitionId: Int): Map[Int, Seq[Int]] //默認值是-1 分區(qū)開始的位置= { ?val ret = mutable.Map[Int, Seq[Int]]() ?val brokerArray = brokerList.toArray ?val startIndex = if (fixedStartIndex >= 0) { ? ? ?fixedStartIndex ?}else { ? ? ? ? ?rand.nextInt(brokerArray.length) ?} ?var currentPartitionId = math.max(0, startPartitionId) ?var nextReplicaShift = if (fixedStartIndex >= 0) { ? ? ? ? ?fixedStartIndex ?}else { ? ? ? ? ?rand.nextInt(brokerArray.length) ?} ?for (_ <- 0 until nPartitions) { ? ?if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0)){ ? ? ?nextReplicaShift += 1 ? ? ? ?} ? ?val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length ? ?val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex)) ? ?for (j <- 0 until replicationFactor - 1) { ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length)) ? ?} ? ?ret.put(currentPartitionId, replicaBuffer) ? ?currentPartitionId += 1 ?} ?ret } ? ? ? ? ? ? ? ? ? private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = { ?val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1) ?(firstReplicaIndex + shift) % nBrokers }

  • 副本因子不能大于 Broker 的個數(shù)(報錯:Replication factor: 4 larger than available brokers: 3.);

  • partition_0的第1個副本(leader副本)放置位置是隨機從 brokerList 選擇的;

  • 其他分區(qū)的第1個副本(leader)放置位置相對于paritition_0分區(qū)依次往后移(也就是如果我們有5個 Broker,5個分區(qū),假設partition0分區(qū)放在broker4上,那么partition1將會放在broker5上;patition2將會放在broker1上;partition3在broker2,依次類);

  • 各分區(qū)剩余的副本相對于分區(qū)前一個副本偏移隨機數(shù)nextReplicaShift+1,然后后面的副本依次加1


Kafka關(guān)鍵原理的評論 (共 條)

分享到微博請遵守國家法律
二连浩特市| 拉萨市| 霞浦县| 佛教| 香格里拉县| 青田县| 台东县| 读书| 阿拉善左旗| 濉溪县| 广东省| 平果县| 雷波县| 乌恰县| 司法| 麻城市| 兴仁县| 昌宁县| 芦溪县| 陆良县| 静乐县| 蒙城县| 米林县| 柯坪县| 崇仁县| 平舆县| 手游| 噶尔县| 綦江县| 萨嘎县| 屏东市| 司法| 丰宁| 乳山市| 沙河市| 呼玛县| 饶阳县| SHOW| 安福县| 航空| 临洮县|