最美情侣中文字幕电影,在线麻豆精品传媒,在线网站高清黄,久久黄色视频

歡迎光臨散文網(wǎng) 會(huì)員登陸 & 注冊(cè)

動(dòng)力節(jié)點(diǎn)RabbitMQ教程|12小時(shí)學(xué)會(huì)rabbitmq消息中間件

2023-04-26 17:11 作者:山藥當(dāng)當(dāng)  | 我要投稿

1.?What is RabbitMQ?

1.1簡(jiǎn)介

RabbitMQ是一個(gè)廣泛使用的消息服務(wù)器,采用Erlang語(yǔ)言編寫,是一種開源的實(shí)現(xiàn) AMQP(高級(jí)消息隊(duì)列協(xié)議)的消息中間件;

RabbitMQ最初起源于金融系統(tǒng),它的性能及穩(wěn)定性都非常出色;

AMQP協(xié)議(http://www.amqp.org),即 Advanced Message Queuing Protocol,高級(jí)消息隊(duì)列協(xié)議,是應(yīng)用層協(xié)議的一個(gè)開放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì);

我們學(xué)的協(xié)議有哪些:(http、ftp)

1.2 相關(guān)網(wǎng)址

官網(wǎng):https://www.rabbitmq.com?

Github:https://github.com/rabbitmq

1.3 消息中間件(MQ=Message Queue)

簡(jiǎn)單來(lái)說(shuō),消息中間件就是指保存數(shù)據(jù)的一個(gè)容器(服務(wù)器),可以用于兩個(gè)系統(tǒng)之間的數(shù)據(jù)傳遞。

消息中間件一般有三個(gè)主要角色:生產(chǎn)者、消費(fèi)者、消息代理(消息隊(duì)列、消息服務(wù)器);

rabbitmq-java-client??????rabbitmq-server?????????rabbitmq-java-client

生產(chǎn)者發(fā)送消息到消息服務(wù)器,然后消費(fèi)者從消息代理(消息隊(duì)列)中獲取數(shù)據(jù)并進(jìn)行處理;

1.4 常用的消息中間件

目前比較主流的幾個(gè)消息中間件:

??RabbitMQ

??kafka(大數(shù)據(jù)領(lǐng)域)

??RocketMQ(阿里巴巴開源)獻(xiàn)給Apache組織

??pulsar(最近一兩年流行起來(lái)的)

1.?MQ(Message Queue)的應(yīng)用場(chǎng)景

1.1?異步處理

下訂單:下訂單--》加積分--》發(fā)紅包--》發(fā)手機(jī)短信

下訂單---向MQ 發(fā)消息--》積分系統(tǒng),紅包系統(tǒng),手機(jī)短信系統(tǒng)接收消息

同步是阻塞的(會(huì)造成等待),異步是非阻塞的(不會(huì)等待);

大流量高并發(fā)請(qǐng)求、批量數(shù)據(jù)傳遞,就可以采用異步處理,提升系統(tǒng)吞吐量;

2.2系統(tǒng)解耦

多個(gè)系統(tǒng)之間,不需要直接交互,通過(guò)消息進(jìn)行業(yè)務(wù)流轉(zhuǎn);

2.3 流量削峰

高負(fù)載請(qǐng)求/任務(wù)的緩沖處理;

2.4日志處理

主要是用kafka這個(gè)服務(wù)器來(lái)做;

日志處理是指將消息隊(duì)列用于在日志處理中,比如Kafka解決大量日志傳輸?shù)膯?wèn)題;

loger.info(.....)

ELK 日志處理解決方案:

loger.error(.....) -->logstash收集消息--> 發(fā)送消息的kafka --> elastic search (es) -->Kibana ELK日志處理平臺(tái)

1.?RabbitMQ運(yùn)行環(huán)境搭建

RabbitMQ是使用Erlang語(yǔ)言開發(fā)的,所以要先下載安裝Erlang

3.1 Erlang及RabbitMQ安裝版本的選擇

下載時(shí)一定要注意版本兼容性

版本兼容說(shuō)明地址:https://www.rabbitmq.com/which-erlang.html

我們選擇的版本

3.2下載Erlang

Erlang官網(wǎng)

https://www.erlang.org/

Linux下載:

wget https://github.com/erlang/otp/releases/download/OTP-25.1.1/otp_src_25.1.1.tar.gz

說(shuō)明:wget 是linux命令,可以用來(lái)下載軟件

3.3 安裝Erlang

3.3.1 安裝erlang前先安裝Linux依賴庫(kù)

yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel

說(shuō)明:yum -y install?安裝linux的一些依賴庫(kù)的命令 ,-y表示自動(dòng)確認(rèn);

3.3.2 解壓erlang壓縮包文件

tar -zxvf otp_src_25.1.1.tar.gz

3.3.3 配置

切換到解壓的目錄下,運(yùn)行相應(yīng)命令

cd otp_src_25.1.1

./configure

3.3.4 編譯

make

3.3.5安裝

make install

安裝好了erlang后可以將解壓的文件夾刪除:

rm -rf otp_src_25.1.1

3.3.6 驗(yàn)證erlang是否安裝成功

在命令行輸入: erl?如果進(jìn)入了編程命令行則表示安裝成功,然后按ctrl + z 退出編程命令行;

3.4 下載RabbitMQ

從RabbitMQ官網(wǎng)https://www.rabbitmq.com找到下載鏈接

Linux:下載3.10.11

wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.10.11/rabbitmq-server-generic-unix-3.10.11.tar.xz

generic 是通用的意思,這個(gè)版本也就是通用的unix版本

3.5 安裝RabbitMQ

解壓RabbitMQ的壓縮包,即安裝完成,無(wú)需再編譯

tar -xvf rabbitmq-server-generic-unix-3.10.11.tar.xz??-C ?/usr/local/

說(shuō)明 -C 選項(xiàng)是指定解壓目錄,如果不指定會(huì)解壓到當(dāng)前目錄

此時(shí)rabbitmq就安裝好了;

4.?啟動(dòng)及停止RabbitMQ

4.1啟動(dòng)RabbitMQ

切換到安裝目錄的sbin目錄下:

#啟動(dòng)

./rabbitmq-server ?-detached

說(shuō)明:

-detached 將表示在后臺(tái)啟動(dòng)運(yùn)行rabbitmq;不加該參數(shù)表示前臺(tái)啟動(dòng);

rabbitmq的運(yùn)行日志存放在安裝目錄的var目錄下;

現(xiàn)在的目錄是:/usr/local/rabbitmq_server-3.10.11/var/log/rabbitmq

4.2 查看RabbitMQ的狀態(tài)

切換到sbin目錄下執(zhí)行:

./rabbitmqctl -n rabbit status

?說(shuō)明:-n rabbit 是指定節(jié)點(diǎn)名稱為rabbit,目前只有一個(gè)節(jié)點(diǎn),節(jié)點(diǎn)名默認(rèn)為rabbit

?此處-n rabbit 也可以省略

4.3 停止RabbitMQ

切換到sbin目錄下執(zhí)行:

./rabbitmqctl shutdown

4.4 配置path環(huán)境變量

vi /etc/profile

RABBIT_HOME=/usr/local/rabbitmq_server-3.10.11

PATH=$PATH:$RABBIT_HOME/sbin

export RABBIT_HOME PATH

刷新環(huán)境變量,命令如下

source /etc/profile

5.?RabbitMQ管理命令

./rabbitmqctl?是一個(gè)管理命令,可以管理rabbitmq的很多操作。

./rabbitmqctl help可以查看一下有哪些操作

查看具體子命令 可以使用 ./rabbitmqctl help 子命令名稱

5.1 用戶管理

用戶管理包括增加用戶,刪除用戶,查看用戶列表,修改用戶密碼。

這些操作都是通過(guò)rabbitmqctl管理命令來(lái)實(shí)現(xiàn)完成。

查看幫助:

rabbitmqctl add_user --help

相應(yīng)的命令

(1) 查看當(dāng)前用戶列表


rabbitmqctl?list_users


(2) 新增一個(gè)用戶

語(yǔ)法:rabbitmqctl?add_user?Username?Password

示例: rabbitmqctl add_user admin 123456

5.2 設(shè)置用戶角色

rabbitmqctl?set_user_tags?User?Tag

示例:rabbitmqctl?set_user_tags?admin administrator

說(shuō)明:此處設(shè)置用戶的角色為管理員角色

5.3 設(shè)置用戶權(quán)限

rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

說(shuō)明:此操作是設(shè)置admin用戶擁有操作虛擬主機(jī)/下的所有權(quán)限

查看用戶權(quán)限

./rabbitmqctl list_permissions

2.?web管理后臺(tái)

Rabbitmq有一個(gè)web管理后臺(tái),這個(gè)管理后臺(tái)是以插件的方式提供的,啟動(dòng)后臺(tái)web管理功能,切換到sbin目錄下執(zhí)行:

6.1 啟用管理后臺(tái)

# 查看rabbitmq 的插件列表

./rabbitmq-plugins list

#啟用

./rabbitmq-plugins enable rabbitmq_management?

#禁用

./rabbitmq-plugins disable rabbitmq_management

6.2防火墻操作

systemctl status firewalld --檢查防火墻狀態(tài)

systemctl stop firewalld --關(guān)閉防火墻,Linux重啟之后會(huì)失效

systemctl disable firewalld --防火墻置為不可用,Linux重啟后,防火墻服務(wù)不自動(dòng)啟動(dòng),依然是不可用

6.3 訪問(wèn)

http://192.168.131.131:15672?

用戶名/密碼為我們上面創(chuàng)建的admin/123456

注意上面改成你的虛擬主機(jī)的ip地址

備注:如果使用默認(rèn)用戶guest、密碼guest登錄,會(huì)提示User can only log in via localhost

說(shuō)明guest用戶只能從localhost本機(jī)登錄,所以不要使用該用戶。

6.4 通過(guò)web頁(yè)面新建虛擬主機(jī)

建完后如下

7.?RabbitMQ工作模型

broker 相當(dāng)于mysql服務(wù)器,virtual host相當(dāng)于數(shù)據(jù)庫(kù)(可以有多個(gè)數(shù)據(jù)庫(kù))

queue相當(dāng)于表,消息相當(dāng)于記錄。

消息隊(duì)列有三個(gè)核心要素: 消息生產(chǎn)者、消息隊(duì)列、消息消費(fèi)者;

生產(chǎn)者(Producer):發(fā)送消息的應(yīng)用;(java程序,也可能是別的語(yǔ)言寫的程序)

消費(fèi)者(Consumer):接收消息的應(yīng)用;(java程序,也可能是別的語(yǔ)言寫的程序)

代理(Broker):就是消息服務(wù)器,RabbitMQ Server就是Message Broker;

連接(Connection):連接RabbitMQ服務(wù)器的TCP長(zhǎng)連接;

信道(Channel):連接中的一個(gè)虛擬通道,消息隊(duì)列發(fā)送或者接收消息時(shí),都是通過(guò)信道進(jìn)行的;

虛擬主機(jī)(Virtual host):一個(gè)虛擬分組,在代碼中就是一個(gè)字符串,當(dāng)多個(gè)不同的用戶使用同一個(gè)RabbitMQ服務(wù)時(shí),可以劃分出多個(gè)Virtual host,每個(gè)用戶在自己的Virtual host創(chuàng)建exchange/queue等;(分類比較清晰、相互隔離)

交換機(jī)(Exchange):交換機(jī)負(fù)責(zé)從生產(chǎn)者接收消息,并根據(jù)交換機(jī)類型分發(fā)到對(duì)應(yīng)的消息隊(duì)列中,起到一個(gè)路由的作用;

路由鍵(Routing Key):交換機(jī)根據(jù)路由鍵來(lái)決定消息分發(fā)到哪個(gè)隊(duì)列,路由鍵是消息的目的地址;

綁定(Binding):綁定是隊(duì)列和交換機(jī)的一個(gè)關(guān)聯(lián)連接(關(guān)聯(lián)關(guān)系);

隊(duì)列(Queue):存儲(chǔ)消息的緩存;

消息(Message):由生產(chǎn)者通過(guò)RabbitMQ發(fā)送給消費(fèi)者的信息;(消息可以任何數(shù)據(jù),字符串、user對(duì)象,json串等等)

8.?RabbitMQ交換機(jī)類型

Exchange(X) 可翻譯成交換機(jī)/交換器/路由器

8.1 RabbitMQ交換器 (Exchange)類型

1、Fanout Exchange(扇形)

2、Direct Exchange(直連)

3、Topic Exchange(主題)

4、Headers Exchange(頭部)

8.2 Fanout Exchange

8.2.1 介紹

Fanout 扇形的,散開的; 扇形交換機(jī)

投遞到所有綁定的隊(duì)列,不需要路由鍵,不需要進(jìn)行路由鍵的匹配,相當(dāng)于廣播、群發(fā);

8.2.2 示例

8.3 Direct Exchange

8.3.1 介紹

根據(jù)路由鍵精確匹配(一模一樣)進(jìn)行路由消息隊(duì)列;

8.3.2 示例

8.4 Topic Exchange

8.4.1 介紹

通配符匹配,相當(dāng)于模糊匹配;

#匹配多個(gè)單詞,用來(lái)表示任意數(shù)量(零個(gè)或多個(gè))單詞

*匹配一個(gè)單詞(必須有一個(gè),而且只有一個(gè)),用.隔開的為一個(gè)單詞:

beijing.# == beijing.queue.abc, beijing.queue.xyz.xxx

beijing.* == beijing.queue, beijing.xyz

發(fā)送時(shí)指定的路由鍵:lazy.orange.rabbit

8.4.2 示例

8.5 Headers Exchange (用的比較少)

8.5.1 介紹

基于消息內(nèi)容中的headers屬性進(jìn)行匹配;

8.5.2 示例

綁定參考代碼:

Map<String, Object> headerValues = new HashMap<>();

headerValues.put("type", "m");

headerValues.put("status", 1);

return BindingBuilder.bind(queueA).to(headersExchange).whereAll(headerValues).match();

發(fā)送參考代碼

??MessageProperties messageProperties = new MessageProperties();

????????messageProperties.setHeader("type", "m");

????????messageProperties.setHeader("status", 1);

????????Message message = new Message(msg.getBytes(), messageProperties);

// ???????void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;

????????amqpTemplate.convertAndSend(RabbitConfig.EXCHANGE, null, message);

學(xué)習(xí)它的目的是:發(fā)消息時(shí)可以指定消息屬性(MessageProperties)

9.?RabbitMQ過(guò)期消息

過(guò)期消息也叫TTL消息,TTL:Time To Live?

消息的過(guò)期時(shí)間有兩種設(shè)置方式:(過(guò)期消息)

9.1?設(shè)置單條消息的過(guò)期時(shí)間

參考代碼

MessageProperties messageProperties = new MessageProperties();

messageProperties.setExpiration("15000"); // 設(shè)置過(guò)期時(shí)間,單位:毫秒

Message message = new Message(json.getBytes(), messageProperties);

//發(fā)送消息

amqpTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE, RabbitConfig.DIRECT_ROUTINGKEY, message);

System.out.println("發(fā)送完畢:" + new Date());

單條消息的過(guò)期時(shí)間決定了在沒(méi)有任何消費(fèi)者消費(fèi)時(shí),消息可以存活多久;

9.2?通過(guò)隊(duì)列屬性設(shè)置消息過(guò)期時(shí)間

@Bean

public Queue directQueue() {

????Map<String, Object> arguments = new HashMap<>();

????arguments.put("x-message-ttl", 10000);

????return new Queue(DIRECT_QUEUE, true, false, false, arguments);

}

隊(duì)列的過(guò)期時(shí)間決定了在沒(méi)有任何消費(fèi)者的情況下,隊(duì)列中的消息可以存活多久;

注意事項(xiàng):

如果消息和對(duì)列都設(shè)置過(guò)期時(shí)間,則消息的TTL以兩者之間較小的那個(gè)數(shù)值為準(zhǔn)。

10.?RabbitMQ死信隊(duì)列

也有叫 死信交換機(jī)、死信郵箱等說(shuō)法;

DLX: Dead-Letter-Exchange 死信交換器,死信郵箱

如下情況下一個(gè)消息會(huì)進(jìn)入DLX(Dead Letter Exchange)死信交換機(jī)。

10.1?消息過(guò)期

參考代碼

MessageProperties messageProperties=new MessageProperties();

//設(shè)置此條消息的過(guò)期時(shí)間為10秒

messageProperties.setExpiration("10000");


1.2?隊(duì)列過(guò)期

參考代碼

?Map<String, Object> arguments =new HashMap<>();

?//指定死信交換機(jī),通過(guò)x-dead-letter-exchange 來(lái)設(shè)置

?arguments.put("x-dead-letter-exchange",EXCHANGE_DLX);

?//設(shè)置死信路由key,value 為死信交換機(jī)和死信隊(duì)列綁定的key,要一模一樣,因?yàn)樗佬沤粨Q機(jī)是直連交換機(jī)

?arguments.put("x-dead-letter-routing-key",BINDING_DLX_KEY);

?//隊(duì)列的過(guò)期時(shí)間

?arguments.put("x-message-ttl",10000);

return ?new Queue(QUEUE_NORMAL,true,false,false,arguments);

TTL: Time to Live的簡(jiǎn)稱,過(guò)期時(shí)間


1.3?隊(duì)列達(dá)到最大長(zhǎng)度(先入隊(duì)的消息會(huì)被發(fā)送到DLX)

Map<String, Object> arguments = new HashMap<String, Object>();

//設(shè)置隊(duì)列的最大長(zhǎng)度?,對(duì)頭的消息會(huì)被擠出變成死信

arguments.put("x-max-length", 5);


1.4?消費(fèi)者拒絕消息不進(jìn)行重新投遞

從正常的隊(duì)列接收消息,但是對(duì)消息不進(jìn)行確認(rèn),并且不對(duì)消息進(jìn)行重新投遞,此時(shí)消息就進(jìn)入死信隊(duì)列。

application.yml 啟動(dòng)手動(dòng)確認(rèn)

spring:

??rabbitmq:

????listener:

??????simple:

????????acknowledge-mode: manual

參考代碼

?/**

?????* 監(jiān)聽(tīng)正常的那個(gè)隊(duì)列的名字,不是監(jiān)聽(tīng)那個(gè)死信隊(duì)列

?????* 我們從正常的隊(duì)列接收消息,但是對(duì)消息不進(jìn)行確認(rèn),并且不對(duì)消息進(jìn)行重新投遞,此時(shí)消息就進(jìn)入死信隊(duì)列

?????*

?????* channel 消息信道(是連接下的一個(gè)消息信道,一個(gè)連接下有多個(gè)消息信息,發(fā)消息/接消息都是通過(guò)信道完成的)

?????*/

????@RabbitListener(queues = {RabbitConfig.QUEUE})

????public void process(Message message, Channel channel) {

????????System.out.println("接收到的消息:" + message);

????????//對(duì)消息不確認(rèn), ack單詞是 確認(rèn) 的意思

????????// void basicNack(long deliveryTag, boolean multiple, boolean requeue)

????????// deliveryTag:消息的一個(gè)數(shù)字標(biāo)簽

????????// multiple:翻譯成中文是多個(gè)的意思,如果是true表示對(duì)小于deliveryTag標(biāo)簽下的消息都進(jìn)行Nack不確認(rèn),false表示只對(duì)當(dāng)前deliveryTag標(biāo)簽的消息Nack

????????// requeue:如果是true表示消息被Nack后,重新發(fā)送到隊(duì)列,如果是false,消息被Nack后,不會(huì)重新發(fā)送到隊(duì)列

????????try {

????????????System.out.println("deliveryTag = " + message.getMessageProperties().getDeliveryTag());

????????????//要開啟rabbitm消息消費(fèi)的手動(dòng)確認(rèn)模式,然后才這么寫代碼;

????????????channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);

????????} catch (IOException e) {

????????????e.printStackTrace();

????????}

????}


1.5?消費(fèi)者拒絕消息

開啟手動(dòng)確認(rèn)模式,并拒絕消息,不重新投遞,則進(jìn)入死信隊(duì)列

參考代碼:

????/**

?????* 監(jiān)聽(tīng)正常的那個(gè)隊(duì)列的名字,不是監(jiān)聽(tīng)那個(gè)死信隊(duì)列

?????* 我們從正常的隊(duì)列接收消息,但是對(duì)消息不進(jìn)行確認(rèn),并且不對(duì)消息進(jìn)行重新投遞,此時(shí)消息就進(jìn)入死信隊(duì)列

?????*

?????* channel 消息信道(是連接下的一個(gè)消息信道,一個(gè)連接下有多個(gè)消息信息,發(fā)消息/接消息都是通過(guò)信道完成的)

?????*/

????@RabbitListener(queues = {RabbitConfig.QUEUE})

????public void process(Message message, Channel channel) {

????????System.out.println("接收到的消息:" + message);

????????//對(duì)消息不確認(rèn), ack單詞是 確認(rèn) 的意思

????????// void basicNack(long deliveryTag, boolean multiple, boolean requeue)

????????// deliveryTag:消息的一個(gè)數(shù)字標(biāo)簽

????????// multiple:翻譯成中文是多個(gè)的意思,如果是true表示對(duì)小于deliveryTag標(biāo)簽下的消息都進(jìn)行Nack不確認(rèn),false表示只對(duì)當(dāng)前deliveryTag標(biāo)簽的消息Nack

????????// requeue:如果是true表示消息被Nack后,重新發(fā)送到隊(duì)列,如果是false,消息被Nack后,不會(huì)重新發(fā)送到隊(duì)列

????????try {

????????????System.out.println("deliveryTag = " + message.getMessageProperties().getDeliveryTag());

????????????//要開啟rabbitm消息消費(fèi)的手動(dòng)確認(rèn)模式,然后才這么寫代碼;

????????????channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);

????????} catch (IOException e) {

????????????e.printStackTrace();

????????}

????}

}


11.?RabbitMQ延遲隊(duì)列

場(chǎng)景:有一個(gè)訂單,15分鐘內(nèi)如果不支付,就把該訂單設(shè)置為交易關(guān)閉,那么就不能支付了,這類實(shí)現(xiàn)延遲任務(wù)的場(chǎng)景就可以采用延遲隊(duì)列來(lái)實(shí)現(xiàn),當(dāng)然除了延遲隊(duì)列來(lái)實(shí)現(xiàn),也可以有一些其他辦法實(shí)現(xiàn);

11.1?定時(shí)任務(wù)方式

每隔3秒掃描一次數(shù)據(jù)庫(kù),查詢過(guò)期的訂單然后進(jìn)行處理;

優(yōu)點(diǎn):

簡(jiǎn)單,容易實(shí)現(xiàn);

缺點(diǎn):

1、存在延遲(延遲時(shí)間不準(zhǔn)確),如果你每隔1分鐘掃一次,那么就有可能延遲1分鐘;

2、性能較差,每次掃描數(shù)據(jù)庫(kù),如果訂單量很大

1.?被動(dòng)取消

當(dāng)用戶查詢訂單的時(shí)候,判斷訂單是否超時(shí),超時(shí)了就取消(交易關(guān)閉);

優(yōu)點(diǎn):

對(duì)服務(wù)器而言,壓力??;

缺點(diǎn):

1、用戶不查詢訂單,將永遠(yuǎn)處于待支付狀態(tài),會(huì)對(duì)數(shù)據(jù)統(tǒng)計(jì)等功能造成影響;

2、用戶打開訂單頁(yè)面,有可能比較慢,因?yàn)橐幚泶罅坑唵?,用戶體驗(yàn)少稍差;

11.2?JDK延遲隊(duì)列(單體應(yīng)用,不能分布式下)

DelayedQueue

無(wú)界阻塞隊(duì)列,該隊(duì)列只有在延遲期滿的時(shí)候才能從中獲取元素

優(yōu)點(diǎn):

實(shí)現(xiàn)簡(jiǎn)單,任務(wù)延遲低;

缺點(diǎn):

服務(wù)重啟、宕機(jī),數(shù)據(jù)丟失;

只適合單機(jī)版,不適合集群;

訂單量大,可能內(nèi)存不足而發(fā)生異常; oom

11.3?采用消息中間件(rabbitmq)

1、RabbitMQ本身不支持延遲隊(duì)列,可以使用TTL結(jié)合DLX的方式來(lái)實(shí)現(xiàn)消息的延遲投遞,即把DLX跟某個(gè)隊(duì)列綁定,到了指定時(shí)間,消息過(guò)期后,就會(huì)從DLX路由到這個(gè)隊(duì)列,消費(fèi)者可以從這個(gè)隊(duì)列取走消息。

代碼:正常延遲

//問(wèn)題? 如果先發(fā)送的消息,消息延遲時(shí)間長(zhǎng),會(huì)影響后面的 延遲時(shí)間段的消息的消費(fèi);

//解決:不同延遲時(shí)間的消息要發(fā)到不同的隊(duì)列上,同一個(gè)隊(duì)列的消息,它的延遲時(shí)間應(yīng)該一樣

代碼 延遲問(wèn)題

14.4?使用rabbitmq-delayed-message-exchange 延遲插件

11.4.1?下載

選擇對(duì)應(yīng)的版本下載 rabbitmq-delayed-message-exchange 插件,下載地址:http://www.rabbitmq.com/community-plugins.html?

2、插件拷貝到 RabbitMQ 服務(wù)器plugins目錄下

11.4.2?解壓

unzip?rabbitmq_delayed_message_exchange-3.10.2.ez

如果unzip 沒(méi)有安裝,先安裝一下

yum install unzip -y

11.4.3?啟用插件

./rabbitmq-plugins enable?rabbitmq_delayed_message_exchange?開啟插件;

11.4.4?查詢安裝情況

./rabbitmq-plugins list?查詢安裝的所有插件;

重啟rabbitmq使其生效;(此處也可以不重啟)

消息發(fā)送后不會(huì)直接投遞到隊(duì)列,而是存儲(chǔ)到 Mnesia(嵌入式數(shù)據(jù)庫(kù)),檢查 x-delay 時(shí)間(消息頭部);

延遲插件在 RabbitMQ 3.5.7 及以上的版本才支持,依賴 Erlang/OPT 18.0 及以上運(yùn)行環(huán)境;

Mnesia?是一個(gè)小型數(shù)據(jù)庫(kù),不適合于大量延遲消息的實(shí)現(xiàn)

解決了消息過(guò)期時(shí)間不一致出現(xiàn)的問(wèn)題。

參考代碼:

@Component

@Slf4j

public class RabbitConfig {

????public static final String EXCHANGE = "exchange:plugin";

????public static final String QUEUE = "queue.plugin";

????public static final String KEY = "plugin";

????

????@Bean

????public CustomExchange customExchange() {

????????Map<String, Object> arguments = new HashMap<>();

????????arguments.put("x-delayed-type", "direct");

????????// CustomExchange(String name, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments)

????????return new CustomExchange(EXCHANGE, "x-delayed-message", true, false, arguments);

????}


????@Bean

????public Queue queue() {

????????return QueueBuilder.durable(QUEUE).build();

????}


????@Bean

????public Binding binding(CustomExchange customExchange, Queue queue) {

????????return BindingBuilder.bind(queue).to(customExchange).with(KEY).noargs();

????}

}

發(fā)消息參考代碼

MessageProperties messageProperties=new MessageProperties();

messageProperties.setHeader("x-delay",16000);

String msg = "hello world";

Message message=new Message(msg.getBytes(),messageProperties);

rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE, "plugin", message);

log.info("發(fā)送完畢,發(fā)送時(shí)間為:{}",new Date());



動(dòng)力節(jié)點(diǎn)RabbitMQ教程|12小時(shí)學(xué)會(huì)rabbitmq消息中間件的評(píng)論 (共 條)

分享到微博請(qǐng)遵守國(guó)家法律
界首市| 石屏县| 昌江| 漳平市| 华宁县| 行唐县| 柏乡县| 伊宁县| 梁平县| 江油市| 正宁县| 广州市| 邹城市| 洪湖市| 武穴市| 盘山县| 个旧市| 邛崃市| 年辖:市辖区| 正宁县| 普兰店市| 涟水县| 鄂托克前旗| 二手房| 黄石市| 图片| 洪洞县| 北海市| 涞水县| 嘉鱼县| 黄大仙区| 金寨县| 平阳县| 耿马| 泰顺县| 苍山县| 剑河县| 长丰县| 庄浪县| 元朗区| 沧源|