最美情侣中文字幕电影,在线麻豆精品传媒,在线网站高清黄,久久黄色视频

歡迎光臨散文網(wǎng) 會(huì)員登陸 & 注冊(cè)

動(dòng)力節(jié)點(diǎn)RocketMQ全套視頻教程-5小時(shí)學(xué)會(huì)rocketmq消息隊(duì)列

2023-05-16 11:17 作者:摸魚協(xié)會(huì)秘書長(zhǎng)  | 我要投稿

20.?RocketMQ消息重復(fù)消費(fèi)問題

20.1?為什么會(huì)出現(xiàn)重復(fù)消費(fèi)問題呢?

BROADCASTING(廣播)模式下,所有注冊(cè)的消費(fèi)者都會(huì)消費(fèi),而這些消費(fèi)者通常是集群部署的一個(gè)個(gè)微服務(wù),這樣就會(huì)多臺(tái)機(jī)器重復(fù)消費(fèi),當(dāng)然這個(gè)是根據(jù)需要來選擇。

CLUSTERING(負(fù)載均衡)模式下,如果一個(gè)topic被多個(gè)consumerGroup消費(fèi),也會(huì)重復(fù)消費(fèi)。

即使是在CLUSTERING模式下,同一個(gè)consumerGroup下,一個(gè)隊(duì)列只會(huì)分配給一個(gè)消費(fèi)者,看起來好像是不會(huì)重復(fù)消費(fèi)。但是,有個(gè)特殊情況:一個(gè)消費(fèi)者新上線后,同組的所有消費(fèi)者要重新負(fù)載均衡(反之一個(gè)消費(fèi)者掉線后,也一樣)。一個(gè)隊(duì)列所對(duì)應(yīng)的新的消費(fèi)者要獲取之前消費(fèi)的offset(偏移量,也就是消息消費(fèi)的點(diǎn)位),此時(shí)之前的消費(fèi)者可能已經(jīng)消費(fèi)了一條消息,但是并沒有把offset提交給broker,那么新的消費(fèi)者可能會(huì)重新消費(fèi)一次。雖然orderly模式是前一個(gè)消費(fèi)者先解鎖,后一個(gè)消費(fèi)者加鎖再消費(fèi)的模式,比起concurrently要嚴(yán)格了,但是加鎖的線程和提交offset的線程不是同一個(gè),所以還是會(huì)出現(xiàn)極端情況下的重復(fù)消費(fèi)。

還有在發(fā)送批量消息的時(shí)候,會(huì)被當(dāng)做一條消息進(jìn)行處理,那么如果批量消息中有一條業(yè)務(wù)處理成功,其他失敗了,還是會(huì)被重新消費(fèi)一次。

那么如果在CLUSTERING(負(fù)載均衡)模式下,并且在同一個(gè)消費(fèi)者組中,不希望一條消息被重復(fù)消費(fèi),改怎么辦呢?我們可以想到去重操作,找到消息唯一的標(biāo)識(shí),可以是msgId也可以是你自定義的唯一的key,這樣就可以去重了

20.2?解決方案

使用去重方案解決,例如將消息的唯一標(biāo)識(shí)存起來,然后每次消費(fèi)之前先判斷是否存在這個(gè)唯一標(biāo)識(shí),如果存在則不消費(fèi),如果不存在則消費(fèi),并且消費(fèi)以后將這個(gè)標(biāo)記保存。

想法很好,但是消息的體量是非常大的,可能在生產(chǎn)環(huán)境中會(huì)到達(dá)上千萬甚至上億條,那么我們?cè)撊绾芜x擇一個(gè)容器來保存所有消息的標(biāo)識(shí),并且又可以快速的判斷是否存在呢?

我們可以選擇布隆過濾器(BloomFilter)

布隆過濾器(Bloom Filter)是1970年由布隆提出的。它實(shí)際上是一個(gè)很長(zhǎng)的二進(jìn)制向量和一系列隨機(jī)映射函數(shù)。布隆過濾器可以用于檢索一個(gè)元素是否在一個(gè)集合中。它的優(yōu)點(diǎn)是空間效率和查詢時(shí)間都比一般的算法要好的多,缺點(diǎn)是有一定的誤識(shí)別率和刪除困難。

在hutool的工具中我們可以直接使用,當(dāng)然你自己使用redis的bitmap類型手寫一個(gè)也是可以的 https://hutool.cn/docs/#/bloomFilter/%E6%A6%82%E8%BF%B0?

20.5?測(cè)試消費(fèi)者

/**

?* 在boot項(xiàng)目中可以使用@Bean在整個(gè)容器中放置一個(gè)單利對(duì)象

?*/

public static BitMapBloomFilter bloomFilter = new BitMapBloomFilter(100);


@Test

public void testRepeatConsumer() throws Exception {

????// 創(chuàng)建默認(rèn)消費(fèi)者組

????DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");

????consumer.setMessageModel(MessageModel.BROADCASTING);

????// 設(shè)置nameServer地址

????consumer.setNamesrvAddr("localhost:9876");

????// 訂閱一個(gè)主題來消費(fèi) ??表達(dá)式,默認(rèn)是*

????consumer.subscribe("TopicTest", "*");

????// 注冊(cè)一個(gè)消費(fèi)監(jiān)聽?MessageListenerConcurrently是并發(fā)消費(fèi)

????// 默認(rèn)是20個(gè)線程一起消費(fèi),可以參看?consumer.setConsumeThreadMax()

????consumer.registerMessageListener(new MessageListenerConcurrently() {

????????@Override

????????public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,

????????????????????????????????????????????????????????ConsumeConcurrentlyContext context) {

????????????// 拿到消息的key

????????????MessageExt messageExt = msgs.get(0);

????????????String keys = messageExt.getKeys();

????????????// 判斷是否存在布隆過濾器中

????????????if (bloomFilter.contains(keys)) {

????????????????// 直接返回了 不往下處理業(yè)務(wù)

????????????????return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

????????????}

????????????// 這個(gè)處理業(yè)務(wù),然后放入過濾器中

????????????// do sth...

????????????bloomFilter.add(keys);

????????????return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

????????}

????});

????consumer.start();

????System.in.read();

}

21.?RocketMQ集成SpringBoot

21.1?搭建rocketmq-producer(消息生產(chǎn)者)


21.1.1?創(chuàng)建項(xiàng)目,完整的pom.xml

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

?????????xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

????<modelVersion>4.0.0</modelVersion>

????<parent>

????????<groupId>org.springframework.boot</groupId>

????????<artifactId>spring-boot-starter-parent</artifactId>

????????<version>2.6.3</version>

????????<relativePath/> <!-- lookup parent from repository -->

????</parent>

????<groupId>com.powernode</groupId>

????<artifactId>01-rocketmq-producer</artifactId>

????<version>0.0.1-SNAPSHOT</version>

????<name>rocketmq-producer</name>

????<description>Demo project for Spring Boot</description>

????<properties>

????????<java.version>1.8</java.version>

????</properties>

????<dependencies>

????????<dependency>

????????????<groupId>org.springframework.boot</groupId>

????????????<artifactId>spring-boot-starter-web</artifactId>

????????</dependency>

????????<!-- rocketmq的依賴 -->

????????<dependency>

????????????<groupId>org.apache.rocketmq</groupId>

????????????<artifactId>rocketmq-spring-boot-starter</artifactId>

????????????<version>2.2.2</version>

????????</dependency>


????????<dependency>

????????????<groupId>org.projectlombok</groupId>

????????????<artifactId>lombok</artifactId>

????????????<optional>true</optional>

????????</dependency>

????????<dependency>

????????????<groupId>org.springframework.boot</groupId>

????????????<artifactId>spring-boot-starter-test</artifactId>

????????????<scope>test</scope>

????????</dependency>

????</dependencies>


????<build>

????????<plugins>

????????????<plugin>

????????????????<groupId>org.springframework.boot</groupId>

????????????????<artifactId>spring-boot-maven-plugin</artifactId>

????????????????<configuration>

????????????????????<excludes>

????????????????????????<exclude>

????????????????????????????<groupId>org.projectlombok</groupId>

????????????????????????????<artifactId>lombok</artifactId>

????????????????????????</exclude>

????????????????????</excludes>

????????????????</configuration>

????????????</plugin>

????????</plugins>

????</build>


</project>

?

21.1.2?修改配置文件application.yml

spring:

????application:

????????name: rocketmq-producer

rocketmq:

????name-server: 127.0.0.1:9876 ????# rocketMq的nameServer地址

????producer:

????????group: powernode-group ???????# 生產(chǎn)者組別

????????send-message-timeout: 3000 ?# 消息發(fā)送的超時(shí)時(shí)間

????????retry-times-when-send-async-failed: 2 ?# 異步消息發(fā)送失敗重試次數(shù)

????????max-message-size: 4194304 ??????# 消息的最大長(zhǎng)度

?21.1.3?我們?cè)跍y(cè)試類里面測(cè)試發(fā)送消息

往powernode主題里面發(fā)送一個(gè)簡(jiǎn)單的字符串消息

/**

?* 注入rocketMQTemplate,我們使用它來操作mq

?*/

@Autowired

private RocketMQTemplate rocketMQTemplate;


/**

?* 測(cè)試發(fā)送簡(jiǎn)單的消息

?*

?* @throws Exception

?*/

@Test

public void testSimpleMsg() throws Exception {

????// 往powernode的主題里面發(fā)送一個(gè)簡(jiǎn)單的字符串消息

????SendResult sendResult = rocketMQTemplate.syncSend("powernode", "我是一個(gè)簡(jiǎn)單的消息");

????// 拿到消息的發(fā)送狀態(tài)

????System.out.println(sendResult.getSendStatus());

????// 拿到消息的id

????System.out.println(sendResult.getMsgId());

}

?

運(yùn)行后查看控制臺(tái)

21.1.4?查看rocketMq的控制臺(tái)

查看消息細(xì)節(jié)


21.2?搭建rocketmq-consumer(消息消費(fèi)者)


21.2.1?創(chuàng)建項(xiàng)目,完整的pom.xml

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

?????????xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

????<modelVersion>4.0.0</modelVersion>

????<parent>

????????<groupId>org.springframework.boot</groupId>

????????<artifactId>spring-boot-starter-parent</artifactId>

????????<version>2.6.3</version>

????????<relativePath/> <!-- lookup parent from repository -->

????</parent>

????<groupId>com.powernode</groupId>

????<artifactId>02-rocketmq-consumer</artifactId>

????<version>0.0.1-SNAPSHOT</version>

????<name>rocketmq-consumer</name>

????<description>Demo project for Spring Boot</description>

????<properties>

????????<java.version>1.8</java.version>

????</properties>

????<dependencies>

????????<dependency>

????????????<groupId>org.springframework.boot</groupId>

????????????<artifactId>spring-boot-starter-web</artifactId>

????????</dependency>

????????<!-- rocketmq的依賴 -->

????????<dependency>

????????????<groupId>org.apache.rocketmq</groupId>

????????????<artifactId>rocketmq-spring-boot-starter</artifactId>

????????????<version>2.2.2</version>

????????</dependency>

????????<dependency>

????????????<groupId>org.projectlombok</groupId>

????????????<artifactId>lombok</artifactId>

????????????<optional>true</optional>

????????</dependency>

????????<dependency>

????????????<groupId>org.springframework.boot</groupId>

????????????<artifactId>spring-boot-starter-test</artifactId>

????????????<scope>test</scope>

????????</dependency>

????</dependencies>


????<build>

????????<plugins>

????????????<plugin>

????????????????<groupId>org.springframework.boot</groupId>

????????????????<artifactId>spring-boot-maven-plugin</artifactId>

????????????????<configuration>

????????????????????<excludes>

????????????????????????<exclude>

????????????????????????????<groupId>org.projectlombok</groupId>

????????????????????????????<artifactId>lombok</artifactId>

????????????????????????</exclude>

????????????????????</excludes>

????????????????</configuration>

????????????</plugin>

????????</plugins>

????</build>


</project>

?

21.2.2?修改配置文件application.yml

spring:

????application:

????????name: rocketmq-consumer

rocketmq:

????name-server: 127.0.0.1:9876

21.2.3?添加一個(gè)監(jiān)聽的類SimpleMsgListener

消費(fèi)者要消費(fèi)消息,就添加一個(gè)監(jiān)聽

package com.powernode.listener;


import org.apache.rocketmq.spring.annotation.MessageModel;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;

import org.apache.rocketmq.spring.core.RocketMQListener;

import org.springframework.stereotype.Component;


/**

?* 創(chuàng)建一個(gè)簡(jiǎn)單消息的監(jiān)聽

?* 1.類上添加注解@Component和@RocketMQMessageListener

?* ?????@RocketMQMessageListener(topic = "powernode", consumerGroup = "powernode-group")

?* ?????topic指定消費(fèi)的主題,consumerGroup指定消費(fèi)組,一個(gè)主題可以有多個(gè)消費(fèi)者組,一個(gè)消息可以被多個(gè)不同的組的消費(fèi)者都消費(fèi)

?* 2.實(shí)現(xiàn)RocketMQListener接口,注意泛型的使用,可以為具體的類型,如果想拿到消息

?*?的其他參數(shù)可以寫成MessageExt

?*/

@Component

@RocketMQMessageListener(topic = "powernode", consumerGroup = "powernode-group",messageModel = MessageModel.CLUSTERING)

public class SimpleMsgListener implements RocketMQListener<String> {


????/**

?????* 消費(fèi)消息的方法

?????*

?????* @param message

?????*/

????@Override

????public void onMessage(String message) {

????????System.out.println(message);

????}

}

21.2.4?啟動(dòng)rocketmq-consumer

查看控制臺(tái),發(fā)現(xiàn)我們已經(jīng)監(jiān)聽到消息了

動(dòng)力節(jié)點(diǎn)RocketMQ全套視頻教程-5小時(shí)學(xué)會(huì)rocketmq消息隊(duì)列的評(píng)論 (共 條)

分享到微博請(qǐng)遵守國(guó)家法律
长寿区| 海伦市| 竹山县| 景泰县| 平陆县| 新津县| 和顺县| 青海省| 五原县| 定襄县| 开鲁县| 新乡县| 林甸县| 永靖县| 阿荣旗| 莲花县| 区。| 武陟县| 辽源市| 乌什县| 民权县| 阳曲县| 乐山市| 克什克腾旗| 同德县| 尼勒克县| 定南县| 哈巴河县| 右玉县| 东源县| 隆化县| 宜州市| 沧州市| 武宁县| 上思县| 黄大仙区| 金寨县| 南宁市| 米林县| 松桃| 阳曲县|