最美情侣中文字幕电影,在线麻豆精品传媒,在线网站高清黄,久久黄色视频

歡迎光臨散文網(wǎng) 會員登陸 & 注冊

延遲隊列實現(xiàn)訂單超時自動取消

2023-03-04 08:34 作者:Cpp程序員  | 我要投稿

實現(xiàn)效果

創(chuàng)建一個訂單,超時30分鐘未支付就取消訂單。

RabbitMQ本身是不支持延遲隊列的,但可以利用RabbitMQ存活時間 + 死信隊列來實現(xiàn)消息延遲。

TTL + DLX

存活時間 TTL

TTL全稱為:time to live,意思為存活時間,當(dāng)消息沒有配置消費者,消息就一直停留在隊列中,停留時間超過存活時間后,消息會被自動刪除

RabbitMQ支持兩種TTL設(shè)置:

  • 對消息本身設(shè)置存活時間,每條消息的存活時間可以靈活設(shè)置為不同的存活時間。

  • 對傳遞的隊列設(shè)置存活時間,每條傳到到隊列的過期時間都一致。

當(dāng)消息過期還沒有被消費,此時消息會變成死信消息dead letter這是實現(xiàn)延遲隊列的關(guān)鍵。

消息變?yōu)樗佬诺臈l件:

  • 消息被拒絕basic.reject/basic.nack,并且requeue=false。

  • 消息的過期時間到期了。

  • 隊列達(dá)到最大長度。

死信交換機(jī) DLX

當(dāng)上面的消息變成死信消息之后,它不會立即被刪除,首先它要看有沒有對應(yīng)的死信交換機(jī),如果有綁定的死信交換機(jī),消息就會從發(fā)送到對應(yīng)的死信交換機(jī)上。

DLX全程為Dead Letter Exchanges,意思為死信交換機(jī)。

死信交換機(jī)和普通交換機(jī)沒什么區(qū)別,不同的是死信交換機(jī)會綁定在其他隊列上,當(dāng)隊列的消息變成死信消息后,死信消息會發(fā)送到死信交換上。

隊列綁定死信交換機(jī)需要兩個參數(shù):

  • x-dead-letter-exchange: 綁定的死信交換機(jī)名稱。

  • x-dead-letter-routing-key: 綁定的死信交換機(jī)routingKey

死信交換機(jī)和普通交換機(jī)的區(qū)別就是死信交換機(jī)的ExchangeroutingKey作為綁定參數(shù),綁定在其他隊列上。

項目實戰(zhàn)

消息發(fā)送的流程圖:

  • 生產(chǎn)者將帶有TTL的消息發(fā)送給交換機(jī),由交換機(jī)路由到隊列中。

  • 隊列由于沒有消費,消息一直停留在隊列中,一直等到消息超時,變成死信消息。

  • 死信消息轉(zhuǎn)發(fā)到死信交換機(jī)在路由到死信隊列上,最后給消費者消費。

創(chuàng)建死信隊列

@Configurationpublic class DelayQueueRabbitConfig { ?// 下面是死信隊列 /** * 死信隊列 */ public static final String DLX_QUEUE = "queue.dlx"; /** * 死信交換機(jī) */ public static final String DLX_EXCHANGE = "exchange.dlx"; /** * 死信routing-key */ public static final String DLX_ROUTING_KEY = "routingKey.dlx"; /** * 死信隊列 * @return */ @Bean public Queue dlxQueue() { return new Queue(DLX_QUEUE,true); } /** * 死信交換機(jī) * @return */ @Bean public DirectExchange dlxExchange() { return new DirectExchange(DLX_EXCHANGE,true,false); } /** * 死信隊列和交換機(jī)綁定 * @return */ @Bean public Binding bindingDLX() { return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_ROUTING_KEY); } }

創(chuàng)建延遲隊列,并綁定死信隊列

?// 下面的是延遲隊列 /** * 訂單延遲隊列 */ public static final String ORDER_QUEUE = "queue.order"; /** * 訂單交換機(jī) */ public static final String ORDER_EXCHANGE = "exchange.order"; /** * 訂單routing-key */ public static final String ORDER_ROUTING_KEY = "routingkey.order"; /** * 訂單延遲隊列 * @return */ @Bean public Queue orderQueue() { Map<String,Object> params = new HashMap<>(); params.put("x-dead-letter-exchange", DLX_EXCHANGE); params.put("x-dead-letter-routing-key", DLX_ROUTING_KEY); return new Queue(ORDER_QUEUE, true, false, false, params); } /** * 訂單交換機(jī) * @return */ @Bean public DirectExchange orderExchange() { return new DirectExchange(ORDER_EXCHANGE,true,false); } /** * 訂單隊列和交換機(jī)綁定 * @return */ @Bean public Binding orderBinding() { return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(ORDER_ROUTING_KEY); }

綁定死信交換通過添加x-dead-letter-exchange、x-dead-letter-routing-key參數(shù)指定對應(yīng)的交換機(jī)和路由。

發(fā)送消息

設(shè)置五秒超時時間

@RestControllerpublic class SendController { ? ?@Autowired ? ?private RabbitTemplate rabbitTemplate; ? ? ? ?@GetMapping("/dlx") ? ?public String dlx() { ? ? ? ?String date = DateUtil.dateFormat(new Date()); ? ? ? ?String delayTime = "5000"; ? ? ? ?System.out.println("【發(fā)送消息】延遲 5 秒 發(fā)送時間 " + date); ? ? ? ?rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE,DelayQueueRabbitConfig.ORDER_ROUTING_KEY, ? ? ? ? ? ? ? ?message, message1 -> { ? ? ? ? ? ? ? ? ? ?message1.getMessageProperties().setExpiration(delayTime); ? ? ? ? ? ? ? ? ? ?return message1; ? ? ? ? ? ? ? ?}); ? ? ? return "ok"; ? ? ? ? ? ?} ? ? ? ?class DateUtil{ ? ? ? public static String dateFormat(Date date) { ? ? ? ?SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); ? ? ? ?return sdf.format(date); ? ? ?} ? ?} } ? ?

消費消息

@RabbitListener(queues = DelayQueueRabbitConfig.DLX_QUEUE)public void delayPrecss(String msg,Channel channel,Message message){ ? ?System.out.println("【接收消息】" + msg + " 接收時間" + DateUtil.dateFormat(new Date())); } ? ?

控制臺輸出

【發(fā)送消息】延遲5 秒 發(fā)送時間 21:32:15 【接收消息】延遲5 秒 發(fā)送時間 21:32:15 接收時間21:32:20

發(fā)送消息,5秒之后消費者后會收到消息。說明延遲成功。

隊列都有先進(jìn)先出的特點,如果隊列前面的消息延遲比后的消息延遲更長,會出現(xiàn)什么情況。

消息時序問題

發(fā)送三條消息,延遲分別是10s、2s5s

@RestControllerpublic class SendController { ? ?@Autowired ? ?private RabbitTemplate rabbitTemplate; ? ? ? ?@GetMapping("/dlx") ? ?public String dlx() { ? ? ? ? dlxSend("延遲10秒","10000"); ? ? ? ? dlxSend("延遲2 秒","2000"); ? ? ? ? dlxSend("延遲5 秒","5000"); ? ? ? ? return "ok"; ? ?} ? ? ? ?private void dlxSend(String message,String delayTime) { ? ? ? ? System.out.println("【發(fā)送消息】" + message + ?"當(dāng)前時間" + DateUtil.dateFormat(new Date())); ? ? ? ? rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE,DelayQueueRabbitConfig.ORDER_ROUTING_KEY, ? ? ? ? ? ? ? ?message, message1 -> { ? ? ? ? ? ? ? ? ? ?message1.getMessageProperties().setExpiration(delayTime); ? ? ? ? ? ? ? ? ? ?return message1; ? ? ? ? ? ? ? ?}); ? ?}

控制臺輸出:

【發(fā)送消息】延遲10秒當(dāng)前時間21:54:36【發(fā)送消息】延遲2 秒當(dāng)前時間21:54:36 【發(fā)送消息】延遲5 秒當(dāng)前時間21:54:36 【接收消息】延遲10秒 當(dāng)前時間21:54:46 【接收消息】延遲2 秒 當(dāng)前時間21:54:46 【接收消息】延遲5 秒 當(dāng)前時間21:54:46

所有的消息都要等10s的消息消費完才能消費,當(dāng)10s消息未被消費,其他消息也會阻塞,即使消息設(shè)置了更短的延遲。因為隊列有先進(jìn)先出的特征,當(dāng)隊列有多條消息,延遲時間就沒用作用了,前面的消息消費后,后的消息才能被消費,不然會被阻塞到隊列中。

插件實現(xiàn)解決消息時序問題

針對上面消息的時序問題,RabbitMQ開發(fā)一個延遲消息的插件delayed_message_exchange,延遲消息交換機(jī)。使用該插件可以解決上面時序的問題。

在Github官網(wǎng)找到對應(yīng)的版本,我選擇的是3.8.17

將文件下載下來放到服務(wù)器的/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.9/plugins目錄下,執(zhí)行以下命令,啟動插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

啟動插件,交換機(jī)會有新的類型x-delayed-message:

x-delayed-message類型的交換機(jī),支持延遲投遞消息。發(fā)送消息給x-delayed-message類型的交換流程圖:

  • x-delayed-message類型的交換機(jī)接收消息投遞后,并未將直接路由到隊列中,而是存儲到mnesia(一個分布式數(shù)據(jù)系統(tǒng)),該系統(tǒng)會檢測消息延遲時間。

  • 消息達(dá)到可投遞時間,消息會被投遞到目標(biāo)隊列。

配置延遲隊列

@Configurationpublic class XDelayedMessageConfig { ?/** * 隊列 */ public static final String DIRECT_QUEUE = "queue.delayed"; /** * 延遲交換機(jī) */ public static final String DELAYED_EXCHANGE = "exchange.delayed"; /** * 綁定的routing key */ public static final String ROUTING_KEY = "routingKey.bind"; @Bean public Queue directQueue() { return new Queue(DIRECT_QUEUE,true); } /** * 定義延遲交換機(jī) * 交換機(jī)的類型為 x-delayed-message * @return */ @Bean public CustomExchange delayedExchange() { Map<String,Object> map = new HashMap<>(); map.put("x-delayed-type","direct"); return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,map); } @Bean public Binding delayOrderBinding() { return BindingBuilder.bind(directQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs(); } }

發(fā)送消息:

? ?@GetMapping("/delay") ? ?public String delay() { ? ?delaySend("延遲隊列10 秒",10000); ? ?delaySend("延遲隊列5 秒",5000); ? ?delaySend("延遲隊列2 秒",2000); ? ? ? ?return "ok"; ? ?} ? ? ? ?private void delaySend(String message,Integer delayTime) { ? ? ? ?message = message + " " + DateUtil.dateFormat(new Date()); ? ? ? ?System.out.println("【發(fā)送消息】" + message); ? ? ? ?rabbitTemplate.convertAndSend(XDelayedMessageConfig.DELAYED_EXCHANGE,XDelayedMessageConfig.ROUTING_KEY, ? ? ? ? ? ? ? ?message, message1 -> { ? ? ? ? ? ? ? ? ? ?message1.getMessageProperties().setDelay(delayTime); ? ? ? ? ? ? ? ? ? ?//message1.getMessageProperties().setHeader("x-delay",delayTime); ? ? ? ? ? ? ? ? ? ?return message1; ? ? ? ? ? ? ? ?}); ? ?} ? ?

消費消息:

? ?@RabbitListener(queues = XDelayedMessageConfig.DIRECT_QUEUE) ? ?public void delayProcess(String msg,Channel channel, Message message) { ? ? ? ?System.out.println("【接收消息】" + msg + " 當(dāng)前時間" + DateUtil.dateFormat(new Date())); ? }

控制臺輸出:

【發(fā)送消息】延遲隊列10 秒 22:00:01 【發(fā)送消息】延遲隊列5 秒 22:00:01 【發(fā)送消息】延遲隊列2 秒 22:00:01 【接收消息】延遲隊列2 秒 22:00:01 當(dāng)前時間22:00:03 【接收消息】延遲隊列5 秒 22:00:01 當(dāng)前時間22:00:05 【接收消息】延遲隊列10 秒 22:00:01 當(dāng)前時間22:00:10

解決了消息的時序問題。

總結(jié)

  • 使用Java自帶的延遲消息,系統(tǒng)重啟或者掛了之后,消息就無法發(fā)送,不適于用在生產(chǎn)環(huán)境上。

  • RabbitMQ本身不支持延遲隊列,可以使用存活時間ttl?+ 死信隊列dlx實現(xiàn)消息延遲。

    • 發(fā)送的消息設(shè)置ttl,所在的隊列不設(shè)置消費者。

    • 隊列綁定死信隊列,消息超時之后,變成死信消息,再發(fā)送給死信隊列,最后發(fā)送給消費者。

  • 發(fā)送多條不同延遲時間消息,前面消息沒有到延遲時間,會阻塞后面延遲更低的消息,因為隊列有先進(jìn)先出的特性。

  • RabbitMQx-delay-message插件可以解決消息時序問題。

    • 帶有ttl的消息發(fā)送x-delayed-message類型的交換機(jī),消息不會直接路由到隊列中。而且存儲到分布式數(shù)據(jù)系統(tǒng)中,該系統(tǒng)會檢測消息延遲時間。

    • 消息到達(dá)延遲時間,消息才能會投遞到隊列中,最后發(fā)送給消費者。


延遲隊列實現(xiàn)訂單超時自動取消的評論 (共 條)

分享到微博請遵守國家法律
崇信县| 特克斯县| 泗阳县| 东丽区| 周口市| 蒲城县| 西宁市| 敖汉旗| 延安市| 新平| 上栗县| 锡林浩特市| 许昌县| 阳朔县| 黑河市| 绥芬河市| 石楼县| 华阴市| 五寨县| 扎赉特旗| 湟中县| 华阴市| 亳州市| 连山| 囊谦县| 盐源县| 杨浦区| 利津县| 渭源县| 东兰县| 乌兰浩特市| 东兴市| 焉耆| 元氏县| 临夏县| 玛沁县| 北海市| 天全县| 都兰县| 溆浦县| 安龙县|