鍍金池/ 教程/ Java/ 生產(chǎn)者—消費(fèi)者模型
并發(fā)新特性—信號量 Semaphore
線程間協(xié)作:wait、notify、notifyAll
notify 通知的遺漏
notifyAll 造成的早期通知問題
多線程的實現(xiàn)方法
深入 Java 內(nèi)存模型(1)
多線程環(huán)境下安全使用集合 API
并發(fā)新特性—Lock 鎖與條件變量
生產(chǎn)者—消費(fèi)者模型
深入 Java 內(nèi)存模型(2)
線程中斷
Volatile 關(guān)鍵字(上)
并發(fā)新特性—阻塞隊列與阻塞棧
可重入內(nèi)置鎖
守護(hù)線程與線程阻塞
并發(fā)新特性—障礙器 CyclicBarrier
Volatile 關(guān)鍵字(下)
synchronized 關(guān)鍵字
synchronized 的另個一重要作用:內(nèi)存可見性
并發(fā)新特性—Executor 框架與線程池
并發(fā)性與多線程介紹
死鎖
實現(xiàn)內(nèi)存可見性的兩種方法比較:synchronized 和 Volatile
線程掛起、恢復(fù)與終止

生產(chǎn)者—消費(fèi)者模型

生產(chǎn)者消費(fèi)者問題是線程模型中的經(jīng)典問題:生產(chǎn)者和消費(fèi)者在同一時間段內(nèi)共用同一存儲空間,生產(chǎn)者向空間里生產(chǎn)數(shù)據(jù),而消費(fèi)者取走數(shù)據(jù)。

這里實現(xiàn)如下情況的生產(chǎn)--消費(fèi)模型:

生產(chǎn)者不斷交替地生產(chǎn)兩組數(shù)據(jù)“姓名--1 --> 內(nèi)容--1”,“姓名--2--> 內(nèi)容--2”,消費(fèi)者不斷交替地取得這兩組數(shù)據(jù),這里的“姓名--1”和“姓名--2”模擬為數(shù)據(jù)的名稱,“內(nèi)容--1 ”和“內(nèi)容--2 ”模擬為數(shù)據(jù)的內(nèi)容。

由于本程序中牽扯到線程運(yùn)行的不確定性,因此可能會出現(xiàn)以下問題:

  • 假設(shè)生產(chǎn)者線程剛向數(shù)據(jù)存儲空間添加了數(shù)據(jù)的名稱,還沒有加入該信息的內(nèi)容,程序就切換到了消費(fèi)者線程,消費(fèi)者線程將把信息的名稱和上一個信息的內(nèi)容聯(lián)系在一起;
  • 生產(chǎn)者生產(chǎn)了若干次數(shù)據(jù),消費(fèi)者才開始取數(shù)據(jù),或者是,消費(fèi)者取完一次數(shù)據(jù)后,還沒等生產(chǎn)者放入新的數(shù)據(jù),又重復(fù)取出了已取過的數(shù)據(jù)。

問題 1 很明顯要靠同步來解決,問題 2 則需要線程間通信,生產(chǎn)者線程放入數(shù)據(jù)后,通知消費(fèi)者線程取出數(shù)據(jù),消費(fèi)者線程取出數(shù)據(jù)后,通知生產(chǎn)者線程生產(chǎn)數(shù)據(jù),這里用 wait/notify 機(jī)制來實現(xiàn)。

詳細(xì)的實現(xiàn)代碼如下:

class Info{ // 定義信息類  
    private String name = "name";//定義name屬性,為了與下面set的name屬性區(qū)別開  
    private String content = "content" ;// 定義content屬性,為了與下面set的content屬性區(qū)別開  
    private boolean flag = true ;   // 設(shè)置標(biāo)志位,初始時先生產(chǎn)  
    public synchronized void set(String name,String content){  
        while(!flag){  
            try{  
                super.wait() ;  
            }catch(InterruptedException e){  
                e.printStackTrace() ;  
            }  
        }  
        this.setName(name) ;    // 設(shè)置名稱  
        try{  
            Thread.sleep(300) ;  
        }catch(InterruptedException e){  
            e.printStackTrace() ;  
        }  
        this.setContent(content) ;  // 設(shè)置內(nèi)容  
        flag  = false ; // 改變標(biāo)志位,表示可以取走  
        super.notify();  
    }  
    public synchronized void get(){  
        while(flag){  
            try{  
                super.wait() ;  
            }catch(InterruptedException e){  
                e.printStackTrace() ;  
            }  
        }  
        try{  
            Thread.sleep(300) ;  
        }catch(InterruptedException e){  
            e.printStackTrace() ;  
        }  
        System.out.println(this.getName() +   
            " --> " + this.getContent()) ;  
        flag  = true ;  // 改變標(biāo)志位,表示可以生產(chǎn)  
        super.notify();  
    }  
    public void setName(String name){  
        this.name = name ;  
    }  
    public void setContent(String content){  
        this.content = content ;  
    }  
    public String getName(){  
        return this.name ;  
    }  
    public String getContent(){  
        return this.content ;  
    }  
}  
class Producer implements Runnable{ // 通過Runnable實現(xiàn)多線程  
    private Info info = null ;      // 保存Info引用  
    public Producer(Info info){  
        this.info = info ;  
    }  
    public void run(){  
        boolean flag = true ;   // 定義標(biāo)記位  
        for(int i=0;i<10;i++){  
            if(flag){  
                this.info.set("姓名--1","內(nèi)容--1") ;    // 設(shè)置名稱  
                flag = false ;  
            }else{  
                this.info.set("姓名--2","內(nèi)容--2") ;    // 設(shè)置名稱  
                flag = true ;  
            }  
        }  
    }  
}  
class Consumer implements Runnable{  
    private Info info = null ;  
    public Consumer(Info info){  
        this.info = info ;  
    }  
    public void run(){  
        for(int i=0;i<10;i++){  
            this.info.get() ;  
        }  
    }  
}  
public class ThreadCaseDemo03{  
    public static void main(String args[]){  
        Info info = new Info(); // 實例化Info對象  
        Producer pro = new Producer(info) ; // 生產(chǎn)者  
        Consumer con = new Consumer(info) ; // 消費(fèi)者  
        new Thread(pro).start() ;  
        //啟動了生產(chǎn)者線程后,再啟動消費(fèi)者線程  
        try{  
            Thread.sleep(500) ;  
        }catch(InterruptedException e){  
            e.printStackTrace() ;  
        }  

        new Thread(con).start() ;  
    }  
}  

執(zhí)行結(jié)果如下:

http://wiki.jikexueyuan.com/project/java-concurrency/images/pc.jpg" alt="" />

另外,在 run 方法中,二者循環(huán)的次數(shù)要相同,否則,當(dāng)一方的循環(huán)結(jié)束時,另一方的循環(huán)依然繼續(xù),它會阻塞在 wait()方法處,而等不到對方的 notify 通知。