鍍金池/ 教程/ Java/ 基于 NIO 的 TCP 通信
Java TCP Socket 編程
深入剖析 Socket——TCP 套接字的生命周期
基于線程池的 TCP 服務(wù)器
構(gòu)建和解析自定義協(xié)議消息
Socket 通信中由 read 返回值造成的的死鎖問(wèn)題
基于 NIO 的 TCP 通信
深入剖析 Socket——數(shù)據(jù)傳輸?shù)牡讓訉?shí)現(xiàn)
Socket 簡(jiǎn)介
UDP Socket 編程
Java NIO Socket VS 標(biāo)準(zhǔn) IO Socket
深入剖析 Socket——TCP 通信中由于底層隊(duì)列填滿而造成的死鎖問(wèn)題
應(yīng)用程序協(xié)議中消息的成幀與解析

基于 NIO 的 TCP 通信

NIO 主要原理及使用

NIO 采取通道(Channel)和緩沖區(qū)(Buffer)來(lái)傳輸和保存數(shù)據(jù),它是非阻塞式的 I/O,即在等待連接、讀寫數(shù)據(jù)(這些都是在一線程以客戶端的程序中會(huì)阻塞線程的操作)的時(shí)候,程序也可以做其他事情,以實(shí)現(xiàn)線程的異步操作。

考慮一個(gè)即時(shí)消息服務(wù)器,可能有上千個(gè)客戶端同時(shí)連接到服務(wù)器,但是在任何時(shí)刻只有非常少量的消息需要讀取和分發(fā)(如果采用線程池或者一線程一客戶端方式,則會(huì)非常浪費(fèi)資源),這就需要一種方法能阻塞等待,直到有一個(gè)信道可以進(jìn)行 I/O 操作。NIO 的 Selector 選擇器就實(shí)現(xiàn)了這樣的功能,一個(gè) Selector 實(shí)例可以同時(shí)檢查一組信道的 I/O 狀態(tài),它就類似一個(gè)觀察者,只要我們把需要探知的 SocketChannel 告訴 Selector,我們接著做別的事情,當(dāng)有事件(比如,連接打開(kāi)、數(shù)據(jù)到達(dá)等)發(fā)生時(shí),它會(huì)通知我們,傳回一組 SelectionKey,我們讀取這些 Key,就會(huì)獲得我們剛剛注冊(cè)過(guò)的 SocketChannel,然后,我們從這個(gè) Channel 中讀取數(shù)據(jù),接著我們可以處理這些數(shù)據(jù)。

Selector 內(nèi)部原理實(shí)際是在做一個(gè)對(duì)所注冊(cè)的 Channel 的輪詢?cè)L問(wèn),不斷的輪詢(目前就這一個(gè)算法),一旦輪詢到一個(gè) Channel 有所注冊(cè)的事情發(fā)生,比如數(shù)據(jù)來(lái)了,它就會(huì)讀取 Channel 中的數(shù)據(jù),并對(duì)其進(jìn)行處理。

要使用選擇器,需要?jiǎng)?chuàng)建一個(gè) Selector 實(shí)例,并將其注冊(cè)到想要監(jiān)控的信道上(通過(guò) Channel 的方法實(shí)現(xiàn))。最后調(diào)用選擇器的 select()方法,該方法會(huì)阻塞等待,直到有一個(gè)或多個(gè)信道準(zhǔn)備好了 I/O 操作或等待超時(shí),或另一個(gè)線程調(diào)用了該選擇器的 wakeup()方法。現(xiàn)在,在一個(gè)單獨(dú)的線程中,通過(guò)調(diào)用 select()方法,就能檢查多個(gè)信道是否準(zhǔn)備好進(jìn)行 I/O 操作,由于非阻塞 I/O 的異步特性,在檢查的同時(shí),我們也可以執(zhí)行其他任務(wù)。

基于 NIO 的 TCP 連接的建立步驟

服務(wù)端

  • 傳建一個(gè) Selector 實(shí)例;

  • 將其注冊(cè)到各種信道,并指定每個(gè)信道上感興趣的I/O操作;

  • 重復(fù)執(zhí)行:

    • 調(diào)用一種 select()方法;

    • 獲取選取的鍵列表;

    • 對(duì)于已選鍵集中的每個(gè)鍵:

      • 獲取信道,并從鍵中獲取附件(如果為信道及其相關(guān)的 key 添加了附件的話);

      • 確定準(zhǔn)備就緒的操縱并執(zhí)行,如果是 accept 操作,將接收的信道設(shè)置為非阻塞模式,并注冊(cè)到選擇器;

      • 如果需要,修改鍵的興趣操作集;

      • 從已選鍵集中移除鍵。

客戶端

與基于多線程的 TCP 客戶端大致相同,只是這里是通過(guò)信道建立的連接,但在等待連接建立及讀寫時(shí),我們可以異步地執(zhí)行其他任務(wù)。

基于 NIO 的 TCP 通信 Demo

下面給出一個(gè)基于 NIO 的 TCP 通信的 Demo,客戶端發(fā)送一串字符串到服務(wù)端,服務(wù)端將該字符串原原本本地反饋給客戶端。

客戶端代碼及其詳細(xì)注釋如下:

import java.net.InetSocketAddress;  
import java.net.SocketException;  
import java.nio.ByteBuffer;  
import java.nio.channels.SocketChannel;  

public class TCPEchoClientNonblocking {  
    public static void main(String args[]) throws Exception{  
        if ((args.length < 2) || (args.length > 3))   
        throw new IllegalArgumentException("參數(shù)不正確");  
        //第一個(gè)參數(shù)作為要連接的服務(wù)端的主機(jī)名或IP  
        String server = args[0];   
        //第二個(gè)參數(shù)為要發(fā)送到服務(wù)端的字符串  
        byte[] argument = args[1].getBytes();  
        //如果有第三個(gè)參數(shù),則作為端口號(hào),如果沒(méi)有,則端口號(hào)設(shè)為7  
        int servPort = (args.length == 3) ? Integer.parseInt(args[2]) : 7;  
        //創(chuàng)建一個(gè)信道,并設(shè)為非阻塞模式  
        SocketChannel clntChan = SocketChannel.open();  
        clntChan.configureBlocking(false);  
        //向服務(wù)端發(fā)起連接  
        if (!clntChan.connect(new InetSocketAddress(server, servPort))){  
            //不斷地輪詢連接狀態(tài),直到完成連接  
            while (!clntChan.finishConnect()){  
                //在等待連接的時(shí)間里,可以執(zhí)行其他任務(wù),以充分發(fā)揮非阻塞IO的異步特性  
                //這里為了演示該方法的使用,只是一直打印"."  
                System.out.print(".");    
            }  
        }  
        //為了與后面打印的"."區(qū)別開(kāi)來(lái),這里輸出換行符  
        System.out.print("\n");  
        //分別實(shí)例化用來(lái)讀寫的緩沖區(qū)  
        ByteBuffer writeBuf = ByteBuffer.wrap(argument);  
        ByteBuffer readBuf = ByteBuffer.allocate(argument.length);  
        //接收到的總的字節(jié)數(shù)  
        int totalBytesRcvd = 0;   
        //每一次調(diào)用read()方法接收到的字節(jié)數(shù)  
        int bytesRcvd;   
        //循環(huán)執(zhí)行,直到接收到的字節(jié)數(shù)與發(fā)送的字符串的字節(jié)數(shù)相等  
        while (totalBytesRcvd < argument.length){  
            //如果用來(lái)向通道中寫數(shù)據(jù)的緩沖區(qū)中還有剩余的字節(jié),則繼續(xù)將數(shù)據(jù)寫入信道  
            if (writeBuf.hasRemaining()){  
                clntChan.write(writeBuf);  
            }  
            //如果read()接收到-1,表明服務(wù)端關(guān)閉,拋出異常  
            if ((bytesRcvd = clntChan.read(readBuf)) == -1){  
                throw new SocketException("Connection closed prematurely");  
            }  
            //計(jì)算接收到的總字節(jié)數(shù)  
            totalBytesRcvd += bytesRcvd;  
            //在等待通信完成的過(guò)程中,程序可以執(zhí)行其他任務(wù),以體現(xiàn)非阻塞IO的異步特性  
            //這里為了演示該方法的使用,同樣只是一直打印"."  
            System.out.print(".");   
        }  
        //打印出接收到的數(shù)據(jù)  
        System.out.println("Received: " +  new String(readBuf.array(), 0, totalBytesRcvd));  
        //關(guān)閉信道  
        clntChan.close();  
    }  
}  

服務(wù)端用單個(gè)線程監(jiān)控一組信道,代碼如下:

import java.io.IOException;  
import java.net.InetSocketAddress;  
import java.nio.channels.SelectionKey;  
import java.nio.channels.Selector;  
import java.nio.channels.ServerSocketChannel;  
import java.util.Iterator;  

public class TCPServerSelector{  
    //緩沖區(qū)的長(zhǎng)度  
    private static final int BUFSIZE = 256;   
    //select方法等待信道準(zhǔn)備好的最長(zhǎng)時(shí)間  
    private static final int TIMEOUT = 3000;   
    public static void main(String[] args) throws IOException {  
        if (args.length < 1){  
            throw new IllegalArgumentException("Parameter(s): <Port> ...");  
        }  
        //創(chuàng)建一個(gè)選擇器  
        Selector selector = Selector.open();  
        for (String arg : args){  
            //實(shí)例化一個(gè)信道  
            ServerSocketChannel listnChannel = ServerSocketChannel.open();  
            //將該信道綁定到指定端口  
            listnChannel.socket().bind(new InetSocketAddress(Integer.parseInt(arg)));  
            //配置信道為非阻塞模式  
            listnChannel.configureBlocking(false);  
            //將選擇器注冊(cè)到各個(gè)信道  
            listnChannel.register(selector, SelectionKey.OP_ACCEPT);  
        }  
        //創(chuàng)建一個(gè)實(shí)現(xiàn)了協(xié)議接口的對(duì)象  
        TCPProtocol protocol = new EchoSelectorProtocol(BUFSIZE);  
        //不斷輪詢select方法,獲取準(zhǔn)備好的信道所關(guān)聯(lián)的Key集  
        while (true){  
            //一直等待,直至有信道準(zhǔn)備好了I/O操作  
            if (selector.select(TIMEOUT) == 0){  
                //在等待信道準(zhǔn)備的同時(shí),也可以異步地執(zhí)行其他任務(wù),  
                //這里只是簡(jiǎn)單地打印"."  
                System.out.print(".");  
                continue;  
            }  
            //獲取準(zhǔn)備好的信道所關(guān)聯(lián)的Key集合的iterator實(shí)例  
            Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator();  
            //循環(huán)取得集合中的每個(gè)鍵值  
            while (keyIter.hasNext()){  
                SelectionKey key = keyIter.next();   
                //如果服務(wù)端信道感興趣的I/O操作為accept  
                if (key.isAcceptable()){  
                    protocol.handleAccept(key);  
                }  
                //如果客戶端信道感興趣的I/O操作為read  
                if (key.isReadable()){  
                    protocol.handleRead(key);  
                }  
                //如果該鍵值有效,并且其對(duì)應(yīng)的客戶端信道感興趣的I/O操作為write  
                if (key.isValid() && key.isWritable()) {  
                    protocol.handleWrite(key);  
                }  
                //這里需要手動(dòng)從鍵集中移除當(dāng)前的key  
                keyIter.remove();   
            }  
        }  
    }  
}  

這里為了使不同協(xié)議都能方便地使用這個(gè)基本的服務(wù)模式,我們把信道中與具體協(xié)議相關(guān)的處理各種 I/O 的操作分離了出來(lái),定義了一個(gè)接口,如下:

import java.nio.channels.SelectionKey;  
import java.io.IOException;  

/** 
*該接口定義了通用TCPSelectorServer類與特定協(xié)議之間的接口, 
*它把與具體協(xié)議相關(guān)的處理各種I/O的操作分離了出來(lái), 
*以使不同協(xié)議都能方便地使用這個(gè)基本的服務(wù)模式。 
*/  
public interface TCPProtocol{  
    //accept I/O形式  
    void handleAccept(SelectionKey key) throws IOException;  
    //read I/O形式  
    void handleRead(SelectionKey key) throws IOException;  
    //write I/O形式  
    void handleWrite(SelectionKey key) throws IOException;  
}  

接口的實(shí)現(xiàn)類代碼如下:

import java.nio.channels.SelectionKey;  
import java.nio.channels.SocketChannel;  
import java.nio.channels.ServerSocketChannel;  
import java.nio.ByteBuffer;  
import java.io.IOException;  

public class EchoSelectorProtocol implements TCPProtocol {  
    private int bufSize; // 緩沖區(qū)的長(zhǎng)度  
    public EchoSelectorProtocol(int bufSize){  
    this.bufSize = bufSize;  
    }  

    //服務(wù)端信道已經(jīng)準(zhǔn)備好了接收新的客戶端連接  
    public void handleAccept(SelectionKey key) throws IOException {  
        SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept();  
        clntChan.configureBlocking(false);  
        //將選擇器注冊(cè)到連接到的客戶端信道,并指定該信道key值的屬性為OP_READ,同時(shí)為該信道指定關(guān)聯(lián)的附件  
        clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufSize));  
    }  

    //客戶端信道已經(jīng)準(zhǔn)備好了從信道中讀取數(shù)據(jù)到緩沖區(qū)  
    public void handleRead(SelectionKey key) throws IOException{  
        SocketChannel clntChan = (SocketChannel) key.channel();  
        //獲取該信道所關(guān)聯(lián)的附件,這里為緩沖區(qū)  
        ByteBuffer buf = (ByteBuffer) key.attachment();  
        long bytesRead = clntChan.read(buf);  
        //如果read()方法返回-1,說(shuō)明客戶端關(guān)閉了連接,那么客戶端已經(jīng)接收到了與自己發(fā)送字節(jié)數(shù)相等的數(shù)據(jù),可以安全地關(guān)閉  
        if (bytesRead == -1){   
            clntChan.close();  
        }else if(bytesRead > 0){  
        //如果緩沖區(qū)總讀入了數(shù)據(jù),則將該信道感興趣的操作設(shè)置為為可讀可寫  
        key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);  
        }  
    }  

    //客戶端信道已經(jīng)準(zhǔn)備好了將數(shù)據(jù)從緩沖區(qū)寫入信道  
    public void handleWrite(SelectionKey key) throws IOException {  
    //獲取與該信道關(guān)聯(lián)的緩沖區(qū),里面有之前讀取到的數(shù)據(jù)  
    ByteBuffer buf = (ByteBuffer) key.attachment();  
    //重置緩沖區(qū),準(zhǔn)備將數(shù)據(jù)寫入信道  
    buf.flip();   
    SocketChannel clntChan = (SocketChannel) key.channel();  
    //將數(shù)據(jù)寫入到信道中  
    clntChan.write(buf);  
    if (!buf.hasRemaining()){   
    //如果緩沖區(qū)中的數(shù)據(jù)已經(jīng)全部寫入了信道,則將該信道感興趣的操作設(shè)置為可讀  
      key.interestOps(SelectionKey.OP_READ);  
    }  
    //為讀入更多的數(shù)據(jù)騰出空間  
    buf.compact();   
  }  

}  

執(zhí)行結(jié)果如下:

http://wiki.jikexueyuan.com/project/java-socket/images/nioresult.png" alt="" />

http://wiki.jikexueyuan.com/project/java-socket/images/nioresult1.png" alt="" />

說(shuō)明:以上的服務(wù)端程序,select()方法第一次能選擇出來(lái)的準(zhǔn)備好的信道都是服務(wù)端信道,其關(guān)聯(lián)鍵值的屬性都為 OP_ACCEPT,亦及有效操作都為 accept,在執(zhí)行 handleAccept 方法時(shí),為取得連接的客戶端信道也進(jìn)行了注冊(cè),屬性為 OP_READ,這樣下次輪詢調(diào)用 select()方法時(shí),便會(huì)檢查到對(duì) read 操作感興趣的客戶端信道(當(dāng)然也有可能有關(guān)聯(lián) accept 操作興趣集的信道),從而調(diào)用 handleRead 方法,在該方法中又注冊(cè)了 OP_WRITE 屬性,那么第三次調(diào)用 select()方法時(shí),便會(huì)檢測(cè)到對(duì) write 操作感興趣的客戶端信道(當(dāng)然也有可能有關(guān)聯(lián) read 操作興趣集的信道),從而調(diào)用 handleWrite 方法。

結(jié)果:從結(jié)果中很明顯地可以看出,服務(wù)器端在等待信道準(zhǔn)備好的時(shí)候,線程沒(méi)有阻塞,而是可以執(zhí)行其他任務(wù),這里只是簡(jiǎn)單的打印".",客戶端在等待連接和等待數(shù)據(jù)讀寫完成的時(shí)候,線程沒(méi)有阻塞,也可以執(zhí)行其他任務(wù),這里也正是簡(jiǎn)單的打印"."。

需要注意的地方

  1. 對(duì)于非阻塞 SocketChannel 來(lái)說(shuō),一旦已經(jīng)調(diào)用 connect()方法發(fā)起連接,底層套接字可能既不是已經(jīng)連接,也不是沒(méi)有連接,而是正在連接。由于底層協(xié)議的工作機(jī)制,套接字可能會(huì)在這個(gè)狀態(tài)一直保持下去,這時(shí)候就需要循環(huán)地調(diào)用 finishConnect()方法來(lái)檢查是否完成連接,在等待連接的同時(shí),線程也可以做其他事情,這便實(shí)現(xiàn)了線程的異步操作。

  2. write()方法的非阻塞調(diào)用哦只會(huì)寫出其能夠發(fā)送的數(shù)據(jù),而不會(huì)阻塞等待所有數(shù)據(jù),而后一起發(fā)送,因此在調(diào)用 write()方法將數(shù)據(jù)寫入信道時(shí),一般要用到 while 循環(huán),如:
while(buf.hasRemaining())
    channel.write(buf);
  1. 任何對(duì) key(信道)所關(guān)聯(lián)的興趣操作集的改變,都只在下次調(diào)用了 select()方法后才會(huì)生效。

  2. selectedKeys()方法返回的鍵集是可修改的,實(shí)際上在兩次調(diào)用 select()方法之間,都必須手動(dòng)將其清空,否則,它就會(huì)在下次調(diào)用 select()方法時(shí)仍然保留在集合中,而且可能會(huì)有無(wú)用的操作來(lái)調(diào)用它,換句話說(shuō),select()方法只會(huì)在已有的所選鍵集上添加鍵,它們不會(huì)創(chuàng)建新的建集。

  3. 對(duì)于 ServerSocketChannel 來(lái)說(shuō),accept 是唯一的有效操作,而對(duì)于 SocketChannel 來(lái)說(shuō),有效操作包括讀、寫和連接,另外,對(duì)于 DatagramChannle,只有讀寫操作是有效的。