Java多線程的調(diào)度

????
有多個(gè)線程,如何控制它們執(zhí)行的先后次序?
? ? ? ? 方法一:設(shè)置線程優(yōu)先級。
? ? ? ? java.lang.Thread 提供了 setPriority(int newPriority) 方法來設(shè)置線程的優(yōu)先級,但線程的優(yōu)先級是無法保障線程的執(zhí)行次序的,優(yōu)先級只是提高了優(yōu)先級高的線程獲取 CPU 資源的概率。也就是說,這個(gè)方法不靠譜。
? ? ? ? 方法二:使用線程合并。
? ? ? ? 使用 java.lang.Thread 的 join() 方法。比如有線程 a,現(xiàn)在當(dāng)前線程想等待 a 執(zhí)行完之后再往下執(zhí)行,那就可以使用 a.join()。一旦線程使用了 a.join(),那么當(dāng)前線程會(huì)一直等待 a 消亡之后才會(huì)繼續(xù)執(zhí)行。什么時(shí)候 a 消亡?a 的 run() 方法執(zhí)行結(jié)束了 a 就消亡了。
? ? ? ? 這個(gè)方法可以有效地進(jìn)行線程調(diào)度,但卻只能局限于等待一個(gè)線程的執(zhí)行調(diào)度。如果要等待 N 個(gè)線程的話,顯然是無能為力了。而且等待線程必須在被等待線程消亡后才得到繼續(xù)執(zhí)行的指令,無法做到兩個(gè)線程真正意義上的并發(fā),靈活性較差。
? ? ? ? 方法三:使用線程通信。
? ? ? ? java.lang.Object 提供了可以進(jìn)行線程間通信的 wait 與 notify 、notifyAll 等方法。每個(gè) Java 對象都有一個(gè)隱性的線程鎖的概念,通過這個(gè)線程鎖的概念我們讓線程間可以進(jìn)行通信,各線程不再埋頭單干。著名的“生產(chǎn)者-消費(fèi)者”模型就是基于這個(gè)原理實(shí)現(xiàn)的。
? ? ? ? 這個(gè)方法也可以有效地進(jìn)行線程調(diào)度,而且也不僅僅局限于等待一個(gè)線程的執(zhí)行調(diào)度,具有很大程度上的靈活性。但操作復(fù)雜,不易控制容易造成混亂,程序維護(hù)起來也不太方便。
? ? ? ? 方法四:使用閉鎖。
? ? ? ? 閉鎖就像一扇門,在先決條件未達(dá)成之前這扇門是閉著的,線程無法通過,先決條件達(dá)成之后,閉鎖打開,線程就可以繼續(xù)執(zhí)行了。java.util.concurrent.CountDownLatch 是一個(gè)很實(shí)用的閉鎖實(shí)現(xiàn),它提供了 countDown() 和 await() 方法達(dá)成線程執(zhí)行隊(duì)列,這個(gè)方法最適合 M 個(gè)線程等待 N 個(gè)線程執(zhí)行結(jié)束再執(zhí)行的情況。首先初始化一個(gè) CountDownLatch 對象,比如 CountDownLatch doneSignal = new CountDownLatch(N);該對象具有 N 作為計(jì)數(shù)閥值,每個(gè)被等待線程通過對 doneSignal 對象的持有,使用 countDown() 可以將 doneSignal 的計(jì)數(shù)閥值減一;每個(gè)等待線程通過對 doneSignal 對象的持有,使用 await() 阻塞當(dāng)前線程,直到 doneSignal 計(jì)數(shù)閥值減為 0,才繼續(xù)往下執(zhí)行。
? ? ? ? 這個(gè)方法也可以有效地進(jìn)行線程調(diào)度,而且比方法三更易于管理,開發(fā)者只需控制好 CountDownLatch 即可。但線程執(zhí)行次序管理相對單一,它只是指出當(dāng)前等待線程的數(shù)量,而且 CountDownLatch 的初始閥值一旦設(shè)置就只能遞減下去,無法重置。如需遞減過程中進(jìn)行閥值的重置可以參考 java.util.concurrent.CyclicBarrier。
? ? ? ? 不管如何,CountDownLatch 對于一定條件下的線程隊(duì)列的達(dá)成還是很有用的。對于復(fù)雜環(huán)境下的線程管理還是卓有成效的。所以熟悉和把握對它的使用還是很有必要的。
? ? ? ? 以下是一個(gè)實(shí)際項(xiàng)目中 CountDownLatch 的使用的例子:
1.???? private?Map<Long,DecryptSignalAndPath>?afterDecryptFilePathMap?=?new?HashMap<Long,DecryptSignalAndPath>();//TODO?注意容器垃圾數(shù)據(jù)的清理工作??
2.???? class?DecryptRunnable?implements?Runnable?{??
3.???? ????private?ServerFileBean?serverFile;??
4.???? ????private?Long?fid;//指向解密文件??
5.???? ????private?CountDownLatch?decryptSignal;??
6.???? ????protected?DecryptRunnable(Long?fid,?ServerFileBean?serverFile,?CountDownLatch?decryptSignal)?{??
7.???? ????????this.fid?=?fid;??
8.???? ????????this.serverFile?=?serverFile;??
9.???? ????????this.decryptSignal?=?decryptSignal;??
10.?? ????}??
11.?? ????@Override??
12.?? ????public?void?run()?{??
13.?? ????????//開始解密??
14.?? ????????String?afterDecryptFilePath?=?null;??
15.?? ????????DecryptSignalAndPath?decryptSignalAndPath?=?new?DecryptSignalAndPath();??
16.?? ????????decryptSignalAndPath.setDecryptSignal(decryptSignal);??
17.?? ????????afterDecryptFilePathMap.put(fid,?decryptSignalAndPath);??
18.?? ????????afterDecryptFilePath?=?decryptFile(serverFile);??
19.?? ????????decryptSignalAndPath.setAfterDecryptFilePath(afterDecryptFilePath);??
20.?? ????????decryptSignal.countDown();//通知所有阻塞的線程??
21.?? ????}??
22.?? ??????
23.?? }??
24.?? class?DecryptSignalAndPath?{??
25.?? ????private?String?afterDecryptFilePath;??
26.?? ????private?CountDownLatch?decryptSignal;??
27.?? ????public?String?getAfterDecryptFilePath()?{??
28.?? ????????return?afterDecryptFilePath;??
29.?? ????}??
30.?? ????public?void?setAfterDecryptFilePath(String?afterDecryptFilePath)?{??
31.?? ????????this.afterDecryptFilePath?=?afterDecryptFilePath;??
32.?? ????}??
33.?? ????public?CountDownLatch?getDecryptSignal()?{??
34.?? ????????return?decryptSignal;??
35.?? ????}??
36.?? ????public?void?setDecryptSignal(CountDownLatch?decryptSignal)?{??
37.?? ????????this.decryptSignal?=?decryptSignal;??
38.?? ????}??
39.?? }??
? ? ? ? 需要先執(zhí)行的,被等待線程在這里加入:
1.???? CountDownLatch?decryptSignal?=?new?CountDownLatch(1);??
2.???? new?Thread(new?DecryptRunnable(fid,?serverFile,?decryptSignal)).start();//無需拿到新線程句柄,由?CountDownLatch?自行跟蹤??
3.???? try?{??
4.???? ????decryptSignal.await();??
5.???? }?catch?(InterruptedException?e)?{??
6.???? ????//?TODO?Auto-generated?catch?block??
7.???? }??
? ? ? ? 需要后執(zhí)行,等待的線程可以這樣加入:
1.???? CountDownLatch?decryptSignal?=?afterDecryptFilePathMap.get(fid).getDecryptSignal();??
2.???? ??
3.???? try?{??
4.???? ????decryptSignal.await();??
5.???? }?catch?(InterruptedException?e)?{??
6.???? ????//?TODO?Auto-generated?catch?block??
7.???? }??
? ? ? ? 當(dāng)然,這也僅僅只是一個(gè)簡單的 CountDownLatch 的使用展示,對于 CountDownLatch 來說有點(diǎn)大材小用了,因?yàn)樗梢詣偃胃鼜?fù)雜的多線程環(huán)境。示例中的案例完全可以使用線程通信進(jìn)行搞定。因?yàn)?CountDownLatch 的閥值初始為 1,所以這里甚至完全可以使用方法二所說的線程的合并進(jìn)行取代。
? ? ? ? 如果讀者覺得以上示例不夠清晰,也可以參考 JDK API 提供的 demo,這個(gè)清晰明了:
1.???? class?Driver2?{?//?...??
2.???? ??void?main()?throws?InterruptedException?{??
3.???? ????CountDownLatch?doneSignal?=?new?CountDownLatch(N);??
4.???? ????Executor?e?=?...??
5.???? ??
6.???? ????for?(int?i?=?0;?i?<?N;?++i)?//?create?and?start?threads??
7.???? ??????e.execute(new?WorkerRunnable(doneSignal,?i));??
8.???? ??
9.???? ????doneSignal.await();???????????//?wait?for?all?to?finish??
10.?? ??}??
11.?? }??
12.?? ??
13.?? class?WorkerRunnable?implements?Runnable?{??
14.?? ??private?final?CountDownLatch?doneSignal;??
15.?? ??private?final?int?i;??
16.?? ??WorkerRunnable(CountDownLatch?doneSignal,?int?i)?{??
17.?? ?????this.doneSignal?=?doneSignal;??
18.?? ?????this.i?=?i;??
19.?? ??}??
20.?? ??public?void?run()?{??
21.?? ?????try?{??
22.?? ???????doWork(i);??
23.?? ???????doneSignal.countDown();??
24.?? ?????}?catch?(InterruptedException?ex)?{}?//?return;??
25.?? ??}??
26.?? ??
27.?? ??void?doWork()?{?...?}??
28.?? }??