最美情侣中文字幕电影,在线麻豆精品传媒,在线网站高清黄,久久黄色视频

歡迎光臨散文網(wǎng) 會(huì)員登陸 & 注冊(cè)

一文搞懂ThreadPoolExecutor原理

2023-06-24 18:03 作者:沒(méi)有名字如何行走江湖  | 我要投稿

前言

都說(shuō)經(jīng)典的就是好的,這句話放在JavaThreadPoolExecutor上那是一點(diǎn)都沒(méi)錯(cuò),像現(xiàn)在數(shù)據(jù)庫(kù)連接的池化實(shí)現(xiàn),或者像Tomcat這種WEB服務(wù)器的線程管理,處處都有著ThreadPoolExecutor的影子,本篇文章將結(jié)合源碼實(shí)現(xiàn),對(duì)ThreadPoolExecutor的原理進(jìn)行一個(gè)深入學(xué)習(xí)。

正文

一. Executor框架簡(jiǎn)介

Executor框架提供了組件來(lái)管理Java中的線程,Executor框架將其分為任務(wù),線程執(zhí)行任務(wù),任務(wù)執(zhí)行結(jié)果三部分。下面以表格形式對(duì)這三部分進(jìn)行說(shuō)明。

項(xiàng)說(shuō)明任務(wù)Executor框架提供了Runnable接口和Callable接口,任務(wù)需要實(shí)現(xiàn)這兩個(gè)接口才能被線程執(zhí)行線程執(zhí)行任務(wù)Executor框架提供了接口Executor和繼承于ExecutorExecutorService接口來(lái)定義任務(wù)執(zhí)行機(jī)制。Executor框架中的線程池類(lèi)ThreadPoolExecutorScheduledThreadPoolExecutor均實(shí)現(xiàn)了ExecutorService接口任務(wù)執(zhí)行結(jié)果Executor框架提供了Future接口和實(shí)現(xiàn)了Future接口的FutureTask類(lèi)來(lái)定義任務(wù)執(zhí)行結(jié)果。

組件之間的類(lèi)圖關(guān)系如下所示。

Executor接口是線程池的頂層接口,通常說(shuō)到的線程池指的是ThreadPoolExecutor,同時(shí)ThreadPoolExecutor還有一個(gè)子類(lèi)叫做ScheduledThreadPoolExecutor,其擴(kuò)展實(shí)現(xiàn)了延時(shí)執(zhí)行任務(wù)定時(shí)執(zhí)行任務(wù)的功能。

Executor框架指的是任務(wù),執(zhí)行任務(wù)的線程池任務(wù)執(zhí)行結(jié)果這三部分,切不可將Executor框架與Executor接口相混淆。

本篇文章就將對(duì)Executor框架中的ThreadPoolExecutor的源碼實(shí)現(xiàn)進(jìn)行學(xué)習(xí)。

二. 認(rèn)識(shí)ThreadPoolExecutor狀態(tài)

在學(xué)習(xí)ThreadPoolExecutor如何執(zhí)行任務(wù)前,先認(rèn)識(shí)一下ThreadPoolExecutor的狀態(tài)。

ThreadPoolExecutor繼承于AbstractExecutorService,并實(shí)現(xiàn)了ExecutorService接口,是Executor框架的核心類(lèi),用于管理線程。

ThreadPoolExecutor使用了原子整型ctl來(lái)表示線程池狀態(tài)和Worker數(shù)量。ctl是一個(gè)原子整型,前3位表示線程池狀態(tài),后29位表示Worker數(shù)量。ThreadPoolExecutor中這部分的源碼如下所示。

java復(fù)制代碼public class ThreadPoolExecutor extends AbstractExecutorService { ? ? ?private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); ?? ?private static final int COUNT_BITS = Integer.SIZE - 3; ?? ?private static final int CAPACITY ? = (1 << COUNT_BITS) - 1; ? ? ?private static final int RUNNING ? ?= -1 << COUNT_BITS; ?? ?private static final int SHUTDOWN ? = ?0 << COUNT_BITS; ?? ?private static final int STOP ? ? ? = ?1 << COUNT_BITS; ?? ?private static final int TIDYING ? ?= ?2 << COUNT_BITS; ?? ?private static final int TERMINATED = ?3 << COUNT_BITS; ? ? ?// 取整型前3位,即獲取線程池狀態(tài) ?? ?private static int runStateOf(int c) ? ? { return c & ~CAPACITY; } ?? ?// 取整型后29位,即獲取Worker數(shù)量 ?? ?private static int workerCountOf(int c) ?{ return c & CAPACITY; } ?? ?// 根據(jù)線程池狀態(tài)和Worker數(shù)量拼裝ctl ?? ?private static int ctlOf(int rs, int wc) { return rs | wc; } ? ? ?// 線程池狀態(tài)判斷 ?? ?private static boolean runStateLessThan(int c, int s) { ?? ? ? ?return c < s; ?? ?} ? ? ?// 線程池狀態(tài)判斷 ?? ?private static boolean runStateAtLeast(int c, int s) { ?? ? ? ?return c >= s; ?? ?} ? ? ?// 判斷線程池狀態(tài)是否為RUNNING ?? ?private static boolean isRunning(int c) { ?? ? ? ?return c < SHUTDOWN; ?? ?} ? ?? ?...... ? }

可知ThreadPoolExecutor有如下五種線程池狀態(tài)。

  • RUNNING,線程池接受新任務(wù),會(huì)執(zhí)行任務(wù)阻塞隊(duì)列中的任務(wù),ctl前三位表示為111;

  • SHUTDOWN,線程池拒絕新任務(wù),會(huì)執(zhí)行任務(wù)阻塞隊(duì)列中的任務(wù),ctl前三位表示為000;

  • STOP,線程池拒絕新任務(wù),不會(huì)執(zhí)行任務(wù)阻塞隊(duì)列中的任務(wù),嘗試中斷正在執(zhí)行的任務(wù),ctl前三位表示為001;

  • TIDYING,所有任務(wù)被關(guān)閉,Worker數(shù)量為0,ctl前三位表示為010;

  • TERMINATED,terminated() 執(zhí)行完畢,ctl前三位表示為011。

得益于ctl的結(jié)構(gòu),所以無(wú)論Worker數(shù)量是多少,ThreadPoolExecutor中線程池狀態(tài)存在如下關(guān)系。

RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED

因此runStateLessThan()runStateAtLeast()isRunning() 方法可以方便的對(duì)線程池狀態(tài)進(jìn)行判斷。

三. 執(zhí)行任務(wù)源碼分析

作為線程池,ThreadPoolExecutor最重要也最經(jīng)典的地方,當(dāng)然就是執(zhí)行任務(wù)了。本節(jié)對(duì)ThreadPoolExecutor執(zhí)行任務(wù)的流程進(jìn)行一個(gè)學(xué)習(xí)。

ThreadPoolExecutor中執(zhí)行任務(wù)的入口方法為execute(),其實(shí)現(xiàn)如下。

java復(fù)制代碼public void execute(Runnable command) { ?? ?if (command == null) ?? ? ? ?throw new NullPointerException(); ?? ?int c = ctl.get(); ?? ?// 如果Worker數(shù)量小于核心線程數(shù),則創(chuàng)建Worker并執(zhí)行任務(wù) ?? ?if (workerCountOf(c) < corePoolSize) { ?? ? ? ?if (addWorker(command, true)) ?? ? ? ? ? ?return; ?? ? ? ?c = ctl.get(); ?? ?} ?? ?// 如果Worker數(shù)量大于等于核心線程數(shù),則將任務(wù)添加到任務(wù)阻塞隊(duì)列 ?? ?if (isRunning(c) && workQueue.offer(command)) { ?? ? ? ?int recheck = ctl.get(); ?? ? ? ?// 如果線程池狀態(tài)突然不再是RUNNING,則嘗試將任務(wù)從任務(wù)阻塞隊(duì)列中刪除,刪除成功則為該任務(wù)執(zhí)行拒絕策略 ?? ? ? ?if (! isRunning(recheck) && remove(command)) ?? ? ? ? ? ?reject(command); ?? ? ? ?// 如果線程池中Worker數(shù)量突然為0,則創(chuàng)建一個(gè)Worker來(lái)執(zhí)行任務(wù) ?? ? ? ?else if (workerCountOf(recheck) == 0) ?? ? ? ? ? ?addWorker(null, false); ?? ?} ?? ?// 執(zhí)行到這里表示線程池狀態(tài)已經(jīng)不是RUNNING或者任務(wù)阻塞隊(duì)列已滿 ?? ?// 此時(shí)嘗試新建一個(gè)Worker來(lái)執(zhí)行任務(wù) ?? ?// 如果新建一個(gè)Worker來(lái)執(zhí)行任務(wù)失敗,表明線程池狀態(tài)不再是RUNNING或者Worker數(shù)量已經(jīng)達(dá)到最大線程數(shù),此時(shí)執(zhí)行拒絕策略 ?? ?else if (!addWorker(command, false)) ?? ? ? ?reject(command); }

execute() 中會(huì)根據(jù)Worker數(shù)量和線程池狀態(tài)來(lái)決定是新建Worker來(lái)執(zhí)行任務(wù)還是將任務(wù)添加到任務(wù)阻塞隊(duì)列。新建Worker來(lái)執(zhí)行任務(wù)的實(shí)現(xiàn)如下所示。

java復(fù)制代碼private boolean addWorker(Runnable firstTask, boolean core) { ?? ?// 標(biāo)記外層for循環(huán) ?? ?retry: ?? ?for (;;) { ?? ? ? ?int c = ctl.get(); ?? ? ? ?// 獲取線程池狀態(tài) ?? ? ? ?int rs = runStateOf(c); ? ? ? ? ?// 線程池狀態(tài)為RUNNING時(shí),可以創(chuàng)建Worker ?? ? ? ?// 線程池狀態(tài)為SHUTDOWN,且任務(wù)阻塞隊(duì)列不為空時(shí),可以創(chuàng)建初始任務(wù)為null的Worker ?? ? ? ?if (rs >= SHUTDOWN && ?? ? ? ? ? ?! (rs == SHUTDOWN && ?? ? ? ? ? ? ? firstTask == null && ?? ? ? ? ? ? ? ! workQueue.isEmpty())) ?? ? ? ? ? ?return false; ? ? ? ? ?for (;;) { ?? ? ? ? ? ?// 獲取Worker數(shù)量 ?? ? ? ? ? ?int wc = workerCountOf(c); ? ?? ? ? ? ? ?// 如果Worker數(shù)量大于CAPACITY,拒絕創(chuàng)建Worker ?? ? ? ? ? ?// core為true表示創(chuàng)建核心線程Worker,如果Worker數(shù)量此時(shí)已經(jīng)大于等于核心線程數(shù),則拒絕創(chuàng)建Worker,轉(zhuǎn)而應(yīng)該將任務(wù)添加到任務(wù)阻塞隊(duì)列 ?? ? ? ? ? ?// core為false表示創(chuàng)建非核心線程Worker,如果Worker數(shù)量此時(shí)已經(jīng)大于等于最大線程數(shù),則拒絕創(chuàng)建Worker,轉(zhuǎn)而應(yīng)該執(zhí)行拒絕策略 ?? ? ? ? ? ?if (wc >= CAPACITY || ?? ? ? ? ? ? ? ?wc >= (core ? corePoolSize : maximumPoolSize)) ?? ? ? ? ? ? ? ?return false; ?? ? ? ? ? ?// 以CAS方式將Worker數(shù)量加1 ?? ? ? ? ? ?// 加1成功表明無(wú)競(jìng)爭(zhēng)發(fā)生,從外層for循環(huán)跳出 ?? ? ? ? ? ?if (compareAndIncrementWorkerCount(c)) ?? ? ? ? ? ? ? ?break retry; ?? ? ? ? ? ?// 加1失敗表明有競(jìng)爭(zhēng)發(fā)生,此時(shí)需要重新獲取ctl的值 ?? ? ? ? ? ?c = ctl.get(); ?? ? ? ? ? ?// 重新獲取ctl后如果發(fā)現(xiàn)線程池狀態(tài)發(fā)生了改變,此時(shí)重新執(zhí)行外層for循環(huán),即需要基于新的線程池狀態(tài)判斷是否允許創(chuàng)建Worker ?? ? ? ? ? ?// 重新獲取ctl后如果線程池狀態(tài)未發(fā)生改變,則繼續(xù)執(zhí)行內(nèi)層for循環(huán),即嘗試再一次以CAS方式將Worker數(shù)量加1 ?? ? ? ? ? ?if (runStateOf(c) != rs) ?? ? ? ? ? ? ? ?continue retry; ?? ? ? ?} ?? ?} ? ? ?boolean workerStarted = false; ?? ?boolean workerAdded = false; ?? ?Worker w = null; ?? ?try { ?? ? ? ?// 創(chuàng)建一個(gè)Worker ?? ? ? ?w = new Worker(firstTask); ?? ? ? ?// 獲取Worker的線程 ?? ? ? ?final Thread t = w.thread; ?? ? ? ?if (t != null) { ?? ? ? ? ? ?final ReentrantLock mainLock = this.mainLock; ?? ? ? ? ? ?// 由于線程池中存儲(chǔ)Worker的集合為HashSet,因此將Worker添加到Worker集合時(shí)需要獲取全局鎖保證線程安全 ?? ? ? ? ? ?mainLock.lock(); ?? ? ? ? ? ?try { ?? ? ? ? ? ? ? ?// 再一次獲取線程池狀態(tài) ?? ? ? ? ? ? ? ?int rs = runStateOf(ctl.get()); ? ? ? ? ? ? ? ? ?// 如果線程池狀態(tài)還是為RUNNING或者線程池狀態(tài)為SHUTDOWN但創(chuàng)建的Worker的初始任務(wù)為null,則允許將創(chuàng)建出來(lái)的Worker添加到集合 ?? ? ? ? ? ? ? ?if (rs < SHUTDOWN || ?? ? ? ? ? ? ? ? ? ?(rs == SHUTDOWN && firstTask == null)) { ?? ? ? ? ? ? ? ? ? ?// 檢查一下Worker的線程是否可以啟動(dòng)(處于活動(dòng)狀態(tài)的線程無(wú)法再啟動(dòng)) ?? ? ? ? ? ? ? ? ? ?if (t.isAlive()) ?? ? ? ? ? ? ? ? ? ? ? ?throw new IllegalThreadStateException(); ?? ? ? ? ? ? ? ? ? ?// 將Worker添加到Worker集合 ?? ? ? ? ? ? ? ? ? ?workers.add(w); ?? ? ? ? ? ? ? ? ? ?int s = workers.size(); ?? ? ? ? ? ? ? ? ? ?// largestPoolSize用于記錄線程池最多存在過(guò)的Worker數(shù) ?? ? ? ? ? ? ? ? ? ?if (s > largestPoolSize) ?? ? ? ? ? ? ? ? ? ? ? ?largestPoolSize = s; ?? ? ? ? ? ? ? ? ? ?workerAdded = true; ?? ? ? ? ? ? ? ?} ?? ? ? ? ? ?} finally { ?? ? ? ? ? ? ? ?mainLock.unlock(); ?? ? ? ? ? ?} ?? ? ? ? ? ?if (workerAdded) { ?? ? ? ? ? ? ? ?// 啟動(dòng)Worker線程 ?? ? ? ? ? ? ? ?t.start(); ?? ? ? ? ? ? ? ?workerStarted = true; ?? ? ? ? ? ?} ?? ? ? ?} ?? ?} finally { ?? ? ? ?if (! workerStarted) ?? ? ? ? ? ?// Worker線程沒(méi)有成功啟動(dòng)起來(lái),此時(shí)需要對(duì)該Worker的創(chuàng)建執(zhí)行回滾操作 ?? ? ? ? ? ?addWorkerFailed(w); ?? ?} ?? ?return workerStarted; }

addWorker() 方法中只允許兩種情況可以創(chuàng)建Worker。

  • 線程池狀態(tài)為RUNNING,可以創(chuàng)建Worker

  • 線程池狀態(tài)為SHUTDOWN,且任務(wù)阻塞隊(duì)列不為空,可以創(chuàng)建初始任務(wù)為nullWorker

一旦Worker創(chuàng)建成功,就會(huì)將Worker的線程啟動(dòng),如果Worker創(chuàng)建失敗或者Worker的線程啟動(dòng)失敗,則會(huì)調(diào)用addWorkerFailed() 方法執(zhí)行回滾操作,其實(shí)現(xiàn)如下所示。

java復(fù)制代碼private void addWorkerFailed(Worker w) { ?? ?final ReentrantLock mainLock = this.mainLock; ?? ?mainLock.lock(); ?? ?try { ?? ? ? ?// 如果Worker添加到了Worker集合中,則將Worker從Worker集合中刪除 ?? ? ? ?if (w != null) ?? ? ? ? ? ?workers.remove(w); ?? ? ? ?// 以CAS方式將Worker數(shù)量減1 ?? ? ? ?decrementWorkerCount(); ?? ? ? ?// 嘗試終止線程池 ?? ? ? ?tryTerminate(); ?? ?} finally { ?? ? ? ?mainLock.unlock(); ?? ?} }

由于Worker自身實(shí)現(xiàn)了Runnable,因此Worker自身就是一個(gè)任務(wù),實(shí)際上Worker的線程執(zhí)行的任務(wù)就是Worker本身,因此addWorker() 中將Worker的線程啟動(dòng)時(shí),會(huì)調(diào)用Workerrun() 方法,其實(shí)現(xiàn)如下。

java復(fù)制代碼public void run() { ?? ?runWorker(this); }

Workerrun() 方法中調(diào)用了ThreadPoolExecutorrunWorker() 方法,其實(shí)現(xiàn)如下所示。

java復(fù)制代碼final void runWorker(Worker w) { ?? ?Thread wt = Thread.currentThread(); ?? ?Runnable task = w.firstTask; ?? ?w.firstTask = null; ?? ?w.unlock(); ?? ?boolean completedAbruptly = true; ?? ?try { ?? ? ? ?// 如果task為null,則從任務(wù)阻塞隊(duì)列中獲取任務(wù) ?? ? ? ?// 通常Worker啟動(dòng)時(shí)會(huì)先執(zhí)行初始任務(wù),然后再去任務(wù)阻塞隊(duì)列中獲取任務(wù) ?? ? ? ?while (task != null || (task = getTask()) != null) { ?? ? ? ? ? ?w.lock(); ?? ? ? ? ? ?// 線程池正在停止時(shí),需要確保當(dāng)前Worker的線程是被中斷的 ?? ? ? ? ? ?if ((runStateAtLeast(ctl.get(), STOP) || ?? ? ? ? ? ? ? ? ? ?(Thread.interrupted() && ?? ? ? ? ? ? ? ? ? ? ? ? ? ?runStateAtLeast(ctl.get(), STOP))) && ?? ? ? ? ? ? ? ? ? ?!wt.isInterrupted()) ?? ? ? ? ? ? ? ?wt.interrupt(); ?? ? ? ? ? ?try { ?? ? ? ? ? ? ? ?beforeExecute(wt, task); ?? ? ? ? ? ? ? ?Throwable thrown = null; ?? ? ? ? ? ? ? ?try { ?? ? ? ? ? ? ? ? ? ?task.run(); ?? ? ? ? ? ? ? ?} catch (RuntimeException x) { ?? ? ? ? ? ? ? ? ? ?thrown = x; throw x; ?? ? ? ? ? ? ? ?} catch (Error x) { ?? ? ? ? ? ? ? ? ? ?thrown = x; throw x; ?? ? ? ? ? ? ? ?} catch (Throwable x) { ?? ? ? ? ? ? ? ? ? ?thrown = x; throw new Error(x); ?? ? ? ? ? ? ? ?} finally { ?? ? ? ? ? ? ? ? ? ?afterExecute(task, thrown); ?? ? ? ? ? ? ? ?} ?? ? ? ? ? ?} finally { ?? ? ? ? ? ? ? ?task = null; ?? ? ? ? ? ? ? ?w.completedTasks++; ?? ? ? ? ? ? ? ?w.unlock(); ?? ? ? ? ? ?} ?? ? ? ?} ?? ? ? ?completedAbruptly = false; ?? ?} finally { ?? ? ? ?// Worker執(zhí)行任務(wù)發(fā)生異常或者從getTask()中獲取任務(wù)為空時(shí)會(huì)執(zhí)行這里的邏輯 ?? ? ? ?// processWorkerExit()會(huì)將Worker從Worker集合中刪除,并嘗試終止線程池 ?? ? ? ?processWorkerExit(w, completedAbruptly); ?? ?} }

runWorker() 方法就是先讓Worker將初始任務(wù)(如果有的話)執(zhí)行完,然后循環(huán)從任務(wù)阻塞隊(duì)列中獲取任務(wù)來(lái)執(zhí)行,如果Worker執(zhí)行任務(wù)發(fā)生異常或者從任務(wù)阻塞隊(duì)列獲取任務(wù)失?。ǐ@取到的任務(wù)為null),則調(diào)用processWorkerExit() 方法來(lái)將自身從Worker集合中刪除。下面先看一下getTask() 方法的實(shí)現(xiàn)。

java復(fù)制代碼private Runnable getTask() { ?? ?boolean timedOut = false; ? ? ?for (;;) { ?? ? ? ?int c = ctl.get(); ?? ? ? ?int rs = runStateOf(c); ? ? ? ? ?// 如果線程池狀態(tài)為SHUTDOWN,且任務(wù)阻塞隊(duì)列為空,則不再允許從任務(wù)阻塞隊(duì)列中獲取任務(wù) ?? ? ? ?// 如果線程池狀態(tài)為STOP,則不再允許從任務(wù)阻塞隊(duì)列中獲取任務(wù) ?? ? ? ?if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { ?? ? ? ? ? ?decrementWorkerCount(); ?? ? ? ? ? ?return null; ?? ? ? ?} ? ? ? ? ?int wc = workerCountOf(c); ? ? ? ? ?// 如果allowCoreThreadTimeOut為true,或者當(dāng)前線程數(shù)大于核心線程數(shù),此時(shí)timed為true,表明從任務(wù)阻塞隊(duì)列以超時(shí)退出的方式獲取任務(wù) ?? ? ? ?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; ? ? ? ? ?// 如果當(dāng)前線程數(shù)大于最大線程數(shù),則當(dāng)前Worker應(yīng)該被刪除 ?? ? ? ?// 如果當(dāng)前Worker上一次從任務(wù)阻塞隊(duì)列中獲取任務(wù)時(shí)超時(shí)退出,且任務(wù)阻塞隊(duì)列現(xiàn)在還是為空,則當(dāng)前Worker應(yīng)該被刪除 ?? ? ? ?if ((wc > maximumPoolSize || (timed && timedOut)) ?? ? ? ? ? ? ? ?&& (wc > 1 || workQueue.isEmpty())) { ?? ? ? ? ? ?if (compareAndDecrementWorkerCount(c)) ?? ? ? ? ? ? ? ?return null; ?? ? ? ? ? ?continue; ?? ? ? ?} ? ? ? ? ?try { ?? ? ? ? ? ?// 從任務(wù)阻塞隊(duì)列中獲取任務(wù) ?? ? ? ? ? ?Runnable r = timed ? ?? ? ? ? ? ? ? ? ? ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : ?? ? ? ? ? ? ? ? ? ?workQueue.take(); ?? ? ? ? ? ?if (r != null) ?? ? ? ? ? ? ? ?// 獲取到任務(wù)則返回該任務(wù) ?? ? ? ? ? ? ? ?return r; ?? ? ? ? ? ?// timedOut為true表明Worker上一次從任務(wù)阻塞隊(duì)列中獲取任務(wù)時(shí)超時(shí)退出 ?? ? ? ? ? ?timedOut = true; ?? ? ? ?} catch (InterruptedException retry) { ?? ? ? ? ? ?timedOut = false; ?? ? ? ?} ?? ?} }

getTask() 方法在如下情況不允許Worker從任務(wù)阻塞隊(duì)列中獲取任務(wù)。

  • 線程池狀態(tài)為SHUTDOWN,且任務(wù)阻塞隊(duì)列為空;

  • 線程池狀態(tài)為STOP。

如果Worker有資格從任務(wù)阻塞隊(duì)列獲取任務(wù),那么當(dāng)allowCoreThreadTimeOuttrue,或者當(dāng)前線程數(shù)大于核心線程數(shù)時(shí),Worker以超時(shí)退出的方式獲取任務(wù),否則Worker以一直阻塞的方式獲取任務(wù)。

當(dāng)WorkergetTask() 方法中獲取任務(wù)失敗時(shí),getTask() 方法會(huì)返回null,從而導(dǎo)致Worker會(huì)執(zhí)行processWorkerExit() 方法來(lái)刪除自身,其實(shí)現(xiàn)如下所示。

java復(fù)制代碼private void processWorkerExit(Worker w, boolean completedAbruptly) { ?? ?// completedAbruptly為true表明是執(zhí)行任務(wù)時(shí)發(fā)生異常導(dǎo)致Worker需要被刪除 ?? ?if (completedAbruptly) ?? ? ? ?// 修正Worker數(shù)量 ?? ? ? ?decrementWorkerCount(); ? ? ?final ReentrantLock mainLock = this.mainLock; ?? ?mainLock.lock(); ?? ?try { ?? ? ? ?completedTaskCount += w.completedTasks; ?? ? ? ?// 將Worker從Worker集合中刪除 ?? ? ? ?workers.remove(w); ?? ?} finally { ?? ? ? ?mainLock.unlock(); ?? ?} ? ? ?// 嘗試終止線程池 ?? ?tryTerminate(); ? ? ?int c = ctl.get(); ?? ?if (runStateLessThan(c, STOP)) { ?? ? ? ?if (!completedAbruptly) { ?? ? ? ? ? ?int min = allowCoreThreadTimeOut ? 0 : corePoolSize; ?? ? ? ? ? ?if (min == 0 && ! workQueue.isEmpty()) ?? ? ? ? ? ? ? ?min = 1; ?? ? ? ? ? ?if (workerCountOf(c) >= min) ?? ? ? ? ? ? ? ?return; ?? ? ? ?} ?? ? ? ?addWorker(null, false); ?? ?} }

WorkerprocessWorkerExit() 方法中刪除自身之后,還會(huì)調(diào)用tryTerminate() 嘗試終止線程池,tryTerminate() 方法很精髓,后面會(huì)對(duì)其進(jìn)行詳細(xì)分析,這里暫且不談。至此,Worker的創(chuàng)建,執(zhí)行任務(wù),獲取任務(wù)和刪除的整個(gè)流程已經(jīng)大體分析完畢。

對(duì)于執(zhí)行任務(wù),現(xiàn)在簡(jiǎn)單進(jìn)行一個(gè)小結(jié)

ThreadPoolExecutor執(zhí)行任務(wù),第一步是根據(jù)Worker數(shù)量來(lái)決定是新建Worker來(lái)執(zhí)行任務(wù)還是將任務(wù)添加到任務(wù)阻塞隊(duì)列,這里的判斷規(guī)則如下。

  1. 如果Worker數(shù)量小于核心線程數(shù),則創(chuàng)建Worker來(lái)執(zhí)行任務(wù);

  2. 如果Worker數(shù)量大于等于核心線程數(shù),則將任務(wù)添加到任務(wù)阻塞隊(duì)列;

  3. 如果任務(wù)阻塞隊(duì)列已滿,則創(chuàng)建Worker來(lái)執(zhí)行任務(wù);

  4. 如果Worker數(shù)量已經(jīng)達(dá)到最大線程數(shù),此時(shí)執(zhí)行任務(wù)拒絕策略。

當(dāng)要新建Worker來(lái)執(zhí)行任務(wù)時(shí),只有兩種情況可以新建Worker,如下所示。

  1. 線程池狀態(tài)為RUNNING,可以創(chuàng)建Worker;

  2. 線程池狀態(tài)為SHUTDOWN,且任務(wù)阻塞隊(duì)列不為空,可以創(chuàng)建初始任務(wù)為nullWorker

Worker自身實(shí)現(xiàn)了Runnable,且Worker持有一個(gè)線程,當(dāng)Worker啟動(dòng)時(shí),就是啟動(dòng)Worker持有的線程,而這個(gè)線程執(zhí)行的任務(wù)就是Worker自身。

Worker啟動(dòng)后,會(huì)首先執(zhí)行自己的初始任務(wù),然后再去任務(wù)阻塞隊(duì)列中獲取任務(wù)。

四. 關(guān)閉線程池源碼分析

不再使用的線程池,可以進(jìn)行關(guān)閉。關(guān)閉ThreadPoolExecutor的方法有shutdown()shutdownNow(),本節(jié)將對(duì)ThreadPoolExecutor的關(guān)閉進(jìn)行分析。

1. shutdown()

首先分析shutdown() 方法,其實(shí)現(xiàn)如下。

java復(fù)制代碼public void shutdown() { ?? ?final ReentrantLock mainLock = this.mainLock; ?? ?mainLock.lock(); ?? ?try { ?? ? ? ?checkShutdownAccess(); ?? ? ? ?// 循環(huán)通過(guò)CAS方式將線程池狀態(tài)置為SHUTDOWN ?? ? ? ?advanceRunState(SHUTDOWN); ?? ? ? ?// 中斷空閑Worker ?? ? ? ?interruptIdleWorkers(); ?? ? ? ?onShutdown(); ?? ?} finally { ?? ? ? ?mainLock.unlock(); ?? ?} ?? ?// 嘗試終止線程池 ?? ?tryTerminate(); }

shutdown() 方法中首先會(huì)將線程池狀態(tài)置為SHUTDOWN,然后調(diào)用interruptIdleWorkers() 方法中斷空閑Worker,最后調(diào)用tryTerminate() 方法來(lái)嘗試終止線程池。那么這里要解釋一下什么是空閑Worker,先看一下interruptIdleWorkers() 的實(shí)現(xiàn)。

java復(fù)制代碼private void interruptIdleWorkers() { ?? ?interruptIdleWorkers(false); } ?private void interruptIdleWorkers(boolean onlyOne) { ?? ?final ReentrantLock mainLock = this.mainLock; ?? ?mainLock.lock(); ?? ?try { ?? ? ? ?for (Worker w : workers) { ?? ? ? ? ? ?Thread t = w.thread; ?? ? ? ? ? ?// 中斷線程前需要先嘗試獲取Worker的鎖 ?? ? ? ? ? ?// 只能獲取到空閑Worker的鎖,所以shutdown()方法只會(huì)中斷空閑Worker ?? ? ? ? ? ?if (!t.isInterrupted() && w.tryLock()) { ?? ? ? ? ? ? ? ?try { ?? ? ? ? ? ? ? ? ? ?t.interrupt(); ?? ? ? ? ? ? ? ?} catch (SecurityException ignore) { ?? ? ? ? ? ? ? ?} finally { ?? ? ? ? ? ? ? ? ? ?w.unlock(); ?? ? ? ? ? ? ? ?} ?? ? ? ? ? ?} ?? ? ? ? ? ?if (onlyOne) ?? ? ? ? ? ? ? ?break; ?? ? ? ?} ?? ?} finally { ?? ? ? ?mainLock.unlock(); ?? ?} }

調(diào)用interruptIdleWorkers() 方法中斷Worker前首先需要嘗試獲取Worker的鎖,已知Worker除了實(shí)現(xiàn)Runnable接口外,還繼承于AbstractQueuedSynchronizer,因此Worker本身是一把鎖,然后在runWorker()Worker執(zhí)行任務(wù)前都會(huì)先獲取Worker的鎖,這里看一下Workerlock() 方法的實(shí)現(xiàn)。

java復(fù)制代碼public void lock() { ?? ?acquire(1); } ?protected boolean tryAcquire(int unused) { ?? ?// 以CAS方式將state從0設(shè)置為1 ?? ?if (compareAndSetState(0, 1)) { ?? ? ? ?setExclusiveOwnerThread(Thread.currentThread()); ?? ? ? ?return true; ?? ?} ?? ?return false; }

可以發(fā)現(xiàn),Workerlock() 中調(diào)用了acquire() 方法,該方法由AbstractQueuedSynchronizer抽象類(lèi)提供,在acquire() 中會(huì)調(diào)用其子類(lèi)實(shí)現(xiàn)的tryAcquire() 方法,tryAcquire() 方法會(huì)以CAS方式將state從0設(shè)置為1,因此這樣的設(shè)計(jì)讓Worker是一把不可重入鎖。

回到interruptIdleWorkers() 方法,前面提到該方法中斷Worker前會(huì)嘗試獲取Worker的鎖,能夠獲取到鎖才會(huì)中斷Worker,而因?yàn)?strong>Worker是不可重入鎖,所以正在執(zhí)行任務(wù)的Worker是無(wú)法獲取到鎖的,只有那些沒(méi)有執(zhí)行任務(wù)的Worker的鎖才能夠被獲取,因此所謂的中斷空閑Worker,實(shí)際就是中斷沒(méi)有執(zhí)行任務(wù)的Worker,那些執(zhí)行任務(wù)的Workershutdown() 方法被調(diào)用時(shí)不會(huì)被中斷,這些Worker執(zhí)行完任務(wù)后會(huì)繼續(xù)從任務(wù)阻塞隊(duì)列中獲取任務(wù)來(lái)執(zhí)行,直到任務(wù)阻塞隊(duì)列為空,此時(shí)沒(méi)有被中斷過(guò)的Worker也會(huì)被刪除掉,等到線程池中沒(méi)有Worker以及任務(wù)阻塞隊(duì)列沒(méi)有任務(wù)后,線程池才會(huì)被終止掉。

對(duì)于shutdown() 方法,一句話總結(jié)就是:將線程池狀態(tài)置為SHUTDOWN并拒絕接受新任務(wù),等到線程池Worker數(shù)量為0,任務(wù)阻塞隊(duì)列為空時(shí),關(guān)閉線程池。

2. shutdownNow()

現(xiàn)在再來(lái)分析shutdownNow() 方法。

java復(fù)制代碼public List<Runnable> shutdownNow() { ?? ?List<Runnable> tasks; ?? ?final ReentrantLock mainLock = this.mainLock; ?? ?mainLock.lock(); ?? ?try { ?? ? ? ?checkShutdownAccess(); ?? ? ? ?// 循環(huán)通過(guò)CAS方式將線程池狀態(tài)置為STOP ?? ? ? ?advanceRunState(STOP); ?? ? ? ?// 中斷所有Worker ?? ? ? ?interruptWorkers(); ?? ? ? ?// 將任務(wù)阻塞隊(duì)列中的任務(wù)獲取出來(lái)并返回 ?? ? ? ?tasks = drainQueue(); ?? ?} finally { ?? ? ? ?mainLock.unlock(); ?? ?} ?? ?// 嘗試終止線程池 ?? ?tryTerminate(); ?? ?return tasks; } ?private void interruptWorkers() { ?? ?final ReentrantLock mainLock = this.mainLock; ?? ?mainLock.lock(); ?? ?try { ?? ? ? ?// 中斷線程池中所有Worker ?? ? ? ?for (Worker w : workers) ?? ? ? ? ? ?w.interruptIfStarted(); ?? ?} finally { ?? ? ? ?mainLock.unlock(); ?? ?} }

shutdownNow() 方法中首先會(huì)將線程池狀態(tài)置為STOP,然后調(diào)用interruptWorkers() 方法中斷線程池中的所有Worker,接著調(diào)用tryTerminate() 方法來(lái)嘗試終止線程池,最后shutdownNow() 方法會(huì)將任務(wù)阻塞隊(duì)列中還未被執(zhí)行的任務(wù)返回。

shutdownNow() 方法調(diào)用之后,線程池中的所有Worker都會(huì)被中斷,包括正在執(zhí)行任務(wù)的Worker,等到所有Worker都被刪除之后,線程池即被終止,也就是說(shuō),shutdownNow() 不會(huì)保證當(dāng)前時(shí)刻正在執(zhí)行的任務(wù)會(huì)被安全的執(zhí)行完,并且會(huì)放棄執(zhí)行任務(wù)阻塞隊(duì)列中的所有任務(wù)。

3. tryTerminate()

關(guān)于線程池的關(guān)閉,還有一個(gè)重要的方法,那就是前面多次提到的tryTerminate() 方法,該方法能確保線程池可以被正確的關(guān)閉,其實(shí)現(xiàn)如下所示。

java復(fù)制代碼final void tryTerminate() { ?? ?for (;;) { ?? ? ? ?int c = ctl.get(); ?? ? ? ?// 如果線程池狀態(tài)為RUNNING,則沒(méi)有資格終止線程池 ?? ? ? ?// 如果線程池狀態(tài)大于等于TIDYING,則沒(méi)有資格終止線程池 ?? ? ? ?// 如果線程池狀態(tài)為SHUTDOWN但任務(wù)阻塞隊(duì)列不為空,則沒(méi)有資格終止線程池 ?? ? ? ?if (isRunning(c) || ?? ? ? ? ? ? ? ?runStateAtLeast(c, TIDYING) || ?? ? ? ? ? ? ? ?(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) ?? ? ? ? ? ?return; ?? ? ? ?// 線程池狀態(tài)為SHUTDOWN且任務(wù)阻塞隊(duì)列為空會(huì)執(zhí)行到這里 ?? ? ? ?// 線程池狀態(tài)為STOP會(huì)執(zhí)行到這里 ?? ? ? ?// Worker數(shù)量不為0,表明當(dāng)前還有正在執(zhí)行任務(wù)的Worker或者空閑的Worker,此時(shí)中斷一個(gè)空閑的Worker ?? ? ? ?// 在這里被中斷的空閑Worker會(huì)在getTask()方法中返回null,從而執(zhí)行processWorkerExit(),最終該Worker會(huì)被刪除 ?? ? ? ?// processWorkerExit()方法中又會(huì)調(diào)用tryTerminate(),因此將shutdown信號(hào)在空閑Worker之間進(jìn)行了傳播 ?? ? ? ?if (workerCountOf(c) != 0) { ?? ? ? ? ? ?interruptIdleWorkers(ONLY_ONE); ?? ? ? ? ? ?return; ?? ? ? ?} ? ? ? ? ?final ReentrantLock mainLock = this.mainLock; ?? ? ? ?mainLock.lock(); ?? ? ? ?try { ?? ? ? ? ? ?// 將線程池狀態(tài)置為T(mén)IDYING ?? ? ? ? ? ?if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { ?? ? ? ? ? ? ? ?try { ?? ? ? ? ? ? ? ? ? ?// 終止線程池 ?? ? ? ? ? ? ? ? ? ?terminated(); ?? ? ? ? ? ? ? ?} finally { ?? ? ? ? ? ? ? ? ? ?// 將線程池狀態(tài)最終置為T(mén)ERMINATED ?? ? ? ? ? ? ? ? ? ?ctl.set(ctlOf(TERMINATED, 0)); ?? ? ? ? ? ? ? ? ? ?termination.signalAll(); ?? ? ? ? ? ? ? ?} ?? ? ? ? ? ? ? ?return; ?? ? ? ? ? ?} ?? ? ? ?} finally { ?? ? ? ? ? ?mainLock.unlock(); ?? ? ? ?} ?? ?} }

tryTerminate() 方法的官方注釋中給出了兩種線程池會(huì)被終止的情況:

  • 線程池的狀態(tài)為SHUTDOWN,Worker數(shù)量為0,任務(wù)阻塞隊(duì)列為空;

  • 線程池的狀態(tài)為STOP,Worker數(shù)量為0。

官方注釋中還說(shuō)明在所有可能導(dǎo)致線程池終止的操作中都應(yīng)該調(diào)用tryTerminate() 方法來(lái)嘗試終止線程池,因此線程池中Worker被刪除時(shí)任務(wù)阻塞隊(duì)列中任務(wù)被刪除時(shí)會(huì)調(diào)用tryTerminate(),以達(dá)到在線程池符合終止條件時(shí)及時(shí)終止線程池。

4. 小結(jié)

對(duì)于關(guān)閉線程池,簡(jiǎn)單小結(jié)如下。

關(guān)閉ThreadPoolExecutor有兩種方式,如下所示。

  1. shutdown()。調(diào)用shutdown() 方法會(huì)首先將線程池狀態(tài)置為SHUTDOWN并拒絕接受新任務(wù),然后中斷空閑Worker,等到線程池中Worker數(shù)量為0,任務(wù)阻塞隊(duì)列為空時(shí),線程池被真正關(guān)閉;

  2. shutdownNow()。調(diào)用shutdownNow() 方法會(huì)首先將線程池狀態(tài)置為STOP,然后中斷所有Worker(包括正在執(zhí)行任務(wù)的Worker),并將任務(wù)阻塞隊(duì)列中還未被執(zhí)行的任務(wù)返回,當(dāng)線程池Worker數(shù)量為0時(shí),線程池被真正關(guān)閉。

還有一點(diǎn)需要說(shuō)明,Worker除了實(shí)現(xiàn)Runnable接口外,還繼承于AbstractQueuedSynchronizer,因此Worker本身是一把鎖,Worker執(zhí)行任務(wù)前都會(huì)先獲取Worker的鎖,所以正在執(zhí)行任務(wù)的Worker的鎖是無(wú)法被獲取的,換言之,只有沒(méi)有執(zhí)行任務(wù)的Worker的鎖才能被獲取,這些Worker就稱為空閑Worker。

一文搞懂ThreadPoolExecutor原理的評(píng)論 (共 條)

分享到微博請(qǐng)遵守國(guó)家法律
永新县| 辽宁省| 澄江县| 山西省| 府谷县| 改则县| 炉霍县| 衡阳县| 宁明县| 青岛市| 商丘市| 武隆县| 潜山县| 岫岩| 岳阳市| 安图县| 丁青县| 上饶市| 理塘县| 嘉峪关市| 夏邑县| 南和县| 嘉荫县| 江安县| 通辽市| 丹江口市| 阿坝| 婺源县| 遂川县| 将乐县| 银川市| 福安市| 江安县| 横峰县| 夏津县| 石狮市| 会理县| 林芝县| 望城县| 聊城市| 九台市|