延遲隊列實現(xiàn)訂單超時自動取消
實現(xiàn)效果
創(chuàng)建一個訂單,超時30分鐘未支付就取消訂單。
RabbitMQ
本身是不支持延遲隊列的,但可以利用RabbitMQ
的存活時間 + 死信隊列
來實現(xiàn)消息延遲。
TTL + DLX
存活時間 TTL
TTL
全稱為:time to live
,意思為存活時間,當(dāng)消息沒有配置消費者,消息就一直停留在隊列中,停留時間超過存活時間后,消息會被自動刪除
RabbitMQ
支持兩種TTL
設(shè)置:
對消息本身設(shè)置存活時間,每條消息的存活時間可以靈活設(shè)置為不同的存活時間。
對傳遞的隊列設(shè)置存活時間,每條傳到到隊列的過期時間都一致。
當(dāng)消息過期還沒有被消費,此時消息會變成死信消息dead letter
,這是實現(xiàn)延遲隊列的關(guān)鍵。
消息變?yōu)樗佬诺臈l件:
消息被拒絕
basic.reject/basic.nack
,并且requeue=false
。消息的過期時間到期了。
隊列達(dá)到最大長度。
死信交換機(jī) DLX
當(dāng)上面的消息變成死信消息之后,它不會立即被刪除,首先它要看有沒有對應(yīng)的死信交換機(jī),如果有綁定的死信交換機(jī),消息就會從發(fā)送到對應(yīng)的死信交換機(jī)上。
DLX
全程為Dead Letter Exchanges
,意思為死信交換機(jī)。
死信交換機(jī)和普通交換機(jī)沒什么區(qū)別,不同的是死信交換機(jī)會綁定在其他隊列上,當(dāng)隊列的消息變成死信消息后,死信消息會發(fā)送到死信交換上。
隊列綁定死信交換機(jī)需要兩個參數(shù):
x-dead-letter-exchange
: 綁定的死信交換機(jī)名稱。x-dead-letter-routing-key
: 綁定的死信交換機(jī)routingKey
。
死信交換機(jī)和普通交換機(jī)的區(qū)別就是死信交換機(jī)的
Exchange
和routingKey
作為綁定參數(shù),綁定在其他隊列上。
項目實戰(zhàn)
消息發(fā)送的流程圖:
生產(chǎn)者將帶有
TTL
的消息發(fā)送給交換機(jī),由交換機(jī)路由到隊列中。隊列由于沒有消費,消息一直停留在隊列中,一直等到消息超時,變成死信消息。
死信消息轉(zhuǎn)發(fā)到死信交換機(jī)在路由到死信隊列上,最后給消費者消費。
創(chuàng)建死信隊列
public class DelayQueueRabbitConfig { ?// 下面是死信隊列
/**
* 死信隊列
*/
public static final String DLX_QUEUE = "queue.dlx"; /**
* 死信交換機(jī)
*/
public static final String DLX_EXCHANGE = "exchange.dlx"; /**
* 死信routing-key
*/
public static final String DLX_ROUTING_KEY = "routingKey.dlx"; /**
* 死信隊列
* @return
*/
public Queue dlxQueue() { return new Queue(DLX_QUEUE,true);
} /**
* 死信交換機(jī)
* @return
*/
public DirectExchange dlxExchange() { return new DirectExchange(DLX_EXCHANGE,true,false);
} /**
* 死信隊列和交換機(jī)綁定
* @return
*/
public Binding bindingDLX() { return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_ROUTING_KEY);
}
}
創(chuàng)建延遲隊列,并綁定死信隊列
?// 下面的是延遲隊列
/**
* 訂單延遲隊列
*/
public static final String ORDER_QUEUE = "queue.order"; /**
* 訂單交換機(jī)
*/
public static final String ORDER_EXCHANGE = "exchange.order"; /**
* 訂單routing-key
*/
public static final String ORDER_ROUTING_KEY = "routingkey.order"; /**
* 訂單延遲隊列
* @return
*/
public Queue orderQueue() { Map<String,Object> params = new HashMap<>();
params.put("x-dead-letter-exchange", DLX_EXCHANGE);
params.put("x-dead-letter-routing-key", DLX_ROUTING_KEY); return new Queue(ORDER_QUEUE, true, false, false, params);
} /**
* 訂單交換機(jī)
* @return
*/
public DirectExchange orderExchange() { return new DirectExchange(ORDER_EXCHANGE,true,false);
} /**
* 訂單隊列和交換機(jī)綁定
* @return
*/
public Binding orderBinding() { return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(ORDER_ROUTING_KEY);
}
綁定死信交換通過添加
x-dead-letter-exchange
、x-dead-letter-routing-key
參數(shù)指定對應(yīng)的交換機(jī)和路由。
發(fā)送消息
設(shè)置五秒超時時間
public class SendController { ? ?
? ?private RabbitTemplate rabbitTemplate; ? ?
? ? ("/dlx") ? ?public String dlx() { ? ? ? ?String date = DateUtil.dateFormat(new Date()); ? ? ? ?String delayTime = "5000"; ? ? ? ?System.out.println("【發(fā)送消息】延遲 5 秒 發(fā)送時間 " + date);
? ? ? ?rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE,DelayQueueRabbitConfig.ORDER_ROUTING_KEY,
? ? ? ? ? ? ? ?message, message1 -> {
? ? ? ? ? ? ? ? ? ?message1.getMessageProperties().setExpiration(delayTime); ? ? ? ? ? ? ? ? ? ?return message1;
? ? ? ? ? ? ? ?}); ? ? ? return "ok"; ? ? ? ?
? ?} ? ?
? ?class DateUtil{ ? ? ? public static String dateFormat(Date date) { ? ? ? ?SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); ? ? ? ?return sdf.format(date);
? ? ?}
? ?}
} ? ?
消費消息
DelayQueueRabbitConfig.DLX_QUEUE)public void delayPrecss(String msg,Channel channel,Message message){ ? ?System.out.println("【接收消息】" + msg + " 接收時間" + DateUtil.dateFormat(new Date()));
} ? ?
(queues =
控制臺輸出:
【發(fā)送消息】延遲5 秒 發(fā)送時間 21:32:15
【接收消息】延遲5 秒 發(fā)送時間 21:32:15 接收時間21:32:20
發(fā)送消息,5秒之后消費者后會收到消息。說明延遲成功。
隊列都有先進(jìn)先出
的特點,如果隊列前面的消息延遲比后的消息延遲更長,會出現(xiàn)什么情況。
消息時序問題
發(fā)送三條消息,延遲分別是10s
、2s
、5s
:
public class SendController { ? ?
? ?private RabbitTemplate rabbitTemplate; ? ?
? ? ("/dlx") ? ?public String dlx() { ? ? ? ? dlxSend("延遲10秒","10000"); ? ? ? ? dlxSend("延遲2 秒","2000"); ? ? ? ? dlxSend("延遲5 秒","5000"); ? ? ? ? return "ok";
? ?} ? ?
? ?private void dlxSend(String message,String delayTime) { ? ? ? ? System.out.println("【發(fā)送消息】" + message + ?"當(dāng)前時間" + DateUtil.dateFormat(new Date()));
? ? ? ? rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE,DelayQueueRabbitConfig.ORDER_ROUTING_KEY,
? ? ? ? ? ? ? ?message, message1 -> {
? ? ? ? ? ? ? ? ? ?message1.getMessageProperties().setExpiration(delayTime); ? ? ? ? ? ? ? ? ? ?return message1;
? ? ? ? ? ? ? ?});
? ?}
控制臺輸出:
【發(fā)送消息】延遲10秒當(dāng)前時間21:54:36【發(fā)送消息】延遲2 秒當(dāng)前時間21:54:36
【發(fā)送消息】延遲5 秒當(dāng)前時間21:54:36
【接收消息】延遲10秒 當(dāng)前時間21:54:46
【接收消息】延遲2 秒 當(dāng)前時間21:54:46
【接收消息】延遲5 秒 當(dāng)前時間21:54:46
所有的消息都要等10s
的消息消費完才能消費,當(dāng)10s
消息未被消費,其他消息也會阻塞,即使消息設(shè)置了更短的延遲。因為隊列有先進(jìn)先出
的特征,當(dāng)隊列有多條消息,延遲時間就沒用作用了,前面的消息消費后,后的消息才能被消費,不然會被阻塞到隊列中。
插件實現(xiàn)解決消息時序問題
針對上面消息的時序問題,RabbitMQ
開發(fā)一個延遲消息的插件delayed_message_exchange
,延遲消息交換機(jī)。使用該插件可以解決上面時序的問題。
在Github官網(wǎng)找到對應(yīng)的版本,我選擇的是3.8.17
:
將文件下載下來放到服務(wù)器的/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.9/plugins
目錄下,執(zhí)行以下命令,啟動插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
啟動插件,交換機(jī)會有新的類型x-delayed-message
:
x-delayed-message
類型的交換機(jī),支持延遲投遞消息。發(fā)送消息給x-delayed-message
類型的交換流程圖:
x-delayed-message
類型的交換機(jī)接收消息投遞后,并未將直接路由到隊列中,而是存儲到mnesia
(一個分布式數(shù)據(jù)系統(tǒng)),該系統(tǒng)會檢測消息延遲時間。消息達(dá)到可投遞時間,消息會被投遞到目標(biāo)隊列。
配置延遲隊列
public class XDelayedMessageConfig { ?/**
* 隊列
*/
public static final String DIRECT_QUEUE = "queue.delayed"; /**
* 延遲交換機(jī)
*/
public static final String DELAYED_EXCHANGE = "exchange.delayed"; /**
* 綁定的routing key
*/
public static final String ROUTING_KEY = "routingKey.bind";
public Queue directQueue() { return new Queue(DIRECT_QUEUE,true);
} /**
* 定義延遲交換機(jī)
* 交換機(jī)的類型為 x-delayed-message
* @return
*/
public CustomExchange delayedExchange() { Map<String,Object> map = new HashMap<>();
map.put("x-delayed-type","direct"); return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,map);
}
public Binding delayOrderBinding() { return BindingBuilder.bind(directQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs();
}
}
發(fā)送消息:
? ?"/delay") ? ?public String delay() { ? ?delaySend("延遲隊列10 秒",10000); ? ?delaySend("延遲隊列5 秒",5000); ? ?delaySend("延遲隊列2 秒",2000); ? ? ? ?return "ok";
? ?} ? ?
? ?private void delaySend(String message,Integer delayTime) {
? ? ? ?message = message + " " + DateUtil.dateFormat(new Date()); ? ? ? ?System.out.println("【發(fā)送消息】" + message);
? ? ? ?rabbitTemplate.convertAndSend(XDelayedMessageConfig.DELAYED_EXCHANGE,XDelayedMessageConfig.ROUTING_KEY,
? ? ? ? ? ? ? ?message, message1 -> {
? ? ? ? ? ? ? ? ? ?message1.getMessageProperties().setDelay(delayTime); ? ? ? ? ? ? ? ? ? ?//message1.getMessageProperties().setHeader("x-delay",delayTime);
? ? ? ? ? ? ? ? ? ?return message1;
? ? ? ? ? ? ? ?});
? ?} ? ?
(
消費消息:
? ?XDelayedMessageConfig.DIRECT_QUEUE) ? ?public void delayProcess(String msg,Channel channel, Message message) { ? ? ? ?System.out.println("【接收消息】" + msg + " 當(dāng)前時間" + DateUtil.dateFormat(new Date()));
? }
(queues =
控制臺輸出:
【發(fā)送消息】延遲隊列10 秒 22:00:01
【發(fā)送消息】延遲隊列5 秒 22:00:01
【發(fā)送消息】延遲隊列2 秒 22:00:01
【接收消息】延遲隊列2 秒 22:00:01 當(dāng)前時間22:00:03
【接收消息】延遲隊列5 秒 22:00:01 當(dāng)前時間22:00:05
【接收消息】延遲隊列10 秒 22:00:01 當(dāng)前時間22:00:10
解決了消息的時序問題。
總結(jié)
使用
Java
自帶的延遲消息,系統(tǒng)重啟或者掛了之后,消息就無法發(fā)送,不適于用在生產(chǎn)環(huán)境上。RabbitMQ
本身不支持延遲隊列,可以使用存活時間ttl
?+ 死信隊列dlx
實現(xiàn)消息延遲。發(fā)送的消息設(shè)置
ttl
,所在的隊列不設(shè)置消費者。隊列綁定死信隊列,消息超時之后,變成死信消息,再發(fā)送給死信隊列,最后發(fā)送給消費者。
發(fā)送多條不同延遲時間消息,前面消息沒有到延遲時間,會阻塞后面延遲更低的消息,因為隊列有
先進(jìn)先出
的特性。RabbitMQ
的x-delay-message
插件可以解決消息時序問題。帶有
ttl
的消息發(fā)送x-delayed-message
類型的交換機(jī),消息不會直接路由到隊列中。而且存儲到分布式數(shù)據(jù)系統(tǒng)中,該系統(tǒng)會檢測消息延遲時間。消息到達(dá)延遲時間,消息才能會投遞到隊列中,最后發(fā)送給消費者。