在 Java 5 之后,并發(fā)編程引入了一堆新的啟動、調(diào)度和管理線程的API。Executor 框架便是 Java 5 中引入的,其內(nèi)部使用了線程池機(jī)制,它在 java.util.cocurrent 包下,通過該框架來控制線程的啟動、執(zhí)行和關(guān)閉,可以簡化并發(fā)編程的操作。因此,在 Java 5之后,通過 Executor 來啟動線程比使用 Thread 的 start 方法更好,除了更易管理,效率更好(用線程池實(shí)現(xiàn),節(jié)約開銷)外,還有關(guān)鍵的一點(diǎn):有助于避免 this 逃逸問題——如果我們在構(gòu)造器中啟動一個線程,因?yàn)榱硪粋€任務(wù)可能會在構(gòu)造器結(jié)束之前開始執(zhí)行,此時可能會訪問到初始化了一半的對象用 Executor 在構(gòu)造器中。
Executor 框架包括:線程池,Executor,Executors,ExecutorService,CompletionService,F(xiàn)uture,Callable 等。
Executor 接口中之定義了一個方法 execute(Runnable command),該方法接收一個 Runable 實(shí)例,它用來執(zhí)行一個任務(wù),任務(wù)即一個實(shí)現(xiàn)了 Runnable 接口的類。ExecutorService 接口繼承自 Executor 接口,它提供了更豐富的實(shí)現(xiàn)多線程的方法,比如,ExecutorService 提供了關(guān)閉自己的方法,以及可為跟蹤一個或多個異步任務(wù)執(zhí)行狀況而生成 Future 的方法。 可以調(diào)用 ExecutorService 的 shutdown()方法來平滑地關(guān)閉 ExecutorService,調(diào)用該方法后,將導(dǎo)致 ExecutorService 停止接受任何新的任務(wù)且等待已經(jīng)提交的任務(wù)執(zhí)行完成(已經(jīng)提交的任務(wù)會分兩類:一類是已經(jīng)在執(zhí)行的,另一類是還沒有開始執(zhí)行的),當(dāng)所有已經(jīng)提交的任務(wù)執(zhí)行完畢后將會關(guān)閉 ExecutorService。因此我們一般用該接口來實(shí)現(xiàn)和管理多線程。
ExecutorService 的生命周期包括三種狀態(tài):運(yùn)行、關(guān)閉、終止。創(chuàng)建后便進(jìn)入運(yùn)行狀態(tài),當(dāng)調(diào)用了 shutdown()方法時,便進(jìn)入關(guān)閉狀態(tài),此時意味著 ExecutorService 不再接受新的任務(wù),但它還在執(zhí)行已經(jīng)提交了的任務(wù),當(dāng)素有已經(jīng)提交了的任務(wù)執(zhí)行完后,便到達(dá)終止?fàn)顟B(tài)。如果不調(diào)用 shutdown()方法,ExecutorService 會一直處在運(yùn)行狀態(tài),不斷接收新的任務(wù),執(zhí)行新的任務(wù),服務(wù)器端一般不需要關(guān)閉它,保持一直運(yùn)行即可。
Executors 提供了一系列工廠方法用于創(chuàng)先線程池,返回的線程池都實(shí)現(xiàn)了 ExecutorService 接口。
public static ExecutorService newFixedThreadPool(int nThreads) 創(chuàng)建固定數(shù)目線程的線程池。
public static ExecutorService newCachedThreadPool() 創(chuàng)建一個可緩存的線程池,調(diào)用execute將重用以前構(gòu)造的線程(如果線程可用)。如果現(xiàn)有線程沒有可用的,則創(chuàng)建一個新線 程并添加到池中。終止并從緩存中移除那些已有 60 秒鐘未被使用的線程。
public static ExecutorService newSingleThreadExecutor() 創(chuàng)建一個單線程化的Executor。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) 創(chuàng)建一個支持定時及周期性的任務(wù)執(zhí)行的線程池,多數(shù)情況下可用來替代Timer類。
這四種方法都是用的 Executors 中的 ThreadFactory 建立的線程,下面就以上四個方法做個比較:
newCachedThreadPool()
能 reuse 的線程,必須是 timeout IDLE 內(nèi)的池中線程,缺省 timeout 是 60s,超過這個 IDLE 時長,線程實(shí)例將被終止及移出池。
注意,放入 CachedThreadPool 的線程不必?fù)?dān)心其結(jié)束,超過 TIMEOUT 不活動,其會自動被終止。
newFixedThreadPool(int)
newScheduledThreadPool(int)
SingleThreadExecutor()
一般來說,CachedTheadPool 在程序執(zhí)行過程中通常會創(chuàng)建與所需數(shù)量相同的線程,然后在它回收舊線程時停止創(chuàng)建新線程,因此它是合理的 Executor 的首選,只有當(dāng)這種方式會引發(fā)問題時(比如需要大量長時間面向連接的線程時),才需要考慮用 FixedThreadPool。(該段話摘自《Thinking in Java》第四版)
通過 Executors 的以上四個靜態(tài)工廠方法獲得 ExecutorService 實(shí)例,而后調(diào)用該實(shí)例的 execute(Runnable command)方法即可。一旦 Runnable 任務(wù)傳遞到 execute()方法,該方法便會自動在一個線程上執(zhí)行。下面是 Executor 執(zhí)行 Runnable 任務(wù)的示例代碼:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestCachedThreadPool{
public static void main(String[] args){
ExecutorService executorService = Executors.newCachedThreadPool();
// ExecutorService executorService = Executors.newFixedThreadPool(5);
// ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++){
executorService.execute(new TestRunnable());
System.out.println("************* a" + i + " *************");
}
executorService.shutdown();
}
}
class TestRunnable implements Runnable{
public void run(){
System.out.println(Thread.currentThread().getName() + "線程被調(diào)用了。");
}
}
執(zhí)行后的結(jié)果如下:
http://wiki.jikexueyuan.com/project/java-concurrency/images/executor.jpg" alt="" />
從結(jié)果中可以看出,pool-1-thread-1 和 pool-1-thread-2 均被調(diào)用了兩次,這是隨機(jī)的,execute 會首先在線程池中選擇一個已有空閑線程來執(zhí)行任務(wù),如果線程池中沒有空閑線程,它便會創(chuàng)建一個新的線程來執(zhí)行任務(wù)。
在 Java 5 之后,任務(wù)分兩類:一類是實(shí)現(xiàn)了 Runnable 接口的類,一類是實(shí)現(xiàn)了 Callable 接口的類。兩者都可以被 ExecutorService 執(zhí)行,但是 Runnable 任務(wù)沒有返回值,而 Callable 任務(wù)有返回值。并且 Callable 的 call()方法只能通過 ExecutorService 的 submit(Callable
Callable 接口類似于 Runnable,兩者都是為那些其實(shí)例可能被另一個線程執(zhí)行的類設(shè)計(jì)的。但是 Runnable 不會返回結(jié)果,并且無法拋出經(jīng)過檢查的異常而 Callable 又返回結(jié)果,而且當(dāng)獲取返回結(jié)果時可能會拋出異常。Callable 中的 call()方法類似 Runnable 的 run()方法,區(qū)別同樣是有返回值,后者沒有。
當(dāng)將一個 Callable 的對象傳遞給 ExecutorService 的 submit 方法,則該 call 方法自動在一個線程上執(zhí)行,并且會返回執(zhí)行結(jié)果 Future 對象。同樣,將 Runnable 的對象傳遞給 ExecutorService 的 submit 方法,則該 run 方法自動在一個線程上執(zhí)行,并且會返回執(zhí)行結(jié)果 Future 對象,但是在該 Future 對象上調(diào)用 get 方法,將返回 null。
下面給出一個 Executor 執(zhí)行 Callable 任務(wù)的示例代碼:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class CallableDemo{
public static void main(String[] args){
ExecutorService executorService = Executors.newCachedThreadPool();
List<Future<String>> resultList = new ArrayList<Future<String>>();
//創(chuàng)建10個任務(wù)并執(zhí)行
for (int i = 0; i < 10; i++){
//使用ExecutorService執(zhí)行Callable類型的任務(wù),并將結(jié)果保存在future變量中
Future<String> future = executorService.submit(new TaskWithResult(i));
//將任務(wù)執(zhí)行結(jié)果存儲到List中
resultList.add(future);
}
//遍歷任務(wù)的結(jié)果
for (Future<String> fs : resultList){
try{
while(!fs.isDone);//Future返回如果沒有完成,則一直循環(huán)等待,直到Future返回完成
System.out.println(fs.get()); //打印各個線程(任務(wù))執(zhí)行的結(jié)果
}catch(InterruptedException e){
e.printStackTrace();
}catch(ExecutionException e){
e.printStackTrace();
}finally{
//啟動一次順序關(guān)閉,執(zhí)行以前提交的任務(wù),但不接受新任務(wù)
executorService.shutdown();
}
}
}
}
class TaskWithResult implements Callable<String>{
private int id;
public TaskWithResult(int id){
this.id = id;
}
/**
* 任務(wù)的具體過程,一旦任務(wù)傳給ExecutorService的submit方法,
* 則該方法自動在一個線程上執(zhí)行
*/
public String call() throws Exception {
System.out.println("call()方法被自動調(diào)用?。?! " + Thread.currentThread().getName());
//該返回結(jié)果將被Future的get方法得到
return "call()方法被自動調(diào)用,任務(wù)返回的結(jié)果是:" + id + " " + Thread.currentThread().getName();
}
}
執(zhí)行結(jié)果如下:
http://wiki.jikexueyuan.com/project/java-concurrency/images/executor1.jpg" alt="" />
從結(jié)果中可以同樣可以看出,submit 也是首先選擇空閑線程來執(zhí)行任務(wù),如果沒有,才會創(chuàng)建新的線程來執(zhí)行任務(wù)。另外,需要注意:如果 Future 的返回尚未完成,則 get()方法會阻塞等待,直到 Future 完成返回,可以通過調(diào)用 isDone()方法判斷 Future 是否完成了返回。
自定義線程池,可以用 ThreadPoolExecutor 類創(chuàng)建,它有多個構(gòu)造方法來創(chuàng)建線程池,用該類很容易實(shí)現(xiàn)自定義的線程池,這里先貼上示例程序:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolTest{
public static void main(String[] args){
//創(chuàng)建等待隊(duì)列
BlockingQueue<Runnable> bqueue = new ArrayBlockingQueue<Runnable>(20);
//創(chuàng)建線程池,池中保存的線程數(shù)為3,允許的最大線程數(shù)為5
ThreadPoolExecutor pool = new ThreadPoolExecutor(3,5,50,TimeUnit.MILLISECONDS,bqueue);
//創(chuàng)建七個任務(wù)
Runnable t1 = new MyThread();
Runnable t2 = new MyThread();
Runnable t3 = new MyThread();
Runnable t4 = new MyThread();
Runnable t5 = new MyThread();
Runnable t6 = new MyThread();
Runnable t7 = new MyThread();
//每個任務(wù)會在一個線程上執(zhí)行
pool.execute(t1);
pool.execute(t2);
pool.execute(t3);
pool.execute(t4);
pool.execute(t5);
pool.execute(t6);
pool.execute(t7);
//關(guān)閉線程池
pool.shutdown();
}
}
class MyThread implements Runnable{
@Override
public void run(){
System.out.println(Thread.currentThread().getName() + "正在執(zhí)行。。。");
try{
Thread.sleep(100);
}catch(InterruptedException e){
e.printStackTrace();
}
}
}
運(yùn)行結(jié)果如下:
http://wiki.jikexueyuan.com/project/java-concurrency/images/threadpoolexecutor.jpg" alt="" />
從結(jié)果中可以看出,七個任務(wù)是在線程池的三個線程上執(zhí)行的。這里簡要說明下用到的 ThreadPoolExecuror 類的構(gòu)造方法中各個參數(shù)的含義。
public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue)
corePoolSize:線程池中所保存的核心線程數(shù),包括空閑線程。
maximumPoolSize:池中允許的最大線程數(shù)。
keepAliveTime:線程池中的空閑線程所能持續(xù)的最長時間。
unit:持續(xù)時間的單位。
根據(jù) ThreadPoolExecutor 源碼前面大段的注釋,我們可以看出,當(dāng)試圖通過 excute 方法講一個 Runnable 任務(wù)添加到線程池中時,按照如下順序來處理:
如果線程池中的線程數(shù)量少于 corePoolSize,即使線程池中有空閑線程,也會創(chuàng)建一個新的線程來執(zhí)行新添加的任務(wù);
如果線程池中的線程數(shù)量大于等于 corePoolSize,但緩沖隊(duì)列 workQueue 未滿,則將新添加的任務(wù)放到 workQueue 中,按照 FIFO 的原則依次等待執(zhí)行(線程池中有線程空閑出來后依次將緩沖隊(duì)列中的任務(wù)交付給空閑的線程執(zhí)行);
如果線程池中的線程數(shù)量大于等于 corePoolSize,且緩沖隊(duì)列 workQueue 已滿,但線程池中的線程數(shù)量小于 maximumPoolSize,則會創(chuàng)建新的線程來處理被添加的任務(wù);
總結(jié)起來,也即是說,當(dāng)有新的任務(wù)要處理時,先看線程池中的線程數(shù)量是否大于 corePoolSize,再看緩沖隊(duì)列 workQueue 是否滿,最后看線程池中的線程數(shù)量是否大于 maximumPoolSize。
另外,當(dāng)線程池中的線程數(shù)量大于 corePoolSize 時,如果里面有線程的空閑時間超過了 keepAliveTime,就將其移除線程池,這樣,可以動態(tài)地調(diào)整線程池中線程的數(shù)量。
我們大致來看下 Executors 的源碼,newCachedThreadPool 的不帶 RejectedExecutionHandler 參數(shù)(即第五個參數(shù),線程數(shù)量超過 maximumPoolSize 時,指定處理方式)的構(gòu)造方法如下:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
它將 corePoolSize 設(shè)定為 0,而將 maximumPoolSize 設(shè)定為了 Integer 的最大值,線程空閑超過 60 秒,將會從線程池中移除。由于核心線程數(shù)為 0,因此每次添加任務(wù),都會先從線程池中找空閑線程,如果沒有就會創(chuàng)建一個線程(SynchronousQueue
再來看 newFixedThreadPool 的不帶 RejectedExecutionHandler 參數(shù)的構(gòu)造方法,如下:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
它將 corePoolSize 和 maximumPoolSize 都設(shè)定為了 nThreads,這樣便實(shí)現(xiàn)了線程池的大小的固定,不會動態(tài)地?cái)U(kuò)大,另外,keepAliveTime 設(shè)定為了 0,也就是說線程只要空閑下來,就會被移除線程池,敢于 LinkedBlockingQueue 下面會說。
直接提交。緩沖隊(duì)列采用 SynchronousQueue,它將任務(wù)直接交給線程處理而不保持它們。如果不存在可用于立即運(yùn)行任務(wù)的線程(即線程池中的線程都在工作),則試圖把任務(wù)加入緩沖隊(duì)列將會失敗,因此會構(gòu)造一個新的線程來處理新添加的任務(wù),并將其加入到線程池中。直接提交通常要求無界 maximumPoolSizes(Integer.MAX_VALUE) 以避免拒絕新提交的任務(wù)。newCachedThreadPool 采用的便是這種策略。
無界隊(duì)列。使用無界隊(duì)列(典型的便是采用預(yù)定義容量的 LinkedBlockingQueue,理論上是該緩沖隊(duì)列可以對無限多的任務(wù)排隊(duì))將導(dǎo)致在所有 corePoolSize 線程都工作的情況下將新任務(wù)加入到緩沖隊(duì)列中。這樣,創(chuàng)建的線程就不會超過 corePoolSize,也因此,maximumPoolSize 的值也就無效了。當(dāng)每個任務(wù)完全獨(dú)立于其他任務(wù),即任務(wù)執(zhí)行互不影響時,適合于使用無界隊(duì)列。newFixedThreadPool采用的便是這種策略。