[學習筆記][kafka] Python使用kafka消息隊列
之前做過的項目使用的都是rabbitmq,最近項目中需要使用kafka,現(xiàn)將安裝和簡單使用python代碼做一個小結。
在本地開發(fā)時,使用docker安裝kafka及其依賴。(以下操作基于Ubuntu操作系統(tǒng),以及默認已經(jīng)安裝docker。)
實際本地開發(fā)時,在windows上使用wsl linux子系統(tǒng),比較方便調(diào)試,且將容器設置為自動重啟之后,每次在命令中鍵入bash回車,即可進入子系統(tǒng),如果為Ubuntu, 在命令行中輸入以下命令,啟動docker。

service docker start

如果容器創(chuàng)建時使用了參數(shù)--restart=always,docker容器將隨著docker啟動而自動啟動。
一、Docker安裝kafka
1. 創(chuàng)建網(wǎng)絡
使用以下命令創(chuàng)建一個名叫app-tier網(wǎng)絡,該網(wǎng)絡作用為將zookeeper(kafka依賴zookeeper運行)和Kafka置于同一網(wǎng)絡中。

docker network create app-tier --driver bridge

參數(shù)解釋:
--driver: 網(wǎng)絡驅(qū)動方式,這里選擇bridge橋接模式。具體橋接模式是啥,這里不再展開。
2. 安裝zookeeper
docker安裝命令:

docker run --name zookeeper-server --restart=always --privileged=true -d \
? ? --network app-tier \
? ? -e ALLOW_ANONYMOUS_LOGIN=yes \
? ? bitnami/zookeeper:latest

以上命令,將創(chuàng)建一個名為zookeeper-server的服務。
參數(shù)解釋:
1) --name?zookeeper-server: 容器名稱配置,這里配置為zookeeper-server。
2) --restart=always:跟隨docker自動重啟。
3) --privileged=true: 容器內(nèi)部權限,可選true和false。
這里用我自己的話理解就是,在容器內(nèi)部有一個root用戶,但是這個用戶權限在true和false下有區(qū)別。
若為true,則容器內(nèi)部的root擁有真正的root權限,比如docker中安裝了一個linux系統(tǒng),則其可以執(zhí)行mount將分區(qū)掛載到其中的一個文件夾上。同時還可以在容器內(nèi)部再啟動docker,也就是可以繼續(xù)套娃。
4)-d: 容器在后臺運行。
5)--network?app-tier:這里配置的是容器使用的網(wǎng)絡,可以配置host, 此時使用的網(wǎng)絡與宿主機的網(wǎng)絡相同,好處是性能比較高,壞處是不安全。這里使用的是橋接模式創(chuàng)建的網(wǎng)絡,因此安全性較好。
6)?-e: 該參數(shù)是用來向容器傳遞環(huán)境變量的。ALLOW_ANONYMOUS_LOGIN=yes 表明zookeeper會讀取該環(huán)境變量,從字面意思也可知,該參數(shù)配置的為是否可以讓匿名用戶訪問服務,說白了也就是游客也可以訪問。
7)bitnami/zookeeper:latest 這里配置該zookeeper容器依據(jù)什么版本的鏡像創(chuàng)建,bitnami是鏡像源頭,zookeeper:latest 表示使用最新版本的zookeeper,該配置會自動拉取鏡像,而不必在創(chuàng)建容器之前手動拉取。如果手動拉取,則使用docker pull命令即可。
3. 安裝kafka

docker run -d --name kafka-server --network app-tier -p 9092:9092 \
? ? -e ALLOW_PLAINTEXT_LISTENER=yes \
? ? -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181 \
? ? -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
? ? bitnami/kafka:latest

通過以上命令即可安裝一個kafka服務。
具體參數(shù)解釋,可以參看安裝zookeeper時的,-p是端口映射,這里9092:9092,前一個9092為物理機的端口,后一個9092為容器暴露的端口。組合起來則是,容器內(nèi)部的9092端口,可以通過物理的機的9092訪問,言下之意就是,也可以配置為9093:9092,即通過物理機的9093端口訪問kafka。
這里有幾個通過環(huán)境變量的配置解釋一下:
1)?ALLOW_PLAINTEXT_LISTENER=yes表示允許使用 PLAINTEXT 監(jiān)聽器。
2)KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181這里配置連接zookeeper服務,2181是zookeeper的默認端口。這里順便說一下,kafka之所以依賴zookeeper,是因為zooKeeper是一個給分布式系統(tǒng)提供協(xié)調(diào)服務的工具, 借助zookeeper,kafka就能夠在生產(chǎn)者和消費者互不知道狀態(tài)的情況下,建立起訂閱關系,做到負載均衡。
通過以下命令來驗證kafka的安裝。

docker logs -f kafka

4. 圖形界面
消息隊列的圖形界面,能夠使很多方面的操作變得便捷,比如rabbitmq自帶圖形界面,可以很方便的看到消息生產(chǎn)和消費的積壓情況,也可以對消息做直接的增刪等操作。
kafka沒有自帶的圖形界面,可以使用一些三方的,可選的比較多,這里選擇kafka-map。docker安裝命令如下:

docker run -d --name kafka-map \
? ? --network app-tier \
? ? -p 9001:8080 \
? ? -v /opt/kafka-map/data:/usr/local/kafka-map/data \
? ? -e DEFAULT_USERNAME=admin \
? ? -e DEFAULT_PASSWORD=admin \
? ? --restart always dushixiang/kafka-map:latest

簡單介紹參數(shù),DEFAULT_USERNAME和DEFAULT_PASSWORD配置默認的用戶和密碼。
使用9001作為在物理機上的暴露端口。通過以下地址訪問(本地訪問)。

http://localhost:9001/

使用admin:admin賬號密碼登錄之后是這樣的。

二、Python中使用kafka
至于在python中的使用,我在gitee上放了example,地址如下:


倉庫地址:


其中也有參考文檔。
使用步驟:
1) 安裝項目依賴。

pip install -r requirements.txt

2)啟動consumer

python consumer.py

3) 生產(chǎn)消息

python producer.py

如果需要更改消息,可以在producer.py文件中自定義,在消費者consumer.py啟動進程控制臺查看消息。
三、結語
本篇主要目的是記錄kafka服務搭建過程和在python中的基礎使用,一些技術細節(jié)和問題,在以后的工作中,還將繼續(xù)補充完善。
本篇關鍵點:
kafka這個消息中間件的安裝依賴zookeeper;
配置kafka-map作為其圖形界面;
在python中簡單驗證,啟動消費者監(jiān)聽消息,生產(chǎn)者發(fā)送消息;
模式為發(fā)布訂閱模式。
本篇完。
