Semaphore(信號量)是一個線程同步結(jié)構(gòu),用于在線程間傳遞信號,以避免出現(xiàn)信號丟失(譯者注:下文會具體介紹),或者像鎖一樣用于保護一個關(guān)鍵區(qū)域。自從 5.0 開始,jdk 在 java.util.concurrent 包里提供了 Semaphore 的官方實現(xiàn),因此大家不需要自己去實現(xiàn) Semaphore。但是還是很有必要去熟悉如何使用 Semaphore 及其背后的原理
本文的涉及的主題如下:
下面是一個信號量的簡單實現(xiàn):
public class Semaphore {
private boolean signal = false;
public synchronized void take() {
this.signal = true;
this.notify();
}
public synchronized void release() throws InterruptedException{
while(!this.signal) wait();
this.signal = false;
}
}
Take 方法發(fā)出一個被存放在 Semaphore 內(nèi)部的信號,而 Release 方法則等待一個信號,當(dāng)其接收到信號后,標(biāo)記位 signal 被清空,然后該方法終止。
使用這個 semaphore 可以避免錯失某些信號通知。用 take 方法來代替 notify,release 方法來代替 wait。如果某線程在調(diào)用 release 等待之前調(diào)用 take 方法,那么調(diào)用 release 方法的線程仍然知道 take 方法已經(jīng)被某個線程調(diào)用過了,因為該 Semaphore 內(nèi)部保存了 take 方法發(fā)出的信號。而 wait 和 notify 方法就沒有這樣的功能。
當(dāng)用 semaphore 來產(chǎn)生信號時,take 和 release 這兩個方法名看起來有點奇怪。這兩個名字來源于后面把 semaphore 當(dāng)做鎖的例子,后面會詳細介紹這個例子,在該例子中,take 和 release 這兩個名字會變得很合理。
下面的例子中,兩個線程通過 Semaphore 發(fā)出的信號來通知對方
Semaphore semaphore = new Semaphore();
SendingThread sender = new SendingThread(semaphore);
ReceivingThread receiver = new ReceivingThread(semaphore);
receiver.start();
sender.start();
public class SendingThread {
Semaphore semaphore = null;
public SendingThread(Semaphore semaphore){
this.semaphore = semaphore;
}
public void run(){
while(true){
//do something, then signal
this.semaphore.take();
}
}
}
public class RecevingThread {
Semaphore semaphore = null;
public ReceivingThread(Semaphore semaphore){
this.semaphore = semaphore;
}
public void run(){
while(true){
this.semaphore.release();
//receive signal, then do something...
}
}
}
上面提到的 Semaphore 的簡單實現(xiàn)并沒有計算通過調(diào)用 take 方法所產(chǎn)生信號的數(shù)量??梢园阉脑斐删哂杏嫈?shù)功能的 Semaphore。下面是一個可計數(shù)的 Semaphore 的簡單實現(xiàn)。
public class CountingSemaphore {
private int signals = 0;
public synchronized void take() {
this.signals++;
this.notify();
}
public synchronized void release() throws InterruptedException{
while(this.signals == 0) wait();
this.signals--;
}
}
上面的 CountingSemaphore 并沒有限制信號的數(shù)量。下面的代碼將 CountingSemaphore 改造成一個信號數(shù)量有上限的 BoundedSemaphore。
public class BoundedSemaphore {
private int signals = 0;
private int bound = 0;
public BoundedSemaphore(int upperBound){
this.bound = upperBound;
}
public synchronized void take() throws InterruptedException{
while(this.signals == bound) wait();
this.signals++;
this.notify();
}
public synchronized void release() throws InterruptedException{
while(this.signals == 0) wait();
this.signals--;
this.notify();
}
}
在 BoundedSemaphore 中,當(dāng)已經(jīng)產(chǎn)生的信號數(shù)量達到了上限,take 方法將阻塞新的信號產(chǎn)生請求,直到某個線程調(diào)用 release 方法后,被阻塞于 take 方法的線程才能傳遞自己的信號。
當(dāng)信號量的數(shù)量上限是 1 時,Semaphore 可以被當(dāng)做鎖來使用。通過 take 和 release 方法來保護關(guān)鍵區(qū)域。請看下面的例子:
BoundedSemaphore semaphore = new BoundedSemaphore(1);
...
semaphore.take();
try{
//critical section
} finally {
semaphore.release();
}
在前面的例子中,Semaphore 被用來在多個線程之間傳遞信號,這種情況下,take 和 release 分別被不同的線程調(diào)用。但是在鎖這個例子中,take 和 release 方法將被同一線程調(diào)用,因為只允許一個線程來獲取信號(允許進入關(guān)鍵區(qū)域的信號),其它調(diào)用 take 方法獲取信號的線程將被阻塞,知道第一個調(diào)用 take 方法的線程調(diào)用 release 方法來釋放信號。對 release 方法的調(diào)用永遠不會被阻塞,這是因為任何一個線程都是先調(diào)用 take 方法,然后再調(diào)用 release。
通過有上限的 Semaphore 可以限制進入某代碼塊的線程數(shù)量。設(shè)想一下,在上面的例子中,如果 BoundedSemaphore 上限設(shè)為 5 將會發(fā)生什么?意味著允許 5 個線程同時訪問關(guān)鍵區(qū)域,但是你必須保證,這個 5 個線程不會互相沖突。否則你的應(yīng)用程序?qū)⒉荒苷_\行。
必須注意,release 方法應(yīng)當(dāng)在 finally 塊中被執(zhí)行。這樣可以保在關(guān)鍵區(qū)域的代碼拋出異常的情況下,信號也一定會被釋放。