SpringBoot整合RocketMQ,嘗嘗幾大高級特性!
作為一名程序員,您一定熟悉RocketMQ的功能,包括支持事務(wù)、順序和延遲消息等。在程序員界有一句名言,“Talk is cheap. Show me the code” 。本文將通過實際案例來引出解決方案,并通過代碼實現(xiàn),讓您在學(xué)習(xí)本節(jié)的過程中能夠確切地掌握實際編碼技能。
1,事務(wù)消息代碼實現(xiàn)
之前我們已經(jīng)在討論訂單業(yè)務(wù)消息丟失問題中引出了事務(wù)消息,本內(nèi)容我們就實際用代碼來實現(xiàn)一下事務(wù)消息吧。
首先我們用原生代碼來實現(xiàn)一下事務(wù)消息,下面是事務(wù)消息生產(chǎn)者TransactionProducer類的代碼,具體代碼解釋已經(jīng)用注釋標(biāo)明。
package com.huc.rocketmq.transaction;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.TransactionListener;import org.apache.rocketmq.client.producer.TransactionMQProducer;import org.apache.rocketmq.client.producer.TransactionSendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.common.RemotingHelper;import java.io.UnsupportedEncodingException;import java.util.concurrent.*;/**
* @author liumeng
*/public class TransactionProducer {public static void main(String[] args)
throws MQClientException, UnsupportedEncodingException { ? ? ? ?// 這里是一個自定義的接收RocketMQ回調(diào)的監(jiān)聽接口
? ? ? ?TransactionListener transactionListener = new TransactionListenerImpl(); ? ? ? ?// 創(chuàng)建支持事務(wù)消息的Producer,并指定生產(chǎn)者組
? ? ? ?TransactionMQProducer producer = ? ? ? ? ? ? ? ?new TransactionMQProducer("testTransactionGroup"); ? ? ? ?// 指定一個線程池,用于處理RocketMQ回調(diào)請求的
? ? ? ?ExecutorService executorService = new ThreadPoolExecutor( ? ? ? ? ? ? ? ?2, ? ? ? ? ? ? ? ?5, ? ? ? ? ? ? ? ?100,
? ? ? ? ? ? ? ?TimeUnit.SECONDS, ? ? ? ? ? ? ? ?new ArrayBlockingQueue<Runnable>(2000), ? ? ? ? ? ? ? ?new ThreadFactory() { ? ? ? ? ? ? ? ? ? ?
? ? ? ? ? ? ? ? ? ?public Thread newThread(Runnable r) {
? ? ? ? ? ? ? ? ? ? ? ?Thread thread = new Thread(r);
? ? ? ? ? ? ? ? ? ? ? ?thread.setName("testThread"); ? ? ? ? ? ? ? ? ? ? ? ?return thread;
? ? ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ?}
? ? ? ?); ? ? ? ?// 給事務(wù)消息生產(chǎn)者設(shè)置線程池
? ? ? ?producer.setExecutorService(executorService); ? ? ? ?// 給事務(wù)消息生產(chǎn)者設(shè)置回調(diào)接口
? ? ? ?producer.setTransactionListener(transactionListener); ? ? ? ?// 啟動生產(chǎn)者
? ? ? ?producer.start(); ? ? ? ?// 構(gòu)造一條訂單支付成功的消息
? ? ? ?Message message = new Message( ? ? ? ? ? ? ? ?"PayOrderSuccessTopic", ? ? ? ? ? ? ? ?"testTag", ? ? ? ? ? ? ? ?"testKey", ? ? ? ? ? ? ? ?"訂單支付消息".getBytes(RemotingHelper.DEFAULT_CHARSET)
? ? ? ?); ? ? ? ?// 將消息作為half消息發(fā)送出去
? ? ? ?try {
? ? ? ? ? ?TransactionSendResult result = producer.sendMessageInTransaction(message, null);
? ? ? ?} catch (Exception e) { ? ? ? ? ? // half消息發(fā)送失敗
? ? ? ? ? ?// 訂單系統(tǒng)執(zhí)行回滾邏輯,比如退款、關(guān)閉訂單
? ? ? ?}
? ?}
}
針對于half消息發(fā)送失敗的情況,是有可能一直接收不到消息發(fā)送失敗的異常的,所以我們可以在發(fā)送half消息的時候,同時保存一份half消息到內(nèi)存中,或者寫入磁盤里,后臺開啟線程去檢查half消息,如果超過10分鐘都沒有接到響應(yīng),就自動執(zhí)行回滾邏輯。那么如果half消息成功了,如何執(zhí)行本地事務(wù)邏輯呢?
這就要說到代碼中自定義的回調(diào)監(jiān)聽接口TransactionListenerImpl類了,代碼如下:
package com.huc.rocketmq.transaction;import org.apache.rocketmq.client.producer.LocalTransactionState;import org.apache.rocketmq.client.producer.TransactionListener;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.common.message.MessageExt;public class TransactionListenerImpl implements TransactionListener { ? ?/**
? ? * 如果half消息發(fā)送成功了,就會回調(diào)這個方法,執(zhí)行本地事務(wù)
? ? * @param message
? ? * @param o
? ? * @return
? ? */
? ?
? ?public LocalTransactionState executeLocalTransaction(Message message, Object o) { ? ? ? ?// 執(zhí)行訂單本地業(yè)務(wù),并根據(jù)結(jié)構(gòu)返回commit/rollback
? ? ? ?try { ? ? ? ? ? ?// 本地事務(wù)執(zhí)行成功,返回commit
? ? ? ? ? ?return LocalTransactionState.COMMIT_MESSAGE;
}catch (Exception e){ ? ? ? ? ? ?// 本地事務(wù)執(zhí)行失敗,返回rollback,作廢half消息
? ? ? ? ? ?return LocalTransactionState.ROLLBACK_MESSAGE;
? ? ? ?}
? ?} ? ?/**
? ? * 如果沒有正確返回commit或rollback,會執(zhí)行此方法
? ? * @param messageExt
? ? * @return
? ? */
? ?
? ?public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { ? ? ? ?// 查詢本地事務(wù)是否已經(jīng)成功執(zhí)行了,再次根據(jù)結(jié)果返回commit/rollback
? ? ? ?try { ? ? ? ? ? ?// 本地事務(wù)執(zhí)行成功,返回commit
? ? ? ? ? ?return LocalTransactionState.COMMIT_MESSAGE;
? ? ? ?}catch (Exception e){ ? ? ? ? ? ?// 本地事務(wù)執(zhí)行失敗,返回rollback,作廢half消息
? ? ? ? ? ?return LocalTransactionState.ROLLBACK_MESSAGE;
? ? ? ?}
? ?}
}
到這里事務(wù)消息的代碼我們就完成了,但是我相信小伙伴們不會滿足于僅僅使用原生代碼實現(xiàn),那接下來我們就用Spring Boot重寫編寫一次相同的邏輯。
使用Spring Boot項目后,我們還是先準(zhǔn)備一個消息的實體類TranMessage,代碼如下:
package com.huc.rocketmq.transaction.spring;/**
* 事務(wù)消息實體
*/public class TranMessage { ? ?public static final String TOPIC = "Tran"; ? ?/**
? ? * 編號
? ? */
? ?private Integer id; ? ?public TranMessage setId(Integer id) { ? ? ? ?this.id = id; ? ? ? ?return this;
? ?} ? ?public Integer getId() { ? ? ? ?return id;
? ?} ? ?public String toString() { ? ? ? ?return "TranMessage{" + ? ? ? ? ? ? ? ?"id=" + id + ? ? ? ? ? ? ? ?'}';
? ?}
}
然后我們編寫事務(wù)消息的生產(chǎn)者TranProducer:package com.huc.rocketmq.transaction.spring;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Component;public class TranProducer { ? ?
? ?private RocketMQTemplate rocketMQTemplate; ? ?public SendResult sendMessageInTransaction(Integer id) { ? ? ? ?// 創(chuàng)建TranMessage消息
? ? ? ?Message<TranMessage> message = MessageBuilder
? ? ? ? ? ? ? ?.withPayload(new TranMessage().setId(id)).build(); ? ? ? ?// 發(fā)送事務(wù)消息
? ? ? ?return rocketMQTemplate.sendMessageInTransaction(TranMessage.TOPIC,
? ? ? ? ? ? ? ?message,id);
? ?}
}
同樣的,我們需要編寫一個回調(diào)監(jiān)聽的實現(xiàn)類,用于自定義處理本地事務(wù),返回commit或者rollback消息。代碼如下:
package com.huc.rocketmq.transaction.spring;import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;import org.springframework.messaging.Message;// 注解中可以指定線程池參數(shù)public class TransactionListenerImpl implements RocketMQLocalTransactionListener { ? ?
? ?public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { ? ? ? ?// 執(zhí)行訂單本地業(yè)務(wù),并根據(jù)結(jié)構(gòu)返回commit/rollback
? ? ? ?try { ? ? ? ? ? ?// 本地事務(wù)執(zhí)行成功,返回commit
? ? ? ? ? ?return RocketMQLocalTransactionState.COMMIT;
? ? ? ?}catch (Exception e){ // 本地事務(wù)執(zhí)行失敗,返回rollback,作廢half消息
? ? ? ? ? ?return RocketMQLocalTransactionState.ROLLBACK;
? ? ? ?}
? ?} ? ?
? ?public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { ? ? ? ?// 查詢本地事務(wù)是否已經(jīng)成功執(zhí)行了,再次根據(jù)結(jié)果返回commit/rollback
? ? ? ?try { ? ? ? ? ? ?// 本地事務(wù)執(zhí)行成功,返回commit
? ? ? ? ? ?return RocketMQLocalTransactionState.COMMIT;
? ? ? ?}catch (Exception e){ ? ? ? ? ? ?// 本地事務(wù)執(zhí)行失敗,返回rollback,作廢half消息
? ? ? ? ? ?return RocketMQLocalTransactionState.ROLLBACK;
? ? ? ?}
? ?}
}
有了原生代碼的實現(xiàn)經(jīng)驗,相信小伙伴們對于使用Spring Boot集成后的代碼同樣可以輕松看得懂。好了,至此事務(wù)消息的代碼我們就已經(jīng)實現(xiàn)了。
2,順序消息代碼實現(xiàn)
有關(guān)消息亂序的出現(xiàn)原因以及解決方案我們已經(jīng)在8.4.3小節(jié)中講解過了,小伙伴們可以去復(fù)習(xí)一下,本節(jié)我們將直接討論代碼的實現(xiàn),首先還是使用原生代碼實現(xiàn)。
經(jīng)過之前的學(xué)習(xí)我們知道,解決消息亂序的方案就是把需要保證順序的消息發(fā)送到同一個MessageQueue中,所以我們一定是需要編寫一個MessageQueue的選擇器的,RocketMQ的API中確實是有這部分內(nèi)容的,就是MessageQueueSelector,下面就以原生代碼異步的發(fā)送為例,在發(fā)送消息的時候指定隊列選擇器,主要代碼如下,注釋已經(jīng)說明代碼的含義:
producer.send(
? ? ? ?msg, ? ? ? ?new MessageQueueSelector() {
? ? ? ? ? ?@Override ? ? ? ? ? ?public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
? ? ? ? ? ? ? ?Long orderId = (Long) arg; // 根據(jù)訂單id選擇發(fā)送的queue
? ? ? ? ? ? ? ?long index = orderId % mqs.size();// 用訂單id于MessageQueue的數(shù)量取模
? ? ? ? ? ? ? ?return mqs.get((int) index); // 返回一個運算后固定的MessageQueue
? ? ? ? ? ?}
? ? ? ?},
? ? ? ?orderId, // 傳入訂單id
? ? ? ?new SendCallback() {
? ? ? ? ? ?@Override ? ? ? ? ? ?public void onSuccess(SendResult sendResult) {
? ? ? ? ? ? ? ?System.out.println(sendResult);
? ? ? ? ? ?}
? ? ? ? ? ?@Overridepublic void onException(Throwable throwable) {
? ? ? ? ? ? ? ?System.out.println(throwable);
? ? ? ? ? ?}
? ? ? ? }
);
在發(fā)送消息時增加一個MessageQueueSelector,就可以實現(xiàn)統(tǒng)一訂單id的消息一直會發(fā)送到同一個MessageQueue之中,可以解決消息亂序問題。
接著我們來看消費者部分的代碼實現(xiàn),主要代碼如下:
consumer.registerMessageListener(new MessageListenerOrderly() { ? ? ? ? ? ?
? ? ? ? ? ?public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ConsumeOrderlyContext context) { ? ? ? ? ? ? ? ?try { ? ? ? ? ? ? ? ? ? ?// 對有序的消息進行順序處理
? ? ? ? ? ? ? ? ? ?for (MessageExt t : msgs) {
? ? ? ? ? ? ? ? ? ?} ? ? ? ? ? ? ? ? ? ?return ConsumeOrderlyStatus.SUCCESS;
? ? ? ? ? ? ? ?} catch (Exception e) { ? ? ? ? ? ? ? ? ? ?// 如果消息處理出錯,返回一個狀態(tài),暫停一會兒再來處理這批消息。
? ? ? ? ? ? ? ? ? ?return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
? ? ? ? ? ? ? ?}
? ? ? ? ? ?}
? ? ? ?});
這里面要注意的是我們注冊的監(jiān)聽器是MessageListenerOrderly,這個監(jiān)聽器為了保證順序消費,Consumer會對每一個ConsumerQueue只使用一個線程來處理消息,如果使用了多線程,是無法避免消息亂序的。
至此原生代碼的實現(xiàn)已經(jīng)完成了,Spring Boot的代碼原理也是一樣的。
消息實體的代碼我們就省略了,直接看生產(chǎn)者的代碼,如下:
package com.huc.rocketmq.order.spring;import com.huc.rocketmq.spring.DemoMessage;import org.apache.rocketmq.client.producer.SendCallback;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;public class OrderProducer { ? ?
? ?private RocketMQTemplate rocketMQTemplate; ? ?public SendResult syncSend(Integer id) { ? ? ? ?// 創(chuàng)建 DemoMessage 消息
? ? ? ?DemoMessage message = new DemoMessage();
? ? ? ?message.setId(id); ? ? ? ?// 同步發(fā)送消息
? ? ? ?return rocketMQTemplate.syncSendOrderly(DemoMessage.TOPIC,
? ? ? ? ? ? ? ?message,String.valueOf(id));
} ? ?public void asyncSend(Integer id, SendCallback callback) { ? ? ? ?// 創(chuàng)建 DemoMessage 消息
? ? ? ?DemoMessage message = new DemoMessage();
? ? ? ?message.setId(id); ? ? ? ?// 異步發(fā)送消息
? ? ? ?rocketMQTemplate.asyncSendOrderly(DemoMessage.TOPIC,
? ? ? ? ? ? ? ?message,String.valueOf(id),callback);
? ?} ? ?public void onewaySend(Integer id) { ? ? ? ?// 創(chuàng)建 DemoMessage 消息
? ? ? ?DemoMessage message = new DemoMessage();
? ? ? ?message.setId(id); ? ? ? ?// oneway 發(fā)送消息
? ? ? ?rocketMQTemplate.sendOneWayOrderly(DemoMessage.TOPIC,
? ? ? ? ? ? ? ?message,String.valueOf(id));
? ?}
}
以上代碼中可以看出,每個發(fā)送方法中都調(diào)用了對應(yīng)的Orderly方法,并傳入了一個id值,默認(rèn)根據(jù)id值采用SelectMessageQueueByHash策略來選擇MessageQueue。
接下來我們繼續(xù)看消費者代碼的實現(xiàn)。
package com.huc.rocketmq.order.spring;import com.huc.rocketmq.spring.DemoMessage;import org.apache.rocketmq.spring.annotation.ConsumeMode;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component; (
? ? ? ?topic = DemoMessage.TOPIC,
? ? ? ?consumerGroup = "demo-consumer-group-" + DemoMessage.TOPIC,
? ? ? ?consumeMode = ConsumeMode.ORDERLY // 設(shè)置為順序消費)public class OrderConsumer implements RocketMQListener<DemoMessage> { ? ?private Logger logger = LoggerFactory.getLogger(getClass()); ? ?
? ?public void onMessage(DemoMessage message) {
? ? ? ?logger.info("[onMessage][線程編號:{} 消息內(nèi)容:{}]", Thread.currentThread().getId(), message);
? ?}
}
可以看到消費者代碼改動很小,只需要在@RocketMQMessageListener注解中新增consumeMode = ConsumeMode.ORDERLY,就可以指定順序消費了,小伙伴們可以大膽的猜測它的實現(xiàn)原理,和我們的原生代碼實現(xiàn)的方式是相同的。
3,消息過濾代碼實現(xiàn)
RocketMQ是包含消息過濾功能的,現(xiàn)在假如我們不使用消息過濾功能,獲取到一個Topic中的消息可能包含了相關(guān)主題的多個表的信息。
如果我們的需求是根據(jù)獲取的消息同步某張表A的數(shù)據(jù),那么就需要在獲取消息后自行判斷消息是否屬于表A,如果屬于表A才去處理,如果不是表A就直接丟棄。
這種做法多了一層邏輯判斷,自然會對系統(tǒng)的性能產(chǎn)生影響。這個時候RocketMQ的過濾機制就可以展示它的作用了,我們在發(fā)送消息的時候可以直接給消息指定tag和屬性,主要代碼如下:
// 構(gòu)建消息對象
? ? ? ?Message msg = new Message(
? ? ? ? ? ? ? ?topic, //這里指定的是topic
? ? ? ? ? ? ? ?"A",//這里存放的Tag 消費者會根據(jù)tag進行消息過濾
? ? ? ? ? ? ? ?message.getBytes(RemotingHelper.DEFAULT_CHARSET)); ? ? ? ?// 我們還可以設(shè)置一些用戶自定義的屬性
? ? ? ?msg.putUserProperty("name","value");
消費者在消費數(shù)據(jù)時就可以根據(jù)tag和屬性進行過濾了,比如下邊的寫法:
// 訂閱test Topic , 第二個參數(shù)是通過tag過濾,意思是過濾出tag為A或B的消息
? ? ? ?consumer.subscribe("test", "A||B");
對應(yīng)到spring boot中的實現(xiàn)也很簡單,生產(chǎn)者部分關(guān)鍵代碼如下:
// 創(chuàng)建 DemoMessage 消息
? ? ? ?Message<DemoMessage> message = MessageBuilder
? ? ? ? ? ? ? ?.withPayload(new DemoMessage().setId(id))
? ? ? ? ? ? ? ?.setHeader(MessageConst.PROPERTY_TAGS,"A")// 設(shè)置消息的tag
? ? ? ? ? ? ? ?.build();
消費者過濾的主要代碼如下:
"demo-consumer-group-" + DemoMessage.TOPIC,
? ? ? ?selectorExpression = "A||B" // 通過tag過濾)
(
? ? ? ?topic = DemoMessage.TOPIC,
? ? ? ?consumerGroup =
消費者部分只要在@RocketMQMessageListener注解中增加selectorExpression屬性就可以了。
4,延時消息代碼實現(xiàn)
在討論延時消息的代碼實現(xiàn)之前,先討論一下電商系統(tǒng)的超時未支付業(yè)務(wù)流程。如圖1所示:

圖1放棄支付流程
這個流程的關(guān)鍵問題就是超時未支付的訂單處于“待支付”狀態(tài),并鎖定了庫存,當(dāng)時我們提出的解決方案就是提供一個后臺線程,來掃描待支付訂單,如果超過30分鐘還未支付,就把訂單關(guān)閉,解鎖庫存。
小伙伴們可以思考一下,這樣的解決方案真的可以在生產(chǎn)環(huán)境落地嗎?
首先,后臺線程不停的掃描訂單數(shù)據(jù),如果訂單數(shù)據(jù)量很大,就會導(dǎo)致嚴(yán)重的系統(tǒng)性能問題。
其次,如果我們的訂單系統(tǒng)是一個分布式系統(tǒng),你的后臺線程要如何部署?多久掃描一次?
所以,使用后臺線程掃描訂單數(shù)據(jù)并不是一個優(yōu)雅的解決方案,這個時候本小節(jié)的主人公延時消息就該出場了。
RocketMQ的延時消息可以做到這樣的效果,訂單系統(tǒng)發(fā)送一條消息,等30分鐘后,這條消息才可以被消費者消費。所以我們引入延時消息后,就可以單獨準(zhǔn)備一個訂單掃描服務(wù),來消費延時消息,當(dāng)它獲得消息的時候再去驗證訂單是否已經(jīng)支付,如果已經(jīng)支付什么都不用做,如果還未支付就去進行關(guān)閉訂單,解鎖庫存的操作。如圖2所示:

圖2延時消息放棄支付流程
使用延時消息后,就可以避免掃描大量訂單數(shù)據(jù)的操作了,而且訂單掃描服務(wù)也可以分布式部署多個,只要同時訂閱一個Topic就可以了。應(yīng)用場景我們已經(jīng)了解了,現(xiàn)在我們來看一下代碼應(yīng)該如何實現(xiàn)。延時消息使用原生代碼實現(xiàn)特別容易,主要代碼如下:
// 構(gòu)建消息對象
? ? ? ?Message msg = new Message(
? ? ? ? ? ? ? ?topic, //這里指定延時消息的topic
? ? ? ? ? ? ? ?message.getBytes(RemotingHelper.DEFAULT_CHARSET)); ? ? ? ?// 指定延時級別為3
? ? ? ?msg.setDelayTimeLevel(3);
? ? ? ?producer.send(msg);
可以看到最核心的內(nèi)容就是msg.setDelayTimeLevel(3),設(shè)置了延遲級別。
RocketMQ支持的延遲級別有18個,這個我們之前已經(jīng)介紹過了,如下:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
所以設(shè)置為3代表10s后消息可以被消費者消費。
消費者的代碼這里就不演示了,沒有什么特殊的寫法。
下面我們來看一下Spring Boot的生產(chǎn)者代碼實現(xiàn):
// 創(chuàng)建 DemoMessage 消息
? ? ? ?Message<DemoMessage> message = MessageBuilder
? ? ? ? ? ? ? ?.withPayload(new DemoMessage().setId(id))
? ? ? ? ? ? ? ?.build(); ? ? ? ?// 同步發(fā)送消息
? ? ? ?return rocketMQTemplate.syncSend(DemoMessage.TOPIC,
? ? ? ? ? ? ? ?message, ? ? ? ? ? ? ? ?30*1000, ? ? ? ? ? ? ? ?3);// 此處設(shè)置的就是延時級別