動(dòng)力節(jié)點(diǎn)RocketMQ全套視頻教程-5小時(shí)學(xué)會(huì)rocketmq消息隊(duì)列

20.?RocketMQ消息重復(fù)消費(fèi)問題
20.1?為什么會(huì)出現(xiàn)重復(fù)消費(fèi)問題呢?
BROADCASTING(廣播)模式下,所有注冊(cè)的消費(fèi)者都會(huì)消費(fèi),而這些消費(fèi)者通常是集群部署的一個(gè)個(gè)微服務(wù),這樣就會(huì)多臺(tái)機(jī)器重復(fù)消費(fèi),當(dāng)然這個(gè)是根據(jù)需要來選擇。
CLUSTERING(負(fù)載均衡)模式下,如果一個(gè)topic被多個(gè)consumerGroup消費(fèi),也會(huì)重復(fù)消費(fèi)。
即使是在CLUSTERING模式下,同一個(gè)consumerGroup下,一個(gè)隊(duì)列只會(huì)分配給一個(gè)消費(fèi)者,看起來好像是不會(huì)重復(fù)消費(fèi)。但是,有個(gè)特殊情況:一個(gè)消費(fèi)者新上線后,同組的所有消費(fèi)者要重新負(fù)載均衡(反之一個(gè)消費(fèi)者掉線后,也一樣)。一個(gè)隊(duì)列所對(duì)應(yīng)的新的消費(fèi)者要獲取之前消費(fèi)的offset(偏移量,也就是消息消費(fèi)的點(diǎn)位),此時(shí)之前的消費(fèi)者可能已經(jīng)消費(fèi)了一條消息,但是并沒有把offset提交給broker,那么新的消費(fèi)者可能會(huì)重新消費(fèi)一次。雖然orderly模式是前一個(gè)消費(fèi)者先解鎖,后一個(gè)消費(fèi)者加鎖再消費(fèi)的模式,比起concurrently要嚴(yán)格了,但是加鎖的線程和提交offset的線程不是同一個(gè),所以還是會(huì)出現(xiàn)極端情況下的重復(fù)消費(fèi)。
還有在發(fā)送批量消息的時(shí)候,會(huì)被當(dāng)做一條消息進(jìn)行處理,那么如果批量消息中有一條業(yè)務(wù)處理成功,其他失敗了,還是會(huì)被重新消費(fèi)一次。
那么如果在CLUSTERING(負(fù)載均衡)模式下,并且在同一個(gè)消費(fèi)者組中,不希望一條消息被重復(fù)消費(fèi),改怎么辦呢?我們可以想到去重操作,找到消息唯一的標(biāo)識(shí),可以是msgId也可以是你自定義的唯一的key,這樣就可以去重了
20.2?解決方案
使用去重方案解決,例如將消息的唯一標(biāo)識(shí)存起來,然后每次消費(fèi)之前先判斷是否存在這個(gè)唯一標(biāo)識(shí),如果存在則不消費(fèi),如果不存在則消費(fèi),并且消費(fèi)以后將這個(gè)標(biāo)記保存。
想法很好,但是消息的體量是非常大的,可能在生產(chǎn)環(huán)境中會(huì)到達(dá)上千萬甚至上億條,那么我們?cè)撊绾芜x擇一個(gè)容器來保存所有消息的標(biāo)識(shí),并且又可以快速的判斷是否存在呢?
我們可以選擇布隆過濾器(BloomFilter)
布隆過濾器(Bloom Filter)是1970年由布隆提出的。它實(shí)際上是一個(gè)很長(zhǎng)的二進(jìn)制向量和一系列隨機(jī)映射函數(shù)。布隆過濾器可以用于檢索一個(gè)元素是否在一個(gè)集合中。它的優(yōu)點(diǎn)是空間效率和查詢時(shí)間都比一般的算法要好的多,缺點(diǎn)是有一定的誤識(shí)別率和刪除困難。
在hutool的工具中我們可以直接使用,當(dāng)然你自己使用redis的bitmap類型手寫一個(gè)也是可以的 https://hutool.cn/docs/#/bloomFilter/%E6%A6%82%E8%BF%B0?

20.5?測(cè)試消費(fèi)者
/**
?* 在boot項(xiàng)目中可以使用@Bean在整個(gè)容器中放置一個(gè)單利對(duì)象
?*/
public static BitMapBloomFilter bloomFilter = new BitMapBloomFilter(100);
@Test
public void testRepeatConsumer() throws Exception {
????// 創(chuàng)建默認(rèn)消費(fèi)者組
????DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
????consumer.setMessageModel(MessageModel.BROADCASTING);
????// 設(shè)置nameServer地址
????consumer.setNamesrvAddr("localhost:9876");
????// 訂閱一個(gè)主題來消費(fèi) ??表達(dá)式,默認(rèn)是*
????consumer.subscribe("TopicTest", "*");
????// 注冊(cè)一個(gè)消費(fèi)監(jiān)聽?MessageListenerConcurrently是并發(fā)消費(fèi)
????// 默認(rèn)是20個(gè)線程一起消費(fèi),可以參看?consumer.setConsumeThreadMax()
????consumer.registerMessageListener(new MessageListenerConcurrently() {
????????@Override
????????public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
????????????????????????????????????????????????????????ConsumeConcurrentlyContext context) {
????????????// 拿到消息的key
????????????MessageExt messageExt = msgs.get(0);
????????????String keys = messageExt.getKeys();
????????????// 判斷是否存在布隆過濾器中
????????????if (bloomFilter.contains(keys)) {
????????????????// 直接返回了 不往下處理業(yè)務(wù)
????????????????return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
????????????}
????????????// 這個(gè)處理業(yè)務(wù),然后放入過濾器中
????????????// do sth...
????????????bloomFilter.add(keys);
????????????return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
????????}
????});
????consumer.start();
????System.in.read();
}
21.?RocketMQ集成SpringBoot
21.1?搭建rocketmq-producer(消息生產(chǎn)者)


21.1.1?創(chuàng)建項(xiàng)目,完整的pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
?????????xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
????<modelVersion>4.0.0</modelVersion>
????<parent>
????????<groupId>org.springframework.boot</groupId>
????????<artifactId>spring-boot-starter-parent</artifactId>
????????<version>2.6.3</version>
????????<relativePath/> <!-- lookup parent from repository -->
????</parent>
????<groupId>com.powernode</groupId>
????<artifactId>01-rocketmq-producer</artifactId>
????<version>0.0.1-SNAPSHOT</version>
????<name>rocketmq-producer</name>
????<description>Demo project for Spring Boot</description>
????<properties>
????????<java.version>1.8</java.version>
????</properties>
????<dependencies>
????????<dependency>
????????????<groupId>org.springframework.boot</groupId>
????????????<artifactId>spring-boot-starter-web</artifactId>
????????</dependency>
????????<!-- rocketmq的依賴 -->
????????<dependency>
????????????<groupId>org.apache.rocketmq</groupId>
????????????<artifactId>rocketmq-spring-boot-starter</artifactId>
????????????<version>2.2.2</version>
????????</dependency>
????????<dependency>
????????????<groupId>org.projectlombok</groupId>
????????????<artifactId>lombok</artifactId>
????????????<optional>true</optional>
????????</dependency>
????????<dependency>
????????????<groupId>org.springframework.boot</groupId>
????????????<artifactId>spring-boot-starter-test</artifactId>
????????????<scope>test</scope>
????????</dependency>
????</dependencies>
????<build>
????????<plugins>
????????????<plugin>
????????????????<groupId>org.springframework.boot</groupId>
????????????????<artifactId>spring-boot-maven-plugin</artifactId>
????????????????<configuration>
????????????????????<excludes>
????????????????????????<exclude>
????????????????????????????<groupId>org.projectlombok</groupId>
????????????????????????????<artifactId>lombok</artifactId>
????????????????????????</exclude>
????????????????????</excludes>
????????????????</configuration>
????????????</plugin>
????????</plugins>
????</build>
</project>
?
21.1.2?修改配置文件application.yml
spring:
????application:
????????name: rocketmq-producer
rocketmq:
????name-server: 127.0.0.1:9876 ????# rocketMq的nameServer地址
????producer:
????????group: powernode-group ???????# 生產(chǎn)者組別
????????send-message-timeout: 3000 ?# 消息發(fā)送的超時(shí)時(shí)間
????????retry-times-when-send-async-failed: 2 ?# 異步消息發(fā)送失敗重試次數(shù)
????????max-message-size: 4194304 ??????# 消息的最大長(zhǎng)度
?21.1.3?我們?cè)跍y(cè)試類里面測(cè)試發(fā)送消息
往powernode主題里面發(fā)送一個(gè)簡(jiǎn)單的字符串消息
/**
?* 注入rocketMQTemplate,我們使用它來操作mq
?*/
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
?* 測(cè)試發(fā)送簡(jiǎn)單的消息
?*
?* @throws Exception
?*/
@Test
public void testSimpleMsg() throws Exception {
????// 往powernode的主題里面發(fā)送一個(gè)簡(jiǎn)單的字符串消息
????SendResult sendResult = rocketMQTemplate.syncSend("powernode", "我是一個(gè)簡(jiǎn)單的消息");
????// 拿到消息的發(fā)送狀態(tài)
????System.out.println(sendResult.getSendStatus());
????// 拿到消息的id
????System.out.println(sendResult.getMsgId());
}
?
運(yùn)行后查看控制臺(tái)

21.1.4?查看rocketMq的控制臺(tái)

查看消息細(xì)節(jié)

21.2?搭建rocketmq-consumer(消息消費(fèi)者)


21.2.1?創(chuàng)建項(xiàng)目,完整的pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
?????????xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
????<modelVersion>4.0.0</modelVersion>
????<parent>
????????<groupId>org.springframework.boot</groupId>
????????<artifactId>spring-boot-starter-parent</artifactId>
????????<version>2.6.3</version>
????????<relativePath/> <!-- lookup parent from repository -->
????</parent>
????<groupId>com.powernode</groupId>
????<artifactId>02-rocketmq-consumer</artifactId>
????<version>0.0.1-SNAPSHOT</version>
????<name>rocketmq-consumer</name>
????<description>Demo project for Spring Boot</description>
????<properties>
????????<java.version>1.8</java.version>
????</properties>
????<dependencies>
????????<dependency>
????????????<groupId>org.springframework.boot</groupId>
????????????<artifactId>spring-boot-starter-web</artifactId>
????????</dependency>
????????<!-- rocketmq的依賴 -->
????????<dependency>
????????????<groupId>org.apache.rocketmq</groupId>
????????????<artifactId>rocketmq-spring-boot-starter</artifactId>
????????????<version>2.2.2</version>
????????</dependency>
????????<dependency>
????????????<groupId>org.projectlombok</groupId>
????????????<artifactId>lombok</artifactId>
????????????<optional>true</optional>
????????</dependency>
????????<dependency>
????????????<groupId>org.springframework.boot</groupId>
????????????<artifactId>spring-boot-starter-test</artifactId>
????????????<scope>test</scope>
????????</dependency>
????</dependencies>
????<build>
????????<plugins>
????????????<plugin>
????????????????<groupId>org.springframework.boot</groupId>
????????????????<artifactId>spring-boot-maven-plugin</artifactId>
????????????????<configuration>
????????????????????<excludes>
????????????????????????<exclude>
????????????????????????????<groupId>org.projectlombok</groupId>
????????????????????????????<artifactId>lombok</artifactId>
????????????????????????</exclude>
????????????????????</excludes>
????????????????</configuration>
????????????</plugin>
????????</plugins>
????</build>
</project>
?
21.2.2?修改配置文件application.yml
spring:
????application:
????????name: rocketmq-consumer
rocketmq:
????name-server: 127.0.0.1:9876
21.2.3?添加一個(gè)監(jiān)聽的類SimpleMsgListener
消費(fèi)者要消費(fèi)消息,就添加一個(gè)監(jiān)聽
package com.powernode.listener;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
?* 創(chuàng)建一個(gè)簡(jiǎn)單消息的監(jiān)聽
?* 1.類上添加注解@Component和@RocketMQMessageListener
?* ?????@RocketMQMessageListener(topic = "powernode", consumerGroup = "powernode-group")
?* ?????topic指定消費(fèi)的主題,consumerGroup指定消費(fèi)組,一個(gè)主題可以有多個(gè)消費(fèi)者組,一個(gè)消息可以被多個(gè)不同的組的消費(fèi)者都消費(fèi)
?* 2.實(shí)現(xiàn)RocketMQListener接口,注意泛型的使用,可以為具體的類型,如果想拿到消息
?*?的其他參數(shù)可以寫成MessageExt
?*/
@Component
@RocketMQMessageListener(topic = "powernode", consumerGroup = "powernode-group",messageModel = MessageModel.CLUSTERING)
public class SimpleMsgListener implements RocketMQListener<String> {
????/**
?????* 消費(fèi)消息的方法
?????*
?????* @param message
?????*/
????@Override
????public void onMessage(String message) {
????????System.out.println(message);
????}
}