侃一侃Redis如何實(shí)現(xiàn)消息列隊(duì)(IT楓斗者)
引言:什么是消息列隊(duì)。
消息隊(duì)列是一種應(yīng)用間的異步協(xié)作機(jī)制,同時(shí)消息隊(duì)列中間件是分布式系統(tǒng)中重要的組件,主要解決應(yīng)用耦合,異步消息,流量削峰等問題。實(shí)現(xiàn)高性能,高可用,可伸縮和最終一致性架構(gòu)。市面上的 MQ應(yīng)用有很多(例如:Kafka,RabbitMQ),同時(shí)也可以基于 Redis 來實(shí)現(xiàn),常用的方式有:
(1)List結(jié)構(gòu):基于List結(jié)構(gòu)模擬消息隊(duì)列
(2)PubSub:基本的點(diǎn)對(duì)點(diǎn)消息模型
(3)Stream:比較完善的消息隊(duì)列模型
1,基于List結(jié)構(gòu)模擬消息列隊(duì)
基于List的隊(duì)列,很簡(jiǎn)單,就是使用LPUSH 結(jié)合 RPOP的組合來實(shí)現(xiàn)隊(duì)列的效果。之前學(xué)習(xí)List類型結(jié)構(gòu)時(shí)就已經(jīng)學(xué)過了。
不過要注意的是,當(dāng)隊(duì)列中沒有消息時(shí)RPOP或LPOP操作會(huì)返回null,并不像JVM的阻塞隊(duì)列那樣會(huì)阻塞并等待消息。因此這里應(yīng)該使用BRPOP或者BLPOP來實(shí)現(xiàn)阻塞效果。
2,基于PubSub的消息隊(duì)列
PubSub(發(fā)布訂閱)是Redis2.0版本引入的消息傳遞模型。顧名思義,消費(fèi)者可以訂閱一個(gè)或多個(gè)channel(頻道),生產(chǎn)者向?qū)?yīng)channel發(fā)送消息后,所有訂閱者都能收到相關(guān)消息。
它的特點(diǎn)是,不同的消費(fèi)者可以訂閱不同的頻道,支持多個(gè)消費(fèi)者進(jìn)行消費(fèi)。

語法:
SUBSCRIBE channel [channel] :訂閱一個(gè)或多個(gè)頻道PUBLISH channel msg :向一個(gè)頻道發(fā)送消息
PSUBSCRIBE pattern[pattern] :訂閱與pattern格式匹配的所有頻道
如圖所示,上面是生產(chǎn)者,下面是消費(fèi)者:

這種方式的優(yōu)點(diǎn)在于可以實(shí)現(xiàn)多消費(fèi),但同樣也有很多缺陷。
包括:不支持?jǐn)?shù)據(jù)持久化 、無法避免消息丟失(只能讀取到最新消息)、消息堆積有上限,超出時(shí)數(shù)據(jù)丟失(因?yàn)槭谴鎯?chǔ)在內(nèi)存里)
3,基于Stream的消息隊(duì)列
我們前面說了,Streams是一種redis專門為消息隊(duì)列定義的一種數(shù)據(jù)結(jié)構(gòu),所以自然的我們是先要看如何定義這種數(shù)據(jù)結(jié)構(gòu)了,和其它的數(shù)據(jù)結(jié)構(gòu)一樣,我們不需要顯式的創(chuàng)建,在執(zhí)行第一次數(shù)據(jù)添加的時(shí)候自動(dòng)創(chuàng)建,添加數(shù)據(jù)的命令是XADD,語法格式是XADD key ID field value [field value ...],參數(shù)說明如下:

如下生產(chǎn)(創(chuàng)建)若干條消息:

其中XLEN用來查看消息的個(gè)數(shù),XRANGE用來通過范圍查詢基于遞增ID獲取消息,-相當(dāng)于是負(fù)無窮,+相當(dāng)于是正無窮,即獲取所有消息。我們接著再來看下其他一些命令。
3.1:XDEL
根據(jù)ID刪除消息,測(cè)試如下:

3.2:XLEN
獲取消息的數(shù)量,語法格式xlen key,如下:

3.3:XRANGE
查詢指定范圍的消息,語法格式XRANGE key start end [COUNT count],解釋如下:

測(cè)試如下:

3.4:XREVRANGE
從后往前獲取消息,語法格式XREVRANGE key end start [COUNT count],解釋如下:

實(shí)例如下:

3.5:XREAD
以阻塞或者是非阻塞的方式獲取消息,即消費(fèi)消息的命令,語法格式XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...],解釋如下:

測(cè)試如下:

3.6:XGROUP CREATE
創(chuàng)建消費(fèi)者組,使用消費(fèi)者可以對(duì)消息進(jìn)行并發(fā)的消費(fèi),解決消費(fèi)者消費(fèi)能力不足的問題,語法格式為XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername],解釋如下:

在實(shí)際的場(chǎng)景中我們可以通過設(shè)置多個(gè)消費(fèi)者組的不同開始消費(fèi)的位置來實(shí)現(xiàn)并發(fā)消費(fèi)的效果,此時(shí)可能如下圖:

3.7:XREADGROUP GROUP
讀取消費(fèi)者組中的消息,語法格式如下:

最后將三種方式進(jìn)行個(gè)對(duì)比:
