kafka的原理及集群部署詳解
本文作者:Praywu
消息隊(duì)列概述
消息隊(duì)列分類
點(diǎn)對點(diǎn)
組成:消息隊(duì)列(Queue)、發(fā)送者(Sender)、接收者(Receiver)
特點(diǎn):一個生產(chǎn)者生產(chǎn)的消息只能被一個接受者接收,消息一旦被消費(fèi),消息就不在消息隊(duì)列中了
發(fā)布/訂閱
組成:消息隊(duì)列(Queue)、發(fā)布者(Publisher)、訂閱者(Subscriber)、主題(Topic)
特點(diǎn):每個消息可以有多個消費(fèi)者,彼此互不影響,即發(fā)布到消息隊(duì)列的消息能被多個接受者(訂閱者)接收
ActiveMQ: 歷史悠久,支持性較好,性能相對不高
RabbitMQ: 可靠性高、安全
Kafka: 分布式、高性能、高吞吐量、跨語言
RocketMQ: 阿里開源的消息中間件,純Java實(shí)現(xiàn)
kafka架構(gòu)
kafka介紹
Kafka是一個分布式的發(fā)布/訂閱消息系統(tǒng),最初由LinkedIn(領(lǐng)英)公司發(fā)布,使用Scala語言編寫,后成為Apache的頂級項(xiàng)目。
kafka主要用于處理活躍的數(shù)據(jù),如登錄、瀏覽、點(diǎn)擊、分享等用戶行為產(chǎn)生的數(shù)據(jù)。
kafka架構(gòu)組成

?
?

?
?
Broker
broker表示kafka的節(jié)點(diǎn),kafka集群包含多個kafka服務(wù)節(jié)點(diǎn),每個kafka服務(wù)節(jié)點(diǎn)就稱為一個broker
Topic
主題,用來存儲不同類別的消息(kafka的消息數(shù)據(jù)是分主題存儲在硬盤上的)
存儲消息時,需要指定存儲在哪個主題下面,如發(fā)帖,發(fā)哪種類型的
Partition
分區(qū),每個topic包含一個或多個partition,在創(chuàng)建topic時指定包含的partition數(shù)據(jù)(目的是為了進(jìn)行分布式存儲)
分區(qū)可以提高負(fù)載(每個分區(qū)是不同的磁盤,所以會提高負(fù)載)
Replication
副本,每個partition分區(qū)可以有多個副本,分布在不同的Broker上
kafka會選出一個副本作為Leader,所有的讀寫請求都會通過Leader完成,F(xiàn)ollower只負(fù)責(zé)備份數(shù)據(jù)
所有Follower會自動從Leader中復(fù)制數(shù)據(jù),當(dāng)Leader宕機(jī)后,會從Follower中選出一個新的Leader繼續(xù)提供服務(wù),實(shí)現(xiàn)故障自動轉(zhuǎn)移
Message
消息,是通信數(shù)據(jù)的基本單位,每個消息都屬于一個Partition,消息都是放在Partition里面的
Producer
消息的生產(chǎn)者,向kafka的一個topic發(fā)布消息,發(fā)布消息時,需要指定發(fā)布到哪個topic主題
Consumer
消息的消費(fèi)者,訂閱Topic并讀取其發(fā)布的消息,消費(fèi)或訂閱哪個topic主題里的消息,可以訂閱多個主題的消息(類似訂閱多個微信公眾號)
Consumer Group
消費(fèi)者組,每個Consumer屬于一個特定的Consumer Group,多個Consumer可以屬于同一個Consumer Group
各個consumer可以組成一個組,每個消息只能被組中的一個consumer消費(fèi),如果一個消息可以被多個consumer消費(fèi)的話,那么這些consumer必須在不同的組。
ZooKeeper
協(xié)調(diào)Kafka的正常運(yùn)行,kafka將元數(shù)據(jù)信息保存在ZooKeeper中,但發(fā)送給Topic本身的消息數(shù)據(jù)并不存儲在ZK中,而是存儲在磁盤文件中
元數(shù)據(jù)信息包括:kafka有多少個節(jié)點(diǎn)、有哪些主題,主題叫什么,有哪些分區(qū)的等(消息自身的數(shù)據(jù)不在ZK中,而是在磁盤上對應(yīng)的分區(qū)中)
kafka的工作流程
生產(chǎn)者向kafka發(fā)送數(shù)據(jù)的流程(六步)

一共六步:
生產(chǎn)者查詢Leader:producer先從zookeeper的“/brokers/.../state”節(jié)點(diǎn)找到該partition的leader
找到Leader之后往Leader寫數(shù)據(jù):producer將消息發(fā)送給該leader
Leader落盤:leader將消息寫入本地log
Leader通知Follower
Follower從Leader中拉取數(shù)據(jù):replication寫入到Follower的本地log后,follower向leader發(fā)送ack
Kafka向生產(chǎn)者回應(yīng)ACK:leader收到所有的replication的ack之后,向producer發(fā)送ack
Kafka選擇分區(qū)的模式(三種)
直接指定往哪個分區(qū)寫
指定key,然后kafka根據(jù)key做hash后決定寫哪個分區(qū)
各個分區(qū)輪詢
生產(chǎn)者往kafka發(fā)送數(shù)據(jù)的模式(三種)
把數(shù)據(jù)發(fā)送給Leader就認(rèn)為成功,效率最高,安全性低
把數(shù)據(jù)發(fā)送給Leader,等待Leader回復(fù)Ack后則認(rèn)為發(fā)送成功
把數(shù)據(jù)發(fā)送給Leader,確保Follower從Leader拉取數(shù)據(jù)回復(fù)Ack給Leader,Leader再向生產(chǎn)者回復(fù)Ack才認(rèn)為發(fā)送成功,安全性最高
數(shù)據(jù)消費(fèi)
多個消費(fèi)者可以組成一個消費(fèi)者組,并用一個標(biāo)簽來標(biāo)識這個消費(fèi)者組(一個消費(fèi)者實(shí)例可以運(yùn)行在不同的進(jìn)程甚至不同的服務(wù)器上)
如果所有的消費(fèi)者實(shí)例都在同一個消費(fèi)者組中,那么消息記錄會被很好的均衡發(fā)送到每個消費(fèi)者實(shí)例
如果所有的消費(fèi)者實(shí)例都在不同的消費(fèi)者組,那么每一條消息記錄會被廣播到每一個消費(fèi)者實(shí)例
各個consumer可以組成一個組,每個消息只能被組中的一個consumer消費(fèi),如果一個消息可以被多個consumer消費(fèi)的話,那么這些consumer必須在不同的組
注意:每個消費(fèi)者實(shí)例可以消費(fèi)多個分區(qū),但是每一個分區(qū)最多只能被消費(fèi)者組中的一個實(shí)例消費(fèi)
?
kafka的文件存儲機(jī)制
topic、partition和segment
1)在kafka文件存儲中,同一個topic下有多個不同的partition:
每個partition就是一個目錄,partition的命名規(guī)則為:topic名稱+有序序號
第一個partition序號從0開始,序號最大值為partition數(shù)量減一
2)每個partition的目錄下面會有多組segment文件:
每個partition(目錄)相當(dāng)于一個巨型大文件被平均分配到多個大小都相等的segment數(shù)據(jù)文件中(但每個segment file消息數(shù)量不一定相等,這種特性方便old segment file快速被刪除)
每組segment文件包含:.index文件、.log文件、.timeindex文件(.log文件就是實(shí)際存儲message的地方,.index和.timeindex文件為索引文件,用于檢索消息)
每個partition只需要支持順序讀寫就行了,segment文件生命周期由服務(wù)端配置參數(shù)決定
這樣做能快速刪除無用文件,有效提高磁盤利用率
3)segment文件
segment文件由2大部分組成,分別為index file和data file,此2個文件一一對應(yīng),成對出現(xiàn),后綴".index"和“.log”分別表示為segment索引文件、數(shù)據(jù)文件
segment文件命名規(guī)則:partion全局的第一個segment從0開始,后續(xù)每個segment文件名為上一個segment文件最后一條消息的offset值。數(shù)值最大為64位long大小,19位數(shù)字字符長度,沒有數(shù)字用0填充
存儲和查找message的過程
1)數(shù)據(jù)寫入過程
每個Partition都是一個有序并且不可改變的消息記錄集合(每個partition都是一個有序隊(duì)列),當(dāng)新的數(shù)據(jù)寫入時,就被追加到partition的末尾。
在每個partition中,每條消息都會被分配一個順序的唯一標(biāo)識,這個標(biāo)識被稱為Offset(偏移量),用于partition唯一標(biāo)識一條消息。
2)數(shù)據(jù)查找過程
在partition中通過offset查找message:
查找segment file:每一個segment文件名都包含了上一個segment最后一條消息的offset值,所以只要根據(jù)offset二分查找文件列表,就能定位到具體segment文件
通過segment file查找message:當(dāng)定位到segment文件后,可以通過對應(yīng)的.index元數(shù)據(jù)文件,在對應(yīng)的.log文件中順序查找對應(yīng)的offset,然后即可拿到數(shù)據(jù)
3)說明:
kafka只能保證在同一個partition內(nèi)部消息是有序的,在不同的partition之間,并不能保證消息有序
為什么kafka快:因?yàn)樗褜Υ疟P的隨機(jī)讀變成了順序讀
kafka安裝部署及操作
kafka單機(jī)部署
安裝ZooKeeper
kafka需要依賴ZooKeeper,所以需要先安裝并啟動ZooKeeper,kafka使用zk有兩種方式:
使用kafka自帶的ZooKeeper(一般不推薦使用內(nèi)置的ZooKeeper)
單獨(dú)搭建ZooKeeper
使用kafka自帶的ZooKeeper:
?# kafka的bin目錄中,有自帶的zk的啟動命令
?/usr/local/kafka/bin/zookeeper-server-start.sh
?
?# kafka的config目錄中,有自帶的zk的配置文件
?/usr/local/kafka/bin/zookeeper.properties
如果要使用kafka內(nèi)置的ZooKeeper,修改好配置文件?./config/zookeeper.properties
(主要修改zk的data位置和端口),直接啟動即可
?# 后臺啟動,并指定配置文件
?zookeeper-server-start.sh -daemon ../config/zookeeper.properties
安裝kafka
kafka需要java環(huán)境,需要安裝jdk
?# 1.安裝jdk
?yum install -y java-1.8.0-openjdk
?
?# 2.準(zhǔn)備kafka安裝包
?tar zxvf kafka_2.11-2.2.0.tgz -C /usr/local/
?ln -s /usr/local/kafka_2.11-2.2.0 /usr/local/kafka
?mkdir -pv /data/kafka/data/ ? # 創(chuàng)建kafka數(shù)據(jù)存儲目錄
?# 配置環(huán)境變量
?sed -i '$aPATH="/usr/local/kafka/bin:$PATH"' /etc/profile
?source /etc/profile
?
?# 3.修改kafka配置文件
?vim /usr/local/kafka/config/server.properties
?listeners=PLAINTEXT://10.0.0.80:9092 ? ?# kafka默認(rèn)監(jiān)聽端口號為9092,
?log.dirs=/data/kafka/data ? ? ? ? ? ? ? # 指定kafka數(shù)據(jù)存放目錄
?zookeeper.connect=localhost:2181 ? ? ? ?# 指定ZooKeeper地址,kafka要將元數(shù)據(jù)存放到zk中,這里會在本機(jī)啟動一個zk
?
?# 4.啟動kafka
?kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
?
?# 5.查看進(jìn)程及端口
?ps -ef | grep kafka
?ss -tnl | grep 9092 ? ? ? ? ? ? ? ? ? ? # kafka監(jiān)聽在9092端口
kafka腳本程序及配置文件
幾個kafka的操作腳本
kafka-server-start.sh kafka啟動程序
kafka-server-stop.sh kafka停止程序
kafka-topics.sh 創(chuàng)建topic程序
kafka-console-producer.sh 命令行模擬生產(chǎn)者生產(chǎn)消息數(shù)據(jù)程序
kafka-console-consumer.sh 命令行模擬消費(fèi)者消費(fèi)消息數(shù)據(jù)程序
kafka的配置文件
vim /usr/local/kafka/config/server.properties
?############################# Server Basics #############################
? # broker的id,值為整數(shù),且必須唯一,在一個集群中不能重復(fù),默認(rèn)為0
?broker.id=0
?
?############################# Socket Server Settings #############################
?# kafka默認(rèn)監(jiān)聽的端口為9092
?#listeners=PLAINTEXT://:9092
?
?# 處理網(wǎng)絡(luò)請求的線程數(shù)量,默認(rèn)為3個
?num.network.threads=3
?
?# 執(zhí)行磁盤IO操作的線程數(shù)量,默認(rèn)為8個
?num.io.threads=8
?
?# socket服務(wù)發(fā)送數(shù)據(jù)的緩沖區(qū)大小,默認(rèn)100KB
?socket.send.buffer.bytes=102400
?# socket服務(wù)接受數(shù)據(jù)的緩沖區(qū)大小,默認(rèn)100KB
?socket.receive.buffer.bytes=102400
?
?# socket服務(wù)所能接受的一個請求的最大大小,默認(rèn)為100M
?socket.request.max.bytes=104857600
?
?
?############################# Log Basics #############################
?# kafka存儲消息數(shù)據(jù)的目錄
?log.dirs=../data
?
?# 每個topic默認(rèn)的partition數(shù)量
?num.partitions=1
?
?# 在啟動時恢復(fù)數(shù)據(jù)和關(guān)閉時刷新數(shù)據(jù)時每個數(shù)據(jù)目錄的線程數(shù)量
?num.recovery.threads.per.data.dir=1
?
?
?############################# Log Flush Policy #############################
?# 消息刷新到磁盤中的消息條數(shù)閾值
?#log.flush.interval.messages=10000
?# 消息刷新到磁盤中的最大時間間隔
?#log.flush.interval.ms=1000
?
?############################# Log Retention Policy #############################
?# 日志保留小時數(shù),超時會自動刪除,默認(rèn)為7天
?log.retention.hours=168
?
?# 日志保留大小,超出大小會自動刪除,默認(rèn)為1G,log.retention.bytes這是指定 Broker 為消息保存的總磁盤容量大小
?#log.retention.bytes=1073741824
?
?# 日志分片策略,單個日志文件的大小最大為1G,超出后則創(chuàng)建一個新的日志文件
?log.segment.bytes=1073741824
?
?# 每隔多長時間檢測數(shù)據(jù)是否達(dá)到刪除條件
?log.retention.check.interval.ms=300000
?
?
?############################# Zookeeper #############################
?# Zookeeper連接信息,如果是zookeeper集群,則以逗號隔開
?zookeeper.connect=localhost:2181
?# 連接zookeeper的超時時間
?zookeeper.connection.timeout.ms=6000
?# 是否可以刪除topic,默認(rèn)為false
?delete.topic.enable=true
kafka集群部署
環(huán)境信息
節(jié)點(diǎn)IPZK PortKafka PortOSnode0110.0.0.8021819092CentOS7.9node0210.0.0.8121819092CentOS7.9node0310.0.0.8221819092CentOS7.9
部署ZooKeeper集群
kakfa依賴ZooKeeper,可以用以下兩種方式使用ZooKeeper:
使用kafka自帶的ZooKeeper(一般不推薦使用內(nèi)置的ZooKeeper)
單獨(dú)搭建ZooKeeper
搭建ZooKeeper集群見ZooKeeper文檔。
部署kafka集群
所有節(jié)點(diǎn)(node01、node02、node03)上操作:
?# 1.安裝jdk
?yum install -y java-1.8.0-openjdk
?
?# 2.準(zhǔn)備kafka安裝包
?tar zxvf kafka_2.11-2.2.0.tgz -C /usr/local/
?ln -s /usr/local/kafka_2.11-2.2.0 /usr/local/kafka
?mkdir -pv /data/kafka/data/ ? # 創(chuàng)建kafka數(shù)據(jù)存儲目錄
?# 配置環(huán)境變量
?sed -i '$aPATH="/usr/local/kafka/bin:$PATH"' /etc/profile
?source /etc/profile
?
?# 3.修改kafka配置文件
?broker.id=1 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?# 各自節(jié)點(diǎn)的id號,每個節(jié)點(diǎn)都有自己的id,值為整數(shù),且必須唯一,在一個集群中不能重復(fù),默認(rèn)為0
?listeners=PLAINTEXT://10.0.0.80:9092 ? ? ? ? ? ? ? ? ? ? ? ? ? ?# kafka默認(rèn)監(jiān)聽的端口號為9092,指定各自節(jié)點(diǎn)的地址和端口
?log.dirs=/data/kafka/data ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? # 指定kafka數(shù)據(jù)的存放目錄
?zookeeper.connect=10.0.0.80:2181,10.0.0.81:2181,10.0.0.82:2181 ?# zookeeper的連接信息,kafka要將元數(shù)據(jù)信息存放到zk中
?zookeeper.connection.timeout.ms=600000 ? ? ? ? ? ? ? ? ? ? ? ? ?#連接zk超時時間調(diào)大,否則可能起不來,默認(rèn): 6000
?
?# 4.啟動kafka
?kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
?
?# 5.查看進(jìn)程及端口
?ps -ef | grep kafka
?ss -tnl | grep 9092 ? ? ? ? ? ? ? ? ? ? # kafka監(jiān)聽在9092端口
?
生產(chǎn)和消費(fèi)消息測試
kafka-server-start.sh kafka啟動程序
kafka-server-stop.sh kafka停止程序
kafka-topics.sh 創(chuàng)建topic程序
kafka-console-producer.sh 命令行模擬生產(chǎn)者生產(chǎn)消息數(shù)據(jù)程序
kafka-console-consumer.sh 命令行模擬消費(fèi)者消費(fèi)消息數(shù)據(jù)程序
topic相關(guān)操作
操作topic使用kafka-topic.sh
腳本
?# 查看主題topic列表,需指定zk的地址
?kafka-topics.sh --list --zookeeper 10.0.0.80:2181 ?
?
?# 創(chuàng)建topic ?hello
?kafka-topics.sh --create --zookeeper 10.0.0.80:2181 --replication-factor 1 --partitions 3 --topic hello ? ?
?# --create ? ? ? ? ? ? ? ? ? ? ?是創(chuàng)建主題topic
?# --zookeeper localhost:2181 ? ?主題topic信息是存儲在zk中,需要指定zk服務(wù)的地址
?# --replication-factor 1 ? ? ? ?主題topic信息的副本數(shù),因?yàn)楝F(xiàn)在只要一個節(jié)點(diǎn),所以只能是1,有多個節(jié)點(diǎn)時候,可以指定副本數(shù)多個
?# --partitions 3 ? ? ? ? ? ? ? ?主題topic有多少個分區(qū)
?# --topic test-topic ? ? ? ? ? ?指定主題topic的名字
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
?# 查看某個具體的主題topic消息
?kafka-topics.sh --describe --zookeeper 10.0.0.80:2181 --topic hello ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
?
?# 修改主題topic信息,增加到5個分區(qū)
?kafka-topics.sh --alter --zookeeper 10.0.0.80:2181 --topic hello --partitions 5 ? ?
?
?# 刪除主題topic hello
?kafka-topics.sh --delete --zookeeper 10.0.0.80:2181 --topic hello ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
生產(chǎn)和消費(fèi)命令
生產(chǎn)消息:
kafka-console-producer.sh
消費(fèi)消息:
kafka-console-consumer.sh
1)生產(chǎn)消息
使用kafka自帶的生產(chǎn)者命令生產(chǎn)消息 (可開一個窗口,模擬生產(chǎn)者)
?# 生產(chǎn)者生產(chǎn)消息,是往topic里發(fā)送消息的,需要指明kafka地址和topic的名字
?kafka-console-producer.sh --broker-list 10.0.0.80:9092 --topic test-topic
?>hello
?>test1
?>test2
?>
2)消費(fèi)消息
使用kafka自帶的消費(fèi)者命令消費(fèi)消息 (可開多個窗口,模擬消費(fèi)者)
?# 消費(fèi)者消費(fèi)消息,也是從指定topic里取出的,需要指明kafka地址和topic的名字,加--from-beginning是從頭開始收,不加就從當(dāng)前狀態(tài)開始收
?kafka-console-consumer.sh --bootstrap-server 10.0.0.80:9092 --topic test-topic --from-beginning
查看消息本體及相關(guān)數(shù)據(jù)
查看kafka存放的消息
?# 來到kafka的數(shù)據(jù)目錄,查看kafka存放的消息
?cd /data/kafka/data/ ?
?ls -d ./test-topic* ? ? ? ? # kafka存放的消息會被分布存儲在各個分區(qū),這里目錄名test-topic就表示對應(yīng)的topic名稱,后綴-0就表示對應(yīng)的分區(qū)
?./test-topic-0 ? ? ? ? ? ? ?# 有幾個分區(qū)就會有幾個這樣的目錄,消息被分布存儲在各個目錄(目錄名稱格式: ?topic名稱-分區(qū)編號)
?
?# 查看對應(yīng)分區(qū)下的文件(每個分區(qū)中存放的消息內(nèi)容都不一樣)
?ls ./test-topic-0/
?00000000000000000000.index ?00000000000000000000.log ?00000000000000000000.timeindex ?leader-epoch-checkpoint
?
?# 查看消息本體
?cat ./test-topic-0/00000000000000000000.log
?=C???????????????????
?hello=M?5??????????????????
?test1<{y輁?????????????????exit<.??????????????????quit=徐±??????????????????
?hello=H???????????????????
?test1=z
????????????????????
?test2BΘ??艁???????????????? hahahahaha
查看kafka存放在ZooKeeper中的元數(shù)據(jù)
?# 客戶端連接zk
?zkCli.sh ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? # 如果什么都不指定,則默認(rèn)連接本機(jī)的2181端口
?# zkCli.sh -server 10.0.0.80:2181 ? ? ?# 指定IP和端口,可以連接集群中任何一個節(jié)點(diǎn)
?
?# 查看/根節(jié)點(diǎn)下有哪些數(shù)據(jù)
?[zk: localhost:2181(CONNECTED) 0] ls /
?[mytest, cluster, controller, brokers, zookeeper, admin, isr_change_notification, log_dir_event_notification, controller_epoch, testNode, consumers, latest_producer_id_block, config]
?
?# 查看/brokers下的數(shù)據(jù)
?[zk: localhost:2181(CONNECTED) 1] ls /brokers
?[ids, topics, seqid]
?
?# 查看當(dāng)前brokers的節(jié)點(diǎn)編號
?[zk: localhost:2181(CONNECTED) 2] ls /brokers/ids
?[0]
?
?# 查看主題topic
?[zk: localhost:2181(CONNECTED) 3] ls /brokers/topics
?[test_conf, test-topic, xxxxxx, web_log, xxx_conf, __consumer_offsets, hg_test, aaa_conf]
?
?# 查看test-topic這個主題的分區(qū)
?[zk: localhost:2181(CONNECTED) 4] ls /brokers/topics/test-topic
?[partitions]
?
?# 查看test-topic這個主題的分區(qū)情況
?[zk: localhost:2181(CONNECTED) 5] ls /brokers/topics/test-topic/partitions
?[0]
?
?# 使用get命令查看test-topic這個主題的相關(guān)信息
?[zk: localhost:2181(CONNECTED) 6] get /brokers/topics/test-topic
?{"version":1,"partitions":{"0":[0]}}
?cZxid = 0x200000147
?ctime = Sat Mar 18 10:18:27 CST 2023
?mZxid = 0x200000147
?mtime = Sat Mar 18 10:18:27 CST 2023
?pZxid = 0x200000148
?cversion = 1
?dataVersion = 0
?aclVersion = 0
?ephemeralOwner = 0x0
?dataLength = 36
?numChildren = 1