鍍金池/ 教程/ Java/ 并發(fā)新特性—Executor 框架與線程池
并發(fā)新特性—信號量 Semaphore
線程間協(xié)作:wait、notify、notifyAll
notify 通知的遺漏
notifyAll 造成的早期通知問題
多線程的實(shí)現(xiàn)方法
深入 Java 內(nèi)存模型(1)
多線程環(huán)境下安全使用集合 API
并發(fā)新特性—Lock 鎖與條件變量
生產(chǎn)者—消費(fèi)者模型
深入 Java 內(nèi)存模型(2)
線程中斷
Volatile 關(guān)鍵字(上)
并發(fā)新特性—阻塞隊(duì)列與阻塞棧
可重入內(nèi)置鎖
守護(hù)線程與線程阻塞
并發(fā)新特性—障礙器 CyclicBarrier
Volatile 關(guān)鍵字(下)
synchronized 關(guān)鍵字
synchronized 的另個一重要作用:內(nèi)存可見性
并發(fā)新特性—Executor 框架與線程池
并發(fā)性與多線程介紹
死鎖
實(shí)現(xiàn)內(nèi)存可見性的兩種方法比較:synchronized 和 Volatile
線程掛起、恢復(fù)與終止

并發(fā)新特性—Executor 框架與線程池

Executor 框架簡介

在 Java 5 之后,并發(fā)編程引入了一堆新的啟動、調(diào)度和管理線程的API。Executor 框架便是 Java 5 中引入的,其內(nèi)部使用了線程池機(jī)制,它在 java.util.cocurrent 包下,通過該框架來控制線程的啟動、執(zhí)行和關(guān)閉,可以簡化并發(fā)編程的操作。因此,在 Java 5之后,通過 Executor 來啟動線程比使用 Thread 的 start 方法更好,除了更易管理,效率更好(用線程池實(shí)現(xiàn),節(jié)約開銷)外,還有關(guān)鍵的一點(diǎn):有助于避免 this 逃逸問題——如果我們在構(gòu)造器中啟動一個線程,因?yàn)榱硪粋€任務(wù)可能會在構(gòu)造器結(jié)束之前開始執(zhí)行,此時可能會訪問到初始化了一半的對象用 Executor 在構(gòu)造器中。

Executor 框架包括:線程池,Executor,Executors,ExecutorService,CompletionService,F(xiàn)uture,Callable 等。

Executor 接口中之定義了一個方法 execute(Runnable command),該方法接收一個 Runable 實(shí)例,它用來執(zhí)行一個任務(wù),任務(wù)即一個實(shí)現(xiàn)了 Runnable 接口的類。ExecutorService 接口繼承自 Executor 接口,它提供了更豐富的實(shí)現(xiàn)多線程的方法,比如,ExecutorService 提供了關(guān)閉自己的方法,以及可為跟蹤一個或多個異步任務(wù)執(zhí)行狀況而生成 Future 的方法。 可以調(diào)用 ExecutorService 的 shutdown()方法來平滑地關(guān)閉 ExecutorService,調(diào)用該方法后,將導(dǎo)致 ExecutorService 停止接受任何新的任務(wù)且等待已經(jīng)提交的任務(wù)執(zhí)行完成(已經(jīng)提交的任務(wù)會分兩類:一類是已經(jīng)在執(zhí)行的,另一類是還沒有開始執(zhí)行的),當(dāng)所有已經(jīng)提交的任務(wù)執(zhí)行完畢后將會關(guān)閉 ExecutorService。因此我們一般用該接口來實(shí)現(xiàn)和管理多線程。

ExecutorService 的生命周期包括三種狀態(tài):運(yùn)行、關(guān)閉、終止。創(chuàng)建后便進(jìn)入運(yùn)行狀態(tài),當(dāng)調(diào)用了 shutdown()方法時,便進(jìn)入關(guān)閉狀態(tài),此時意味著 ExecutorService 不再接受新的任務(wù),但它還在執(zhí)行已經(jīng)提交了的任務(wù),當(dāng)素有已經(jīng)提交了的任務(wù)執(zhí)行完后,便到達(dá)終止?fàn)顟B(tài)。如果不調(diào)用 shutdown()方法,ExecutorService 會一直處在運(yùn)行狀態(tài),不斷接收新的任務(wù),執(zhí)行新的任務(wù),服務(wù)器端一般不需要關(guān)閉它,保持一直運(yùn)行即可。

Executors 提供了一系列工廠方法用于創(chuàng)先線程池,返回的線程池都實(shí)現(xiàn)了 ExecutorService 接口。

public static ExecutorService newFixedThreadPool(int nThreads)
創(chuàng)建固定數(shù)目線程的線程池。

public static ExecutorService newCachedThreadPool()
創(chuàng)建一個可緩存的線程池,調(diào)用execute將重用以前構(gòu)造的線程(如果線程可用)。如果現(xiàn)有線程沒有可用的,則創(chuàng)建一個新線 程并添加到池中。終止并從緩存中移除那些已有 60 秒鐘未被使用的線程。

public static ExecutorService newSingleThreadExecutor()
創(chuàng)建一個單線程化的Executor。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
創(chuàng)建一個支持定時及周期性的任務(wù)執(zhí)行的線程池,多數(shù)情況下可用來替代Timer類。

這四種方法都是用的 Executors 中的 ThreadFactory 建立的線程,下面就以上四個方法做個比較:

newCachedThreadPool()

  • 緩存型池子,先查看池中有沒有以前建立的線程,如果有,就 reuse 如果沒有,就建一個新的線程加入池中
  • 緩存型池子通常用于執(zhí)行一些生存期很短的異步型任務(wù) 因此在一些面向連接的 daemon 型 SERVER 中用得不多。但對于生存期短的異步任務(wù),它是 Executor 的首選。
  • 能 reuse 的線程,必須是 timeout IDLE 內(nèi)的池中線程,缺省 timeout 是 60s,超過這個 IDLE 時長,線程實(shí)例將被終止及移出池。

    注意,放入 CachedThreadPool 的線程不必?fù)?dān)心其結(jié)束,超過 TIMEOUT 不活動,其會自動被終止。

newFixedThreadPool(int)

  • newFixedThreadPool 與 cacheThreadPool 差不多,也是能 reuse 就用,但不能隨時建新的線程。
  • 其獨(dú)特之處:任意時間點(diǎn),最多只能有固定數(shù)目的活動線程存在,此時如果有新的線程要建立,只能放在另外的隊(duì)列中等待,直到當(dāng)前的線程中某個線程終止直接被移出池子。
  • 和 cacheThreadPool 不同,F(xiàn)ixedThreadPool 沒有 IDLE 機(jī)制(可能也有,但既然文檔沒提,肯定非常長,類似依賴上層的 TCP 或 UDP IDLE 機(jī)制之類的),所以 FixedThreadPool 多數(shù)針對一些很穩(wěn)定很固定的正規(guī)并發(fā)線程,多用于服務(wù)器。
  • 從方法的源代碼看,cache池和fixed 池調(diào)用的是同一個底層 池,只不過參數(shù)不同:
    • fixed 池線程數(shù)固定,并且是0秒IDLE(無IDLE)。
    • cache 池線程數(shù)支持 0-Integer.MAX_VALUE(顯然完全沒考慮主機(jī)的資源承受能力),60 秒 IDLE 。

newScheduledThreadPool(int)

  • 調(diào)度型線程池
  • 這個池子里的線程可以按 schedule 依次 delay 執(zhí)行,或周期執(zhí)行

SingleThreadExecutor()

  • 單例線程,任意時間池中只能有一個線程
  • 用的是和 cache 池和 fixed 池相同的底層池,但線程數(shù)目是 1-1,0 秒 IDLE(無 IDLE)

一般來說,CachedTheadPool 在程序執(zhí)行過程中通常會創(chuàng)建與所需數(shù)量相同的線程,然后在它回收舊線程時停止創(chuàng)建新線程,因此它是合理的 Executor 的首選,只有當(dāng)這種方式會引發(fā)問題時(比如需要大量長時間面向連接的線程時),才需要考慮用 FixedThreadPool。(該段話摘自《Thinking in Java》第四版)

Executor 執(zhí)行 Runnable 任務(wù)

通過 Executors 的以上四個靜態(tài)工廠方法獲得 ExecutorService 實(shí)例,而后調(diào)用該實(shí)例的 execute(Runnable command)方法即可。一旦 Runnable 任務(wù)傳遞到 execute()方法,該方法便會自動在一個線程上執(zhí)行。下面是 Executor 執(zhí)行 Runnable 任務(wù)的示例代碼:

import java.util.concurrent.ExecutorService;   
import java.util.concurrent.Executors;   

public class TestCachedThreadPool{   
    public static void main(String[] args){   
        ExecutorService executorService = Executors.newCachedThreadPool();   
//      ExecutorService executorService = Executors.newFixedThreadPool(5);  
//      ExecutorService executorService = Executors.newSingleThreadExecutor();  
        for (int i = 0; i < 5; i++){   
            executorService.execute(new TestRunnable());   
            System.out.println("************* a" + i + " *************");   
        }   
        executorService.shutdown();   
    }   
}   

class TestRunnable implements Runnable{   
    public void run(){   
        System.out.println(Thread.currentThread().getName() + "線程被調(diào)用了。");   
    }   
}  

執(zhí)行后的結(jié)果如下:

http://wiki.jikexueyuan.com/project/java-concurrency/images/executor.jpg" alt="" />

從結(jié)果中可以看出,pool-1-thread-1 和 pool-1-thread-2 均被調(diào)用了兩次,這是隨機(jī)的,execute 會首先在線程池中選擇一個已有空閑線程來執(zhí)行任務(wù),如果線程池中沒有空閑線程,它便會創(chuàng)建一個新的線程來執(zhí)行任務(wù)。

Executor 執(zhí)行 Callable 任務(wù)

在 Java 5 之后,任務(wù)分兩類:一類是實(shí)現(xiàn)了 Runnable 接口的類,一類是實(shí)現(xiàn)了 Callable 接口的類。兩者都可以被 ExecutorService 執(zhí)行,但是 Runnable 任務(wù)沒有返回值,而 Callable 任務(wù)有返回值。并且 Callable 的 call()方法只能通過 ExecutorService 的 submit(Callable task) 方法來執(zhí)行,并且返回一個 Future,是表示任務(wù)等待完成的 Future。

Callable 接口類似于 Runnable,兩者都是為那些其實(shí)例可能被另一個線程執(zhí)行的類設(shè)計(jì)的。但是 Runnable 不會返回結(jié)果,并且無法拋出經(jīng)過檢查的異常而 Callable 又返回結(jié)果,而且當(dāng)獲取返回結(jié)果時可能會拋出異常。Callable 中的 call()方法類似 Runnable 的 run()方法,區(qū)別同樣是有返回值,后者沒有。

當(dāng)將一個 Callable 的對象傳遞給 ExecutorService 的 submit 方法,則該 call 方法自動在一個線程上執(zhí)行,并且會返回執(zhí)行結(jié)果 Future 對象。同樣,將 Runnable 的對象傳遞給 ExecutorService 的 submit 方法,則該 run 方法自動在一個線程上執(zhí)行,并且會返回執(zhí)行結(jié)果 Future 對象,但是在該 Future 對象上調(diào)用 get 方法,將返回 null。

下面給出一個 Executor 執(zhí)行 Callable 任務(wù)的示例代碼:

import java.util.ArrayList;   
import java.util.List;   
import java.util.concurrent.*;   

public class CallableDemo{   
    public static void main(String[] args){   
        ExecutorService executorService = Executors.newCachedThreadPool();   
        List<Future<String>> resultList = new ArrayList<Future<String>>();   

        //創(chuàng)建10個任務(wù)并執(zhí)行   
        for (int i = 0; i < 10; i++){   
            //使用ExecutorService執(zhí)行Callable類型的任務(wù),并將結(jié)果保存在future變量中   
            Future<String> future = executorService.submit(new TaskWithResult(i));   
            //將任務(wù)執(zhí)行結(jié)果存儲到List中   
            resultList.add(future);   
        }   

        //遍歷任務(wù)的結(jié)果   
        for (Future<String> fs : resultList){   
                try{   
                    while(!fs.isDone);//Future返回如果沒有完成,則一直循環(huán)等待,直到Future返回完成  
                    System.out.println(fs.get());     //打印各個線程(任務(wù))執(zhí)行的結(jié)果   
                }catch(InterruptedException e){   
                    e.printStackTrace();   
                }catch(ExecutionException e){   
                    e.printStackTrace();   
                }finally{   
                    //啟動一次順序關(guān)閉,執(zhí)行以前提交的任務(wù),但不接受新任務(wù)  
                    executorService.shutdown();   
                }   
        }   
    }   
}   

class TaskWithResult implements Callable<String>{   
    private int id;   

    public TaskWithResult(int id){   
        this.id = id;   
    }   

    /**  
     * 任務(wù)的具體過程,一旦任務(wù)傳給ExecutorService的submit方法, 
     * 則該方法自動在一個線程上執(zhí)行 
     */   
    public String call() throws Exception {  
        System.out.println("call()方法被自動調(diào)用?。?!    " + Thread.currentThread().getName());   
        //該返回結(jié)果將被Future的get方法得到  
        return "call()方法被自動調(diào)用,任務(wù)返回的結(jié)果是:" + id + "    " + Thread.currentThread().getName();   
    }   
}  

執(zhí)行結(jié)果如下:

http://wiki.jikexueyuan.com/project/java-concurrency/images/executor1.jpg" alt="" />

從結(jié)果中可以同樣可以看出,submit 也是首先選擇空閑線程來執(zhí)行任務(wù),如果沒有,才會創(chuàng)建新的線程來執(zhí)行任務(wù)。另外,需要注意:如果 Future 的返回尚未完成,則 get()方法會阻塞等待,直到 Future 完成返回,可以通過調(diào)用 isDone()方法判斷 Future 是否完成了返回。

自定義線程池

自定義線程池,可以用 ThreadPoolExecutor 類創(chuàng)建,它有多個構(gòu)造方法來創(chuàng)建線程池,用該類很容易實(shí)現(xiàn)自定義的線程池,這里先貼上示例程序:

import java.util.concurrent.ArrayBlockingQueue;   
import java.util.concurrent.BlockingQueue;   
import java.util.concurrent.ThreadPoolExecutor;   
import java.util.concurrent.TimeUnit;   

public class ThreadPoolTest{   
    public static void main(String[] args){   
        //創(chuàng)建等待隊(duì)列   
        BlockingQueue<Runnable> bqueue = new ArrayBlockingQueue<Runnable>(20);   
        //創(chuàng)建線程池,池中保存的線程數(shù)為3,允許的最大線程數(shù)為5  
        ThreadPoolExecutor pool = new ThreadPoolExecutor(3,5,50,TimeUnit.MILLISECONDS,bqueue);   
        //創(chuàng)建七個任務(wù)   
        Runnable t1 = new MyThread();   
        Runnable t2 = new MyThread();   
        Runnable t3 = new MyThread();   
        Runnable t4 = new MyThread();   
        Runnable t5 = new MyThread();   
        Runnable t6 = new MyThread();   
        Runnable t7 = new MyThread();   
        //每個任務(wù)會在一個線程上執(zhí)行  
        pool.execute(t1);   
        pool.execute(t2);   
        pool.execute(t3);   
        pool.execute(t4);   
        pool.execute(t5);   
        pool.execute(t6);   
        pool.execute(t7);   
        //關(guān)閉線程池   
        pool.shutdown();   
    }   
}   

class MyThread implements Runnable{   
    @Override   
    public void run(){   
        System.out.println(Thread.currentThread().getName() + "正在執(zhí)行。。。");   
        try{   
            Thread.sleep(100);   
        }catch(InterruptedException e){   
            e.printStackTrace();   
        }   
    }   
}  

運(yùn)行結(jié)果如下:

http://wiki.jikexueyuan.com/project/java-concurrency/images/threadpoolexecutor.jpg" alt="" />

從結(jié)果中可以看出,七個任務(wù)是在線程池的三個線程上執(zhí)行的。這里簡要說明下用到的 ThreadPoolExecuror 類的構(gòu)造方法中各個參數(shù)的含義。

public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long         keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue)
  • corePoolSize:線程池中所保存的核心線程數(shù),包括空閑線程。

  • maximumPoolSize:池中允許的最大線程數(shù)。

  • keepAliveTime:線程池中的空閑線程所能持續(xù)的最長時間。

  • unit:持續(xù)時間的單位。

  • workQueue:任務(wù)執(zhí)行前保存任務(wù)的隊(duì)列,僅保存由 execute 方法提交的 Runnable 任務(wù)。

根據(jù) ThreadPoolExecutor 源碼前面大段的注釋,我們可以看出,當(dāng)試圖通過 excute 方法講一個 Runnable 任務(wù)添加到線程池中時,按照如下順序來處理:

  1. 如果線程池中的線程數(shù)量少于 corePoolSize,即使線程池中有空閑線程,也會創(chuàng)建一個新的線程來執(zhí)行新添加的任務(wù);

  2. 如果線程池中的線程數(shù)量大于等于 corePoolSize,但緩沖隊(duì)列 workQueue 未滿,則將新添加的任務(wù)放到 workQueue 中,按照 FIFO 的原則依次等待執(zhí)行(線程池中有線程空閑出來后依次將緩沖隊(duì)列中的任務(wù)交付給空閑的線程執(zhí)行);

  3. 如果線程池中的線程數(shù)量大于等于 corePoolSize,且緩沖隊(duì)列 workQueue 已滿,但線程池中的線程數(shù)量小于 maximumPoolSize,則會創(chuàng)建新的線程來處理被添加的任務(wù);

  4. 如果線程池中的線程數(shù)量等于了 maximumPoolSize,有 4 種才處理方式(該構(gòu)造方法調(diào)用了含有 5 個參數(shù)的構(gòu)造方法,并將最后一個構(gòu)造方法為 RejectedExecutionHandler 類型,它在處理線程溢出時有 4 種方式,這里不再細(xì)說,要了解的,自己可以閱讀下源碼)。

總結(jié)起來,也即是說,當(dāng)有新的任務(wù)要處理時,先看線程池中的線程數(shù)量是否大于 corePoolSize,再看緩沖隊(duì)列 workQueue 是否滿,最后看線程池中的線程數(shù)量是否大于 maximumPoolSize。

另外,當(dāng)線程池中的線程數(shù)量大于 corePoolSize 時,如果里面有線程的空閑時間超過了 keepAliveTime,就將其移除線程池,這樣,可以動態(tài)地調(diào)整線程池中線程的數(shù)量。

我們大致來看下 Executors 的源碼,newCachedThreadPool 的不帶 RejectedExecutionHandler 參數(shù)(即第五個參數(shù),線程數(shù)量超過 maximumPoolSize 時,指定處理方式)的構(gòu)造方法如下:

public static ExecutorService newCachedThreadPool() {  
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  
                                  60L, TimeUnit.SECONDS,  
                                  new SynchronousQueue<Runnable>());  
}

它將 corePoolSize 設(shè)定為 0,而將 maximumPoolSize 設(shè)定為了 Integer 的最大值,線程空閑超過 60 秒,將會從線程池中移除。由于核心線程數(shù)為 0,因此每次添加任務(wù),都會先從線程池中找空閑線程,如果沒有就會創(chuàng)建一個線程(SynchronousQueue決定的,后面會說)來執(zhí)行新的任務(wù),并將該線程加入到線程池中,而最大允許的線程數(shù)為 Integer 的最大值,因此這個線程池理論上可以不斷擴(kuò)大。

再來看 newFixedThreadPool 的不帶 RejectedExecutionHandler 參數(shù)的構(gòu)造方法,如下:

public static ExecutorService newFixedThreadPool(int nThreads) {  
    return new ThreadPoolExecutor(nThreads, nThreads,  
                                  0L, TimeUnit.MILLISECONDS,  
                                  new LinkedBlockingQueue<Runnable>());  
}  

它將 corePoolSize 和 maximumPoolSize 都設(shè)定為了 nThreads,這樣便實(shí)現(xiàn)了線程池的大小的固定,不會動態(tài)地?cái)U(kuò)大,另外,keepAliveTime 設(shè)定為了 0,也就是說線程只要空閑下來,就會被移除線程池,敢于 LinkedBlockingQueue 下面會說。

幾種排隊(duì)的策略

  • 直接提交。緩沖隊(duì)列采用 SynchronousQueue,它將任務(wù)直接交給線程處理而不保持它們。如果不存在可用于立即運(yùn)行任務(wù)的線程(即線程池中的線程都在工作),則試圖把任務(wù)加入緩沖隊(duì)列將會失敗,因此會構(gòu)造一個新的線程來處理新添加的任務(wù),并將其加入到線程池中。直接提交通常要求無界 maximumPoolSizes(Integer.MAX_VALUE) 以避免拒絕新提交的任務(wù)。newCachedThreadPool 采用的便是這種策略。

  • 無界隊(duì)列。使用無界隊(duì)列(典型的便是采用預(yù)定義容量的 LinkedBlockingQueue,理論上是該緩沖隊(duì)列可以對無限多的任務(wù)排隊(duì))將導(dǎo)致在所有 corePoolSize 線程都工作的情況下將新任務(wù)加入到緩沖隊(duì)列中。這樣,創(chuàng)建的線程就不會超過 corePoolSize,也因此,maximumPoolSize 的值也就無效了。當(dāng)每個任務(wù)完全獨(dú)立于其他任務(wù),即任務(wù)執(zhí)行互不影響時,適合于使用無界隊(duì)列。newFixedThreadPool采用的便是這種策略。

  • 有界隊(duì)列。當(dāng)使用有限的 maximumPoolSizes 時,有界隊(duì)列(一般緩沖隊(duì)列使用 ArrayBlockingQueue,并制定隊(duì)列的最大長度)有助于防止資源耗盡,但是可能較難調(diào)整和控制,隊(duì)列大小和最大池大小需要相互折衷,需要設(shè)定合理的參數(shù)。