最美情侣中文字幕电影,在线麻豆精品传媒,在线网站高清黄,久久黄色视频

歡迎光臨散文網(wǎng) 會員登陸 & 注冊

駕馭Java線程池:一步一步帶你從新手到高手!

2023-10-05 17:31 作者:塵緣如夢_  | 我要投稿

java框架中例如Tomcat、Dubbo等都離不開線程池,這些框架用到線程的地方,都會用線程池來負責。我們在使用這些框架的時候,會設置線程池參數(shù),用于提高性能。那么開多少線程合適?今天我們將圍繞這個問題來學習一下線程池。 為什么使用線程池

平常我們使用java線程的時候,都是直接創(chuàng)建一個Thread對象,java線程的創(chuàng)建和銷毀都會涉及到Thread對象的創(chuàng)建和銷毀,線程切換等問題。創(chuàng)建Thread對象,僅僅是在 JVM 的堆里分配一塊內(nèi)存而已;而創(chuàng)建一個線程,卻需要調(diào)用操作系統(tǒng)內(nèi)核的 API,然后操作系統(tǒng)要為線程分配一系列的資源,這個成本就很高了。所以線程是一個重量級的對象,應該避免頻繁創(chuàng)建和銷毀。 一般可以通過“池化”思想來解決上述的問題,而JDK中提供的線程池實現(xiàn)是基于ThreadPoolExecutor。 使用線程池可以帶來一系列好處:

降低資源消耗

:通過池化技術(shù)重復利用已創(chuàng)建的線程,降低線程創(chuàng)建和銷毀造成的損耗。

提高響應速度

:任務到達時,無需等待線程創(chuàng)建即可立即執(zhí)行。

提高線程的可管理性

:線程是稀缺資源,如果無限制創(chuàng)建,不僅會消耗系統(tǒng)資源,還會因為線程的不合理分布導致資源調(diào)度失衡,降低系統(tǒng)的穩(wěn)定性。使用線程池可以進行統(tǒng)一的分配、調(diào)優(yōu)和監(jiān)控。

提供更多更強大的功能

:線程池具備可拓展性,允許開發(fā)人員向其中增加更多的功能。比如延時定時線程池ScheduledThreadPoolExecutor,就允許任務延期執(zhí)行或定期執(zhí)行。

線程池核心設計與實現(xiàn)

總體設計

頂層接口是Executor,java.util.concurrent.Executor#execute,用戶只需提供Runnable對象,將任務的運行邏輯提交到執(zhí)行器(Executor)中,由Executor框架完成線程的調(diào)配和任務的執(zhí)行部分。

ExecutorService接口擴展了Executor并增加了一些能力:

擴充執(zhí)行任務的能力,通過調(diào)用submit()或者invokeAll()方法可以為一個或一批異步任務生成Future的方法;

提供了管控線程池的方法,比如調(diào)用shutdown()等方法停止線程池的運行。

AbstractExecutorService則是上層的抽象類,將執(zhí)行任務的流程串聯(lián)了起來,保證下層的實現(xiàn)只需關注一個執(zhí)行任務的方法即可。

具體實現(xiàn)類是ThreadPoolExecutor,ThreadPoolExecutor將會一方面維護自身的生命周期,另一方面同時管理線程和任務,使兩者良好的結(jié)合從而執(zhí)行并行任務。

ScheduledThreadPoolExecutor又擴展了ThreadPoolExecutor和ScheduledExecutorService接口,增加了調(diào)度能力,使任務可以延時定時執(zhí)行。

另外還有一個提供了線程池創(chuàng)建的工廠方法的類Executors,用來創(chuàng)建線程池。

本章主要說明ThreadPoolExecutor的實現(xiàn)原理,ScheduledThreadPoolExecutor下篇會討論。 ThreadPoolExecutor實現(xiàn)原理

ThreadPoolExecutor構(gòu)造參數(shù)說明 ?ThreadPoolExecutor( ??int corePoolSize, ??int maximumPoolSize, ??long keepAliveTime, ??TimeUnit unit, ??BlockingQueue workQueue, ??ThreadFactory threadFactory, ??RejectedExecutionHandler handler) ??

corePoolSize

:表示線程池保有的最小線程數(shù)。核心線程數(shù),這些核心線程一旦被創(chuàng)建,就不會被銷毀。相反,如果是非核心線程,等任務執(zhí)行完并長時間未被使用則會被銷毀。

maximumPoolSize

:表示線程池創(chuàng)建的最大線程數(shù)。

keepAliveTime&unit

:一個線程如果在一段時間內(nèi),都沒有執(zhí)行任務,說明很閑,keepAliveTime和unit就是用來定義這個一段時間的參數(shù)。也就是說,如果線程已經(jīng)空閑了keepAliveTime和unit這么久了,而且線程數(shù)大于corePoolSize,那么這個空閑線程就要被回收。

workQueue

:用來存儲任務,當有新的任務請求線程處理時,如果核心線程池已滿,那么新來的任務會加入workQueue隊列中,workQueue是一個阻塞隊列。

threadFactory

:通過這個參數(shù)可以自定義如何創(chuàng)建線程。

handler

:通過這個參數(shù)可以自定義任務的拒絕策略。如果線程池中所有的線程都在忙碌,并且工作隊列也滿了(前提是工作隊列是有界隊列),那么此時提交任務,線程池就會拒絕接收。至于拒絕的策略,可以通過這個參數(shù)來指定

ThreadPoolExecutor已經(jīng)提供了四種策略。

CallerRunsPolicy:提交任務的線程自己去執(zhí)行該任務。

AbortPolicy:默認的拒絕策略,會throws RejectedExecutionException.

DiscardPolicy:直接丟棄任務,沒有任何異常輸出。

DiscardOldestPolicy:丟棄最老的任務,其實就是把最早進入工作隊列的任務丟棄,然后把新任務加入到工作隊列。

ThreadPoolExecutor執(zhí)行流程 ?public void execute(Runnable command) { ???if (command == null) ?????throw new NullPointerException(); ???int c = ctl.get(); ???if (workerCountOf(c) < corePoolSize) { ?????if (addWorker(command, true)) ???????return; ?????c = ctl.get(); ??} ???if (isRunning(c) && workQueue.offer(command)) { ?????int recheck = ctl.get(); ?????if (! isRunning(recheck) && remove(command)) ???????reject(command); ?????else if (workerCountOf(recheck) == 0) ???????addWorker(null, false); ??} ???else if (!addWorker(command, false)) ?????reject(command); ?} 首先檢測線程池運行狀態(tài),如果不是RUNNING,則直接拒絕,線程池要保證在RUNNING的狀態(tài)下執(zhí)行任務。

如果workerCount < corePoolSize,則創(chuàng)建并啟動一個線程來執(zhí)行新提交的任務。

如果workerCount >= corePoolSize,且線程池內(nèi)的阻塞隊列未滿,則將任務添加到該阻塞隊列中。

如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且線程池內(nèi)的阻塞隊列已滿,則創(chuàng)建并啟動一個線程來執(zhí)行新提交的任務。

如果workerCount >= maximumPoolSize,并且線程池內(nèi)的阻塞隊列已滿, 則根據(jù)拒絕策略來處理該任務, 默認的處理方式是直接拋異常。

線程池運行狀態(tài) 線程池的運行狀態(tài),由線程池內(nèi)部維護,線程池內(nèi)部使用AtomicInteger變量,用于維護運行狀態(tài)runState和工作線程數(shù)workerCount,高3位保存runState,低29位保存workerCount,兩個變量之間互不干擾。用一個變量去存儲兩個值,可避免在做相關決策時,出現(xiàn)不一致的情況,不必為了維護兩者的一致,而占用鎖資源。 ?private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); ?? ?// COUNT_BITS=29,(對于int長度為32來說)表示線程數(shù)量的字節(jié)位數(shù) ?private static final int COUNT_BITS = Integer.SIZE - 3; ?// 狀態(tài)掩碼,高三位是1,低29位全是0,可以通過 ctl&COUNT_MASK 運算來獲取線程池狀態(tài) ?private static final int COUNT_MASK = (1 << COUNT_BITS) - 1; ?? ?? ?private static final int RUNNING??= -1 << COUNT_BITS; // 111 00000 00000000 00000000 00000000; ?private static final int SHUTDOWN?=?0 << COUNT_BITS; // 000 00000 00000000 00000000 00000000; ?private static final int STOP???=?1 << COUNT_BITS; // 001 00000 00000000 00000000 00000000; ?private static final int TIDYING??=?2 << COUNT_BITS; // 010 00000 00000000 00000000 00000000; ?private static final int TERMINATED =?3 << COUNT_BITS; // 011 00000 00000000 00000000 00000000; ?? ?// 計算當前運行狀態(tài) ?private static int runStateOf(int c)??{ return c & ~COUNT_MASK; } ?// 計算當前線程數(shù)量 ?private static int workerCountOf(int c) { return c & COUNT_MASK; } ?//通過狀態(tài)和線程數(shù)生成ctl ?private static int ctlOf(int rs, int wc) { return rs | wc; } 狀態(tài)描述RUNNING能接受新的任務,也能處理阻塞隊列中的任務SHUTDOWN關閉狀態(tài),不能接受新的任務,只能處理阻塞隊列中的任務STOP不能接受新的任務,也不能處理阻塞隊列中的任務,會中斷正在處理任務的線程TIDYING所有任務都停止了,workerCount為0TERMINATED在執(zhí)行terminated()方法會進入到這個狀態(tài) 狀態(tài)轉(zhuǎn)移: 阻塞隊列 再介紹線程池總體設計的時候,說過線程池的設計,采用的都是生產(chǎn)者 - 消費者模式,其實現(xiàn)主要就是通過BlockingQueue來實現(xiàn)的,目的是將任務和線程兩者解耦,阻塞隊列緩存任務,工作線程從阻塞隊列中獲取任務。 使用不同的隊列可以實現(xiàn)不一樣的任務存取策略。在這里,我們可以再介紹下阻塞隊列的成員: 阻塞隊列描述ArrayBlockingQueue基于數(shù)組實現(xiàn)的有界隊列,支持公平鎖和非公平鎖LinkedBlockingQueue基于鏈表實現(xiàn)的有界隊列,隊列大小默認為Integer.MAX_VALUE,所以默認創(chuàng)建該隊列會有容量危險PriorityBlockingQueue支持優(yōu)先級排序的無界隊列,不能保證同優(yōu)先級的順序DelayQueue基于PriorityBlockingQueue實現(xiàn)的延期隊列,只有當延時期滿了,才能從中取出元素SynchronousQueue同步隊列,不存儲任何元素,調(diào)用一次put()就必須等待take()調(diào)用完。支持公平鎖和非公平鎖LinkedTransferQueue基于鏈表實現(xiàn)的無界隊列,多了transfer()和tryTransfer()方法LinkedBlockingDeque基于雙向鏈表實現(xiàn)的隊列,多線程并發(fā)時,可以將鎖的競爭最多降到一半 Worker

Worker整體設計 Worker繼承了AQS,使用AQS來實現(xiàn)獨占鎖這個功能。沒有使用可重入鎖ReentrantLock,而是使用AQS,為的就是實現(xiàn)不可重入的特性去反應線程現(xiàn)在的執(zhí)行狀態(tài)。

Worker實現(xiàn)了Runnable接口,持有一個線程thread,一個初始化的任務firstTask。thread是在調(diào)用構(gòu)造方法時通過ThreadFactory來創(chuàng)建的線程,可以用來執(zhí)行任務;

?private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ ???final Thread thread;//Worker持有的線程 ???Runnable firstTask;//初始化的任務,可以為null ?? ???Worker(Runnable firstTask) { ????setState(-1); // inhibit interrupts until runWorker ????this.firstTask = firstTask; ????this.thread = getThreadFactory().newThread(this); ??} ?? ???public void run() { ????runWorker(this); ??} ?? ??// ...省略其余代碼 ?} Worker如何添加任務 ?private boolean addWorker(Runnable firstTask, boolean core) { ???retry: ???for (int c = ctl.get();;) { ?????// Check if queue empty only if necessary. ?????if (runStateAtLeast(c, SHUTDOWN) ???????&& (runStateAtLeast(c, STOP) ?????????|| firstTask != null ?????????|| workQueue.isEmpty())) ???????return false; ?? ?????for (;;) { ???????if (workerCountOf(c) ?????????>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) ?????????return false; ???????if (compareAndIncrementWorkerCount(c)) ?????????break retry; ???????c = ctl.get();?// Re-read ctl ???????if (runStateAtLeast(c, SHUTDOWN)) ?????????continue retry; ???????// else CAS failed due to workerCount change; retry inner loop ????} ??} ?? ???boolean workerStarted = false; ???boolean workerAdded = false; ???Worker w = null; ???try { ?????w = new Worker(firstTask); ?????final Thread t = w.thread; ?????if (t != null) { ???????final ReentrantLock mainLock = this.mainLock; ???????mainLock.lock(); ???????try { ?????????// Recheck while holding lock. ?????????// Back out on ThreadFactory failure or if ?????????// shut down before lock acquired. ?????????int c = ctl.get(); ?? ?????????if (isRunning(c) || ??????????(runStateLessThan(c, STOP) && firstTask == null)) { ???????????if (t.getState() != Thread.State.NEW) ?????????????throw new IllegalThreadStateException(); ???????????workers.add(w); ???????????workerAdded = true; ???????????int s = workers.size(); ???????????if (s > largestPoolSize) ?????????????largestPoolSize = s; ????????} ??????} finally { ?????????mainLock.unlock(); ??????} ???????if (workerAdded) { ?????????t.start(); ?????????workerStarted = true; ??????} ????} ??} finally { ?????if (! workerStarted) ???????addWorkerFailed(w); ??} ???return workerStarted; ?} addWorker()方法有兩個參數(shù): firstTask用它來保存?zhèn)魅氲牡谝粋€任務,這個任務可以有也可以為null。如果這個值是非空的,那么線程就會在啟動初期立即執(zhí)行這個任務,也就對應核心線程創(chuàng)建時的情況;如果這個值是null,那么就需要創(chuàng)建一個線程去執(zhí)行workQueue中的任務,也就是非核心線程的創(chuàng)建。

core參數(shù)為true表示在新增線程時會判斷當前活動線程數(shù)是否少于corePoolSize,false表示新增線程前需要判斷當前活動線程數(shù)是否少于maximumPoolSize。

具體流程如下: Worker如何獲取任務 任務的執(zhí)行有兩種可能:一種是任務直接由新創(chuàng)建的線程執(zhí)行。另一種是線程從任務隊列中獲取任務然后執(zhí)行,執(zhí)行完任務的空閑線程會再次去從隊列中申請任務再去執(zhí)行。 第一種在上述addWorker()方法中,如果firstTask不為空的話,會直接運行。第二種firstTask為空,任務將從workQueue中獲取,調(diào)用getTask()方法 ?private Runnable getTask() { ?????boolean timedOut = false; // Did the last poll() time out? ?? ?????for (;;) { ???????int c = ctl.get(); ???????// Check if queue empty only if necessary. ???????if (runStateAtLeast(c, SHUTDOWN) ?????????&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) { ?????????decrementWorkerCount(); ?????????return null; ??????} ???????int wc = workerCountOf(c); ???????// Are workers subject to culling? ???????boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; ?? ???????if ((wc > maximumPoolSize || (timed && timedOut)) ?????????&& (wc > 1 || workQueue.isEmpty())) { ?????????if (compareAndDecrementWorkerCount(c)) ???????????return null; ?????????continue; ??????} ???????try { ?????????Runnable r = timed ? ???????????workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : ???????????workQueue.take(); ?????????if (r != null) ???????????return r; ?????????timedOut = true; ??????} catch (InterruptedException retry) { ?????????timedOut = false; ??????} ????} ??} 具體流程: Worker如何運行任務 ?// java.util.concurrent.ThreadPoolExecutor#runWorker ?final void runWorker(Worker w) { ??Thread wt = Thread.currentThread(); ??Runnable task = w.firstTask; ??w.firstTask = null; ??w.unlock(); // allow interrupts ??boolean completedAbruptly = true; ??try { ???while (task != null || (task = getTask()) != null) { ????w.lock(); ????// If pool is stopping, ensure thread is interrupted; ????// if not, ensure thread is not interrupted. This ????// requires a recheck in second case to deal with ????// shutdownNow race while clearing interrupt ????if ((runStateAtLeast(ctl.get(), STOP) || ??????(Thread.interrupted() && ???????runStateAtLeast(ctl.get(), STOP))) && ??????!wt.isInterrupted()) ?????wt.interrupt(); ????try { ?????beforeExecute(wt, task); ?????try { ??????task.run(); ??????afterExecute(task, null); ????} catch (Throwable ex) { ??????afterExecute(task, ex); ??????throw ex; ????} ???} finally { ?????task = null; ?????w.completedTasks++; ?????w.unlock(); ???} ??} ???completedAbruptly = false; ? } finally { ???processWorkerExit(w, completedAbruptly); ? } ?} 具體流程: while循環(huán)不斷地通過getTask()方法獲取任務。

如果線程池正在停止,那么要保證當前線程是中斷狀態(tài),否則要保證當前線程不是中斷狀態(tài)。

執(zhí)行任務。

如果getTask結(jié)果為null則跳出循環(huán),執(zhí)行processWorkerExit()方法,銷毀線程。

Worker線程如何回收 線程的銷毀依賴JVM自動的回收,但線程池中核心線程是不能被jvm回收的,所以當線程池決定哪些線程需要回收時,只需要將其引用消除即可。Worker被創(chuàng)建出來后,就會不斷地進行輪詢,然后獲取任務去執(zhí)行,核心線程可以無限等待獲取任務,非核心線程要限時獲取任務。當Worker無法獲取到任務,也就是獲取的任務為空時,循環(huán)會結(jié)束,Worker會主動消除自身在線程池內(nèi)的引用。 其主要邏輯在processWorkerExit()方法中 ?private void processWorkerExit(Worker w, boolean completedAbruptly) { ???if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted ?????decrementWorkerCount(); ?? ???final ReentrantLock mainLock = this.mainLock; ???mainLock.lock(); ???try { ?????completedTaskCount += w.completedTasks; ?????workers.remove(w); ??} finally { ?????mainLock.unlock(); ??} ?? ???tryTerminate(); ?? ???int c = ctl.get(); ???if (runStateLessThan(c, STOP)) { ?????if (!completedAbruptly) { ???????int min = allowCoreThreadTimeOut ? 0 : corePoolSize; ???????if (min == 0 && ! workQueue.isEmpty()) ?????????min = 1; ???????if (workerCountOf(c) >= min) ?????????return; // replacement not needed ????} ?????addWorker(null, false); ??} ?} 具體流程: 使用線程池最佳實踐

Executors

考慮到ThreadPoolExecutor的構(gòu)造函數(shù)實現(xiàn)有些復雜,所以java提供了一個線程池的靜態(tài)工廠類,Executors,利用Executors可以快速創(chuàng)建線程池。但是大廠都不建議使用Executors,原因:Executors的很多方法默認使用的是無參構(gòu)造的LinkedBlockQueue,默認大小為Integer.MAX_VALUE,高負載情況下,隊列很容易導致OOM。而OOM了就會導致所有請求都無法處理。

強烈建議使用ArrayBlockingQueue有界隊列。

使用有界隊列,當任務過多時,線程池會觸發(fā)執(zhí)行拒絕策略,線程池默認的拒絕策略會throw RejectedExecutionException這個運行時異常,所以開發(fā)人員很容易忽略,因此默認拒絕策略需要慎重使用。如果線程處理的任務非常重要,建議自定義拒絕策略,實際開發(fā)中,自定義拒絕策略往往和降級策略配合使用。 下面介紹常用的方法: newFixedThreadPool() newFixedThreadPool()函數(shù)用來創(chuàng)建大小固定的線程池。

ThreadPoolExecutor中的maximumPoolSize跟corePoolSize相等,因此,線程池中的線程都是核心線程,一旦創(chuàng)建便不會銷毀。

workQueue為LinkedBlockingQueue,默認大小為Integer.MAX_VALUE,大小非常大,相當于無界阻塞隊列。任務可以無限的往workQueue中提交,永遠都不會觸發(fā)拒絕策略。

public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); } newSingleThreadExecutor() newSingleThreadExecutor()函數(shù)用來創(chuàng)建單線程執(zhí)行器。

ThreadPoolExecutor中的maximumPoolSize跟corePoolSize都等于1。

workQueue同樣是大小為Integer.MAX_VALUE的LinkedBlockingQueue。

public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); } newCachedThreadPool() newCachedThreadPool()函數(shù)創(chuàng)建的線程池只包含非核心線程,線程空閑60秒以上便會銷毀。

workQueue是SynchronousQueue類型的,而SynchronousQueue是長度為0的阻塞隊列,所以,workQueue不存儲任何等待執(zhí)行的任務。

如果線程池內(nèi)存在空閑線程,那么新提交的任務會被空閑線程執(zhí)行

如果線程池內(nèi)沒有空閑線程,那么線程池會創(chuàng)建新的線程來執(zhí)行新提交的任務。

線程池大小為Integer.MAX_VALUE,因此,線程池中創(chuàng)建的線程個數(shù)可以非常多。

public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); } 異常捕獲

使用線程池,還需要注意異常處理的問題,通過ThreadPoolExecutor對象的execute()方法執(zhí)行任務時,如果在任務執(zhí)行期間出現(xiàn)運行時異常,會導致任務的線程終止,但是你卻獲取不到任何通知,這會讓你誤以為任務都執(zhí)行得很正常。雖說線程池提供了很多用于異常處理的方法,但是最穩(wěn)妥和簡單的方案還是捕獲異常信息,并按需處理。 配置線程池參數(shù)

從任務的優(yōu)先級,任務的執(zhí)行時間長短,任務的性質(zhì)(CPU密集/ IO密集),任務的依賴關系這四個角度來分析。并且近可能地使用有界的工作隊列。 性質(zhì)不同的任務可用使用不同規(guī)模的線程池分開處理: CPU密集型: 盡可能少的線程,Ncpu+1

IO密集型: 盡可能多的線程, Ncpu*2,比如數(shù)據(jù)庫連接池

混合型: CPU密集型的任務與IO密集型任務的執(zhí)行時間差別較小,拆分為兩個線程池;否則沒有必要拆分。

駕馭Java線程池:一步一步帶你從新手到高手!的評論 (共 條)

分享到微博請遵守國家法律
SHOW| 湘乡市| 高安市| 定西市| 伊金霍洛旗| 贵溪市| 涟水县| 杭州市| 兴安盟| 澳门| 武威市| 瓮安县| 静安区| 卓资县| 大兴区| 永新县| 密云县| 建阳市| 翼城县| 区。| 十堰市| 凉城县| 宁津县| 正蓝旗| 湖口县| 西吉县| 定州市| 巴南区| 西丰县| 迁安市| 阳山县| 炉霍县| 桃源县| 洪泽县| 章丘市| 佛冈县| 陵水| 乌苏市| 赤水市| 枞阳县| 林西县|