Java 線程池ThreadPoolExecutor execute & addWorker源碼分析
本文參考自:從Java構建線程的方式 到 線程池ThreadPoolExecutor源碼剖析

本文分成以下幾個部分:
創(chuàng)建線程的方式
ThreadPoolExecutor概述
ThreadPoolExecutor#execute ThreadPoolExecutor#addWorker方法解析

創(chuàng)建線程的方式
繼承自Thread
寫一個類繼承自Thread,并且調用start方法即可開啟線程。這個是老生常談的實現方法了,繼承Thread的時候需要實現run,如果直接調用run的話是無法開啟線程的。調用start最后會調用到native的start0方法,然后如果是Linux的話,底層使用的是Linux的api pthread。關于這點我在這篇文章中談及過,并且說了一下sychronized鎖的實現。有興趣的同學可以看看~
通過FutureTask+Callable實現
然后將創(chuàng)建好的FutureTask的實例放到Thread中執(zhí)行。Callable是有返回值的,我們可以通過get獲取到。get是一個阻塞的,底層的實現我稍微看了一下是基于LockSupport#park實現的阻塞。
使用線程池實現
使用線程池主要類似于這樣:
其實JDK也提供了一些默認的線程池創(chuàng)建方法,但是一般都不推薦使用,因為這些方法可能不符合我們常規(guī)的業(yè)務需求。所以一般都使用手動創(chuàng)建的方式實現。

ThreadExecutorPool概述
重要的常量
ctl這個AtomicInteger是基于自旋鎖+CAS操作實現的自旋鎖我在這篇文章也聊過。Executor本身也有生命周期,根據數值大小排序的生命周期狀態(tài)是:
Running < Shutdown < Stop < Tidying < Terminated
Running:當前的線程池中的線程正常運行,而且線程池接受新的Runnable
Shutdown:在調用了shutdown方法會走到這個狀態(tài),并且此時不接收新的Runnable,但是會將阻塞隊列中的Runnable處理完成
Stop:在調用shutdownNow之后會走到這個狀態(tài),不接收新的Runnable,同時會暫停正在執(zhí)行的線程
Tidying:是一個中間的過渡狀態(tài),可能做一個內容的清理工作等等。
Terminated:在調用terminated方法之后會到這個狀態(tài),線程池結束
我直接拿過來課程中的圖:

拒絕策略
AbortPolicy
直接會拋出異常
CallerRunsPolicy
這塊直接調用了Runnable實現的run方法。run方法會在Excecutor所在的線程中執(zhí)行,所以如果是耗時操作也會出現問題。
DiscardPolicy
直接會放棄,什么都不會做
DiscardOldestPolicy
會嘗試獲取ThreadPoolExecutor中的隊列,然后將隊列頭,也就是最開始的一個出棧。 ?

源碼分析
任務加入線程池是一個這樣的過程:
首先會詢問核心線程是否有沒有分配到的,通常是和核心線程數進行比較。如果核心線程都滿了,就會通過阻塞隊列進行緩沖。如果阻塞隊列都放慢了,就會看非核心線程是否到了最大的線程數,如果達到了最大線程,就會執(zhí)行拒絕策略。
下面我們會通過看execute+addWorker的源碼來還原這個過程。
ThreadPoolExecutor#execute
execute的代碼中首先通過ctl進行位運算的分解獲取當前的工作線程數,優(yōu)先使用核心線程。然后下面會放入到阻塞隊列中,如果阻塞隊列中都放不下,再會看工作線程是否達到最大線程數。如果以上的執(zhí)行都不能放入這個任務,就執(zhí)行拒絕策略。
上面的代碼中可以看到前面兩步,也就是詢問核心線程和詢問阻塞隊列是否放滿,第三步看工作線程是否達到最大線程數是在addWorker中的
ThreadPoolExecutor#addWorker
part1 判斷部分
addWorker的代碼拆分成兩部分來看,第一部分是進行一些條件判斷:
特別是if中的第二個條件有點復雜,傳入firstTask為空的情況是當阻塞隊列中有任務,但是工作線程為0時,一般情況下firstTask肯定不為null。具體三種情況為什么需要返回false的原因我寫在注釋中了~
下面我們來看看執(zhí)行的邏輯
part2 執(zhí)行部分
總的來說addWorker除了進行Worker的構建和添加到Workers之外,還進行了Worker中線程的啟動,這塊是真正執(zhí)行我們定義的邏輯的地方。
我們再來看看Worker:
因為Worker本身也是一個Runnable,所以當調用start的時候會執(zhí)行Woker的run方法,Worker#run調用了runWorker。在runWorker中,我們重寫的run會被執(zhí)行。同時提供了兩個鉤子:beforeExecute和afterExecute,這兩個方法本身是空實現,我們可以自行定義執(zhí)行一些操作。
作為判斷條件的代碼我使用黃色底的字體標記出來了,具體的邏輯就是這樣。

ThreadPoolExecutor#getTask
getTask其實就是從阻塞隊列wokerQueue中獲取task這樣一件事情。
線程池的線程復用邏輯

這塊直接上圖,在addWorker中會執(zhí)行Worker中的Thread#start,我們知道執(zhí)行完成start之后就不能再次調用start。線程池與其說他是復用Thread,不如說他是不斷地向Thread中填充新的Runnable,然后調用run,減少了創(chuàng)建Thread的開銷。我們仔細看看addWorker的核心代碼:
不斷地從workerQueue中取出新的Task,然后執(zhí)行run。如果wokerQueue為空,getTask就會阻塞,等到有了新的Task再執(zhí)行。
總結時間
JDK中提供了一些可以直接啟動線程池的方式,但是我們最好自己寫一個ThreadPoolExecutor進行調整參數。ThreadPoolExecutor有以下幾個核心參數:核心線程數、最大線程數、線程存活時間、阻塞隊列
ThreadPoolExecutor是有5中狀態(tài)的,Running,Shutdown,Stop,Tidying,terminated。
execute比較好理解,我們使用Runnable添加到ThreadPoolExecutor之后,首先會創(chuàng)建核心線程,核心線程其實就是一個標志位為true的Worker。Worker內部有一個Thread,會在addWorker方法中啟動(Thread#start)。但是ThreadPoolExecutor其實并不會立刻【放過】Worker中的Thread。如果后續(xù)有runnable被放到阻塞隊列之后,會從阻塞隊列中讀取。這點其實也是復用機制的關鍵。