RocketMQ 事務(wù)消息
一、RocketMQ事務(wù)消息概要
RocketMQ事務(wù)消息(Transactional Message)是指應(yīng)用本地事務(wù)和發(fā)送消息操作可以被定義到全局事務(wù)中,要么同時(shí)成功,要么同時(shí)失敗。RocketMQ的事務(wù)消息提供類似 X/Open XA 的分布式事務(wù)功能,通過事務(wù)消息能達(dá)到分布式事務(wù)的最終一致。
Apache RocketMQ在4.3.0版中已經(jīng)支持分布式事務(wù)消息,采用了2PC(兩階段提交)+ 補(bǔ)償機(jī)制(事務(wù)狀態(tài)回查)的思想來實(shí)現(xiàn)了提交事務(wù)消息,同時(shí)增加一個(gè)補(bǔ)償邏輯來處理二階段超時(shí)或者失敗的消息,如下圖所示。

我們可以看到,事務(wù)消息主要分為兩個(gè)流程:
(1)、正常事務(wù)消息的發(fā)送及提交
a、生產(chǎn)者發(fā)送half消息到Broker服務(wù)端(半消息);
半消息是一種特殊的消息類型,該狀態(tài)的消息暫時(shí)不能被Consumer消費(fèi)。當(dāng)一條事務(wù)消息被成功投遞到Broker上,但是Broker并沒有接收到Producer發(fā)出的二次確認(rèn)時(shí),該事務(wù)消息就處于"暫時(shí)不可被消費(fèi)"狀態(tài),該狀態(tài)的事務(wù)消息被稱為半消息。
b、Broker服務(wù)端將消息持久化之后,給生產(chǎn)者響應(yīng)消息寫入結(jié)果(ACK響應(yīng));
c、生產(chǎn)者根據(jù)發(fā)送結(jié)果執(zhí)行本地事務(wù)邏輯(如果寫入失敗,此時(shí)half消息對(duì)業(yè)務(wù)不可見,本地邏輯不執(zhí)行);
d、生產(chǎn)者根據(jù)本地事務(wù)執(zhí)行結(jié)果向Broker服務(wù)端提交二次確認(rèn)(Commit 或是 Rollback),Broker服務(wù)端收到 Commit 狀態(tài)則將半事務(wù)消息標(biāo)記為可投遞,訂閱方最終將收到該消息;Broker服務(wù)端收到 Rollback 狀態(tài)則刪除半事務(wù)消息,訂閱方將不會(huì)接收該消息;
(2)、事務(wù)消息的補(bǔ)償流程
a、在網(wǎng)絡(luò)閃斷或者是應(yīng)用重啟的情況下,可能導(dǎo)致生產(chǎn)者發(fā)送的二次確認(rèn)消息未能到達(dá)Broker服務(wù)端,經(jīng)過固定時(shí)間后,Broker服務(wù)端將會(huì)對(duì)沒有Commit/Rollback的事務(wù)消息(pending狀態(tài)的消息)進(jìn)行“回查”;
b、生產(chǎn)者收到回查消息后,檢查回查消息對(duì)應(yīng)的本地事務(wù)執(zhí)行的最終結(jié)果;
c、生產(chǎn)者根據(jù)本地事務(wù)狀態(tài),再次提交二次確認(rèn)給Broker,然后Broker重新對(duì)半事務(wù)消息Commit或者Rollback;
其中,補(bǔ)償階段用于解決消息Commit或者Rollback發(fā)生超時(shí)或者失敗的情況。
事務(wù)消息共有三種狀態(tài),提交狀態(tài)、回滾狀態(tài)、中間狀態(tài):
TransactionStatus.CommitTransaction:提交事務(wù),它允許消費(fèi)者消費(fèi)此消息。
TransactionStatus.RollbackTransaction:回滾事務(wù),它代表該消息將被刪除,不允許被消費(fèi)。
TransactionStatus.Unknown:中間狀態(tài),它代表需要回查本地事務(wù)狀態(tài)來決定是提交還是回滾事務(wù)。
二、延時(shí)消息使用場(chǎng)景
比如銀行轉(zhuǎn)賬,A銀行的某賬戶要轉(zhuǎn)一萬元到B銀行的某賬戶。A銀行發(fā)送“B銀行賬戶增加一萬元”這個(gè)消息,要和“從A銀行賬戶扣除一萬元”這個(gè)操作同時(shí)成功或者同時(shí)失敗。上述場(chǎng)景在各個(gè)類型的系統(tǒng)中都能找到相似影子,比如在電商系統(tǒng)中,當(dāng)有用戶下單后,除了在訂單表插入一條記錄外,對(duì)應(yīng)商品表的這個(gè)商品數(shù)量必須減1吧,怎么保證?在搜索廣告系統(tǒng)中,當(dāng)用戶點(diǎn)擊某廣告后,除了在點(diǎn)擊事件表中增加一條記錄外,還得去商家賬戶表中找到這個(gè)商家并扣除廣告費(fèi)等等
RocketMQ采用兩階段提交的方式實(shí)現(xiàn)事務(wù)消息,TransactionMQProducer處理上面情況的流程是,先發(fā)一個(gè)“準(zhǔn)備從B銀行賬戶增加一萬元”的消息,發(fā)送成功后做從A銀行賬戶扣除一萬元的操作,根據(jù)操作結(jié)果是否成功,確定之前的“準(zhǔn)備從B銀行賬戶增加一萬元”的消息是做commit還是rollback,具體流程如下:
1)發(fā)送方向RocketMQ發(fā)送“待確認(rèn)”消息。
2)RocketMQ將收到的“待確認(rèn)”消息持久化成功后,向發(fā)送方回復(fù)消息已經(jīng)發(fā)送成功,此時(shí)第一階段消息發(fā)送完成。
3)發(fā)送方開始執(zhí)行本地事件邏輯。
4)發(fā)送方根據(jù)本地事件執(zhí)行結(jié)果向RocketMQ發(fā)送二次確認(rèn)(Commit或是Rollback)消息,RocketMQ收到Commit狀態(tài)則將第一階段消息標(biāo)記為可投遞,訂閱方將能夠收到該消息;收到Rollback狀態(tài)則刪除第一階段的消息,訂閱方接收不到該消息。
5)如果出現(xiàn)異常情況,步驟4)提交的二次確認(rèn)最終未到達(dá)RocketMQ,服務(wù)器在經(jīng)過固定時(shí)間段后將對(duì)“待確認(rèn)”消息發(fā)起回查請(qǐng)求。
6)發(fā)送方收到消息回查請(qǐng)求后(如果發(fā)送一階段消息的Producer不能工作,回查請(qǐng)求將被發(fā)送到和Producer在同一個(gè)Group里的其他Producer),通過檢查對(duì)應(yīng)消息的本地事件執(zhí)行結(jié)果返回Commit或Roolback狀態(tài)。
7)RocketMQ收到回查請(qǐng)求后,按照步驟4)的邏輯處理。

三、RocketMQ事務(wù)消息使用案例
(1)、定義消息監(jiān)聽器
消息監(jiān)聽器主要是實(shí)現(xiàn)TransactionListener接口,然后需要重寫下面兩個(gè)方法:
executeLocalTransaction:執(zhí)行本地事務(wù);
checkLocalTransaction:回查本地事務(wù)狀態(tài),根據(jù)這次回查的結(jié)果來決定此次事務(wù)是提交還是回滾;
/** * 事務(wù)監(jiān)聽器,重寫執(zhí)行本地事務(wù)方法以及事務(wù)回查方法 */public class TransactionListenerImpl implements TransactionListener { ? ?@Override ? ?public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { ? ? ? ?String msgKey = msg.getKeys(); ? ? ? ?switch (msgKey) { ? ? ? ? ? ?case "Num0": ? ? ? ? ? ?case "Num1": ? ? ? ? ? ? ? ?// 明確回復(fù)回滾操作,消息將會(huì)被刪除,不允許被消費(fèi)。 ? ? ? ? ? ? ? ?return LocalTransactionState.ROLLBACK_MESSAGE; ? ? ? ? ? ?case "Num8": ? ? ? ? ? ?case "Num9": ? ? ? ? ? ? ? ?// 消息無響應(yīng),代表需要回查本地事務(wù)狀態(tài)來決定是提交還是回滾事務(wù) ? ? ? ? ? ? ? ?return LocalTransactionState.UNKNOW; ? ? ? ? ? ?default: ? ? ? ? ? ? ? ?// 消息通過,允許消費(fèi)者消費(fèi)消息 ? ? ? ? ? ? ? ?return LocalTransactionState.COMMIT_MESSAGE; ? ? ? ?} ? ?} ? ?@Override ? ?public LocalTransactionState checkLocalTransaction(MessageExt msg) { ? ? ? ?System.out.println("回查本地事務(wù)狀態(tài),消息Key: " + msg.getKeys() + ",消息內(nèi)容: " + new String(msg.getBody())); ? ? ? ? ? ? ? ?// 需要根據(jù)業(yè)務(wù),查詢本地事務(wù)是否執(zhí)行成功,這里直接返回COMMIT ? ? ? ?return LocalTransactionState.COMMIT_MESSAGE; ? ?} }
(2)、定義消息生產(chǎn)者
事務(wù)消息的生產(chǎn)者跟我們之前的普通生產(chǎn)者的不同:
a、需創(chuàng)建事務(wù)類型的生產(chǎn)者TransactionMQProducer;
b、需調(diào)用setTransactionListener()方法設(shè)置事務(wù)監(jiān)聽器;
c、使用sendMessageInTransaction()以事務(wù)方式發(fā)送消息;
public class TransactionProducer { ? ?public static void main(String[] args) throws MQClientException, InterruptedException { ? ? ? ?// 創(chuàng)建事務(wù)類型的生產(chǎn)者 ? ? ? ?TransactionMQProducer producer = new TransactionMQProducer("transaction-producer-group"); ? ? ? ?// 設(shè)置NameServer的地址 ? ? ? ?producer.setNamesrvAddr("10.0.90.211:9876"); ? ? ? ?// 設(shè)置事務(wù)監(jiān)聽器 ? ? ? ?producer.setTransactionListener(new TransactionListenerImpl()); ? ? ? ?// 啟動(dòng)生產(chǎn)者 ? ? ? ?producer.start(); ? ? ? ?// 發(fā)送10條消息 ? ? ? ?for (int i = 0; i < 10; i++) { ? ? ? ? ? ?try { ? ? ? ? ? ? ? ?Message msg = new Message("TransactionTopic", "", ("Hello RocketMQ Transaction Message" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); ? ? ? ? ? ? ? ?// 設(shè)置消息Key ? ? ? ? ? ? ? ?msg.setKeys("Num" + i); ? ? ? ? ? ? ? ?// 使用事務(wù)方式發(fā)送消息 ? ? ? ? ? ? ? ?SendResult sendResult = producer.sendMessageInTransaction(msg, null); ? ? ? ? ? ? ? ?System.out.println("sendResult = " + sendResult); ? ? ? ? ? ? ? ?Thread.sleep(10); ? ? ? ? ? ?} catch (MQClientException | UnsupportedEncodingException e) { ? ? ? ? ? ? ? ?e.printStackTrace(); ? ? ? ? ? ?} ? ? ? ?} ? ? ? ?// 阻塞,目的是為了在消息發(fā)送完成后才關(guān)閉生產(chǎn)者 ? ? ? ?Thread.sleep(10000); ? ? ? ?producer.shutdown(); ? ?} }
(3)、定義消息消費(fèi)者
public class MQConsumer { ? ?public static void main(String[] args) throws MQClientException { ? ? ? ?// 創(chuàng)建DefaultMQPushConsumer類并設(shè)定消費(fèi)者名稱 ? ? ? ?DefaultMQPushConsumer mqPushConsumer = new DefaultMQPushConsumer("consumer-group-test"); ? ? ? ?// 設(shè)置NameServer地址,如果是集群的話,使用分號(hào);分隔開 ? ? ? ?mqPushConsumer.setNamesrvAddr("10.0.90.211:9876"); ? ? ? ?// 設(shè)置Consumer第一次啟動(dòng)是從隊(duì)列頭部開始消費(fèi)還是隊(duì)列尾部開始消費(fèi) ? ? ? ?// 如果不是第一次啟動(dòng),那么按照上次消費(fèi)的位置繼續(xù)消費(fèi) ? ? ? ?mqPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); ? ? ? ?// 訂閱一個(gè)或者多個(gè)Topic,以及Tag來過濾需要消費(fèi)的消息,如果訂閱該主題下的所有tag,則使用* ? ? ? ?mqPushConsumer.subscribe("TransactionTopic", "*"); ? ? ? ?// 注冊(cè)回調(diào)實(shí)現(xiàn)類來處理從broker拉取回來的消息 ? ? ? ?mqPushConsumer.registerMessageListener(new MessageListenerConcurrently() { ? ? ? ? ? ?// 監(jiān)聽類實(shí)現(xiàn)MessageListenerConcurrently接口即可,重寫consumeMessage方法接收數(shù)據(jù) ? ? ? ? ? ?@Override ? ? ? ? ? ?public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext consumeConcurrentlyContext) { ? ? ? ? ? ? ? ?MessageExt messageExt = msgList.get(0); ? ? ? ? ? ? ? ?String body = new String(messageExt.getBody(), StandardCharsets.UTF_8); ? ? ? ? ? ? ? ?System.out.println("消費(fèi)者接收到消息: " + messageExt.toString() + "---消息內(nèi)容為:" + body); ? ? ? ? ? ? ? ?// 標(biāo)記該消息已經(jīng)被成功消費(fèi) ? ? ? ? ? ? ? ?return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; ? ? ? ? ? ?} ? ? ? ?}); ? ? ? ?// 啟動(dòng)消費(fèi)者實(shí)例 ? ? ? ?mqPushConsumer.start(); ? ?} }
(4)、觀察生產(chǎn)者控制臺(tái)輸出
通過控制臺(tái)可以看到,生產(chǎn)者成功發(fā)送10條消息,并且我們?cè)谑聞?wù)監(jiān)聽器中針對(duì)message key為Num8、Num9這兩條消息返回UNKNOW狀態(tài),這樣RocketMQ就會(huì)執(zhí)行本地事務(wù)回查去確認(rèn)本地事務(wù)執(zhí)行狀態(tài)【即執(zhí)行checkLocalTransaction()方法】。
sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E00564F4018B4AAC231C40E0E0000, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=broker-a, queueId=2], queueOffset=9] sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E00564F4018B4AAC231C40E300001, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=broker-a, queueId=3], queueOffset=10] sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E00564F4018B4AAC231C40E400002, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=broker-a, queueId=0], queueOffset=11] sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E00564F4018B4AAC231C40E650003, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=broker-a, queueId=1], queueOffset=12] sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E00564F4018B4AAC231C40E780004, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=broker-a, queueId=2], queueOffset=13] sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E00564F4018B4AAC231C40E880005, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=broker-a, queueId=3], queueOffset=14] sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E00564F4018B4AAC231C40E990006, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=broker-a, queueId=0], queueOffset=15] sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E00564F4018B4AAC231C40EB20007, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=broker-a, queueId=1], queueOffset=16] sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E00564F4018B4AAC231C40EC30008, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=broker-a, queueId=2], queueOffset=17] sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E00564F4018B4AAC231C40EE30009, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=broker-a, queueId=3], queueOffset=18] 回查本地事務(wù)狀態(tài),消息Key: Num8,消息內(nèi)容: Hello RocketMQ Transaction Message8 回查本地事務(wù)狀態(tài),消息Key: Num9,消息內(nèi)容: Hello RocketMQ Transaction Message9
(5)、觀察消費(fèi)者控制臺(tái)輸出
可以看到,消費(fèi)者成功接收到8條消息,因?yàn)橛?條消息,我們?cè)趫?zhí)行本地事務(wù)的時(shí)候,明確告訴RocketMQ進(jìn)行回滾了,所以這2條消息不能被消費(fèi)者進(jìn)行消費(fèi)。
消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=0, storeSize=313, queueOffset=1, sysFlag=8, bornTimestamp=1646898932288, bornHost=/10.0.90.139:57933, storeTimestamp=1646898931728, storeHost=/10.0.90.211:10911, msgId=0A005AD300002A9F0000000000004398, commitLogOffset=17304, bodyCRC=1033347556, reconsumeTimes=0, preparedTransactionOffset=16983, toString()=Message{topic='TransactionTopic', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TransactionTopic, MAX_OFFSET=2, KEYS=Num2, TRAN_MSG=true, CONSUME_START_TIME=1646898932329, UNIQ_KEY=AC6E00564F4018B4AAC231C40E400002, CLUSTER=DefaultCluster, PGROUP=transaction-producer-group, WAIT=true, REAL_QID=0}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 84, 114, 97, 110, 115, 97, 99, 116, 105, 111, 110, 32, 77, 101, 115, 115, 97, 103, 101, 50], transactionId='AC6E00564F4018B4AAC231C40E400002'}]---消息內(nèi)容為:Hello RocketMQ Transaction Message2 消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=1, storeSize=313, queueOffset=1, sysFlag=8, bornTimestamp=1646898932325, bornHost=/10.0.90.139:57933, storeTimestamp=1646898931741, storeHost=/10.0.90.211:10911, msgId=0A005AD300002A9F000000000000469A, commitLogOffset=18074, bodyCRC=1250988402, reconsumeTimes=0, preparedTransactionOffset=17753, toString()=Message{topic='TransactionTopic', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TransactionTopic, MAX_OFFSET=2, KEYS=Num3, TRAN_MSG=true, CONSUME_START_TIME=1646898932341, UNIQ_KEY=AC6E00564F4018B4AAC231C40E650003, CLUSTER=DefaultCluster, PGROUP=transaction-producer-group, WAIT=true, REAL_QID=1}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 84, 114, 97, 110, 115, 97, 99, 116, 105, 111, 110, 32, 77, 101, 115, 115, 97, 103, 101, 51], transactionId='AC6E00564F4018B4AAC231C40E650003'}]---消息內(nèi)容為:Hello RocketMQ Transaction Message3 消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=2, storeSize=313, queueOffset=2, sysFlag=8, bornTimestamp=1646898932344, bornHost=/10.0.90.139:57933, storeTimestamp=1646898931758, storeHost=/10.0.90.211:10911, msgId=0A005AD300002A9F000000000000499C, commitLogOffset=18844, bodyCRC=1425278161, reconsumeTimes=0, preparedTransactionOffset=18523, toString()=Message{topic='TransactionTopic', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TransactionTopic, MAX_OFFSET=3, KEYS=Num4, TRAN_MSG=true, CONSUME_START_TIME=1646898932359, UNIQ_KEY=AC6E00564F4018B4AAC231C40E780004, CLUSTER=DefaultCluster, PGROUP=transaction-producer-group, WAIT=true, REAL_QID=2}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 84, 114, 97, 110, 115, 97, 99, 116, 105, 111, 110, 32, 77, 101, 115, 115, 97, 103, 101, 52], transactionId='AC6E00564F4018B4AAC231C40E780004'}]---消息內(nèi)容為:Hello RocketMQ Transaction Message4 消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=3, storeSize=313, queueOffset=2, sysFlag=8, bornTimestamp=1646898932360, bornHost=/10.0.90.139:57933, storeTimestamp=1646898931774, storeHost=/10.0.90.211:10911, msgId=0A005AD300002A9F0000000000004C9E, commitLogOffset=19614, bodyCRC=603141191, reconsumeTimes=0, preparedTransactionOffset=19293, toString()=Message{topic='TransactionTopic', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TransactionTopic, MAX_OFFSET=3, KEYS=Num5, TRAN_MSG=true, CONSUME_START_TIME=1646898932375, UNIQ_KEY=AC6E00564F4018B4AAC231C40E880005, CLUSTER=DefaultCluster, PGROUP=transaction-producer-group, WAIT=true, REAL_QID=3}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 84, 114, 97, 110, 115, 97, 99, 116, 105, 111, 110, 32, 77, 101, 115, 115, 97, 103, 101, 53], transactionId='AC6E00564F4018B4AAC231C40E880005'}]---消息內(nèi)容為:Hello RocketMQ Transaction Message5 消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=0, storeSize=313, queueOffset=2, sysFlag=8, bornTimestamp=1646898932377, bornHost=/10.0.90.139:57933, storeTimestamp=1646898931801, storeHost=/10.0.90.211:10911, msgId=0A005AD300002A9F0000000000004FA0, commitLogOffset=20384, bodyCRC=989488637, reconsumeTimes=0, preparedTransactionOffset=20063, toString()=Message{topic='TransactionTopic', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TransactionTopic, MAX_OFFSET=3, KEYS=Num6, TRAN_MSG=true, CONSUME_START_TIME=1646898932402, UNIQ_KEY=AC6E00564F4018B4AAC231C40E990006, CLUSTER=DefaultCluster, PGROUP=transaction-producer-group, WAIT=true, REAL_QID=0}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 84, 114, 97, 110, 115, 97, 99, 116, 105, 111, 110, 32, 77, 101, 115, 115, 97, 103, 101, 54], transactionId='AC6E00564F4018B4AAC231C40E990006'}]---消息內(nèi)容為:Hello RocketMQ Transaction Message6 消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=1, storeSize=313, queueOffset=2, sysFlag=8, bornTimestamp=1646898932402, bornHost=/10.0.90.139:57933, storeTimestamp=1646898931816, storeHost=/10.0.90.211:10911, msgId=0A005AD300002A9F00000000000052A2, commitLogOffset=21154, bodyCRC=1308448107, reconsumeTimes=0, preparedTransactionOffset=20833, toString()=Message{topic='TransactionTopic', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TransactionTopic, MAX_OFFSET=3, KEYS=Num7, TRAN_MSG=true, CONSUME_START_TIME=1646898932441, UNIQ_KEY=AC6E00564F4018B4AAC231C40EB20007, CLUSTER=DefaultCluster, PGROUP=transaction-producer-group, WAIT=true, REAL_QID=1}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 84, 114, 97, 110, 115, 97, 99, 116, 105, 111, 110, 32, 77, 101, 115, 115, 97, 103, 101, 55], transactionId='AC6E00564F4018B4AAC231C40EB20007'}]---消息內(nèi)容為:Hello RocketMQ Transaction Message7 消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=1, storeSize=339, queueOffset=3, sysFlag=8, bornTimestamp=1646898900749, bornHost=/10.0.90.139:57878, storeTimestamp=1646898935220, storeHost=/10.0.90.211:10911, msgId=0A005AD300002A9F000000000000599B, commitLogOffset=22939, bodyCRC=709195884, reconsumeTimes=0, preparedTransactionOffset=22592, toString()=Message{topic='TransactionTopic', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TransactionTopic, TRANSACTION_CHECK_TIMES=1, MAX_OFFSET=4, KEYS=Num9, TRAN_MSG=true, CONSUME_START_TIME=1646898935835, UNIQ_KEY=AC6E00563BCC18B4AAC231C3930D0009, CLUSTER=DefaultCluster, PGROUP=transaction-producer-group, WAIT=true, REAL_QID=1}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 84, 114, 97, 110, 115, 97, 99, 116, 105, 111, 110, 32, 77, 101, 115, 115, 97, 103, 101, 57], transactionId='AC6E00563BCC18B4AAC231C3930D0009'}]---消息內(nèi)容為:Hello RocketMQ Transaction Message9 消費(fèi)者接收到消息: MessageExt [brokerName=broker-a, queueId=0, storeSize=339, queueOffset=3, sysFlag=8, bornTimestamp=1646898900727, bornHost=/10.0.90.139:57878, storeTimestamp=1646898935223, storeHost=/10.0.90.211:10911, msgId=0A005AD300002A9F0000000000005B76, commitLogOffset=23414, bodyCRC=1564625146, reconsumeTimes=0, preparedTransactionOffset=22245, toString()=Message{topic='TransactionTopic', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=TransactionTopic, TRANSACTION_CHECK_TIMES=1, MAX_OFFSET=4, KEYS=Num8, TRAN_MSG=true, CONSUME_START_TIME=1646898935839, UNIQ_KEY=AC6E00563BCC18B4AAC231C392F70008, CLUSTER=DefaultCluster, PGROUP=transaction-producer-group, WAIT=true, REAL_QID=0}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 84, 114, 97, 110, 115, 97, 99, 116, 105, 111, 110, 32, 77, 101, 115, 115, 97, 103, 101, 56], transactionId='AC6E00563BCC18B4AAC231C392F70008'}]---消息內(nèi)容為:Hello RocketMQ Transaction Message8
四、RocketMQ事務(wù)消息原理
設(shè)計(jì)思想
在RocketMQ事務(wù)消息的主要流程中,一階段的消息如何對(duì)用戶不可見。其中,事務(wù)消息相對(duì)普通消息最大的特點(diǎn)就是一階段發(fā)送的消息對(duì)用戶是不可見的。那么,如何做到寫入消息但是對(duì)用戶不可見呢?RocketMQ事務(wù)消息的做法是:如果消息是half消息,將備份原消息的主題與消息消費(fèi)隊(duì)列,然后改變主題為RMQ_SYS_TRANS_HALF_TOPIC。由于消費(fèi)組未訂閱該主題,故消費(fèi)端無法消費(fèi)half類型的消息。
如何實(shí)現(xiàn)事務(wù)回查?
Broker會(huì)啟動(dòng)一個(gè)消息回查的定時(shí)任務(wù),定時(shí)從事務(wù)消息queue中讀取所有待反查的消息。針對(duì)每個(gè)需要反查的半消息,Broker會(huì)給對(duì)應(yīng)的Producer發(fā)一個(gè)要求執(zhí)行事務(wù)狀態(tài)反查的RPC請(qǐng)求。然后根據(jù)RPC返回響應(yīng)中的反查結(jié)果,來決定這個(gè)半消息是需要提交還是回滾,或者后續(xù)繼續(xù)來反查。最后,提交或者回滾事務(wù),將半消息標(biāo)記為已處理狀態(tài)【將消息存儲(chǔ)在主題為:RMQ_SYS_TRANS_OP_HALF_TOPIC的主題中,代表這些消息已經(jīng)被處理(提交或回滾)】。 如果是提交事務(wù),就把半消息從半消息隊(duì)列中復(fù)制到該消息真正的topic和queue中; 如果是回滾事務(wù),則什么都不做。
值得注意的是,rocketmq并不會(huì)無休止的的信息事務(wù)狀態(tài)回查,默認(rèn)回查15次,如果15次回查還是無法得知事務(wù)狀態(tài),rocketmq默認(rèn)回滾該消息。
五、RocketMQ事務(wù)消息使用限制
使用事務(wù)消息,有一些限制條件:
1 事務(wù)消息不支持延時(shí)消息和批量消息;
2 事務(wù)性消息可能不止一次被檢查或消費(fèi),所以消費(fèi)者端需要做好消費(fèi)冪等;
3 為了避免單個(gè)消息被檢查太多次而導(dǎo)致半隊(duì)列消息累積,我們默認(rèn)將單個(gè)消息的檢查次數(shù)限制為 15 次(即默認(rèn)只會(huì)回查15次),我們可以通過 Broker 配置文件的 transactionCheckMax參數(shù)來修改此限制。如果已經(jīng)檢查某條消息超過 N 次的話( N = transactionCheckMax ), 則 Broker 將丟棄此消息,并在默認(rèn)情況下同時(shí)打印錯(cuò)誤日志。用戶可以通過重寫 AbstractTransactionCheckListener 類來修改這個(gè)行為;
4 事務(wù)消息將在 Broker 配置文件中的參數(shù) transactionMsgTimeout 這樣的特定時(shí)間長(zhǎng)度之后被檢查。當(dāng)發(fā)送事務(wù)消息時(shí),用戶還可以通過設(shè)置用戶屬性 CHECK_IMMUNITY_TIME_IN_SECONDS 來改變這個(gè)限制,該參數(shù)優(yōu)先于 transactionMsgTimeout 參數(shù);
5 提交給用戶的目標(biāo)主題消息可能會(huì)失敗,目前這依日志的記錄而定。它的高可用性通過 RocketMQ 本身的高可用性機(jī)制來保證,如果希望確保事務(wù)消息不丟失、并且事務(wù)完整性得到保證,建議使用同步的雙重寫入機(jī)制。
6 事務(wù)消息的生產(chǎn)者 ID 不能與其他類型消息的生產(chǎn)者 ID 共享。與其他類型的消息不同,事務(wù)消息允許反向查詢、MQ服務(wù)器能通過它們的生產(chǎn)者 ID 查詢到消費(fèi)者。
不積跬步,無以至千里;不積小流,無以成江海
鏈接:https://www.dianjilingqu.com/480802.html