Java 項目中使用 Resilience4j 實現(xiàn)客戶端 API 調(diào)用的限速/節(jié)流機制


在本系列的上一篇文章中,我們了解了 Resilience4j 以及如何使用其?[Retry 模塊](
https://icodewalker.com/blog/261/)。現(xiàn)在讓我們了解 RateLimiter - 它是什么,何時以及如何使用它,以及在實施速率限制(或者也稱為“節(jié)流”)時要注意什么。
代碼示例
本文附有GitHub 上的工作代碼示例。
什么是 Resilience4j?
請參閱上一篇文章中的描述,快速了解?[Resilience4j 的一般工作原理](
https://icodewalker.com/blog/261/#what-is-resilience4j)。
什么是限速?
我們可以從兩個角度來看待速率限制——作為服務(wù)提供者和作為服務(wù)消費者。
服務(wù)端限速
作為服務(wù)提供商,我們實施速率限制以保護我們的資源免受過載和拒絕服務(wù) (DoS) 攻擊。
為了滿足我們與所有消費者的服務(wù)水平協(xié)議 (SLA),我們希望確保一個導致流量激增的消費者不會影響我們對他人的服務(wù)質(zhì)量。
我們通過設(shè)置在給定時間單位內(nèi)允許消費者發(fā)出多少請求的限制來做到這一點。我們通過適當?shù)捻憫芙^任何超出限制的請求,例如 HTTP 狀態(tài) 429(請求過多)。這稱為服務(wù)器端速率限制。
速率限制以每秒請求數(shù) (rps)、每分鐘請求數(shù) (rpm) 或類似形式指定。某些服務(wù)在不同的持續(xù)時間(例如 50 rpm 且不超過 2500 rph)和一天中的不同時間(例如,白天 100 rps 和晚上 150 rps)有多個速率限制。該限制可能適用于單個用戶(由用戶 ID、IP 地址、API 訪問密鑰等標識)或多租戶應用程序中的租戶。
客戶端限速
作為服務(wù)的消費者,我們希望確保我們不會使服務(wù)提供者過載。此外,我們不想招致意外的成本——無論是金錢上的還是服務(wù)質(zhì)量方面的。
如果我們消費的服務(wù)是有彈性的,就會發(fā)生這種情況。服務(wù)提供商可能不會限制我們的請求,而是會因額外負載而向我們收取額外費用。有些甚至在短時間內(nèi)禁止行為不端的客戶。消費者為防止此類問題而實施的速率限制稱為客戶端速率限制。
何時使用 RateLimiter?
resilience4j-ratelimiter?用于客戶端速率限制。
服務(wù)器端速率限制需要諸如緩存和多個服務(wù)器實例之間的協(xié)調(diào)之類的東西,這是 resilience4j 不支持的。對于服務(wù)器端的速率限制,有 API 網(wǎng)關(guān)和 API 過濾器,例如?Kong API Gateway?和?Repose API Filter。Resilience4j 的 RateLimiter 模塊并不打算取代它們。
Resilience4j RateLimiter 概念
想要調(diào)用遠程服務(wù)的線程首先向 RateLimiter 請求許可。如果 RateLimiter 允許,則線程繼續(xù)。 否則,RateLimiter 會停放線程或?qū)⑵渲糜诘却隣顟B(tài)。
RateLimiter 定期創(chuàng)建新權(quán)限。當權(quán)限可用時,線程會收到通知,然后可以繼續(xù)。
一段時間內(nèi)允許的調(diào)用次數(shù)稱為?limitForPeriod。RateLimiter 刷新權(quán)限的頻率由?limitRefreshPeriod?指定。timeoutDuration?指定線程可以等待多長時間獲取權(quán)限。如果在等待時間結(jié)束時沒有可用的權(quán)限,RateLimiter 將拋出?RequestNotPermitted?運行時異常。
使用Resilience4j RateLimiter 模塊
RateLimiterRegistry、RateLimiterConfig?和?RateLimiter?是?resilience4j-ratelimiter 的主要抽象。
RateLimiterRegistry?是一個用于創(chuàng)建和管理?RateLimiter?對象的工廠。
RateLimiterConfig?封裝了?limitForPeriod、limitRefreshPeriod?和?timeoutDuration?配置。每個?RateLimiter?對象都與一個?RateLimiterConfig?相關(guān)聯(lián)。
RateLimiter?提供輔助方法來為包含遠程調(diào)用的函數(shù)式接口或 lambda 表達式創(chuàng)建裝飾器。
讓我們看看如何使用 RateLimiter 模塊中可用的各種功能。假設(shè)我們正在為一家航空公司建立一個網(wǎng)站,以允許其客戶搜索和預訂航班。我們的服務(wù)與?FlightSearchService?類封裝的遠程服務(wù)對話。
基本示例
第一步是創(chuàng)建一個?RateLimiterConfig:
RateLimiterConfig config = RateLimiterConfig.ofDefaults();
這將創(chuàng)建一個?RateLimiterConfig,其默認值為?limitForPeriod?(50)、limitRefreshPeriod(500ns) 和?timeoutDuration?(5s)。
假設(shè)我們與航空公司服務(wù)的合同規(guī)定我們可以以 1 rps 調(diào)用他們的搜索 API。然后我們將像這樣創(chuàng)建?RateLimiterConfig:
RateLimiterConfig config = RateLimiterConfig.custom()
?.limitForPeriod(1)
?.limitRefreshPeriod(Duration.ofSeconds(1))
?.timeoutDuration(Duration.ofSeconds(1))
?.build();
如果線程無法在指定的 1 秒?timeoutDuration?內(nèi)獲取權(quán)限,則會出錯。
然后我們創(chuàng)建一個?RateLimiter?并裝飾?searchFlights()?調(diào)用:
RateLimiterRegistry registry = RateLimiterRegistry.of(config);
RateLimiter limiter = registry.rateLimiter("flightSearchService");// FlightSearchService and SearchRequest creation omitted
Supplier<List<Flight>> flightsSupplier =
?RateLimiter.decorateSupplier(limiter, ? ?() -> service.searchFlights(request));
最后,我們多次使用裝飾過的?Supplier<List<Flight>>?:
for (int i=0; i<3; i++) {
?System.out.println(flightsSupplier.get());
}
示例輸出中的時間戳顯示每秒發(fā)出一個請求:
Searching for flights; current time = 15:29:40 786...
[Flight{flightNumber='XY 765', ... }, ... ]
Searching for flights; current time = 15:29:41 791...
[Flight{flightNumber='XY 765', ... }, ... ]
如果超出限制,我們會收到?RequestNotPermitted?異常:
Exception in thread "main" io.github.resilience4j.ratelimiter.RequestNotPermitted: RateLimiter 'flightSearchService' does not permit further calls at io.github.resilience4j.ratelimiter.RequestNotPermitted.createRequestNotPermitted(RequestNotPermitted.java:43)
at io.github.resilience4j.ratelimiter.RateLimiter.waitForPermission(RateLimiter.java:580)
... other lines omitted ...
裝飾方法拋出已檢異常
假設(shè)我們正在調(diào)用
FlightSearchService.searchFlightsThrowingException()?,它可以拋出一個已檢?Exception。那么我們就不能使用
RateLimiter.decorateSupplier()。我們將使用
RateLimiter.decorateCheckedSupplier()?代替:
CheckedFunction0<List<Flight>> flights =
?RateLimiter.decorateCheckedSupplier(limiter, ? ?() -> service.searchFlightsThrowingException(request));try {
?System.out.println(flights.apply());
} catch (...) { ?// exception handling
}
RateLimiter.decorateCheckedSupplier()?返回一個?CheckedFunction0,它表示一個沒有參數(shù)的函數(shù)。請注意對?CheckedFunction0?對象的?apply()?調(diào)用以調(diào)用遠程操作。
如果我們不想使用?Suppliers ,RateLimiter?提供了更多的輔助裝飾器方法,如?decorateFunction()、decorateCheckedFunction()、decorateRunnable()、decorateCallable()?等,以與其他語言結(jié)構(gòu)一起使用。decorateChecked*?方法用于裝飾拋出已檢查異常的方法。
應用多個速率限制
假設(shè)航空公司的航班搜索有多個速率限制:2 rps?和?40 rpm。 我們可以通過創(chuàng)建多個?RateLimiters 在客戶端應用多個限制:
RateLimiterConfig rpsConfig = RateLimiterConfig.custom().
?limitForPeriod(2).
?limitRefreshPeriod(Duration.ofSeconds(1)).
?timeoutDuration(Duration.ofMillis(2000)).build();RateLimiterConfig rpmConfig = RateLimiterConfig.custom().
?limitForPeriod(40).
?limitRefreshPeriod(Duration.ofMinutes(1)).
?timeoutDuration(Duration.ofMillis(2000)).build();RateLimiterRegistry registry = RateLimiterRegistry.of(rpsConfig);RateLimiter rpsLimiter =
?registry.rateLimiter("flightSearchService_rps", rpsConfig);RateLimiter rpmLimiter =
?registry.rateLimiter("flightSearchService_rpm", rpmConfig); ?
然后我們使用兩個?RateLimiters?裝飾?searchFlights()?方法:
Supplier<List<Flight>> rpsLimitedSupplier =
?RateLimiter.decorateSupplier(rpsLimiter, ? ?() -> service.searchFlights(request));
Supplier<List<Flight>> flightsSupplier
?= RateLimiter.decorateSupplier(rpmLimiter, rpsLimitedSupplier);
示例輸出顯示每秒發(fā)出 2 個請求,并且限制為 40 個請求:
Searching for flights; current time = 15:13:21 246...
Searching for flights; current time = 15:13:21 249...
Searching for flights; current time = 15:13:22 212...
Searching for flights; current time = 15:13:40 215...
Exception in thread "main" io.github.resilience4j.ratelimiter.RequestNotPermitted:
RateLimiter 'flightSearchService_rpm' does not permit further calls
at io.github.resilience4j.ratelimiter.RequestNotPermitted.createRequestNotPermitted(RequestNotPermitted.java:43)
at io.github.resilience4j.ratelimiter.RateLimiter.waitForPermission(RateLimiter.java:580)
在運行時更改限制
如果需要,我們可以在運行時更改?limitForPeriod?和?timeoutDuration?的值:
limiter.changeLimitForPeriod(2);limiter.changeTimeoutDuration(Duration.ofSeconds(2));
例如,如果我們的速率限制根據(jù)一天中的時間而變化,則此功能很有用 - 我們可以有一個計劃線程來更改這些值。新值不會影響當前正在等待權(quán)限的線程。
RateLimiter和 Retry一起使用
假設(shè)我們想在收到?RequestNotPermitted?異常時重試,因為它是一個暫時性錯誤。我們會像往常一樣創(chuàng)建?RateLimiter?和?Retry?對象。然后我們裝飾一個?Supplier?的供應商并用?Retry?包裝它:
Supplier<List<Flight>> rateLimitedFlightsSupplier =
?RateLimiter.decorateSupplier(rateLimiter, ? ?() -> service.searchFlights(request));
Supplier<List<Flight>> retryingFlightsSupplier =
?Retry.decorateSupplier(retry, rateLimitedFlightsSupplier);
示例輸出顯示為?RequestNotPermitted?異常重試請求:
Searching for flights; current time = 15:29:39 847Flight search successful
[Flight{flightNumber='XY 765', ... }, ... ]
Searching for flights; current time = 17:10:09 218...
[Flight{flightNumber='XY 765', flightDate='07/31/2020', from='NYC', to='LAX'}, ...]2020-07-27T17:10:09.484: Retry 'rateLimitedFlightSearch', waiting PT1S until attempt '1'. Last attempt failed with exception 'io.github.resilience4j.ratelimiter.RequestNotPermitted: RateLimiter 'flightSearchService' does not permit further calls'.
Searching for flights; current time = 17:10:10 492...2020-07-27T17:10:10.494: Retry 'rateLimitedFlightSearch' recorded a successful retry attempt...
[Flight{flightNumber='XY 765', flightDate='07/31/2020', from='NYC', to='LAX'}, ...]
我們創(chuàng)建裝飾器的順序很重要。如果我們將?Retry?與?RateLimiter?包裝在一起,它將不起作用。
RateLimiter 事件
RateLimiter?有一個?EventPublisher,它在調(diào)用遠程操作時生成?RateLimiterOnSuccessEvent?和?RateLimiterOnFailureEvent?類型的事件,以指示獲取權(quán)限是否成功。我們可以監(jiān)聽這些事件并記錄它們,例如:
RateLimiter limiter = registry.rateLimiter("flightSearchService");
limiter.getEventPublisher().onSuccess(e -> System.out.println(e.toString()));
limiter.getEventPublisher().onFailure(e -> System.out.println(e.toString()));
日志輸出示例如下:
RateLimiterEvent{type=SUCCESSFUL_ACQUIRE, rateLimiterName='flightSearchService', creationTime=2020-07-21T19:14:33.127+05:30}
... other lines omitted ...
RateLimiterEvent{type=FAILED_ACQUIRE, rateLimiterName='flightSearchService', creationTime=2020-07-21T19:14:33.186+05:30}
RateLimiter 指標
假設(shè)在實施客戶端節(jié)流后,我們發(fā)現(xiàn) API 的響應時間增加了。這是可能的 - 正如我們所見,如果在線程調(diào)用遠程操作時權(quán)限不可用,RateLimiter?會將線程置于等待狀態(tài)。
如果我們的請求處理線程經(jīng)常等待獲得許可,則可能意味著我們的?limitForPeriod?太低。也許我們需要與我們的服務(wù)提供商合作并首先獲得額外的配額。
監(jiān)控?RateLimiter?指標可幫助我們識別此類容量問題,并確保我們在?RateLimiterConfig?上設(shè)置的值運行良好。
RateLimiter?跟蹤兩個指標:可用權(quán)限的數(shù)量(
resilience4j.ratelimiter.available.permissions)和等待權(quán)限的線程數(shù)量(
resilience4j.ratelimiter.waiting.threads)。
首先,我們像往常一樣創(chuàng)建?RateLimiterConfig、RateLimiterRegistry?和?RateLimiter。然后,我們創(chuàng)建一個?MeterRegistry?并將?RateLimiterRegistry?綁定到它:
MeterRegistry meterRegistry = new SimpleMeterRegistry();
TaggedRateLimiterMetrics.ofRateLimiterRegistry(registry)
?.bindTo(meterRegistry);
運行幾次限速操作后,我們顯示捕獲的指標:
Consumer<Meter> meterConsumer = meter -> { ?String desc = meter.getId().getDescription(); ?String metricName = meter.getId().getName();
?Double metricValue = StreamSupport.stream(meter.measure().spliterator(), false)
? ?.filter(m -> m.getStatistic().name().equals("VALUE"))
? ?.findFirst()
? ?.map(m -> m.getValue())
? ?.orElse(0.0);
?System.out.println(desc + " - " + metricName + ": " + metricValue);};meterRegistry.forEachMeter(meterConsumer);
這是一些示例輸出:
The number of available permissions - resilience4j.ratelimiter.available.permissions: -6.0The number of waiting threads - resilience4j.ratelimiter.waiting_threads: 7.0
resilience4j.ratelimiter.available.permissions?的負值顯示為請求線程保留的權(quán)限數(shù)。在實際應用中,我們會定期將數(shù)據(jù)導出到監(jiān)控系統(tǒng),并在儀表板上進行分析。
實施客戶端速率限制時的陷阱和良好實踐
使速率限制器成為單例
對給定遠程服務(wù)的所有調(diào)用都應通過相同的?RateLimiter?實例。對于給定的遠程服務(wù),RateLimiter?必須是單例。
如果我們不強制執(zhí)行此操作,我們代碼庫的某些區(qū)域可能會繞過?RateLimiter?直接調(diào)用遠程服務(wù)。為了防止這種情況,對遠程服務(wù)的實際調(diào)用應該在核心、內(nèi)部層和其他區(qū)域應該使用內(nèi)部層暴露的限速裝飾器。
我們?nèi)绾未_保未來的新開發(fā)人員理解這一意圖?查看 Tom 的文章,其中揭示了一種解決此類問題的方法,即通過組織包結(jié)構(gòu)來明確此類意圖。此外,它還展示了如何通過在 ArchUnit 測試中編碼意圖來強制執(zhí)行此操作。
為多個服務(wù)器實例配置速率限制器
為配置找出正確的值可能很棘手。如果我們在集群中運行多個服務(wù)實例,limitForPeriod?的值必須考慮到這一點。
例如,如果上游服務(wù)的速率限制為 100 rps,而我們的服務(wù)有 4 個實例,那么我們將配置 25 rps 作為每個實例的限制。
然而,這假設(shè)我們每個實例上的負載大致相同。 如果情況并非如此,或者如果我們的服務(wù)本身具有彈性并且實例數(shù)量可能會有所不同,那么 Resilience4j 的?RateLimiter?可能不適合。
在這種情況下,我們需要一個速率限制器,將其數(shù)據(jù)保存在分布式緩存中,而不是像 Resilience4j?RateLimiter?那樣保存在內(nèi)存中。但這會影響我們服務(wù)的響應時間。另一種選擇是實現(xiàn)某種自適應速率限制。盡管 Resilience4j?可能會支持它,但尚不清楚何時可用。
選擇正確的超時時間
對于?timeoutDuration?配置值,我們應該牢記 API 的預期響應時間。
如果我們將?timeoutDuration?設(shè)置得太高,響應時間和吞吐量就會受到影響。如果它太低,我們的錯誤率可能會增加。
由于此處可能涉及一些反復試驗,因此一個好的做法是將我們在?RateLimiterConfig?中使用的值(如?timeoutDuration、limitForPeriod?和?limitRefreshPeriod)作為我們服務(wù)之外的配置進行維護。然后我們可以在不更改代碼的情況下更改它們。
調(diào)優(yōu)客戶端和服務(wù)器端速率限制器
實現(xiàn)客戶端速率限制并不能保證我們永遠不會受到上游服務(wù)的速率限制。
假設(shè)我們有來自上游服務(wù)的 2 rps 的限制,并且我們將?limitForPeriod?配置為 2,將?limitRefreshPeriod?配置為 1s。如果我們在第二秒的最后幾毫秒發(fā)出兩個請求,在此之前沒有其他調(diào)用,RateLimiter 將允許它們。如果我們在下一秒的前幾毫秒內(nèi)再進行兩次調(diào)用,RateLimiter?也會允許它們,因為有兩個新權(quán)限可用。但是上游服務(wù)可能會拒絕這兩個請求,因為服務(wù)器通常會實現(xiàn)基于滑動窗口的速率限制。
為了保證我們永遠不會從上游服務(wù)中獲得超過速率,我們需要將客戶端中的固定窗口配置為短于服務(wù)中的滑動窗口。因此,如果我們在前面的示例中將?limitForPeriod?配置為 1 并將?limitRefreshPeriod?配置為 500ms,我們就不會出現(xiàn)超出速率限制的錯誤。但是,第一個請求之后的所有三個請求都會等待,從而增加響應時間并降低吞吐量。
結(jié)論
在本文中,我們學習了如何使用 Resilience4j 的 RateLimiter 模塊來實現(xiàn)客戶端速率限制。 我們通過實際示例研究了配置它的不同方法。我們學習了一些在實施速率限制時要記住的良好做法和注意事項。
您可以使用 [GitHub 上](
https://github.com/thombergs/code-examples/tree/master/resilience4j/ratelimiter)的代碼演示一個完整的應用程序來說明這些想法。
本文譯自: [Implementing Rate Limiting with Resilience4j - Reflectoring](
https://reflectoring.io/rate-limiting-with-resilience4j/)