Java 項(xiàng)目中使用 Resilience4j 框架實(shí)現(xiàn)異步超時(shí)處理


到目前為止,在本系列中,我們已經(jīng)了解了 Resilience4j 及其?[Retry](
https://icodewalker.com/blog/261/)?和?[RateLimiter](
https://icodewalker.com/blog/288/)?模塊。在本文中,我們將通過(guò) TimeLimiter 繼續(xù)探索 Resilience4j。我們將了解它解決了什么問(wèn)題,何時(shí)以及如何使用它,并查看一些示例。
代碼示例
本文附有?[GitHub 上](
https://github.com/thombergs/code-examples/tree/master/resilience4j/timelimiter)的工作代碼示例。
什么是 Resilience4j?
請(qǐng)參閱上一篇文章中的描述,快速了解?[Resilience4j 的一般工作原理](
https://icodewalker.com/blog/261/#what-is-resilience4j)。
什么是限時(shí)?
對(duì)我們?cè)敢獾却僮魍瓿傻臅r(shí)間設(shè)置限制稱為時(shí)間限制。如果操作沒(méi)有在我們指定的時(shí)間內(nèi)完成,我們希望通過(guò)超時(shí)錯(cuò)誤收到通知。
有時(shí),這也稱為“設(shè)定最后期限”。
我們這樣做的一個(gè)主要原因是確保我們不會(huì)讓用戶或客戶無(wú)限期地等待。不提供任何反饋的緩慢服務(wù)可能會(huì)讓用戶感到沮喪。
我們對(duì)操作設(shè)置時(shí)間限制的另一個(gè)原因是確保我們不會(huì)無(wú)限期地占用服務(wù)器資源。我們?cè)谑褂?Spring 的?@Transactional?注解時(shí)指定的?timeout?值就是一個(gè)例子——在這種情況下,我們不想長(zhǎng)時(shí)間占用數(shù)據(jù)庫(kù)資源。
什么時(shí)候使用 Resilience4j TimeLimiter?
Resilience4j 的?TimeLimiter?可用于設(shè)置使用?CompleteableFutures 實(shí)現(xiàn)的異步操作的時(shí)間限制(超時(shí))。
Java 8 中引入的?CompletableFuture?類使異步、非阻塞編程變得更容易??梢栽诓煌木€程上執(zhí)行慢速方法,釋放當(dāng)前線程來(lái)處理其他任務(wù)。 我們可以提供一個(gè)當(dāng)?slowMethod()?返回時(shí)執(zhí)行的回調(diào):
int slowMethod() { ?// time-consuming computation or remote operationreturn 42;
}
CompletableFuture.supplyAsync(this::slowMethod)
.thenAccept(System.out::println);
這里的?slowMethod()?可以是一些計(jì)算或遠(yuǎn)程操作。通常,我們希望在進(jìn)行這樣的異步調(diào)用時(shí)設(shè)置時(shí)間限制。我們不想無(wú)限期地等待?slowMethod()?返回。例如,如果slowMethod()花費(fèi)的時(shí)間超過(guò)一秒,我們可能想要返回先前計(jì)算的、緩存的值,甚至可能會(huì)出錯(cuò)。
在 Java 8 的?CompletableFuture?中,沒(méi)有簡(jiǎn)單的方法來(lái)設(shè)置異步操作的時(shí)間限制。CompletableFuture?實(shí)現(xiàn)了?Future?接口,F(xiàn)uture?有一個(gè)重載的?get()?方法來(lái)指定我們可以等待多長(zhǎng)時(shí)間:
CompletableFuture<Integer> completableFuture = CompletableFuture
?.supplyAsync(this::slowMethod);
Integer result = completableFuture.get(3000, TimeUnit.MILLISECONDS);
System.out.println(result);
但是這里有一個(gè)問(wèn)題——?get()?方法是一個(gè)阻塞調(diào)用。所以它首先違背了使用?CompletableFuture?的目的,即釋放當(dāng)前線程。
這是 Resilience4j 的?TimeLimiter?解決的問(wèn)題——它讓我們?cè)诋惒讲僮魃显O(shè)置時(shí)間限制,同時(shí)保留在 Java 8 中使用?CompletableFuture?時(shí)非阻塞的好處。
CompletableFuture?的這種限制已在 Java 9 中得到解決。我們可以在 Java 9 及更高版本中使用 CompletableFuture 上的?orTimeout()?或?completeOnTimeout()?等方法直接設(shè)置時(shí)間限制。然而,憑借 Resilience4J 的?指標(biāo)?和?事件,與普通的 Java 9 解決方案相比,它仍然提供了附加值。
Resilience4j TimeLimiter 概念
TimeLimiter?支持?Future?和?CompletableFuture。但是將它與 Future 一起使用相當(dāng)于?Future.get(long timeout, TimeUnit unit)。因此,我們將在本文的其余部分關(guān)注?CompletableFuture。
與其他 Resilience4j 模塊一樣,TimeLimiter?的工作方式是使用所需的功能裝飾我們的代碼 - 如果在這種情況下操作未在指定的?timeoutDuration?內(nèi)完成,則返回?TimeoutException。
我們?yōu)?TimeLimiter?提供?timeoutDuration、ScheduledExecutorService?和異步操作本身,表示為?CompletionStage?的?Supplier。它返回一個(gè)?CompletionStage?的裝飾?Supplier。
在內(nèi)部,它使用調(diào)度器來(lái)調(diào)度一個(gè)超時(shí)任務(wù)——通過(guò)拋出一個(gè)?TimeoutException?來(lái)完成?CompletableFuture?的任務(wù)。如果操作先完成,TimeLimiter?取消內(nèi)部超時(shí)任務(wù)。
除了?timeoutDuration?之外,還有另一個(gè)與?TimeLimiter?關(guān)聯(lián)的配置?cancelRunningFuture。此配置僅?適用于?Future?而不適用于?CompletableFuture。當(dāng)超時(shí)發(fā)生時(shí),它會(huì)在拋出?TimeoutException?之前取消正在運(yùn)行的?Future。
使用 Resilience4j TimeLimiter 模塊
TimeLimiterRegistry、TimeLimiterConfig?和?TimeLimiter?是?resilience4j-timelimiter?的主要抽象。
TimeLimiterRegistry?是用于創(chuàng)建和管理?TimeLimiter?對(duì)象的工廠。
TimeLimiterConfig?封裝了?timeoutDuration?和?cancelRunningFuture?配置。每個(gè)?TimeLimiter?對(duì)象都與一個(gè)?TimeLimiterConfig?相關(guān)聯(lián)。
TimeLimiter?提供輔助方法來(lái)為?Future?和?CompletableFuture?Suppliers?創(chuàng)建或執(zhí)行裝飾器。
讓我們看看如何使用 TimeLimiter 模塊中可用的各種功能。我們將使用與本系列前幾篇文章相同的示例。假設(shè)我們正在為一家航空公司建立一個(gè)網(wǎng)站,以允許其客戶搜索和預(yù)訂航班。我們的服務(wù)與?FlightSearchService?類封裝的遠(yuǎn)程服務(wù)對(duì)話。
第一步是創(chuàng)建一個(gè)?TimeLimiterConfig:
TimeLimiterConfig config = TimeLimiterConfig.ofDefaults();
這將創(chuàng)建一個(gè)?TimeLimiterConfig,其默認(rèn)值為?timeoutDuration?(1000ms) 和?cancelRunningFuture?(true)。
假設(shè)我們想將超時(shí)值設(shè)置為 2s 而不是默認(rèn)值:
TimeLimiterConfig config = TimeLimiterConfig.custom()
?.timeoutDuration(Duration.ofSeconds(2))
?.build();
然后我們創(chuàng)建一個(gè)?TimeLimiter:
TimeLimiterRegistry registry = TimeLimiterRegistry.of(config);TimeLimiter limiter = registry.timeLimiter("flightSearch");
我們想要異步調(diào)用
FlightSearchService.searchFlights(),它返回一個(gè)?List<Flight>。讓我們將其表示為?Supplier<CompletionStage<List<Flight>>>:
Supplier<List<Flight>> flightSupplier = () -> service.searchFlights(request);
Supplier<CompletionStage<List<Flight>>> origCompletionStageSupplier =() -> CompletableFuture.supplyAsync(flightSupplier);
然后我們可以使用?TimeLimiter?裝飾?Supplier:
ScheduledExecutorService scheduler =
?Executors.newSingleThreadScheduledExecutor();
Supplier<CompletionStage<List<Flight>>> decoratedCompletionStageSupplier = ?
?limiter.decorateCompletionStage(scheduler, origCompletionStageSupplier);
最后,讓我們調(diào)用裝飾的異步操作:
decoratedCompletionStageSupplier.get().whenComplete((result, ex) -> { ?if (ex != null) {
? ?System.out.println(ex.getMessage());
?} ?if (result != null) {
? ?System.out.println(result);
?}
});
以下是成功飛行搜索的示例輸出,其耗時(shí)少于我們指定的 2 秒?timeoutDuration:
Searching for flights; current time = 19:25:09 783; current thread = ForkJoinPool.commonPool-worker-3Flight search successful
[Flight{flightNumber='XY 765', flightDate='08/30/2020', from='NYC', to='LAX'}, Flight{flightNumber='XY 746', flightDate='08/30/2020', from='NYC', to='LAX'}] on thread ForkJoinPool.commonPool-worker-3
這是超時(shí)的航班搜索的示例輸出:
Exception java.util.concurrent.TimeoutException: TimeLimiter 'flightSearch' recorded a timeout exception on thread pool-1-thread-1 at 19:38:16 963Searching for flights; current time = 19:38:18 448; current thread = ForkJoinPool.commonPool-worker-3Flight search successful at 19:38:18 461
上面的時(shí)間戳和線程名稱表明,即使異步操作稍后在另一個(gè)線程上完成,調(diào)用線程也會(huì)收到?TimeoutException。
如果我們想創(chuàng)建一個(gè)裝飾器并在代碼庫(kù)的不同位置重用它,我們將使用decorateCompletionStage()。如果我們想創(chuàng)建它并立即執(zhí)行?Supplier<CompletionStage>,我們可以使用?executeCompletionStage()?實(shí)例方法代替:
CompletionStage<List<Flight>> decoratedCompletionStage = ?
?limiter.executeCompletionStage(scheduler, origCompletionStageSupplier);
TimeLimiter 事件
TimeLimiter?有一個(gè)?EventPublisher,它生成?TimeLimiterOnSuccessEvent、TimeLimiterOnErrorEvent?和?TimeLimiterOnTimeoutEvent?類型的事件。我們可以監(jiān)聽(tīng)這些事件并記錄它們,例如:
TimeLimiter limiter = registry.timeLimiter("flightSearch");
limiter.getEventPublisher().onSuccess(e -> System.out.println(e.toString()));
limiter.getEventPublisher().onError(e -> System.out.println(e.toString()));
limiter.getEventPublisher().onTimeout(e -> System.out.println(e.toString()));
示例輸出顯示了記錄的內(nèi)容:
2020-08-07T11:31:48.181944: TimeLimiter 'flightSearch' recorded a successful call.... other lines omitted ...2020-08-07T11:31:48.582263: TimeLimiter 'flightSearch' recorded a timeout exception.
TimeLimiter 指標(biāo)
TimeLimiter?跟蹤成功、失敗和超時(shí)的調(diào)用次數(shù)。
首先,我們像往常一樣創(chuàng)建?TimeLimiterConfig、TimeLimiterRegistry?和?TimeLimiter。然后,我們創(chuàng)建一個(gè)?MeterRegistry?并將?TimeLimiterRegistry?綁定到它:
MeterRegistry meterRegistry = new SimpleMeterRegistry();
TaggedTimeLimiterMetrics.ofTimeLimiterRegistry(registry)
?.bindTo(meterRegistry);
運(yùn)行幾次限時(shí)操作后,我們顯示捕獲的指標(biāo):
Consumer<Meter> meterConsumer = meter -> { ?String desc = meter.getId().getDescription(); ?String metricName = meter.getId().getName(); ?String metricKind = meter.getId().getTag("kind");
?Double metricValue =
? ?StreamSupport.stream(meter.measure().spliterator(), false)
? ?.filter(m -> m.getStatistic().name().equals("COUNT"))
? ?.findFirst()
? ?.map(Measurement::getValue)
? ?.orElse(0.0);
?System.out.println(desc + " - " +
? ? ? ? ? ? ? ? ? ? metricName + ? ? ? ? ? ? ? ? ? ? "(" + metricKind + ")" + ? ? ? ? ? ? ? ? ? ? ": " + metricValue);
};
meterRegistry.forEachMeter(meterConsumer);
這是一些示例輸出:
The number of timed out calls - resilience4j.timelimiter.calls(timeout): 6.0The number of successful calls - resilience4j.timelimiter.calls(successful): 4.0The number of failed calls - resilience4j.timelimiter.calls(failed): 0.0
在實(shí)際應(yīng)用中,我們會(huì)定期將數(shù)據(jù)導(dǎo)出到監(jiān)控系統(tǒng)并在儀表板上進(jìn)行分析。
實(shí)施時(shí)間限制時(shí)的陷阱和良好實(shí)踐
通常,我們處理兩種操作 - 查詢(或讀?。┖兔睿ɑ?qū)懭耄?。?duì)查詢進(jìn)行時(shí)間限制是安全的,因?yàn)槲覀冎浪鼈儾粫?huì)改變系統(tǒng)的狀態(tài)。我們看到的?searchFlights()?操作是查詢操作的一個(gè)例子。
命令通常會(huì)改變系統(tǒng)的狀態(tài)。bookFlights()操作將是命令的一個(gè)示例。在對(duì)命令進(jìn)行時(shí)間限制時(shí),我們必須記住,當(dāng)我們超時(shí)時(shí),該命令很可能仍在運(yùn)行。例如,bookFlights()?調(diào)用上的?TimeoutException并不一定意味著命令失敗。
在這種情況下,我們需要管理用戶體驗(yàn)——也許在超時(shí)時(shí),我們可以通知用戶操作花費(fèi)的時(shí)間比我們預(yù)期的要長(zhǎng)。然后我們可以查詢上游以檢查操作的狀態(tài)并稍后通知用戶。
結(jié)論
在本文中,我們學(xué)習(xí)了如何使用 Resilience4j 的 TimeLimiter 模塊為異步、非阻塞操作設(shè)置時(shí)間限制。我們通過(guò)一些實(shí)際示例了解了何時(shí)使用它以及如何配置它。
您可以使用?[GitHub 上](
https://github.com/thombergs/code-examples/tree/master/resilience4j/timelimiter)的代碼演示一個(gè)完整的應(yīng)用程序來(lái)說(shuō)明這些想法。
本文譯自:
https://reflectoring.io/time-limiting-with-resilience4j/