動力節(jié)點(diǎn)RocketMQ全套視頻教程-5小時學(xué)會rocketmq消息隊(duì)列

rocketmq學(xué)習(xí)筆記分享
P5-P8
15.?RocketMQ發(fā)送事務(wù)消息
15.1?事務(wù)消息的發(fā)送流程
它可以被認(rèn)為是一個兩階段的提交消息實(shí)現(xiàn),以確保分布式系統(tǒng)的最終一致性。事務(wù)性消息確保本地事務(wù)的執(zhí)行和消息的發(fā)送可以原子地執(zhí)行。


上圖說明了事務(wù)消息的大致方案,其中分為兩個流程:正常事務(wù)消息的發(fā)送及提交、事務(wù)消息的補(bǔ)償流程。
事務(wù)消息發(fā)送及提交
1.?發(fā)送消息(half消息)。
2.?服務(wù)端響應(yīng)消息寫入結(jié)果。
3.?根據(jù)發(fā)送結(jié)果執(zhí)行本地事務(wù)(如果寫入失敗,此時half消息對業(yè)務(wù)不可見,本地邏輯不執(zhí)行)。
4.?根據(jù)本地事務(wù)狀態(tài)執(zhí)行Commit或Rollback(Commit操作生成消息索引,消息對消費(fèi)者可見)
事務(wù)補(bǔ)償
1.?對沒有Commit/Rollback的事務(wù)消息(pending狀態(tài)的消息),從服務(wù)端發(fā)起一次“回查”
2.?Producer收到回查消息,檢查回查消息對應(yīng)的本地事務(wù)的狀態(tài)
3.?根據(jù)本地事務(wù)狀態(tài),重新Commit或者Rollback
其中,補(bǔ)償階段用于解決消息UNKNOW或者Rollback發(fā)生超時或者失敗的情況。
事務(wù)消息狀態(tài)
事務(wù)消息共有三種狀態(tài),提交狀態(tài)、回滾狀態(tài)、中間狀態(tài):
l?TransactionStatus.CommitTransaction: 提交事務(wù),它允許消費(fèi)者消費(fèi)此消息。
l?TransactionStatus.RollbackTransaction: 回滾事務(wù),它代表該消息將被刪除,不允許被消費(fèi)。
l?TransactionStatus.Unknown: 中間狀態(tài),它代表需要檢查消息隊(duì)列來確定狀態(tài)。
15.2?事務(wù)消息生產(chǎn)者
/**
?* TransactionalMessageCheckService的檢測頻率默認(rèn)1分鐘,可通過在broker.conf文件中設(shè)置transactionCheckInterval的值來改變默認(rèn)值,單位為毫秒。
?* 從broker配置文件中獲取transactionTimeOut參數(shù)值。
?* 從broker配置文件中獲取transactionCheckMax參數(shù)值,表示事務(wù)的最大檢測次數(shù),如果超過檢測次數(shù),消息會默認(rèn)為丟棄,即回滾消息。
?*
?* @throws Exception
?*/
@Test
public void testTransactionProducer() throws Exception {
????// 創(chuàng)建一個事務(wù)消息生產(chǎn)者
????TransactionMQProducer producer = new TransactionMQProducer("test-group");
????producer.setNamesrvAddr("localhost:9876");
????// 設(shè)置事務(wù)消息監(jiān)聽器
????producer.setTransactionListener(new TransactionListener() {
????????// 這個是執(zhí)行本地業(yè)務(wù)方法
????????@Override
????????public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
????????????System.out.println(new Date());
????????????System.out.println(new String(msg.getBody()));
????????????// 這個可以使用try catch對業(yè)務(wù)代碼進(jìn)行性包裹
????????????// COMMIT_MESSAGE 表示允許消費(fèi)者消費(fèi)該消息
????????????// ROLLBACK_MESSAGE 表示該消息將被刪除,不允許消費(fèi)
????????????// UNKNOW表示需要MQ回查才能確定狀態(tài) 那么過一會 代碼會走下面的checkLocalTransaction(msg)方法
????????????return LocalTransactionState.UNKNOW;
????????}
????????// 這里是回查方法 回查不是再次執(zhí)行業(yè)務(wù)操作,而是確認(rèn)上面的操作是否有結(jié)果
????????// 默認(rèn)是1min回查 默認(rèn)回查15次 超過次數(shù)則丟棄打印日志 可以通過參數(shù)設(shè)置
????????// transactionTimeOut 超時時間
????????// transactionCheckMax 最大回查次數(shù)
????????// transactionCheckInterval 回查間隔時間單位毫秒
????????// 觸發(fā)條件
????????// 1.當(dāng)上面執(zhí)行本地事務(wù)返回結(jié)果UNKNOW時,或者下面的回查方法也返回UNKNOW時 會觸發(fā)回查
????????// 2.當(dāng)上面操作超過20s沒有做出一個結(jié)果,也就是超時或者卡主了,也會進(jìn)行回查
????????@Override
????????public LocalTransactionState checkLocalTransaction(MessageExt msg) {
????????????System.err.println(new Date());
????????????System.err.println(new String(msg.getBody()));
????????????// 這里
????????????return LocalTransactionState.UNKNOW;
????????}
????});
????producer.start();
????Message message = new Message("TopicTest2", "我是一個事務(wù)消息".getBytes());
????// 發(fā)送消息
????producer.sendMessageInTransaction(message, null);
????System.out.println(new Date());
????System.in.read();
}
15.3?事務(wù)消息消費(fèi)者
@Test
public void testTransactionConsumer() throws Exception {
????// 創(chuàng)建默認(rèn)消費(fèi)者組
????DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
????// 設(shè)置nameServer地址
????consumer.setNamesrvAddr("localhost:9876");
????// 訂閱一個主題來消費(fèi)???*表示沒有過濾參數(shù) 表示這個主題的任何消息
????consumer.subscribe("TopicTest2", "*");
????// 注冊一個消費(fèi)監(jiān)聽?MessageListenerConcurrently是并發(fā)消費(fèi)
????// 默認(rèn)是20個線程一起消費(fèi),可以參看?consumer.setConsumeThreadMax()
????consumer.registerMessageListener(new MessageListenerConcurrently() {
????????@Override
????????public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
????????????????????????????????????????????????????????ConsumeConcurrentlyContext context) {
????????????// 這里執(zhí)行消費(fèi)的代碼 默認(rèn)是多線程消費(fèi)
????????????System.out.println(Thread.currentThread().getName() + "----" + new String(msgs.get(0).getBody()));
????????????return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
????????}
????});
????consumer.start();
????System.in.read();
}
15.4?測試結(jié)果

16.?RocketMQ發(fā)送帶標(biāo)簽的消息,消息過濾
Rocketmq提供消息過濾功能,通過tag或者key進(jìn)行區(qū)分
我們往一個主題里面發(fā)送消息的時候,根據(jù)業(yè)務(wù)邏輯,可能需要區(qū)分,比如帶有tagA標(biāo)簽的被A消費(fèi),帶有tagB標(biāo)簽的被B消費(fèi),還有在事務(wù)監(jiān)聽的類里面,只要是事務(wù)消息都要走同一個監(jiān)聽,我們也需要通過過濾才區(qū)別對待
16.1?標(biāo)簽消息生產(chǎn)者
@Test
public void testTagProducer() throws Exception {
????// 創(chuàng)建默認(rèn)的生產(chǎn)者
????DefaultMQProducer producer = new DefaultMQProducer("test-group");
????// 設(shè)置nameServer地址
????producer.setNamesrvAddr("localhost:9876");
????// 啟動實(shí)例
????producer.start();
????Message msg = new Message("TopicTest","tagA", "我是一個帶標(biāo)記的消息".getBytes());
????SendResult send = producer.send(msg);
????System.out.println(send);
????// 關(guān)閉實(shí)例
????producer.shutdown();
}
?
16.2?標(biāo)簽消息消費(fèi)者
@Test
public void testTagConsumer() throws Exception {
????// 創(chuàng)建默認(rèn)消費(fèi)者組
????DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
????// 設(shè)置nameServer地址
????consumer.setNamesrvAddr("localhost:9876");
????// 訂閱一個主題來消費(fèi) ??表達(dá)式,默認(rèn)是*,支持"tagA || tagB || tagC" 這樣或者的寫法 只要是符合任何一個標(biāo)簽都可以消費(fèi)
????consumer.subscribe("TopicTest", "tagA || tagB || tagC");
????// 注冊一個消費(fèi)監(jiān)聽?MessageListenerConcurrently是并發(fā)消費(fèi)
????// 默認(rèn)是20個線程一起消費(fèi),可以參看?consumer.setConsumeThreadMax()
????consumer.registerMessageListener(new MessageListenerConcurrently() {
????????@Override
????????public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
????????????????????????????????????????????????????????ConsumeConcurrentlyContext context) {
????????????// 這里執(zhí)行消費(fèi)的代碼 默認(rèn)是多線程消費(fèi)
????????????System.out.println(Thread.currentThread().getName() + "----" + new String(msgs.get(0).getBody()));
????????????System.out.println(msgs.get(0).getTags());
????????????return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
????????}
????});
????consumer.start();
????System.in.read();
}
?
17.?RocketMQ中消息的Key(業(yè)務(wù)相關(guān))
在rocketmq中的消息,默認(rèn)會有一個messageId當(dāng)做消息的唯一標(biāo)識,我們也可以給消息攜帶一個key,用作唯一標(biāo)識或者業(yè)務(wù)標(biāo)識,包括在控制面板查詢的時候也可以使用messageId或者key來進(jìn)行查詢

17.1?帶key消息生產(chǎn)者
@Test
public void testKeyProducer() throws Exception {
????// 創(chuàng)建默認(rèn)的生產(chǎn)者
????DefaultMQProducer producer = new DefaultMQProducer("test-group");
????// 設(shè)置nameServer地址
????producer.setNamesrvAddr("localhost:9876");
????// 啟動實(shí)例
????producer.start();
????Message msg = new Message("TopicTest","tagA","key", "我是一個帶標(biāo)記和key的消息".getBytes());
????SendResult send = producer.send(msg);
????System.out.println(send);
????// 關(guān)閉實(shí)例
????producer.shutdown();
}
17.2?帶key消息消費(fèi)者
@Test
public void testKeyConsumer() throws Exception {
????// 創(chuàng)建默認(rèn)消費(fèi)者組
????DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
????// 設(shè)置nameServer地址
????consumer.setNamesrvAddr("localhost:9876");
????// 訂閱一個主題來消費(fèi) ??表達(dá)式,默認(rèn)是*,支持"tagA || tagB || tagC" 這樣或者的寫法 只要是符合任何一個標(biāo)簽都可以消費(fèi)
????consumer.subscribe("TopicTest", "tagA || tagB || tagC");
????// 注冊一個消費(fèi)監(jiān)聽?MessageListenerConcurrently是并發(fā)消費(fèi)
????// 默認(rèn)是20個線程一起消費(fèi),可以參看?consumer.setConsumeThreadMax()
????consumer.registerMessageListener(new MessageListenerConcurrently() {
????????@Override
????????public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
????????????????????????????????????????????????????????ConsumeConcurrentlyContext context) {
????????????// 這里執(zhí)行消費(fèi)的代碼 默認(rèn)是多線程消費(fèi)
????????????System.out.println(Thread.currentThread().getName() + "----" + new String(msgs.get(0).getBody()));
????????????System.out.println(msgs.get(0).getTags());
????????????System.out.println(msgs.get(0).getKeys());
????????????return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
????????}
????});
????consumer.start();
????System.in.read();
}

18.?RocketMQ重試機(jī)制
18.1?生產(chǎn)者重試
// 失敗的情況重發(fā)3次
producer.setRetryTimesWhenSendFailed(3);
// 消息在1S內(nèi)沒有發(fā)送成功,就會重試
producer.send(msg, 1000);
18.2?消費(fèi)者重試
在消費(fèi)者放return ConsumeConcurrentlyStatus.RECONSUME_LATER;后就會執(zhí)行重試
上圖代碼中說明了,我們再實(shí)際生產(chǎn)過程中,一般重試5-7次,如果還沒有消費(fèi)成功,則可以把消息簽收了,通知人工等處理
messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
/**
?* 測試消費(fèi)者
?*
?* @throws Exception
?*/
@Test
public void testConsumer() throws Exception {
????// 創(chuàng)建默認(rèn)消費(fèi)者組
????DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
????// 設(shè)置nameServer地址
????consumer.setNamesrvAddr("localhost:9876");
????// 訂閱一個主題來消費(fèi)???*表示沒有過濾參數(shù) 表示這個主題的任何消息
????consumer.subscribe("TopicTest", "*");
????// 注冊一個消費(fèi)監(jiān)聽
????consumer.registerMessageListener(new MessageListenerConcurrently() {
????????@Override
????????public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
????????????????????????????????????????????????????????ConsumeConcurrentlyContext context) {
????????????try {
????????????????// 這里執(zhí)行消費(fèi)的代碼
????????????????System.out.println(Thread.currentThread().getName() + "----" + msgs);
????????????????// 這里制造一個錯誤
????????????????int i = 10 / 0;
????????????} catch (Exception e) {
????????????????// 出現(xiàn)問題 判斷重試的次數(shù)
????????????????MessageExt messageExt = msgs.get(0);
????????????????// 獲取重試的次數(shù)?失敗一次消息中的失敗次數(shù)會累加一次
????????????????int reconsumeTimes = messageExt.getReconsumeTimes();
????????????????if (reconsumeTimes >= 3) {
????????????????????// 則把消息確認(rèn)了,可以將這條消息記錄到日志或者數(shù)據(jù)庫 通知人工處理
????????????????????return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
????????????????} else {
????????????????????return ConsumeConcurrentlyStatus.RECONSUME_LATER;
????????????????}
????????????}
????????????return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
????????}
????});
????consumer.start();
????System.in.read();
}
19.?RocketMQ死信消息
當(dāng)消費(fèi)重試到達(dá)閾值以后,消息不會被投遞給消費(fèi)者了,而是進(jìn)入了死信隊(duì)列
19.1?消息生產(chǎn)者
@Test
public void testDeadMsgProducer() throws Exception {
????DefaultMQProducer producer = new DefaultMQProducer("dead-group");
????producer.setNamesrvAddr("localhost:9876");
????producer.start();
????Message message = new Message("dead-topic", "我是一個死信消息".getBytes());
????producer.send(message);
????producer.shutdown();
}
19.2?消息消費(fèi)者
@Test
public void testDeadMsgConsumer() throws Exception {
????DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("dead-group");
????consumer.setNamesrvAddr("localhost:9876");
????consumer.subscribe("dead-topic", "*");
????// 設(shè)置最大消費(fèi)重試次數(shù)?2 次
????consumer.setMaxReconsumeTimes(2);
????consumer.registerMessageListener(new MessageListenerConcurrently() {
????????@Override
????????public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
????????????System.out.println(msgs);
????????????// 測試消費(fèi)失敗
????????????return ConsumeConcurrentlyStatus.RECONSUME_LATER;
????????}
????});
????consumer.start();
????System.in.read();
}
19.3?死信消費(fèi)者
注意權(quán)限問題 perm 2讀??4寫??6讀寫

@Test
public void testDeadMq() throws ?Exception{
????DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("dead-group");
????consumer.setNamesrvAddr("localhost:9876");
????// 消費(fèi)重試到達(dá)閾值以后,消息不會被投遞給消費(fèi)者了,而是進(jìn)入了死信隊(duì)列
????// 隊(duì)列名稱 默認(rèn)是?%DLQ% + 消費(fèi)者組名
????consumer.subscribe("%DLQ%dead-group", "*");
????consumer.registerMessageListener(new MessageListenerConcurrently() {
????????@Override
????????public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
????????????System.out.println(msgs);
????????????// 處理消息 簽收了
????????????return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
????????}
????});
????consumer.start();
????System.in.read();
}
19.4?控制臺顯示

