最美情侣中文字幕电影,在线麻豆精品传媒,在线网站高清黄,久久黄色视频

歡迎光臨散文網(wǎng) 會員登陸 & 注冊

SpringBoot整合RocketMQ,嘗嘗幾大高級特性!

2023-03-27 16:52 作者:董嘉dongjia  | 我要投稿

作為一名程序員,您一定熟悉RocketMQ的功能,包括支持事務(wù)、順序和延遲消息等。在程序員界有一句名言,“Talk is cheap. Show me the code” 。本文將通過實際案例來引出解決方案,并通過代碼實現(xiàn),讓您在學(xué)習(xí)本節(jié)的過程中能夠確切地掌握實際編碼技能。


1,事務(wù)消息代碼實現(xiàn)

之前我們已經(jīng)在討論訂單業(yè)務(wù)消息丟失問題中引出了事務(wù)消息,本內(nèi)容我們就實際用代碼來實現(xiàn)一下事務(wù)消息吧。

首先我們用原生代碼來實現(xiàn)一下事務(wù)消息,下面是事務(wù)消息生產(chǎn)者TransactionProducer類的代碼,具體代碼解釋已經(jīng)用注釋標(biāo)明。

package com.huc.rocketmq.transaction;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.TransactionListener;import org.apache.rocketmq.client.producer.TransactionMQProducer;import org.apache.rocketmq.client.producer.TransactionSendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.common.RemotingHelper;import java.io.UnsupportedEncodingException;import java.util.concurrent.*;/** * @author liumeng */public class TransactionProducer {public static void main(String[] args) throws MQClientException, UnsupportedEncodingException { ? ? ? ?// 這里是一個自定義的接收RocketMQ回調(diào)的監(jiān)聽接口 ? ? ? ?TransactionListener transactionListener = new TransactionListenerImpl(); ? ? ? ?// 創(chuàng)建支持事務(wù)消息的Producer,并指定生產(chǎn)者組 ? ? ? ?TransactionMQProducer producer = ? ? ? ? ? ? ? ?new TransactionMQProducer("testTransactionGroup"); ? ? ? ?// 指定一個線程池,用于處理RocketMQ回調(diào)請求的 ? ? ? ?ExecutorService executorService = new ThreadPoolExecutor( ? ? ? ? ? ? ? ?2, ? ? ? ? ? ? ? ?5, ? ? ? ? ? ? ? ?100, ? ? ? ? ? ? ? ?TimeUnit.SECONDS, ? ? ? ? ? ? ? ?new ArrayBlockingQueue<Runnable>(2000), ? ? ? ? ? ? ? ?new ThreadFactory() { ? ? ? ? ? ? ? ? ? ?@Override ? ? ? ? ? ? ? ? ? ?public Thread newThread(Runnable r) { ? ? ? ? ? ? ? ? ? ? ? ?Thread thread = new Thread(r); ? ? ? ? ? ? ? ? ? ? ? ?thread.setName("testThread"); ? ? ? ? ? ? ? ? ? ? ? ?return thread; ? ? ? ? ? ? ? ? ? ?} ? ? ? ? ? ? ? ?} ? ? ? ?); ? ? ? ?// 給事務(wù)消息生產(chǎn)者設(shè)置線程池 ? ? ? ?producer.setExecutorService(executorService); ? ? ? ?// 給事務(wù)消息生產(chǎn)者設(shè)置回調(diào)接口 ? ? ? ?producer.setTransactionListener(transactionListener); ? ? ? ?// 啟動生產(chǎn)者 ? ? ? ?producer.start(); ? ? ? ?// 構(gòu)造一條訂單支付成功的消息 ? ? ? ?Message message = new Message( ? ? ? ? ? ? ? ?"PayOrderSuccessTopic", ? ? ? ? ? ? ? ?"testTag", ? ? ? ? ? ? ? ?"testKey", ? ? ? ? ? ? ? ?"訂單支付消息".getBytes(RemotingHelper.DEFAULT_CHARSET) ? ? ? ?); ? ? ? ?// 將消息作為half消息發(fā)送出去 ? ? ? ?try { ? ? ? ? ? ?TransactionSendResult result = producer.sendMessageInTransaction(message, null); ? ? ? ?} catch (Exception e) { ? ? ? ? ? // half消息發(fā)送失敗 ? ? ? ? ? ?// 訂單系統(tǒng)執(zhí)行回滾邏輯,比如退款、關(guān)閉訂單 ? ? ? ?} ? ?} }

針對于half消息發(fā)送失敗的情況,是有可能一直接收不到消息發(fā)送失敗的異常的,所以我們可以在發(fā)送half消息的時候,同時保存一份half消息到內(nèi)存中,或者寫入磁盤里,后臺開啟線程去檢查half消息,如果超過10分鐘都沒有接到響應(yīng),就自動執(zhí)行回滾邏輯。那么如果half消息成功了,如何執(zhí)行本地事務(wù)邏輯呢?

這就要說到代碼中自定義的回調(diào)監(jiān)聽接口TransactionListenerImpl類了,代碼如下:

package com.huc.rocketmq.transaction;import org.apache.rocketmq.client.producer.LocalTransactionState;import org.apache.rocketmq.client.producer.TransactionListener;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.common.message.MessageExt;public class TransactionListenerImpl implements TransactionListener { ? ?/** ? ? * 如果half消息發(fā)送成功了,就會回調(diào)這個方法,執(zhí)行本地事務(wù) ? ? * @param message ? ? * @param o ? ? * @return ? ? */ ? ?@Override ? ?public LocalTransactionState executeLocalTransaction(Message message, Object o) { ? ? ? ?// 執(zhí)行訂單本地業(yè)務(wù),并根據(jù)結(jié)構(gòu)返回commit/rollback ? ? ? ?try { ? ? ? ? ? ?// 本地事務(wù)執(zhí)行成功,返回commit ? ? ? ? ? ?return LocalTransactionState.COMMIT_MESSAGE; }catch (Exception e){ ? ? ? ? ? ?// 本地事務(wù)執(zhí)行失敗,返回rollback,作廢half消息 ? ? ? ? ? ?return LocalTransactionState.ROLLBACK_MESSAGE; ? ? ? ?} ? ?} ? ?/** ? ? * 如果沒有正確返回commit或rollback,會執(zhí)行此方法 ? ? * @param messageExt ? ? * @return ? ? */ ? ?@Override ? ?public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { ? ? ? ?// 查詢本地事務(wù)是否已經(jīng)成功執(zhí)行了,再次根據(jù)結(jié)果返回commit/rollback ? ? ? ?try { ? ? ? ? ? ?// 本地事務(wù)執(zhí)行成功,返回commit ? ? ? ? ? ?return LocalTransactionState.COMMIT_MESSAGE; ? ? ? ?}catch (Exception e){ ? ? ? ? ? ?// 本地事務(wù)執(zhí)行失敗,返回rollback,作廢half消息 ? ? ? ? ? ?return LocalTransactionState.ROLLBACK_MESSAGE; ? ? ? ?} ? ?} }

到這里事務(wù)消息的代碼我們就完成了,但是我相信小伙伴們不會滿足于僅僅使用原生代碼實現(xiàn),那接下來我們就用Spring Boot重寫編寫一次相同的邏輯。

使用Spring Boot項目后,我們還是先準(zhǔn)備一個消息的實體類TranMessage,代碼如下:

package com.huc.rocketmq.transaction.spring;/** * 事務(wù)消息實體 */public class TranMessage { ? ?public static final String TOPIC = "Tran"; ? ?/** ? ? * 編號 ? ? */ ? ?private Integer id; ? ?public TranMessage setId(Integer id) { ? ? ? ?this.id = id; ? ? ? ?return this; ? ?} ? ?public Integer getId() { ? ? ? ?return id; ? ?} ? ?@Overridepublic String toString() { ? ? ? ?return "TranMessage{" + ? ? ? ? ? ? ? ?"id=" + id + ? ? ? ? ? ? ? ?'}'; ? ?} } 然后我們編寫事務(wù)消息的生產(chǎn)者TranProducer:package com.huc.rocketmq.transaction.spring;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Component;@Componentpublic class TranProducer { ? ?@Autowired ? ?private RocketMQTemplate rocketMQTemplate; ? ?public SendResult sendMessageInTransaction(Integer id) { ? ? ? ?// 創(chuàng)建TranMessage消息 ? ? ? ?Message<TranMessage> message = MessageBuilder ? ? ? ? ? ? ? ?.withPayload(new TranMessage().setId(id)).build(); ? ? ? ?// 發(fā)送事務(wù)消息 ? ? ? ?return rocketMQTemplate.sendMessageInTransaction(TranMessage.TOPIC, ? ? ? ? ? ? ? ?message,id); ? ?} }

同樣的,我們需要編寫一個回調(diào)監(jiān)聽的實現(xiàn)類,用于自定義處理本地事務(wù),返回commit或者rollback消息。代碼如下:

package com.huc.rocketmq.transaction.spring;import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;import org.springframework.messaging.Message;// 注解中可以指定線程池參數(shù)@RocketMQTransactionListener(corePoolSize=2,maximumPoolSize=5)public class TransactionListenerImpl implements RocketMQLocalTransactionListener { ? ?@Override ? ?public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { ? ? ? ?// 執(zhí)行訂單本地業(yè)務(wù),并根據(jù)結(jié)構(gòu)返回commit/rollback ? ? ? ?try { ? ? ? ? ? ?// 本地事務(wù)執(zhí)行成功,返回commit ? ? ? ? ? ?return RocketMQLocalTransactionState.COMMIT; ? ? ? ?}catch (Exception e){ // 本地事務(wù)執(zhí)行失敗,返回rollback,作廢half消息 ? ? ? ? ? ?return RocketMQLocalTransactionState.ROLLBACK; ? ? ? ?} ? ?} ? ?@Override ? ?public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { ? ? ? ?// 查詢本地事務(wù)是否已經(jīng)成功執(zhí)行了,再次根據(jù)結(jié)果返回commit/rollback ? ? ? ?try { ? ? ? ? ? ?// 本地事務(wù)執(zhí)行成功,返回commit ? ? ? ? ? ?return RocketMQLocalTransactionState.COMMIT; ? ? ? ?}catch (Exception e){ ? ? ? ? ? ?// 本地事務(wù)執(zhí)行失敗,返回rollback,作廢half消息 ? ? ? ? ? ?return RocketMQLocalTransactionState.ROLLBACK; ? ? ? ?} ? ?} }

有了原生代碼的實現(xiàn)經(jīng)驗,相信小伙伴們對于使用Spring Boot集成后的代碼同樣可以輕松看得懂。好了,至此事務(wù)消息的代碼我們就已經(jīng)實現(xiàn)了。

2,順序消息代碼實現(xiàn)

有關(guān)消息亂序的出現(xiàn)原因以及解決方案我們已經(jīng)在8.4.3小節(jié)中講解過了,小伙伴們可以去復(fù)習(xí)一下,本節(jié)我們將直接討論代碼的實現(xiàn),首先還是使用原生代碼實現(xiàn)。

經(jīng)過之前的學(xué)習(xí)我們知道,解決消息亂序的方案就是把需要保證順序的消息發(fā)送到同一個MessageQueue中,所以我們一定是需要編寫一個MessageQueue的選擇器的,RocketMQ的API中確實是有這部分內(nèi)容的,就是MessageQueueSelector,下面就以原生代碼異步的發(fā)送為例,在發(fā)送消息的時候指定隊列選擇器,主要代碼如下,注釋已經(jīng)說明代碼的含義:

producer.send( ? ? ? ?msg, ? ? ? ?new MessageQueueSelector() { ? ? ? ? ? ?@Override ? ? ? ? ? ?public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { ? ? ? ? ? ? ? ?Long orderId = (Long) arg; // 根據(jù)訂單id選擇發(fā)送的queue ? ? ? ? ? ? ? ?long index = orderId % mqs.size();// 用訂單id于MessageQueue的數(shù)量取模 ? ? ? ? ? ? ? ?return mqs.get((int) index); // 返回一個運算后固定的MessageQueue ? ? ? ? ? ?} ? ? ? ?}, ? ? ? ?orderId, // 傳入訂單id ? ? ? ?new SendCallback() { ? ? ? ? ? ?@Override ? ? ? ? ? ?public void onSuccess(SendResult sendResult) { ? ? ? ? ? ? ? ?System.out.println(sendResult); ? ? ? ? ? ?} ? ? ? ? ? ?@Overridepublic void onException(Throwable throwable) { ? ? ? ? ? ? ? ?System.out.println(throwable); ? ? ? ? ? ?} ? ? ? ? } );

在發(fā)送消息時增加一個MessageQueueSelector,就可以實現(xiàn)統(tǒng)一訂單id的消息一直會發(fā)送到同一個MessageQueue之中,可以解決消息亂序問題。

接著我們來看消費者部分的代碼實現(xiàn),主要代碼如下:

consumer.registerMessageListener(new MessageListenerOrderly() { ? ? ? ? ? ?@Override ? ? ? ? ? ?public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ConsumeOrderlyContext context) { ? ? ? ? ? ? ? ?try { ? ? ? ? ? ? ? ? ? ?// 對有序的消息進行順序處理 ? ? ? ? ? ? ? ? ? ?for (MessageExt t : msgs) { ? ? ? ? ? ? ? ? ? ?} ? ? ? ? ? ? ? ? ? ?return ConsumeOrderlyStatus.SUCCESS; ? ? ? ? ? ? ? ?} catch (Exception e) { ? ? ? ? ? ? ? ? ? ?// 如果消息處理出錯,返回一個狀態(tài),暫停一會兒再來處理這批消息。 ? ? ? ? ? ? ? ? ? ?return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; ? ? ? ? ? ? ? ?} ? ? ? ? ? ?} ? ? ? ?});

這里面要注意的是我們注冊的監(jiān)聽器是MessageListenerOrderly,這個監(jiān)聽器為了保證順序消費,Consumer會對每一個ConsumerQueue只使用一個線程來處理消息,如果使用了多線程,是無法避免消息亂序的。

至此原生代碼的實現(xiàn)已經(jīng)完成了,Spring Boot的代碼原理也是一樣的。

消息實體的代碼我們就省略了,直接看生產(chǎn)者的代碼,如下:

package com.huc.rocketmq.order.spring;import com.huc.rocketmq.spring.DemoMessage;import org.apache.rocketmq.client.producer.SendCallback;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;@Componentpublic class OrderProducer { ? ?@Autowired ? ?private RocketMQTemplate rocketMQTemplate; ? ?public SendResult syncSend(Integer id) { ? ? ? ?// 創(chuàng)建 DemoMessage 消息 ? ? ? ?DemoMessage message = new DemoMessage(); ? ? ? ?message.setId(id); ? ? ? ?// 同步發(fā)送消息 ? ? ? ?return rocketMQTemplate.syncSendOrderly(DemoMessage.TOPIC, ? ? ? ? ? ? ? ?message,String.valueOf(id)); } ? ?public void asyncSend(Integer id, SendCallback callback) { ? ? ? ?// 創(chuàng)建 DemoMessage 消息 ? ? ? ?DemoMessage message = new DemoMessage(); ? ? ? ?message.setId(id); ? ? ? ?// 異步發(fā)送消息 ? ? ? ?rocketMQTemplate.asyncSendOrderly(DemoMessage.TOPIC, ? ? ? ? ? ? ? ?message,String.valueOf(id),callback); ? ?} ? ?public void onewaySend(Integer id) { ? ? ? ?// 創(chuàng)建 DemoMessage 消息 ? ? ? ?DemoMessage message = new DemoMessage(); ? ? ? ?message.setId(id); ? ? ? ?// oneway 發(fā)送消息 ? ? ? ?rocketMQTemplate.sendOneWayOrderly(DemoMessage.TOPIC, ? ? ? ? ? ? ? ?message,String.valueOf(id)); ? ?} }

以上代碼中可以看出,每個發(fā)送方法中都調(diào)用了對應(yīng)的Orderly方法,并傳入了一個id值,默認(rèn)根據(jù)id值采用SelectMessageQueueByHash策略來選擇MessageQueue。

接下來我們繼續(xù)看消費者代碼的實現(xiàn)。

package com.huc.rocketmq.order.spring;import com.huc.rocketmq.spring.DemoMessage;import org.apache.rocketmq.spring.annotation.ConsumeMode;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;@Component@RocketMQMessageListener( ? ? ? ?topic = DemoMessage.TOPIC, ? ? ? ?consumerGroup = "demo-consumer-group-" + DemoMessage.TOPIC, ? ? ? ?consumeMode = ConsumeMode.ORDERLY // 設(shè)置為順序消費)public class OrderConsumer implements RocketMQListener<DemoMessage> { ? ?private Logger logger = LoggerFactory.getLogger(getClass()); ? ?@Override ? ?public void onMessage(DemoMessage message) { ? ? ? ?logger.info("[onMessage][線程編號:{} 消息內(nèi)容:{}]", Thread.currentThread().getId(), message); ? ?} }

可以看到消費者代碼改動很小,只需要在@RocketMQMessageListener注解中新增consumeMode = ConsumeMode.ORDERLY,就可以指定順序消費了,小伙伴們可以大膽的猜測它的實現(xiàn)原理,和我們的原生代碼實現(xiàn)的方式是相同的。

3,消息過濾代碼實現(xiàn)

RocketMQ是包含消息過濾功能的,現(xiàn)在假如我們不使用消息過濾功能,獲取到一個Topic中的消息可能包含了相關(guān)主題的多個表的信息。

如果我們的需求是根據(jù)獲取的消息同步某張表A的數(shù)據(jù),那么就需要在獲取消息后自行判斷消息是否屬于表A,如果屬于表A才去處理,如果不是表A就直接丟棄。

這種做法多了一層邏輯判斷,自然會對系統(tǒng)的性能產(chǎn)生影響。這個時候RocketMQ的過濾機制就可以展示它的作用了,我們在發(fā)送消息的時候可以直接給消息指定tag和屬性,主要代碼如下:

// 構(gòu)建消息對象 ? ? ? ?Message msg = new Message( ? ? ? ? ? ? ? ?topic, //這里指定的是topic ? ? ? ? ? ? ? ?"A",//這里存放的Tag 消費者會根據(jù)tag進行消息過濾 ? ? ? ? ? ? ? ?message.getBytes(RemotingHelper.DEFAULT_CHARSET)); ? ? ? ?// 我們還可以設(shè)置一些用戶自定義的屬性 ? ? ? ?msg.putUserProperty("name","value");

消費者在消費數(shù)據(jù)時就可以根據(jù)tag和屬性進行過濾了,比如下邊的寫法:

// 訂閱test Topic , 第二個參數(shù)是通過tag過濾,意思是過濾出tag為A或B的消息 ? ? ? ?consumer.subscribe("test", "A||B");

對應(yīng)到spring boot中的實現(xiàn)也很簡單,生產(chǎn)者部分關(guān)鍵代碼如下:

// 創(chuàng)建 DemoMessage 消息 ? ? ? ?Message<DemoMessage> message = MessageBuilder ? ? ? ? ? ? ? ?.withPayload(new DemoMessage().setId(id)) ? ? ? ? ? ? ? ?.setHeader(MessageConst.PROPERTY_TAGS,"A")// 設(shè)置消息的tag ? ? ? ? ? ? ? ?.build();

消費者過濾的主要代碼如下:

@RocketMQMessageListener( ? ? ? ?topic = DemoMessage.TOPIC, ? ? ? ?consumerGroup = "demo-consumer-group-" + DemoMessage.TOPIC, ? ? ? ?selectorExpression = "A||B" // 通過tag過濾)

消費者部分只要在@RocketMQMessageListener注解中增加selectorExpression屬性就可以了。

4,延時消息代碼實現(xiàn)

在討論延時消息的代碼實現(xiàn)之前,先討論一下電商系統(tǒng)的超時未支付業(yè)務(wù)流程。如圖1所示:


圖1放棄支付流程

這個流程的關(guān)鍵問題就是超時未支付的訂單處于“待支付”狀態(tài),并鎖定了庫存,當(dāng)時我們提出的解決方案就是提供一個后臺線程,來掃描待支付訂單,如果超過30分鐘還未支付,就把訂單關(guān)閉,解鎖庫存。

小伙伴們可以思考一下,這樣的解決方案真的可以在生產(chǎn)環(huán)境落地嗎?

首先,后臺線程不停的掃描訂單數(shù)據(jù),如果訂單數(shù)據(jù)量很大,就會導(dǎo)致嚴(yán)重的系統(tǒng)性能問題。

其次,如果我們的訂單系統(tǒng)是一個分布式系統(tǒng),你的后臺線程要如何部署?多久掃描一次?

所以,使用后臺線程掃描訂單數(shù)據(jù)并不是一個優(yōu)雅的解決方案,這個時候本小節(jié)的主人公延時消息就該出場了。

RocketMQ的延時消息可以做到這樣的效果,訂單系統(tǒng)發(fā)送一條消息,等30分鐘后,這條消息才可以被消費者消費。所以我們引入延時消息后,就可以單獨準(zhǔn)備一個訂單掃描服務(wù),來消費延時消息,當(dāng)它獲得消息的時候再去驗證訂單是否已經(jīng)支付,如果已經(jīng)支付什么都不用做,如果還未支付就去進行關(guān)閉訂單,解鎖庫存的操作。如圖2所示:


圖2延時消息放棄支付流程

使用延時消息后,就可以避免掃描大量訂單數(shù)據(jù)的操作了,而且訂單掃描服務(wù)也可以分布式部署多個,只要同時訂閱一個Topic就可以了。應(yīng)用場景我們已經(jīng)了解了,現(xiàn)在我們來看一下代碼應(yīng)該如何實現(xiàn)。延時消息使用原生代碼實現(xiàn)特別容易,主要代碼如下:

// 構(gòu)建消息對象 ? ? ? ?Message msg = new Message( ? ? ? ? ? ? ? ?topic, //這里指定延時消息的topic ? ? ? ? ? ? ? ?message.getBytes(RemotingHelper.DEFAULT_CHARSET)); ? ? ? ?// 指定延時級別為3 ? ? ? ?msg.setDelayTimeLevel(3); ? ? ? ?producer.send(msg);

可以看到最核心的內(nèi)容就是msg.setDelayTimeLevel(3),設(shè)置了延遲級別。

RocketMQ支持的延遲級別有18個,這個我們之前已經(jīng)介紹過了,如下:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

所以設(shè)置為3代表10s后消息可以被消費者消費。

消費者的代碼這里就不演示了,沒有什么特殊的寫法。

下面我們來看一下Spring Boot的生產(chǎn)者代碼實現(xiàn):

// 創(chuàng)建 DemoMessage 消息 ? ? ? ?Message<DemoMessage> message = MessageBuilder ? ? ? ? ? ? ? ?.withPayload(new DemoMessage().setId(id)) ? ? ? ? ? ? ? ?.build(); ? ? ? ?// 同步發(fā)送消息 ? ? ? ?return rocketMQTemplate.syncSend(DemoMessage.TOPIC, ? ? ? ? ? ? ? ?message, ? ? ? ? ? ? ? ?30*1000, ? ? ? ? ? ? ? ?3);// 此處設(shè)置的就是延時級別


SpringBoot整合RocketMQ,嘗嘗幾大高級特性!的評論 (共 條)

分享到微博請遵守國家法律
平安县| 台山市| 洛南县| 禄丰县| 稷山县| 江源县| 靖宇县| 元谋县| 鹿邑县| 南部县| 宜川县| 迁安市| 唐山市| 美姑县| 永济市| 瓮安县| 庆阳市| 中牟县| 斗六市| 团风县| 石林| 广水市| 临汾市| 本溪| 叶城县| 许昌市| 珠海市| 宣化县| 翁源县| 荥阳市| 邯郸县| 望奎县| 武隆县| 雅安市| 湘西| 绵阳市| 丁青县| 崇信县| 海兴县| 香河县| 张家川|