CountDownLatch翻車后,大家都建議我用CompletableFuture改造下,改造完感覺真香??!
前言
大家好,前段時間使用了CountDownLatch來做并發(fā)流程的控制,在生產(chǎn)上碰到了一些問題,最終問題是解決了,但是那篇文章的評論大家讓我用CompletableFuture來試一試,改造完之后,發(fā)現(xiàn)CompletableFuture這東西真強(qiáng)大,有種相見恨晚的感覺。
上篇文章
# 以為很熟悉CountDownLatch的使用了,沒想到在生產(chǎn)環(huán)境翻車了
可以來這篇文章看一下具體的業(yè)務(wù)場景
CompletableFuture改造
我先直接分享一下我是如何使用CompletableFuture的吧
java復(fù)制代碼// 下載文件總數(shù),初始化 List<Integer> resultList = new ArrayList<>(1000); ConcurrentHashMap<String, Integer> concurrentHashMap = new ConcurrentHashMap<>(); IntStream.range(0,1000).forEach(resultList::add);
java復(fù)制代碼public List<R> sendAsyncBatch(List<P> list, Executor executor, TaskLoader<R,P> loader) { ? ? ?List<R> resultList = Collections.synchronizedList(Lists.newArrayList()); ?? ?if (CollectionUtils.isNotEmpty(list)) { ?? ? ? ?Executor finalExecutor = executor; ?? ? ? ?// 將任務(wù)拆分分成每50個為一個任務(wù) ?? ? ? ?CollUtil.split(list, 50) ?? ? ? ? ? ? ? ?.forEach(tempList -> { ?? ? ? ? ? ? ? ? ? ?CompletableFuture[] completableFutures = tempList.stream() ?? ? ? ? ? ? ? ? ? ? ? ? ? ?.map(p -> CompletableFuture.supplyAsync(() -> { ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?try { ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?return loader.load(p); ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?} catch (InterruptedException e) { ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?e.printStackTrace(); ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?} ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?return null; ?? ? ? ? ? ? ? ? ? ? ? ? ? ?}, finalExecutor) ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?.handle((result, throwable) -> { ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?if (Objects.nonNull(throwable)) { ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?//log.error("async error:{}", throwable.getMessage()); ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?} else if (Objects.nonNull(result)) { ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?//log.info("async success:{}", result); ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?} else { ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?//log.error("async result is null"); ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?} ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?return result; ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?}).whenComplete((r, ex) -> { ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?if (Objects.nonNull(r)) { ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?resultList.add((R) r); ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?} ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?}) ?? ? ? ? ? ? ? ? ? ? ? ? ? ?).toArray(CompletableFuture[]::new); ?? ? ? ? ? ? ? ? ? ?CompletableFuture.allOf(completableFutures).join(); ?? ? ? ? ? ? ? ? ? ?System.out.println(resultList.size()); ?? ? ? ? ? ? ? ?}); ?? ?} ?? ?return resultList; }
java復(fù)制代碼// 具體業(yè)務(wù)邏輯實現(xiàn)接口 @FunctionalInterface public interface TaskLoader<T,P> { ? ? ?T load(P p) throws InterruptedException; }
ini復(fù)制代碼// ?自定義啟動器 ExecutorService executorService = BaseThreadPoolExector.queueExecutor(new ArrayBlockingQueue<>(500)); AsyncTask<Integer, Integer> asyncTask = new AsyncTask(); // 返回結(jié)果 List<Integer> list = asyncTask.sendAsyncBatch(resultList, executorService, new TaskLoadImpl()); // 返回結(jié)果處理
我先說一下,為什么要CountDownLatch替換掉
CompletableFuture為我們提供更直觀、更優(yōu)美的API。
在“多個任務(wù)等待完成狀態(tài)”這個應(yīng)用場景,在遇到異常的情況下我們不需要去手動的拋異常,以免錯誤處理細(xì)節(jié)導(dǎo)致阻塞
CompletableFuture也可以定制執(zhí)行器
但是他也是有缺點的,我個人感覺他的API有點多,看的時候讓人眼花。
短短十幾行的代碼,看到了很多API supplyAsync、handle、whenComplete、allOf
之后我們還會用到runAsync、 thenApply、thenCompose等等其他的。
什么是CompletableFuture?
異步編程,利用多線程優(yōu)化性能這個核心方案得以實施的基礎(chǔ)
他的目的也很簡單,同一個CPU上執(zhí)行幾個松耦合的任務(wù),充分利用CPU核數(shù),實現(xiàn)最大化吞吐量,避免因為阻塞造成等待時間過長;
1. 要區(qū)分并發(fā)與并行的區(qū)別
我們還需要特別的注意這兩個概念不能混淆
并發(fā):在一個CPU上串行執(zhí)行
并行:多個CPU上同時執(zhí)行任務(wù)
2. Future接口
CompletableFuture主要繼承了Future接口,但是他比Future接口豐富的很多
java復(fù)制代碼// 取消 boolean cancel(boolean mayInterruptIfRunning); ?// 判斷是否取消 boolean isCancelled(); ?//是否異步計算是否已經(jīng)結(jié)束 boolean isDone(); ?// 獲取計算結(jié)果 V get() throws InterruptedException, ExecutionException; ?// 設(shè)置最長計算時間,返回計算結(jié)果 V get(long timeout, TimeUnit unit) ?? ? ? ?throws InterruptedException, ExecutionException, TimeoutException;
可以看到Future接口的局限性,主要是用起來不省事 舉個例子:A線程執(zhí)行完之后通知B線程執(zhí)行
java復(fù)制代碼ExecutorService executorService = BaseThreadPoolExector.calculateExecutor(); Future<String> futureA = executorService.submit(() -> Thread.currentThread().getName()); System.out.println(futureA.get()); if (futureA.isDone()){ ?? ?Future<String> futureB = executorService.submit(() -> Thread.currentThread().getName()); ?? ?System.out.println(futureB.get()); } executorService.shutdown();
這里我們就需要查詢futureA.isDone()結(jié)果,然后再去執(zhí)行B線程的業(yè)務(wù)
而 CompletableFuture 操作起來就便捷很多了
java復(fù)制代碼CompletableFuture<String> completableFuture = CompletableFuture ?? ? ? ?.supplyAsync(() -> Thread.currentThread().getName(), executorService) ?? ? ? ?.thenApply(s -> Thread.currentThread().getName()); System.out.println(completableFuture.get()); ?準(zhǔn)備執(zhí)行 計劃執(zhí)行 supplyAsync result pool-1-thread-1, thenApply result main 線程退出
supplyAsync執(zhí)行完成之后,再去執(zhí)行thenApply
沒有繁瑣的手工維護(hù)線程的工作,給任務(wù)分配線程的工作也不需要我們關(guān)注;
3. 錯誤處理細(xì)節(jié),避免造成阻塞
java復(fù)制代碼CompletableFuture<Integer> completableFuture = new CompletableFuture<>(); new Thread(() ->{ ?? ?try { ?? ? ? ?completableFuture.complete(10/0); ?? ?}catch (Exception ex){ ?? ? ? ?//ex.printStackTrace(); ?? ? ? ?completableFuture.completeExceptionally(ex); ?? ?} ?}).start(); try { ?? ?System.out.println(completableFuture.get()); } catch (InterruptedException | ExecutionException e) { ?? ?e.printStackTrace(); }
注意到catch里面的completeExceptionally
函數(shù)了吧,
這個主要的作用就是為了拋出異常,
如果缺少了他,就會造成completableFuture.get()一直處于等待造成阻塞,
與此同時,沒有為我們拋出異常信息。
所以CompletableFuture
的API優(yōu)美之處又要體現(xiàn)出來了
ini復(fù)制代碼CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> { ?? ?int kk = 10 / 0; ?? ?return kk; }).handle((result, throwable) -> { ?? ?System.out.println(result); ?? ?System.out.println(throwable.getMessage()); ?? ?return result; }).whenComplete((result ,throwable) -> System.out.println(result));
supplyAsync配合著 handle 和 whenComplete,將異常和結(jié)果進(jìn)行處理.
handle 和 whenComplete的區(qū)別
typescript復(fù)制代碼whenComplete public CompletableFuture<T> whenComplete( ?? ?BiConsumer<? super T, ? super Throwable> action) { ?? ?return uniWhenCompleteStage(null, action); } handle public <U> CompletableFuture<U> handle( ?? ?BiFunction<? super T, Throwable, ? extends U> fn) { ?? ?return uniHandleStage(null, fn); }
whenComplete是BiConsumer也就是直接消費(fèi)不返回值,不對結(jié)果產(chǎn)生影響
如果單獨(dú)使用whenComplete的時候,沒有進(jìn)行拋出異常的處理會造成阻塞
java復(fù)制代碼CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> { ?? ? ? ? ? ?int kk = 10 / 0; ?? ? ? ? ? ?return kk; ?? ? ? ?}) ?? ? ? ? ? ? ? ?.whenComplete((r, ex) -> { ?? ? ? ? ? ? ? ? ? ?if (Objects.nonNull(ex)) { ?? ? ? ? ? ? ? ? ? ? ? ?System.out.println("whenComplete>>>" + ex.getMessage()); ?? ? ? ? ? ? ? ? ? ?} ?? ? ? ? ? ? ? ?}) ?? ? ? ? ? ? ? ?.exceptionally(throwable -> { ?? ? ? ? ? ? ? ? ? ?System.out.println("exceptionally>>>" + throwable.getMessage()); ?? ? ? ? ? ? ? ? ? ?return null; ?? ? ? ? ? ? ? ?});
handle是BiFunction也就是需要返回值,對結(jié)果產(chǎn)生影響
需要注意的是,在handle中對結(jié)果修改,要避免結(jié)果對象為空,如果沒有判斷直接進(jìn)行操作會出現(xiàn)空指針異常造成阻塞
在這里出現(xiàn)空指針異常,如果沒有exceptionally將異常拋出,則會造成阻塞
了解API
欲善其功,必先利其器
我們主要從這三種關(guān)系下手去了解和使用API 涉及接口
java復(fù)制代碼CompletionStage<R> thenApply(fn); ? ?CompletionStage<R> thenApplyAsync(fn); ? ? ? ? ? ?CompletionStage<Void> thenAccept(consumer); ? ? ? CompletionStage<Void> thenAcceptAsync(consumer); ?CompletionStage<Void> thenRun(action); ? ? ? ? ? ?CompletionStage<Void> thenRunAsync(action); ? ? ? CompletionStage<R> thenCompose(fn); ? ? ? ? ? ? ? CompletionStage<R> thenComposeAsync(fn);
thenApply函數(shù)里參數(shù)入?yún)?code>Function<? super T,? extends U> fn,這個接口里與 CompletionStage 相關(guān)的方法是?R apply(T t)
,這個方法既能接收參數(shù)也支持返回值,所以 thenApply函數(shù)出參的是CompletionStage<R>
。
thenAccept類型函數(shù)入?yún)?code>Consumer<? super T> action是一個消費(fèi)類型的,回參是CompletionStage<Void>
所以thenAccept類型函數(shù)不會有返回值。
thenRun函數(shù)入?yún)?code>Runnable action,回參CompletionStage<Void>
,所以既不能接收參數(shù)也不支持返回值。
thenCombine函數(shù)入?yún)?code>CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn,回參CompletableFuture<V>
是支持返回值的,他的作用主要使用BiFunction處理兩個階段的結(jié)果
我們只需要注意他的入?yún)?、回參和函?shù)后綴就能夠區(qū)分出他們的不同
1. CompletableFuture中的串行化關(guān)系
kotlin復(fù)制代碼CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() ->{ ?? ?//int kk = 10/0; ?? ?return Thread.currentThread().getName() + ":小郭"; },executorService).thenApply(s -> { ?? ?return s + "拿茶葉"; }).thenApply(a ->{ ?? ?return a + ",泡茶去"; }).handle((result, ex) ->{ ?? ?if (ex != null){ ?? ? ? ?System.out.println(ex.getMessage()); ?? ?} ?? ?return result; }).whenComplete((r, ex) ->{ ?? ?System.out.println(r); }); task1.join();
執(zhí)行結(jié)果:
java復(fù)制代碼準(zhǔn)備執(zhí)行 計劃執(zhí)行 pool-1-thread-1:小郭拿茶葉,泡茶去
可以看到,是按照之上而下的順序去執(zhí)行的supplyAsync、thenApply、thenApply 如果第二階段任務(wù)沒有拿到第一階段的結(jié)果,他就會等待
2. CompletableFuture中的匯聚AND關(guān)系
scss復(fù)制代碼CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() ->{ ?? ?int t = new Random().nextInt(30); ?? ?try { ?? ? ? ?Thread.sleep(10000); ?? ?} catch (InterruptedException e) { ?? ? ? ?e.printStackTrace(); ?? ?} ?? ?System.out.println("task1=" + t); ?? ?return t; }); CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() ->{ ?? ?int t = new Random().nextInt(30); ?? ?try { ?? ? ? ?Thread.sleep(t); ?? ?} catch (InterruptedException e) { ?? ? ? ?e.printStackTrace(); ?? ?} ?? ?System.out.println("task2=" + t); ?? ?return t; }); CompletableFuture<Integer> task3 = task1.thenCombineAsync(task2, Integer::sum); task3.join();
等待task1和task2執(zhí)行完成,task再進(jìn)行處理
執(zhí)行結(jié)果
java復(fù)制代碼task1=1 task2=3 4
3. CompletableFuture中的匯聚OR關(guān)系
scss復(fù)制代碼CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() ->{ ?? ?int t = new Random().nextInt(5); ?? ?try { ?? ? ? ?Thread.sleep(t * 1000); ?? ?} catch (InterruptedException e) { ?? ? ? ?e.printStackTrace(); ?? ?} ?? ?System.out.println("task1=" + t); ?? ?return t; }); CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() ->{ ?? ?int t = new Random().nextInt(5); ?? ?try { ?? ? ? ?Thread.sleep(t * 1000); ?? ?} catch (InterruptedException e) { ?? ? ? ?e.printStackTrace(); ?? ?} ?? ?System.out.println("task2=" + t); ?? ?return t; }); CompletableFuture<Integer> task3 = task1.applyToEither(task2, s ->s); task3.join();
誰先執(zhí)行完先輸出誰,如果相同時間執(zhí)行完,則一起數(shù)據(jù)
執(zhí)行結(jié)果
java復(fù)制代碼我快我先來 task2=2 我快我先來 task1=2 2 ?我快我先來 task2=0 0
實現(xiàn)List任務(wù)并行執(zhí)行的方式
并行流進(jìn)行操作
使用CompletableFuture發(fā)起異步請求,最后使用join等待所有異步操作結(jié)束
為了更好的發(fā)揮出CompletableFuture,需要采用定制的執(zhí)行器
那這兩個如何選擇?
進(jìn)行計算密集型,并且沒有I/O操作,推薦使用Sream并行流,沒必要創(chuàng)建更多的線程,線程過多反而是一種浪費(fèi)
涉及I/O等待的操作,CompletableFuture的靈活性會更高
現(xiàn)在回過頭看一下,我上面的改造方法,是不是就感覺清晰了許多,不足的地方大家提出來
總結(jié)
這篇文章我主要是根據(jù)大家的建議,使用了Java8的CompletableFuture 來進(jìn)行了原來的業(yè)務(wù)功能改造.
在執(zhí)行比較耗時的業(yè)務(wù)操作時候可以使用異步編程來提高性能,加快程序的處理速度
在處理異常機(jī)制的時候,往往是讓我們很頭痛的,擔(dān)心線程中出現(xiàn)的異常沒有及時捕獲,造成程序的阻塞或者其他方面的影響,CompletableFuture 提供了優(yōu)秀的異常管理機(jī)制。
CompletableFuture 還提供了 串行、聚合、優(yōu)先輸出的函數(shù),更貼切業(yè)務(wù)需求做出最好的選擇。