最美情侣中文字幕电影,在线麻豆精品传媒,在线网站高清黄,久久黄色视频

歡迎光臨散文網(wǎng) 會員登陸 & 注冊

動力節(jié)點(diǎn)RocketMQ全套視頻教程-5小時學(xué)會rocketmq消息隊(duì)列

2023-05-16 11:03 作者:山藥當(dāng)當(dāng)  | 我要投稿

rocketmq學(xué)習(xí)筆記分享

P5-P8

15.?RocketMQ發(fā)送事務(wù)消息

15.1?事務(wù)消息的發(fā)送流程

它可以被認(rèn)為是一個兩階段的提交消息實(shí)現(xiàn),以確保分布式系統(tǒng)的最終一致性。事務(wù)性消息確保本地事務(wù)的執(zhí)行和消息的發(fā)送可以原子地執(zhí)行。


上圖說明了事務(wù)消息的大致方案,其中分為兩個流程:正常事務(wù)消息的發(fā)送及提交、事務(wù)消息的補(bǔ)償流程。

事務(wù)消息發(fā)送及提交

1.?發(fā)送消息(half消息)。

2.?服務(wù)端響應(yīng)消息寫入結(jié)果。

3.?根據(jù)發(fā)送結(jié)果執(zhí)行本地事務(wù)(如果寫入失敗,此時half消息對業(yè)務(wù)不可見,本地邏輯不執(zhí)行)。

4.?根據(jù)本地事務(wù)狀態(tài)執(zhí)行Commit或Rollback(Commit操作生成消息索引,消息對消費(fèi)者可見)

事務(wù)補(bǔ)償

1.?對沒有Commit/Rollback的事務(wù)消息(pending狀態(tài)的消息),從服務(wù)端發(fā)起一次“回查”

2.?Producer收到回查消息,檢查回查消息對應(yīng)的本地事務(wù)的狀態(tài)

3.?根據(jù)本地事務(wù)狀態(tài),重新Commit或者Rollback

其中,補(bǔ)償階段用于解決消息UNKNOW或者Rollback發(fā)生超時或者失敗的情況。

事務(wù)消息狀態(tài)

事務(wù)消息共有三種狀態(tài),提交狀態(tài)、回滾狀態(tài)、中間狀態(tài):

l?TransactionStatus.CommitTransaction: 提交事務(wù),它允許消費(fèi)者消費(fèi)此消息。

l?TransactionStatus.RollbackTransaction: 回滾事務(wù),它代表該消息將被刪除,不允許被消費(fèi)。

l?TransactionStatus.Unknown: 中間狀態(tài),它代表需要檢查消息隊(duì)列來確定狀態(tài)。

15.2?事務(wù)消息生產(chǎn)者

/**

?* TransactionalMessageCheckService的檢測頻率默認(rèn)1分鐘,可通過在broker.conf文件中設(shè)置transactionCheckInterval的值來改變默認(rèn)值,單位為毫秒。

?* 從broker配置文件中獲取transactionTimeOut參數(shù)值。

?* 從broker配置文件中獲取transactionCheckMax參數(shù)值,表示事務(wù)的最大檢測次數(shù),如果超過檢測次數(shù),消息會默認(rèn)為丟棄,即回滾消息。

?*

?* @throws Exception

?*/

@Test

public void testTransactionProducer() throws Exception {

????// 創(chuàng)建一個事務(wù)消息生產(chǎn)者

????TransactionMQProducer producer = new TransactionMQProducer("test-group");

????producer.setNamesrvAddr("localhost:9876");

????// 設(shè)置事務(wù)消息監(jiān)聽器

????producer.setTransactionListener(new TransactionListener() {

????????// 這個是執(zhí)行本地業(yè)務(wù)方法

????????@Override

????????public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {

????????????System.out.println(new Date());

????????????System.out.println(new String(msg.getBody()));

????????????// 這個可以使用try catch對業(yè)務(wù)代碼進(jìn)行性包裹

????????????// COMMIT_MESSAGE 表示允許消費(fèi)者消費(fèi)該消息

????????????// ROLLBACK_MESSAGE 表示該消息將被刪除,不允許消費(fèi)

????????????// UNKNOW表示需要MQ回查才能確定狀態(tài) 那么過一會 代碼會走下面的checkLocalTransaction(msg)方法

????????????return LocalTransactionState.UNKNOW;

????????}


????????// 這里是回查方法 回查不是再次執(zhí)行業(yè)務(wù)操作,而是確認(rèn)上面的操作是否有結(jié)果

????????// 默認(rèn)是1min回查 默認(rèn)回查15次 超過次數(shù)則丟棄打印日志 可以通過參數(shù)設(shè)置

????????// transactionTimeOut 超時時間

????????// transactionCheckMax 最大回查次數(shù)

????????// transactionCheckInterval 回查間隔時間單位毫秒

????????// 觸發(fā)條件

????????// 1.當(dāng)上面執(zhí)行本地事務(wù)返回結(jié)果UNKNOW時,或者下面的回查方法也返回UNKNOW時 會觸發(fā)回查

????????// 2.當(dāng)上面操作超過20s沒有做出一個結(jié)果,也就是超時或者卡主了,也會進(jìn)行回查

????????@Override

????????public LocalTransactionState checkLocalTransaction(MessageExt msg) {

????????????System.err.println(new Date());

????????????System.err.println(new String(msg.getBody()));

????????????// 這里

????????????return LocalTransactionState.UNKNOW;

????????}

????});

????producer.start();

????Message message = new Message("TopicTest2", "我是一個事務(wù)消息".getBytes());

????// 發(fā)送消息

????producer.sendMessageInTransaction(message, null);

????System.out.println(new Date());

????System.in.read();

}

15.3?事務(wù)消息消費(fèi)者

@Test

public void testTransactionConsumer() throws Exception {

????// 創(chuàng)建默認(rèn)消費(fèi)者組

????DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");

????// 設(shè)置nameServer地址

????consumer.setNamesrvAddr("localhost:9876");

????// 訂閱一個主題來消費(fèi)???*表示沒有過濾參數(shù) 表示這個主題的任何消息

????consumer.subscribe("TopicTest2", "*");

????// 注冊一個消費(fèi)監(jiān)聽?MessageListenerConcurrently是并發(fā)消費(fèi)

????// 默認(rèn)是20個線程一起消費(fèi),可以參看?consumer.setConsumeThreadMax()

????consumer.registerMessageListener(new MessageListenerConcurrently() {

????????@Override

????????public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,

????????????????????????????????????????????????????????ConsumeConcurrentlyContext context) {

????????????// 這里執(zhí)行消費(fèi)的代碼 默認(rèn)是多線程消費(fèi)

????????????System.out.println(Thread.currentThread().getName() + "----" + new String(msgs.get(0).getBody()));

????????????return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

????????}

????});

????consumer.start();

????System.in.read();

}

15.4?測試結(jié)果

16.?RocketMQ發(fā)送帶標(biāo)簽的消息,消息過濾

Rocketmq提供消息過濾功能,通過tag或者key進(jìn)行區(qū)分

我們往一個主題里面發(fā)送消息的時候,根據(jù)業(yè)務(wù)邏輯,可能需要區(qū)分,比如帶有tagA標(biāo)簽的被A消費(fèi),帶有tagB標(biāo)簽的被B消費(fèi),還有在事務(wù)監(jiān)聽的類里面,只要是事務(wù)消息都要走同一個監(jiān)聽,我們也需要通過過濾才區(qū)別對待

16.1?標(biāo)簽消息生產(chǎn)者

@Test

public void testTagProducer() throws Exception {

????// 創(chuàng)建默認(rèn)的生產(chǎn)者

????DefaultMQProducer producer = new DefaultMQProducer("test-group");

????// 設(shè)置nameServer地址

????producer.setNamesrvAddr("localhost:9876");

????// 啟動實(shí)例

????producer.start();

????Message msg = new Message("TopicTest","tagA", "我是一個帶標(biāo)記的消息".getBytes());

????SendResult send = producer.send(msg);

????System.out.println(send);

????// 關(guān)閉實(shí)例

????producer.shutdown();

}

?

16.2?標(biāo)簽消息消費(fèi)者

@Test

public void testTagConsumer() throws Exception {

????// 創(chuàng)建默認(rèn)消費(fèi)者組

????DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");

????// 設(shè)置nameServer地址

????consumer.setNamesrvAddr("localhost:9876");

????// 訂閱一個主題來消費(fèi) ??表達(dá)式,默認(rèn)是*,支持"tagA || tagB || tagC" 這樣或者的寫法 只要是符合任何一個標(biāo)簽都可以消費(fèi)

????consumer.subscribe("TopicTest", "tagA || tagB || tagC");

????// 注冊一個消費(fèi)監(jiān)聽?MessageListenerConcurrently是并發(fā)消費(fèi)

????// 默認(rèn)是20個線程一起消費(fèi),可以參看?consumer.setConsumeThreadMax()

????consumer.registerMessageListener(new MessageListenerConcurrently() {

????????@Override

????????public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,

????????????????????????????????????????????????????????ConsumeConcurrentlyContext context) {

????????????// 這里執(zhí)行消費(fèi)的代碼 默認(rèn)是多線程消費(fèi)

????????????System.out.println(Thread.currentThread().getName() + "----" + new String(msgs.get(0).getBody()));

????????????System.out.println(msgs.get(0).getTags());

????????????return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

????????}

????});

????consumer.start();

????System.in.read();

}

?

17.?RocketMQ中消息的Key(業(yè)務(wù)相關(guān))

在rocketmq中的消息,默認(rèn)會有一個messageId當(dāng)做消息的唯一標(biāo)識,我們也可以給消息攜帶一個key,用作唯一標(biāo)識或者業(yè)務(wù)標(biāo)識,包括在控制面板查詢的時候也可以使用messageId或者key來進(jìn)行查詢

17.1?帶key消息生產(chǎn)者

@Test

public void testKeyProducer() throws Exception {

????// 創(chuàng)建默認(rèn)的生產(chǎn)者

????DefaultMQProducer producer = new DefaultMQProducer("test-group");

????// 設(shè)置nameServer地址

????producer.setNamesrvAddr("localhost:9876");

????// 啟動實(shí)例

????producer.start();

????Message msg = new Message("TopicTest","tagA","key", "我是一個帶標(biāo)記和key的消息".getBytes());

????SendResult send = producer.send(msg);

????System.out.println(send);

????// 關(guān)閉實(shí)例

????producer.shutdown();

}

17.2?帶key消息消費(fèi)者

@Test

public void testKeyConsumer() throws Exception {

????// 創(chuàng)建默認(rèn)消費(fèi)者組

????DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");

????// 設(shè)置nameServer地址

????consumer.setNamesrvAddr("localhost:9876");

????// 訂閱一個主題來消費(fèi) ??表達(dá)式,默認(rèn)是*,支持"tagA || tagB || tagC" 這樣或者的寫法 只要是符合任何一個標(biāo)簽都可以消費(fèi)

????consumer.subscribe("TopicTest", "tagA || tagB || tagC");

????// 注冊一個消費(fèi)監(jiān)聽?MessageListenerConcurrently是并發(fā)消費(fèi)

????// 默認(rèn)是20個線程一起消費(fèi),可以參看?consumer.setConsumeThreadMax()

????consumer.registerMessageListener(new MessageListenerConcurrently() {

????????@Override

????????public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,

????????????????????????????????????????????????????????ConsumeConcurrentlyContext context) {

????????????// 這里執(zhí)行消費(fèi)的代碼 默認(rèn)是多線程消費(fèi)

????????????System.out.println(Thread.currentThread().getName() + "----" + new String(msgs.get(0).getBody()));

????????????System.out.println(msgs.get(0).getTags());

????????????System.out.println(msgs.get(0).getKeys());

????????????return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

????????}

????});

????consumer.start();

????System.in.read();

}

18.?RocketMQ重試機(jī)制

18.1?生產(chǎn)者重試

// 失敗的情況重發(fā)3次

producer.setRetryTimesWhenSendFailed(3);

// 消息在1S內(nèi)沒有發(fā)送成功,就會重試

producer.send(msg, 1000);

18.2?消費(fèi)者重試

在消費(fèi)者放return ConsumeConcurrentlyStatus.RECONSUME_LATER;后就會執(zhí)行重試

上圖代碼中說明了,我們再實(shí)際生產(chǎn)過程中,一般重試5-7次,如果還沒有消費(fèi)成功,則可以把消息簽收了,通知人工等處理

messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

/**

?* 測試消費(fèi)者

?*

?* @throws Exception

?*/

@Test

public void testConsumer() throws Exception {

????// 創(chuàng)建默認(rèn)消費(fèi)者組

????DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");

????// 設(shè)置nameServer地址

????consumer.setNamesrvAddr("localhost:9876");

????// 訂閱一個主題來消費(fèi)???*表示沒有過濾參數(shù) 表示這個主題的任何消息

????consumer.subscribe("TopicTest", "*");

????// 注冊一個消費(fèi)監(jiān)聽

????consumer.registerMessageListener(new MessageListenerConcurrently() {

????????@Override

????????public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,

????????????????????????????????????????????????????????ConsumeConcurrentlyContext context) {

????????????try {

????????????????// 這里執(zhí)行消費(fèi)的代碼

????????????????System.out.println(Thread.currentThread().getName() + "----" + msgs);

????????????????// 這里制造一個錯誤

????????????????int i = 10 / 0;

????????????} catch (Exception e) {

????????????????// 出現(xiàn)問題 判斷重試的次數(shù)

????????????????MessageExt messageExt = msgs.get(0);

????????????????// 獲取重試的次數(shù)?失敗一次消息中的失敗次數(shù)會累加一次

????????????????int reconsumeTimes = messageExt.getReconsumeTimes();

????????????????if (reconsumeTimes >= 3) {

????????????????????// 則把消息確認(rèn)了,可以將這條消息記錄到日志或者數(shù)據(jù)庫 通知人工處理

????????????????????return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

????????????????} else {

????????????????????return ConsumeConcurrentlyStatus.RECONSUME_LATER;

????????????????}

????????????}

????????????return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

????????}

????});

????consumer.start();

????System.in.read();

}

19.?RocketMQ死信消息

當(dāng)消費(fèi)重試到達(dá)閾值以后,消息不會被投遞給消費(fèi)者了,而是進(jìn)入了死信隊(duì)列

19.1?消息生產(chǎn)者

@Test

public void testDeadMsgProducer() throws Exception {

????DefaultMQProducer producer = new DefaultMQProducer("dead-group");

????producer.setNamesrvAddr("localhost:9876");

????producer.start();

????Message message = new Message("dead-topic", "我是一個死信消息".getBytes());

????producer.send(message);

????producer.shutdown();

}

19.2?消息消費(fèi)者

@Test

public void testDeadMsgConsumer() throws Exception {

????DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("dead-group");

????consumer.setNamesrvAddr("localhost:9876");

????consumer.subscribe("dead-topic", "*");

????// 設(shè)置最大消費(fèi)重試次數(shù)?2 次

????consumer.setMaxReconsumeTimes(2);

????consumer.registerMessageListener(new MessageListenerConcurrently() {

????????@Override

????????public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

????????????System.out.println(msgs);

????????????// 測試消費(fèi)失敗

????????????return ConsumeConcurrentlyStatus.RECONSUME_LATER;

????????}

????});

????consumer.start();

????System.in.read();

}

19.3?死信消費(fèi)者

注意權(quán)限問題 perm 2讀??4寫??6讀寫

@Test

public void testDeadMq() throws ?Exception{

????DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("dead-group");

????consumer.setNamesrvAddr("localhost:9876");

????// 消費(fèi)重試到達(dá)閾值以后,消息不會被投遞給消費(fèi)者了,而是進(jìn)入了死信隊(duì)列

????// 隊(duì)列名稱 默認(rèn)是?%DLQ% + 消費(fèi)者組名

????consumer.subscribe("%DLQ%dead-group", "*");

????consumer.registerMessageListener(new MessageListenerConcurrently() {

????????@Override

????????public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

????????????System.out.println(msgs);

????????????// 處理消息 簽收了

????????????return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

????????}

????});

????consumer.start();

????System.in.read();

}

19.4?控制臺顯示





動力節(jié)點(diǎn)RocketMQ全套視頻教程-5小時學(xué)會rocketmq消息隊(duì)列的評論 (共 條)

分享到微博請遵守國家法律
邵阳县| 射洪县| 资溪县| 田东县| 武功县| 宜章县| 余干县| 博湖县| 衡南县| 左贡县| 锡林郭勒盟| 彰化县| 宝清县| 吉首市| 安国市| 车险| 安西县| 青州市| 巨野县| 盘锦市| 嵊泗县| 涡阳县| 卢龙县| 读书| 沙湾县| 桦川县| 浦县| 含山县| 涡阳县| 黑龙江省| 图木舒克市| 正镶白旗| 潞西市| 香港 | 金寨县| 长宁县| 成都市| 葵青区| 汶川县| 佛山市| 海门市|