動(dòng)力節(jié)點(diǎn)RabbitMQ教程|12小時(shí)學(xué)會(huì)rabbitmq消息中間件

1.?What is RabbitMQ?
1.1簡(jiǎn)介
RabbitMQ是一個(gè)廣泛使用的消息服務(wù)器,采用Erlang語(yǔ)言編寫,是一種開源的實(shí)現(xiàn) AMQP(高級(jí)消息隊(duì)列協(xié)議)的消息中間件;
RabbitMQ最初起源于金融系統(tǒng),它的性能及穩(wěn)定性都非常出色;
AMQP協(xié)議(http://www.amqp.org),即 Advanced Message Queuing Protocol,高級(jí)消息隊(duì)列協(xié)議,是應(yīng)用層協(xié)議的一個(gè)開放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì);
我們學(xué)的協(xié)議有哪些:(http、ftp)
1.2 相關(guān)網(wǎng)址
官網(wǎng):https://www.rabbitmq.com?
Github:https://github.com/rabbitmq
1.3 消息中間件(MQ=Message Queue)
簡(jiǎn)單來(lái)說(shuō),消息中間件就是指保存數(shù)據(jù)的一個(gè)容器(服務(wù)器),可以用于兩個(gè)系統(tǒng)之間的數(shù)據(jù)傳遞。
消息中間件一般有三個(gè)主要角色:生產(chǎn)者、消費(fèi)者、消息代理(消息隊(duì)列、消息服務(wù)器);

rabbitmq-java-client??????rabbitmq-server?????????rabbitmq-java-client
生產(chǎn)者發(fā)送消息到消息服務(wù)器,然后消費(fèi)者從消息代理(消息隊(duì)列)中獲取數(shù)據(jù)并進(jìn)行處理;
1.4 常用的消息中間件
目前比較主流的幾個(gè)消息中間件:
??RabbitMQ
??kafka(大數(shù)據(jù)領(lǐng)域)
??RocketMQ(阿里巴巴開源)獻(xiàn)給Apache組織
??pulsar(最近一兩年流行起來(lái)的)
1.?MQ(Message Queue)的應(yīng)用場(chǎng)景
1.1?異步處理
下訂單:下訂單--》加積分--》發(fā)紅包--》發(fā)手機(jī)短信
下訂單---向MQ 發(fā)消息--》積分系統(tǒng),紅包系統(tǒng),手機(jī)短信系統(tǒng)接收消息

同步是阻塞的(會(huì)造成等待),異步是非阻塞的(不會(huì)等待);
大流量高并發(fā)請(qǐng)求、批量數(shù)據(jù)傳遞,就可以采用異步處理,提升系統(tǒng)吞吐量;
2.2系統(tǒng)解耦
多個(gè)系統(tǒng)之間,不需要直接交互,通過(guò)消息進(jìn)行業(yè)務(wù)流轉(zhuǎn);

2.3 流量削峰
高負(fù)載請(qǐng)求/任務(wù)的緩沖處理;

2.4日志處理
主要是用kafka這個(gè)服務(wù)器來(lái)做;
日志處理是指將消息隊(duì)列用于在日志處理中,比如Kafka解決大量日志傳輸?shù)膯?wèn)題;
loger.info(.....)
ELK 日志處理解決方案:
loger.error(.....) -->logstash收集消息--> 發(fā)送消息的kafka --> elastic search (es) -->Kibana ELK日志處理平臺(tái)
1.?RabbitMQ運(yùn)行環(huán)境搭建
RabbitMQ是使用Erlang語(yǔ)言開發(fā)的,所以要先下載安裝Erlang
3.1 Erlang及RabbitMQ安裝版本的選擇
下載時(shí)一定要注意版本兼容性
版本兼容說(shuō)明地址:https://www.rabbitmq.com/which-erlang.html
我們選擇的版本

3.2下載Erlang
Erlang官網(wǎng)
https://www.erlang.org/
Linux下載:
wget https://github.com/erlang/otp/releases/download/OTP-25.1.1/otp_src_25.1.1.tar.gz
說(shuō)明:wget 是linux命令,可以用來(lái)下載軟件
3.3 安裝Erlang
3.3.1 安裝erlang前先安裝Linux依賴庫(kù)
yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel
說(shuō)明:yum -y install?安裝linux的一些依賴庫(kù)的命令 ,-y表示自動(dòng)確認(rèn);
3.3.2 解壓erlang壓縮包文件
tar -zxvf otp_src_25.1.1.tar.gz
3.3.3 配置
切換到解壓的目錄下,運(yùn)行相應(yīng)命令
cd otp_src_25.1.1
./configure
3.3.4 編譯
make
3.3.5安裝
make install
安裝好了erlang后可以將解壓的文件夾刪除:
rm -rf otp_src_25.1.1
3.3.6 驗(yàn)證erlang是否安裝成功
在命令行輸入: erl?如果進(jìn)入了編程命令行則表示安裝成功,然后按ctrl + z 退出編程命令行;
3.4 下載RabbitMQ
從RabbitMQ官網(wǎng)https://www.rabbitmq.com找到下載鏈接
Linux:下載3.10.11
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.10.11/rabbitmq-server-generic-unix-3.10.11.tar.xz
generic 是通用的意思,這個(gè)版本也就是通用的unix版本
3.5 安裝RabbitMQ
解壓RabbitMQ的壓縮包,即安裝完成,無(wú)需再編譯
tar -xvf rabbitmq-server-generic-unix-3.10.11.tar.xz??-C ?/usr/local/
說(shuō)明 -C 選項(xiàng)是指定解壓目錄,如果不指定會(huì)解壓到當(dāng)前目錄
此時(shí)rabbitmq就安裝好了;
4.?啟動(dòng)及停止RabbitMQ
4.1啟動(dòng)RabbitMQ
切換到安裝目錄的sbin目錄下:
#啟動(dòng)
./rabbitmq-server ?-detached
說(shuō)明:
-detached 將表示在后臺(tái)啟動(dòng)運(yùn)行rabbitmq;不加該參數(shù)表示前臺(tái)啟動(dòng);
rabbitmq的運(yùn)行日志存放在安裝目錄的var目錄下;
現(xiàn)在的目錄是:/usr/local/rabbitmq_server-3.10.11/var/log/rabbitmq
4.2 查看RabbitMQ的狀態(tài)
切換到sbin目錄下執(zhí)行:
./rabbitmqctl -n rabbit status
?說(shuō)明:-n rabbit 是指定節(jié)點(diǎn)名稱為rabbit,目前只有一個(gè)節(jié)點(diǎn),節(jié)點(diǎn)名默認(rèn)為rabbit
?此處-n rabbit 也可以省略
4.3 停止RabbitMQ
切換到sbin目錄下執(zhí)行:
./rabbitmqctl shutdown
4.4 配置path環(huán)境變量
vi /etc/profile
RABBIT_HOME=/usr/local/rabbitmq_server-3.10.11
PATH=$PATH:$RABBIT_HOME/sbin
export RABBIT_HOME PATH
刷新環(huán)境變量,命令如下
source /etc/profile
5.?RabbitMQ管理命令
./rabbitmqctl?是一個(gè)管理命令,可以管理rabbitmq的很多操作。
./rabbitmqctl help可以查看一下有哪些操作
查看具體子命令 可以使用 ./rabbitmqctl help 子命令名稱
5.1 用戶管理
用戶管理包括增加用戶,刪除用戶,查看用戶列表,修改用戶密碼。
這些操作都是通過(guò)rabbitmqctl管理命令來(lái)實(shí)現(xiàn)完成。
查看幫助:
rabbitmqctl add_user --help
相應(yīng)的命令
(1) 查看當(dāng)前用戶列表
rabbitmqctl?list_users
(2) 新增一個(gè)用戶
語(yǔ)法:rabbitmqctl?add_user?Username?Password
示例: rabbitmqctl add_user admin 123456
5.2 設(shè)置用戶角色
rabbitmqctl?set_user_tags?User?Tag
示例:rabbitmqctl?set_user_tags?admin administrator
說(shuō)明:此處設(shè)置用戶的角色為管理員角色
5.3 設(shè)置用戶權(quán)限
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
說(shuō)明:此操作是設(shè)置admin用戶擁有操作虛擬主機(jī)/下的所有權(quán)限
查看用戶權(quán)限
./rabbitmqctl list_permissions
2.?web管理后臺(tái)
Rabbitmq有一個(gè)web管理后臺(tái),這個(gè)管理后臺(tái)是以插件的方式提供的,啟動(dòng)后臺(tái)web管理功能,切換到sbin目錄下執(zhí)行:
6.1 啟用管理后臺(tái)
# 查看rabbitmq 的插件列表
./rabbitmq-plugins list
#啟用
./rabbitmq-plugins enable rabbitmq_management?
#禁用
./rabbitmq-plugins disable rabbitmq_management
6.2防火墻操作
systemctl status firewalld --檢查防火墻狀態(tài)
systemctl stop firewalld --關(guān)閉防火墻,Linux重啟之后會(huì)失效
systemctl disable firewalld --防火墻置為不可用,Linux重啟后,防火墻服務(wù)不自動(dòng)啟動(dòng),依然是不可用
6.3 訪問(wèn)
http://192.168.131.131:15672?
用戶名/密碼為我們上面創(chuàng)建的admin/123456
注意上面改成你的虛擬主機(jī)的ip地址
備注:如果使用默認(rèn)用戶guest、密碼guest登錄,會(huì)提示User can only log in via localhost
說(shuō)明guest用戶只能從localhost本機(jī)登錄,所以不要使用該用戶。
6.4 通過(guò)web頁(yè)面新建虛擬主機(jī)

建完后如下

7.?RabbitMQ工作模型

broker 相當(dāng)于mysql服務(wù)器,virtual host相當(dāng)于數(shù)據(jù)庫(kù)(可以有多個(gè)數(shù)據(jù)庫(kù))
queue相當(dāng)于表,消息相當(dāng)于記錄。
消息隊(duì)列有三個(gè)核心要素: 消息生產(chǎn)者、消息隊(duì)列、消息消費(fèi)者;
生產(chǎn)者(Producer):發(fā)送消息的應(yīng)用;(java程序,也可能是別的語(yǔ)言寫的程序)
消費(fèi)者(Consumer):接收消息的應(yīng)用;(java程序,也可能是別的語(yǔ)言寫的程序)
代理(Broker):就是消息服務(wù)器,RabbitMQ Server就是Message Broker;
連接(Connection):連接RabbitMQ服務(wù)器的TCP長(zhǎng)連接;
信道(Channel):連接中的一個(gè)虛擬通道,消息隊(duì)列發(fā)送或者接收消息時(shí),都是通過(guò)信道進(jìn)行的;
虛擬主機(jī)(Virtual host):一個(gè)虛擬分組,在代碼中就是一個(gè)字符串,當(dāng)多個(gè)不同的用戶使用同一個(gè)RabbitMQ服務(wù)時(shí),可以劃分出多個(gè)Virtual host,每個(gè)用戶在自己的Virtual host創(chuàng)建exchange/queue等;(分類比較清晰、相互隔離)
交換機(jī)(Exchange):交換機(jī)負(fù)責(zé)從生產(chǎn)者接收消息,并根據(jù)交換機(jī)類型分發(fā)到對(duì)應(yīng)的消息隊(duì)列中,起到一個(gè)路由的作用;
路由鍵(Routing Key):交換機(jī)根據(jù)路由鍵來(lái)決定消息分發(fā)到哪個(gè)隊(duì)列,路由鍵是消息的目的地址;
綁定(Binding):綁定是隊(duì)列和交換機(jī)的一個(gè)關(guān)聯(lián)連接(關(guān)聯(lián)關(guān)系);
隊(duì)列(Queue):存儲(chǔ)消息的緩存;
消息(Message):由生產(chǎn)者通過(guò)RabbitMQ發(fā)送給消費(fèi)者的信息;(消息可以任何數(shù)據(jù),字符串、user對(duì)象,json串等等)
8.?RabbitMQ交換機(jī)類型
Exchange(X) 可翻譯成交換機(jī)/交換器/路由器
8.1 RabbitMQ交換器 (Exchange)類型
1、Fanout Exchange(扇形)
2、Direct Exchange(直連)
3、Topic Exchange(主題)
4、Headers Exchange(頭部)
8.2 Fanout Exchange
8.2.1 介紹
Fanout 扇形的,散開的; 扇形交換機(jī)
投遞到所有綁定的隊(duì)列,不需要路由鍵,不需要進(jìn)行路由鍵的匹配,相當(dāng)于廣播、群發(fā);

8.2.2 示例
8.3 Direct Exchange
8.3.1 介紹
根據(jù)路由鍵精確匹配(一模一樣)進(jìn)行路由消息隊(duì)列;

8.3.2 示例
8.4 Topic Exchange
8.4.1 介紹
通配符匹配,相當(dāng)于模糊匹配;
#匹配多個(gè)單詞,用來(lái)表示任意數(shù)量(零個(gè)或多個(gè))單詞
*匹配一個(gè)單詞(必須有一個(gè),而且只有一個(gè)),用.隔開的為一個(gè)單詞:
beijing.# == beijing.queue.abc, beijing.queue.xyz.xxx
beijing.* == beijing.queue, beijing.xyz

發(fā)送時(shí)指定的路由鍵:lazy.orange.rabbit
8.4.2 示例
8.5 Headers Exchange (用的比較少)
8.5.1 介紹
基于消息內(nèi)容中的headers屬性進(jìn)行匹配;

8.5.2 示例
綁定參考代碼:
Map<String, Object> headerValues = new HashMap<>();
headerValues.put("type", "m");
headerValues.put("status", 1);
return BindingBuilder.bind(queueA).to(headersExchange).whereAll(headerValues).match();
發(fā)送參考代碼
??MessageProperties messageProperties = new MessageProperties();
????????messageProperties.setHeader("type", "m");
????????messageProperties.setHeader("status", 1);
????????Message message = new Message(msg.getBytes(), messageProperties);
// ???????void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;
????????amqpTemplate.convertAndSend(RabbitConfig.EXCHANGE, null, message);
學(xué)習(xí)它的目的是:發(fā)消息時(shí)可以指定消息屬性(MessageProperties)
9.?RabbitMQ過(guò)期消息
過(guò)期消息也叫TTL消息,TTL:Time To Live?
消息的過(guò)期時(shí)間有兩種設(shè)置方式:(過(guò)期消息)
9.1?設(shè)置單條消息的過(guò)期時(shí)間
參考代碼
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("15000"); // 設(shè)置過(guò)期時(shí)間,單位:毫秒
Message message = new Message(json.getBytes(), messageProperties);
//發(fā)送消息
amqpTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE, RabbitConfig.DIRECT_ROUTINGKEY, message);
System.out.println("發(fā)送完畢:" + new Date());
單條消息的過(guò)期時(shí)間決定了在沒(méi)有任何消費(fèi)者消費(fèi)時(shí),消息可以存活多久;
9.2?通過(guò)隊(duì)列屬性設(shè)置消息過(guò)期時(shí)間
@Bean
public Queue directQueue() {
????Map<String, Object> arguments = new HashMap<>();
????arguments.put("x-message-ttl", 10000);
????return new Queue(DIRECT_QUEUE, true, false, false, arguments);
}
隊(duì)列的過(guò)期時(shí)間決定了在沒(méi)有任何消費(fèi)者的情況下,隊(duì)列中的消息可以存活多久;
注意事項(xiàng):
如果消息和對(duì)列都設(shè)置過(guò)期時(shí)間,則消息的TTL以兩者之間較小的那個(gè)數(shù)值為準(zhǔn)。
10.?RabbitMQ死信隊(duì)列
也有叫 死信交換機(jī)、死信郵箱等說(shuō)法;
DLX: Dead-Letter-Exchange 死信交換器,死信郵箱

如下情況下一個(gè)消息會(huì)進(jìn)入DLX(Dead Letter Exchange)死信交換機(jī)。
10.1?消息過(guò)期
參考代碼
MessageProperties messageProperties=new MessageProperties();
//設(shè)置此條消息的過(guò)期時(shí)間為10秒
messageProperties.setExpiration("10000");
1.2?隊(duì)列過(guò)期
參考代碼
?Map<String, Object> arguments =new HashMap<>();
?//指定死信交換機(jī),通過(guò)x-dead-letter-exchange 來(lái)設(shè)置
?arguments.put("x-dead-letter-exchange",EXCHANGE_DLX);
?//設(shè)置死信路由key,value 為死信交換機(jī)和死信隊(duì)列綁定的key,要一模一樣,因?yàn)樗佬沤粨Q機(jī)是直連交換機(jī)
?arguments.put("x-dead-letter-routing-key",BINDING_DLX_KEY);
?//隊(duì)列的過(guò)期時(shí)間
?arguments.put("x-message-ttl",10000);
return ?new Queue(QUEUE_NORMAL,true,false,false,arguments);
TTL: Time to Live的簡(jiǎn)稱,過(guò)期時(shí)間
1.3?隊(duì)列達(dá)到最大長(zhǎng)度(先入隊(duì)的消息會(huì)被發(fā)送到DLX)
Map<String, Object> arguments = new HashMap<String, Object>();
//設(shè)置隊(duì)列的最大長(zhǎng)度?,對(duì)頭的消息會(huì)被擠出變成死信
arguments.put("x-max-length", 5);
1.4?消費(fèi)者拒絕消息不進(jìn)行重新投遞
從正常的隊(duì)列接收消息,但是對(duì)消息不進(jìn)行確認(rèn),并且不對(duì)消息進(jìn)行重新投遞,此時(shí)消息就進(jìn)入死信隊(duì)列。
application.yml 啟動(dòng)手動(dòng)確認(rèn)
spring:
??rabbitmq:
????listener:
??????simple:
????????acknowledge-mode: manual
參考代碼
?/**
?????* 監(jiān)聽(tīng)正常的那個(gè)隊(duì)列的名字,不是監(jiān)聽(tīng)那個(gè)死信隊(duì)列
?????* 我們從正常的隊(duì)列接收消息,但是對(duì)消息不進(jìn)行確認(rèn),并且不對(duì)消息進(jìn)行重新投遞,此時(shí)消息就進(jìn)入死信隊(duì)列
?????*
?????* channel 消息信道(是連接下的一個(gè)消息信道,一個(gè)連接下有多個(gè)消息信息,發(fā)消息/接消息都是通過(guò)信道完成的)
?????*/
????@RabbitListener(queues = {RabbitConfig.QUEUE})
????public void process(Message message, Channel channel) {
????????System.out.println("接收到的消息:" + message);
????????//對(duì)消息不確認(rèn), ack單詞是 確認(rèn) 的意思
????????// void basicNack(long deliveryTag, boolean multiple, boolean requeue)
????????// deliveryTag:消息的一個(gè)數(shù)字標(biāo)簽
????????// multiple:翻譯成中文是多個(gè)的意思,如果是true表示對(duì)小于deliveryTag標(biāo)簽下的消息都進(jìn)行Nack不確認(rèn),false表示只對(duì)當(dāng)前deliveryTag標(biāo)簽的消息Nack
????????// requeue:如果是true表示消息被Nack后,重新發(fā)送到隊(duì)列,如果是false,消息被Nack后,不會(huì)重新發(fā)送到隊(duì)列
????????try {
????????????System.out.println("deliveryTag = " + message.getMessageProperties().getDeliveryTag());
????????????//要開啟rabbitm消息消費(fèi)的手動(dòng)確認(rèn)模式,然后才這么寫代碼;
????????????channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
????????} catch (IOException e) {
????????????e.printStackTrace();
????????}
????}
1.5?消費(fèi)者拒絕消息
開啟手動(dòng)確認(rèn)模式,并拒絕消息,不重新投遞,則進(jìn)入死信隊(duì)列
參考代碼:
????/**
?????* 監(jiān)聽(tīng)正常的那個(gè)隊(duì)列的名字,不是監(jiān)聽(tīng)那個(gè)死信隊(duì)列
?????* 我們從正常的隊(duì)列接收消息,但是對(duì)消息不進(jìn)行確認(rèn),并且不對(duì)消息進(jìn)行重新投遞,此時(shí)消息就進(jìn)入死信隊(duì)列
?????*
?????* channel 消息信道(是連接下的一個(gè)消息信道,一個(gè)連接下有多個(gè)消息信息,發(fā)消息/接消息都是通過(guò)信道完成的)
?????*/
????@RabbitListener(queues = {RabbitConfig.QUEUE})
????public void process(Message message, Channel channel) {
????????System.out.println("接收到的消息:" + message);
????????//對(duì)消息不確認(rèn), ack單詞是 確認(rèn) 的意思
????????// void basicNack(long deliveryTag, boolean multiple, boolean requeue)
????????// deliveryTag:消息的一個(gè)數(shù)字標(biāo)簽
????????// multiple:翻譯成中文是多個(gè)的意思,如果是true表示對(duì)小于deliveryTag標(biāo)簽下的消息都進(jìn)行Nack不確認(rèn),false表示只對(duì)當(dāng)前deliveryTag標(biāo)簽的消息Nack
????????// requeue:如果是true表示消息被Nack后,重新發(fā)送到隊(duì)列,如果是false,消息被Nack后,不會(huì)重新發(fā)送到隊(duì)列
????????try {
????????????System.out.println("deliveryTag = " + message.getMessageProperties().getDeliveryTag());
????????????//要開啟rabbitm消息消費(fèi)的手動(dòng)確認(rèn)模式,然后才這么寫代碼;
????????????channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
????????} catch (IOException e) {
????????????e.printStackTrace();
????????}
????}
}
11.?RabbitMQ延遲隊(duì)列
場(chǎng)景:有一個(gè)訂單,15分鐘內(nèi)如果不支付,就把該訂單設(shè)置為交易關(guān)閉,那么就不能支付了,這類實(shí)現(xiàn)延遲任務(wù)的場(chǎng)景就可以采用延遲隊(duì)列來(lái)實(shí)現(xiàn),當(dāng)然除了延遲隊(duì)列來(lái)實(shí)現(xiàn),也可以有一些其他辦法實(shí)現(xiàn);
11.1?定時(shí)任務(wù)方式
每隔3秒掃描一次數(shù)據(jù)庫(kù),查詢過(guò)期的訂單然后進(jìn)行處理;
優(yōu)點(diǎn):
簡(jiǎn)單,容易實(shí)現(xiàn);
缺點(diǎn):
1、存在延遲(延遲時(shí)間不準(zhǔn)確),如果你每隔1分鐘掃一次,那么就有可能延遲1分鐘;
2、性能較差,每次掃描數(shù)據(jù)庫(kù),如果訂單量很大
1.?被動(dòng)取消
當(dāng)用戶查詢訂單的時(shí)候,判斷訂單是否超時(shí),超時(shí)了就取消(交易關(guān)閉);
優(yōu)點(diǎn):
對(duì)服務(wù)器而言,壓力??;
缺點(diǎn):
1、用戶不查詢訂單,將永遠(yuǎn)處于待支付狀態(tài),會(huì)對(duì)數(shù)據(jù)統(tǒng)計(jì)等功能造成影響;
2、用戶打開訂單頁(yè)面,有可能比較慢,因?yàn)橐幚泶罅坑唵?,用戶體驗(yàn)少稍差;
11.2?JDK延遲隊(duì)列(單體應(yīng)用,不能分布式下)
DelayedQueue
無(wú)界阻塞隊(duì)列,該隊(duì)列只有在延遲期滿的時(shí)候才能從中獲取元素
優(yōu)點(diǎn):
實(shí)現(xiàn)簡(jiǎn)單,任務(wù)延遲低;
缺點(diǎn):
服務(wù)重啟、宕機(jī),數(shù)據(jù)丟失;
只適合單機(jī)版,不適合集群;
訂單量大,可能內(nèi)存不足而發(fā)生異常; oom
11.3?采用消息中間件(rabbitmq)
1、RabbitMQ本身不支持延遲隊(duì)列,可以使用TTL結(jié)合DLX的方式來(lái)實(shí)現(xiàn)消息的延遲投遞,即把DLX跟某個(gè)隊(duì)列綁定,到了指定時(shí)間,消息過(guò)期后,就會(huì)從DLX路由到這個(gè)隊(duì)列,消費(fèi)者可以從這個(gè)隊(duì)列取走消息。

代碼:正常延遲
//問(wèn)題? 如果先發(fā)送的消息,消息延遲時(shí)間長(zhǎng),會(huì)影響后面的 延遲時(shí)間段的消息的消費(fèi);
//解決:不同延遲時(shí)間的消息要發(fā)到不同的隊(duì)列上,同一個(gè)隊(duì)列的消息,它的延遲時(shí)間應(yīng)該一樣

代碼 延遲問(wèn)題
14.4?使用rabbitmq-delayed-message-exchange 延遲插件
11.4.1?下載
選擇對(duì)應(yīng)的版本下載 rabbitmq-delayed-message-exchange 插件,下載地址:http://www.rabbitmq.com/community-plugins.html?


2、插件拷貝到 RabbitMQ 服務(wù)器plugins目錄下
11.4.2?解壓
unzip?rabbitmq_delayed_message_exchange-3.10.2.ez
如果unzip 沒(méi)有安裝,先安裝一下
yum install unzip -y
11.4.3?啟用插件
./rabbitmq-plugins enable?rabbitmq_delayed_message_exchange?開啟插件;
11.4.4?查詢安裝情況
./rabbitmq-plugins list?查詢安裝的所有插件;
重啟rabbitmq使其生效;(此處也可以不重啟)

消息發(fā)送后不會(huì)直接投遞到隊(duì)列,而是存儲(chǔ)到 Mnesia(嵌入式數(shù)據(jù)庫(kù)),檢查 x-delay 時(shí)間(消息頭部);
延遲插件在 RabbitMQ 3.5.7 及以上的版本才支持,依賴 Erlang/OPT 18.0 及以上運(yùn)行環(huán)境;
Mnesia?是一個(gè)小型數(shù)據(jù)庫(kù),不適合于大量延遲消息的實(shí)現(xiàn)
解決了消息過(guò)期時(shí)間不一致出現(xiàn)的問(wèn)題。
參考代碼:
@Component
@Slf4j
public class RabbitConfig {
????public static final String EXCHANGE = "exchange:plugin";
????public static final String QUEUE = "queue.plugin";
????public static final String KEY = "plugin";
????
????@Bean
????public CustomExchange customExchange() {
????????Map<String, Object> arguments = new HashMap<>();
????????arguments.put("x-delayed-type", "direct");
????????// CustomExchange(String name, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments)
????????return new CustomExchange(EXCHANGE, "x-delayed-message", true, false, arguments);
????}
????@Bean
????public Queue queue() {
????????return QueueBuilder.durable(QUEUE).build();
????}
????@Bean
????public Binding binding(CustomExchange customExchange, Queue queue) {
????????return BindingBuilder.bind(queue).to(customExchange).with(KEY).noargs();
????}
}
發(fā)消息參考代碼
MessageProperties messageProperties=new MessageProperties();
messageProperties.setHeader("x-delay",16000);
String msg = "hello world";
Message message=new Message(msg.getBytes(),messageProperties);
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE, "plugin", message);
log.info("發(fā)送完畢,發(fā)送時(shí)間為:{}",new Date());