NIO 采取通道(Channel)和緩沖區(qū)(Buffer)來(lái)傳輸和保存數(shù)據(jù),它是非阻塞式的 I/O,即在等待連接、讀寫數(shù)據(jù)(這些都是在一線程以客戶端的程序中會(huì)阻塞線程的操作)的時(shí)候,程序也可以做其他事情,以實(shí)現(xiàn)線程的異步操作。
考慮一個(gè)即時(shí)消息服務(wù)器,可能有上千個(gè)客戶端同時(shí)連接到服務(wù)器,但是在任何時(shí)刻只有非常少量的消息需要讀取和分發(fā)(如果采用線程池或者一線程一客戶端方式,則會(huì)非常浪費(fèi)資源),這就需要一種方法能阻塞等待,直到有一個(gè)信道可以進(jìn)行 I/O 操作。NIO 的 Selector 選擇器就實(shí)現(xiàn)了這樣的功能,一個(gè) Selector 實(shí)例可以同時(shí)檢查一組信道的 I/O 狀態(tài),它就類似一個(gè)觀察者,只要我們把需要探知的 SocketChannel 告訴 Selector,我們接著做別的事情,當(dāng)有事件(比如,連接打開(kāi)、數(shù)據(jù)到達(dá)等)發(fā)生時(shí),它會(huì)通知我們,傳回一組 SelectionKey,我們讀取這些 Key,就會(huì)獲得我們剛剛注冊(cè)過(guò)的 SocketChannel,然后,我們從這個(gè) Channel 中讀取數(shù)據(jù),接著我們可以處理這些數(shù)據(jù)。
Selector 內(nèi)部原理實(shí)際是在做一個(gè)對(duì)所注冊(cè)的 Channel 的輪詢?cè)L問(wèn),不斷的輪詢(目前就這一個(gè)算法),一旦輪詢到一個(gè) Channel 有所注冊(cè)的事情發(fā)生,比如數(shù)據(jù)來(lái)了,它就會(huì)讀取 Channel 中的數(shù)據(jù),并對(duì)其進(jìn)行處理。
要使用選擇器,需要?jiǎng)?chuàng)建一個(gè) Selector 實(shí)例,并將其注冊(cè)到想要監(jiān)控的信道上(通過(guò) Channel 的方法實(shí)現(xiàn))。最后調(diào)用選擇器的 select()方法,該方法會(huì)阻塞等待,直到有一個(gè)或多個(gè)信道準(zhǔn)備好了 I/O 操作或等待超時(shí),或另一個(gè)線程調(diào)用了該選擇器的 wakeup()方法。現(xiàn)在,在一個(gè)單獨(dú)的線程中,通過(guò)調(diào)用 select()方法,就能檢查多個(gè)信道是否準(zhǔn)備好進(jìn)行 I/O 操作,由于非阻塞 I/O 的異步特性,在檢查的同時(shí),我們也可以執(zhí)行其他任務(wù)。
傳建一個(gè) Selector 實(shí)例;
將其注冊(cè)到各種信道,并指定每個(gè)信道上感興趣的I/O操作;
重復(fù)執(zhí)行:
調(diào)用一種 select()方法;
獲取選取的鍵列表;
對(duì)于已選鍵集中的每個(gè)鍵:
獲取信道,并從鍵中獲取附件(如果為信道及其相關(guān)的 key 添加了附件的話);
確定準(zhǔn)備就緒的操縱并執(zhí)行,如果是 accept 操作,將接收的信道設(shè)置為非阻塞模式,并注冊(cè)到選擇器;
如果需要,修改鍵的興趣操作集;
與基于多線程的 TCP 客戶端大致相同,只是這里是通過(guò)信道建立的連接,但在等待連接建立及讀寫時(shí),我們可以異步地執(zhí)行其他任務(wù)。
下面給出一個(gè)基于 NIO 的 TCP 通信的 Demo,客戶端發(fā)送一串字符串到服務(wù)端,服務(wù)端將該字符串原原本本地反饋給客戶端。
客戶端代碼及其詳細(xì)注釋如下:
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
public class TCPEchoClientNonblocking {
public static void main(String args[]) throws Exception{
if ((args.length < 2) || (args.length > 3))
throw new IllegalArgumentException("參數(shù)不正確");
//第一個(gè)參數(shù)作為要連接的服務(wù)端的主機(jī)名或IP
String server = args[0];
//第二個(gè)參數(shù)為要發(fā)送到服務(wù)端的字符串
byte[] argument = args[1].getBytes();
//如果有第三個(gè)參數(shù),則作為端口號(hào),如果沒(méi)有,則端口號(hào)設(shè)為7
int servPort = (args.length == 3) ? Integer.parseInt(args[2]) : 7;
//創(chuàng)建一個(gè)信道,并設(shè)為非阻塞模式
SocketChannel clntChan = SocketChannel.open();
clntChan.configureBlocking(false);
//向服務(wù)端發(fā)起連接
if (!clntChan.connect(new InetSocketAddress(server, servPort))){
//不斷地輪詢連接狀態(tài),直到完成連接
while (!clntChan.finishConnect()){
//在等待連接的時(shí)間里,可以執(zhí)行其他任務(wù),以充分發(fā)揮非阻塞IO的異步特性
//這里為了演示該方法的使用,只是一直打印"."
System.out.print(".");
}
}
//為了與后面打印的"."區(qū)別開(kāi)來(lái),這里輸出換行符
System.out.print("\n");
//分別實(shí)例化用來(lái)讀寫的緩沖區(qū)
ByteBuffer writeBuf = ByteBuffer.wrap(argument);
ByteBuffer readBuf = ByteBuffer.allocate(argument.length);
//接收到的總的字節(jié)數(shù)
int totalBytesRcvd = 0;
//每一次調(diào)用read()方法接收到的字節(jié)數(shù)
int bytesRcvd;
//循環(huán)執(zhí)行,直到接收到的字節(jié)數(shù)與發(fā)送的字符串的字節(jié)數(shù)相等
while (totalBytesRcvd < argument.length){
//如果用來(lái)向通道中寫數(shù)據(jù)的緩沖區(qū)中還有剩余的字節(jié),則繼續(xù)將數(shù)據(jù)寫入信道
if (writeBuf.hasRemaining()){
clntChan.write(writeBuf);
}
//如果read()接收到-1,表明服務(wù)端關(guān)閉,拋出異常
if ((bytesRcvd = clntChan.read(readBuf)) == -1){
throw new SocketException("Connection closed prematurely");
}
//計(jì)算接收到的總字節(jié)數(shù)
totalBytesRcvd += bytesRcvd;
//在等待通信完成的過(guò)程中,程序可以執(zhí)行其他任務(wù),以體現(xiàn)非阻塞IO的異步特性
//這里為了演示該方法的使用,同樣只是一直打印"."
System.out.print(".");
}
//打印出接收到的數(shù)據(jù)
System.out.println("Received: " + new String(readBuf.array(), 0, totalBytesRcvd));
//關(guān)閉信道
clntChan.close();
}
}
服務(wù)端用單個(gè)線程監(jiān)控一組信道,代碼如下:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;
public class TCPServerSelector{
//緩沖區(qū)的長(zhǎng)度
private static final int BUFSIZE = 256;
//select方法等待信道準(zhǔn)備好的最長(zhǎng)時(shí)間
private static final int TIMEOUT = 3000;
public static void main(String[] args) throws IOException {
if (args.length < 1){
throw new IllegalArgumentException("Parameter(s): <Port> ...");
}
//創(chuàng)建一個(gè)選擇器
Selector selector = Selector.open();
for (String arg : args){
//實(shí)例化一個(gè)信道
ServerSocketChannel listnChannel = ServerSocketChannel.open();
//將該信道綁定到指定端口
listnChannel.socket().bind(new InetSocketAddress(Integer.parseInt(arg)));
//配置信道為非阻塞模式
listnChannel.configureBlocking(false);
//將選擇器注冊(cè)到各個(gè)信道
listnChannel.register(selector, SelectionKey.OP_ACCEPT);
}
//創(chuàng)建一個(gè)實(shí)現(xiàn)了協(xié)議接口的對(duì)象
TCPProtocol protocol = new EchoSelectorProtocol(BUFSIZE);
//不斷輪詢select方法,獲取準(zhǔn)備好的信道所關(guān)聯(lián)的Key集
while (true){
//一直等待,直至有信道準(zhǔn)備好了I/O操作
if (selector.select(TIMEOUT) == 0){
//在等待信道準(zhǔn)備的同時(shí),也可以異步地執(zhí)行其他任務(wù),
//這里只是簡(jiǎn)單地打印"."
System.out.print(".");
continue;
}
//獲取準(zhǔn)備好的信道所關(guān)聯(lián)的Key集合的iterator實(shí)例
Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator();
//循環(huán)取得集合中的每個(gè)鍵值
while (keyIter.hasNext()){
SelectionKey key = keyIter.next();
//如果服務(wù)端信道感興趣的I/O操作為accept
if (key.isAcceptable()){
protocol.handleAccept(key);
}
//如果客戶端信道感興趣的I/O操作為read
if (key.isReadable()){
protocol.handleRead(key);
}
//如果該鍵值有效,并且其對(duì)應(yīng)的客戶端信道感興趣的I/O操作為write
if (key.isValid() && key.isWritable()) {
protocol.handleWrite(key);
}
//這里需要手動(dòng)從鍵集中移除當(dāng)前的key
keyIter.remove();
}
}
}
}
這里為了使不同協(xié)議都能方便地使用這個(gè)基本的服務(wù)模式,我們把信道中與具體協(xié)議相關(guān)的處理各種 I/O 的操作分離了出來(lái),定義了一個(gè)接口,如下:
import java.nio.channels.SelectionKey;
import java.io.IOException;
/**
*該接口定義了通用TCPSelectorServer類與特定協(xié)議之間的接口,
*它把與具體協(xié)議相關(guān)的處理各種I/O的操作分離了出來(lái),
*以使不同協(xié)議都能方便地使用這個(gè)基本的服務(wù)模式。
*/
public interface TCPProtocol{
//accept I/O形式
void handleAccept(SelectionKey key) throws IOException;
//read I/O形式
void handleRead(SelectionKey key) throws IOException;
//write I/O形式
void handleWrite(SelectionKey key) throws IOException;
}
接口的實(shí)現(xiàn)類代碼如下:
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.ByteBuffer;
import java.io.IOException;
public class EchoSelectorProtocol implements TCPProtocol {
private int bufSize; // 緩沖區(qū)的長(zhǎng)度
public EchoSelectorProtocol(int bufSize){
this.bufSize = bufSize;
}
//服務(wù)端信道已經(jīng)準(zhǔn)備好了接收新的客戶端連接
public void handleAccept(SelectionKey key) throws IOException {
SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept();
clntChan.configureBlocking(false);
//將選擇器注冊(cè)到連接到的客戶端信道,并指定該信道key值的屬性為OP_READ,同時(shí)為該信道指定關(guān)聯(lián)的附件
clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufSize));
}
//客戶端信道已經(jīng)準(zhǔn)備好了從信道中讀取數(shù)據(jù)到緩沖區(qū)
public void handleRead(SelectionKey key) throws IOException{
SocketChannel clntChan = (SocketChannel) key.channel();
//獲取該信道所關(guān)聯(lián)的附件,這里為緩沖區(qū)
ByteBuffer buf = (ByteBuffer) key.attachment();
long bytesRead = clntChan.read(buf);
//如果read()方法返回-1,說(shuō)明客戶端關(guān)閉了連接,那么客戶端已經(jīng)接收到了與自己發(fā)送字節(jié)數(shù)相等的數(shù)據(jù),可以安全地關(guān)閉
if (bytesRead == -1){
clntChan.close();
}else if(bytesRead > 0){
//如果緩沖區(qū)總讀入了數(shù)據(jù),則將該信道感興趣的操作設(shè)置為為可讀可寫
key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
}
//客戶端信道已經(jīng)準(zhǔn)備好了將數(shù)據(jù)從緩沖區(qū)寫入信道
public void handleWrite(SelectionKey key) throws IOException {
//獲取與該信道關(guān)聯(lián)的緩沖區(qū),里面有之前讀取到的數(shù)據(jù)
ByteBuffer buf = (ByteBuffer) key.attachment();
//重置緩沖區(qū),準(zhǔn)備將數(shù)據(jù)寫入信道
buf.flip();
SocketChannel clntChan = (SocketChannel) key.channel();
//將數(shù)據(jù)寫入到信道中
clntChan.write(buf);
if (!buf.hasRemaining()){
//如果緩沖區(qū)中的數(shù)據(jù)已經(jīng)全部寫入了信道,則將該信道感興趣的操作設(shè)置為可讀
key.interestOps(SelectionKey.OP_READ);
}
//為讀入更多的數(shù)據(jù)騰出空間
buf.compact();
}
}
執(zhí)行結(jié)果如下:
http://wiki.jikexueyuan.com/project/java-socket/images/nioresult.png" alt="" />
http://wiki.jikexueyuan.com/project/java-socket/images/nioresult1.png" alt="" />
說(shuō)明:以上的服務(wù)端程序,select()方法第一次能選擇出來(lái)的準(zhǔn)備好的信道都是服務(wù)端信道,其關(guān)聯(lián)鍵值的屬性都為 OP_ACCEPT,亦及有效操作都為 accept,在執(zhí)行 handleAccept 方法時(shí),為取得連接的客戶端信道也進(jìn)行了注冊(cè),屬性為 OP_READ,這樣下次輪詢調(diào)用 select()方法時(shí),便會(huì)檢查到對(duì) read 操作感興趣的客戶端信道(當(dāng)然也有可能有關(guān)聯(lián) accept 操作興趣集的信道),從而調(diào)用 handleRead 方法,在該方法中又注冊(cè)了 OP_WRITE 屬性,那么第三次調(diào)用 select()方法時(shí),便會(huì)檢測(cè)到對(duì) write 操作感興趣的客戶端信道(當(dāng)然也有可能有關(guān)聯(lián) read 操作興趣集的信道),從而調(diào)用 handleWrite 方法。
結(jié)果:從結(jié)果中很明顯地可以看出,服務(wù)器端在等待信道準(zhǔn)備好的時(shí)候,線程沒(méi)有阻塞,而是可以執(zhí)行其他任務(wù),這里只是簡(jiǎn)單的打印".",客戶端在等待連接和等待數(shù)據(jù)讀寫完成的時(shí)候,線程沒(méi)有阻塞,也可以執(zhí)行其他任務(wù),這里也正是簡(jiǎn)單的打印"."。
對(duì)于非阻塞 SocketChannel 來(lái)說(shuō),一旦已經(jīng)調(diào)用 connect()方法發(fā)起連接,底層套接字可能既不是已經(jīng)連接,也不是沒(méi)有連接,而是正在連接。由于底層協(xié)議的工作機(jī)制,套接字可能會(huì)在這個(gè)狀態(tài)一直保持下去,這時(shí)候就需要循環(huán)地調(diào)用 finishConnect()方法來(lái)檢查是否完成連接,在等待連接的同時(shí),線程也可以做其他事情,這便實(shí)現(xiàn)了線程的異步操作。
while(buf.hasRemaining())
channel.write(buf);
任何對(duì) key(信道)所關(guān)聯(lián)的興趣操作集的改變,都只在下次調(diào)用了 select()方法后才會(huì)生效。
selectedKeys()方法返回的鍵集是可修改的,實(shí)際上在兩次調(diào)用 select()方法之間,都必須手動(dòng)將其清空,否則,它就會(huì)在下次調(diào)用 select()方法時(shí)仍然保留在集合中,而且可能會(huì)有無(wú)用的操作來(lái)調(diào)用它,換句話說(shuō),select()方法只會(huì)在已有的所選鍵集上添加鍵,它們不會(huì)創(chuàng)建新的建集。