目錄
非阻塞通訊的流程
範例程式碼:
實作非阻塞通訊的方式
緩衝區
通道
Selector类:
SelectionKey类
首頁 Java java教程 Java網路程式設計由淺入深三 一文了解非阻塞通訊的圖文程式碼範例詳解

Java網路程式設計由淺入深三 一文了解非阻塞通訊的圖文程式碼範例詳解

Mar 04, 2017 am 10:10 AM

本文詳細介紹組成非阻塞通訊的幾大類別:BufferChannelSelectorSelectionKey

非阻塞通訊的流程

  1. ServerSocketChannel透過open方法取得ServerSocketChannel,透過ServerSocketChannel設定為非阻塞模式,再透過ServerSocketChannel取得socket,綁定定服務進程監聽埠。服務啟動成功。

  2. 然後就是非阻塞通訊的精髓了,Selector透過靜態的open()方法取得到Selector,然後ServerSocketChannel註冊Selection.OP_ACCEPT事件到Selector。

  3. Selector就會監控事件發生,Selector透過select()監控已發生的SelectionKey物件的數目,透過selectKeys()方法傳回對應的selectionKey物件集合。遍歷此集合得到對應的selectionKey對象,透過該物件的channel()方法取得關聯的ServerSocketChannel對象, 透過selector()方法就可以取得關聯的Selector物件。

  4. 透過上面取得的ServerSocketChannel執行accept()方法取得SocketChannel,再透過SocketChannel設定為非阻塞模式,在將SocketChannel註冊到上面建立的Selector上,註冊SelectionKey .OP_READ |SelectionKey.OP_WRITE 事件。

  5. Selector將在監控對應上面綁定的事件,監控到對應的事件的話執行讀取和寫入的操作。

範例程式碼:

上面描述了服務端非阻塞方式通訊的一個流程,以下透過具體程式碼實作:

/**
 * 非阻塞模式
 * 
 */public class EchoServer2 {
    private Selector selector = null;    
    private ServerSocketChannel serverSocketChannel = null;    
    private int port = 8001;    
    private Charset charset = Charset.forName("UTF-8");    
    public EchoServer2() throws IOException {
        selector = Selector.open();
        serverSocketChannel = ServerSocketChannel.open();        //服务器重启的时候,重用端口
        serverSocketChannel.socket().setReuseAddress(true);        //设置非阻塞模式
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.socket().bind(new InetSocketAddress(port));
        System.out.println("服务器启动成功");
    }    /**
     * 服务方法
     */
    public void service() throws IOException {
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);        
        while (selector.select() > 0) {
            Set readyKes = selector.selectedKeys();
            Iterator it = readyKes.iterator();            
            while (it.hasNext()) {
                SelectionKey key = null;                
                try {
                    key = (SelectionKey) it.next();
                    it.remove();                    
                    if (key.isAcceptable()) {
                        System.out.println("连接事件");                        
                        //连接事件
                        ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                        SocketChannel socketChannel = ssc.accept();
                        System.out.println("接收到客户连接,来自:" + socketChannel.socket().getInetAddress() +                                " : " + socketChannel.socket().getPort());
                        socketChannel.configureBlocking(false);
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        socketChannel.register(selector, SelectionKey.OP_READ |
                                SelectionKey.OP_WRITE, buffer);
                    } else if (key.isReadable()) {                        
                    //接收数据
                        receive(key);
                    } else if (key.isWritable()) {                        
                    //发送数据
                        send(key);
                    }
                } catch (IOException e) {
                    e.printStackTrace();                    
                    try {                        
                    if (key != null) {
                            key.cancel();
                            key.channel().close();
                        }
                    }catch (IOException ex){
                        ex.printStackTrace();
                    }
                }
            }
        }
    }    private void send(SelectionKey key) throws IOException {
        ByteBuffer buffer = (ByteBuffer) key.attachment();

        SocketChannel channel = (SocketChannel) key.channel();
        buffer.flip(); //把极限设置为位置,把位置设置为0
        String data = decode(buffer);        
        if (data.indexOf("\r\n") == -1) {            
        return;
        }
        String outputData = data.substring(0, data.indexOf("\n") + 1);
        System.out.println("请求数据:" + outputData);

        ByteBuffer outputBuffer = encode("echo:" + outputData);        
        while (outputBuffer.hasRemaining()) {
            channel.write(outputBuffer);
        }
        ByteBuffer temp = encode(outputData);
        buffer.position(temp.limit());
        buffer.compact();        if (outputData.equals("bye\r\n")) {
            key.cancel();
            channel.close();
            System.out.println("关闭与客户的连接");
        }
    }    private String decode(ByteBuffer buffer) {
        CharBuffer charBuffer = charset.decode(buffer);        return charBuffer.toString();
    }    private ByteBuffer encode(String s) {        return charset.encode(s);
    }    private void receive(SelectionKey key) throws IOException {
        ByteBuffer buffer = (ByteBuffer) key.attachment();

        SocketChannel socketChannel = (SocketChannel) key.channel();
        ByteBuffer readBuff = ByteBuffer.allocate(32);
        socketChannel.read(readBuff);
        readBuff.flip();

        buffer.limit(buffer.capacity());
        buffer.put(readBuff);
    }    public static void main(String[] args) throws IOException {        new EchoServer2().service();
    }
}/**
 * 创建非阻塞客户端
 * 
 */public class EchoClient2 {

    private SocketChannel socketChannel;    private int port = 8001;    private Selector selector;    private ByteBuffer sendBuffer = ByteBuffer.allocate(1024);    private ByteBuffer receiveBuffer = ByteBuffer.allocate(1024);    private Charset charset = Charset.forName("UTF-8");    public EchoClient2() throws IOException {
        socketChannel = SocketChannel.open();
        InetSocketAddress inetSocketAddress = new InetSocketAddress(InetAddress.getLocalHost(), port);
        socketChannel.connect(inetSocketAddress);//
        socketChannel.configureBlocking(false);//设置为非阻塞模式
        System.out.println("与服务器连接成功");
        selector = Selector.open();
    }    public static void main(String[] args) throws IOException {        final EchoClient2 client = new EchoClient2();
        Thread receiver = new Thread(new Runnable() {            @Override
            public void run() {
                client.receiveFromUser();
            }
        });
        receiver.start();
        client.talk();
    }    private void receiveFromUser() {        try {
            System.out.println("请输入数据:");
            BufferedReader localReader = new BufferedReader(new InputStreamReader(System.in));
            String msg = null;            while ((msg = localReader.readLine()) != null) {
                System.out.println("用户输入的数据:" + msg);                synchronized (sendBuffer) {
                    sendBuffer.put(encode(msg + "\r\n"));
                }                if (msg.equalsIgnoreCase("bye")) {                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }    private ByteBuffer encode(String s) {        return charset.encode(s);
    }    private void talk() throws IOException {
        socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);        while (selector.select() > 0) {
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> it = keys.iterator();            while (it.hasNext()) {
                SelectionKey key = null;                try {
                    key = it.next();
                    it.remove();                    if (key.isReadable()) {                        //System.out.println("读事件");
                        //读事件
                        receive(key);
                    }                    if (key.isWritable()) {                       // System.out.println("写事件");
                        //写事件
                        send(key);
                    }
                } catch (IOException e) {
                    e.printStackTrace();                    if (key != null) {
                        key.cancel();
                        key.channel().close();
                    }
                }
            }
        }
    }    private void send(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();        synchronized (sendBuffer) {
            sendBuffer.flip();//把极限设为位置,把位置设为零
            channel.write(sendBuffer);
            sendBuffer.compact();//删除已经发送的数据。
        }
    }    private void receive(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        channel.read(receiveBuffer);
        receiveBuffer.flip();//将limit的值设置为position的值,将position的值设置为0
        String receiveData = decode(receiveBuffer);        if (receiveData.indexOf("\n") == -1) {            return;
        }
        String outputData = receiveData.substring(0, receiveData.indexOf("\n") + 1);
        System.out.println("响应数据:" + outputData);        if (outputData.equalsIgnoreCase("echo:bye\r\n")) {
            key.cancel();
            socketChannel.close();
            ;
            System.out.println("关闭与服务器的连接");
            selector.close();
            System.exit(0);
        }

        ByteBuffer temp = encode(outputData);
        receiveBuffer.position(temp.limit());
        receiveBuffer.compact();//删除已经打印的数据
    }    private String decode(ByteBuffer receiveBuffer) {
        CharBuffer buffer = charset.decode(receiveBuffer);        return buffer.toString();
    }
}
登入後複製

實作非阻塞通訊的方式

  • 緩衝區

  • 通道

  • #Selector

    緩衝區

#作用:減少物理讀取和寫入次數,減少記憶體建立和銷毀次數。 緩衝區的屬性:capacity(最大容量)、limit(實際容量)、position(目前位置)。 PS:其他地方是翻譯成capacity(容量)、limit(極限)、position位置),我個人覺得翻譯成上面的更好理解,為啥透過下面的方法解析和圖解就可明白。當然最好透過英文表達這樣最清楚。
三個屬性的關係為:capacity≥limit≥position≥0

圖解關係如下:
Java網路程式設計由淺入深三 一文了解非阻塞通訊的圖文程式碼範例詳解
緩衝區類別結構:
java.nio.ByteBuffer類是一個抽象類,不能被實例化。但是提供了8個具體的實現類,其中最基本的緩衝區是ByteBuffer,它存放的資料單元是位元組。
Java網路程式設計由淺入深三 一文了解非阻塞通訊的圖文程式碼範例詳解

常用方法:

clear():把limit設定為capacity,再把位置設為0  
#   flip():把limit設定為position,再把位置設為0。
 rewind():不改變limit,把位置設為0。  
 allocate():建立一個緩衝區中,方法參數指定緩衝區大小
 compact():將緩衝區的目前位置和界限之間的位元組(如果有)複製到緩衝區的開始處。

測試上述方法:
測試clear()方法

    @Test
    public void testClear() {        //创建一个10chars大小的缓冲区,默认情况下limit和capacity是相等的
        CharBuffer buffer = CharBuffer.allocate(10);
        System.out.println("创建默认情况");
        printBufferInfo(buffer);
        buffer.limit(8);//修改limit的值
        System.out.println("修改limit后");
        printBufferInfo(buffer);        // clear():把limit设置为capacity,再把位置设为0
        buffer.clear();
        System.out.println("执行clear()方法后");
        printBufferInfo(buffer);
    }
登入後複製

執行結果如下:
Java網路程式設計由淺入深三 一文了解非阻塞通訊的圖文程式碼範例詳解

測試flip()方法:

    @Test
    public void testFlip() {
        CharBuffer buffer = CharBuffer.allocate(10);
        System.out.println("创建默认情况");
        printBufferInfo(buffer);        //put的方法会修改position的值
        buffer.put(&#39;H&#39;);
        buffer.put(&#39;E&#39;);
        buffer.put(&#39;L&#39;);
        buffer.put(&#39;L&#39;);
        buffer.put(&#39;O&#39;);
        System.out.println("调用put方法后:");
        printBufferInfo(buffer);        //flip():把limit设置为position,再把位置设置为0。
        buffer.flip();
        System.out.println("调用flip方法后:");
        printBufferInfo(buffer);
    }
登入後複製

執行結果如下:
Java網路程式設計由淺入深三 一文了解非阻塞通訊的圖文程式碼範例詳解

#測試rewind()方法

        @Test
    public void testRewind() {
        CharBuffer buffer = CharBuffer.allocate(10);
        System.out.println("创建默认情况");
        printBufferInfo(buffer);        //put的方法会修改position的值
        buffer.put(&#39;H&#39;);
        buffer.put(&#39;E&#39;);
        buffer.put(&#39;L&#39;);
        buffer.put(&#39;L&#39;);
        buffer.put(&#39;O&#39;);
        buffer.limit(8);
        System.out.println("调用put、limit方法后:");
        printBufferInfo(buffer);        //rewind():不改变limit,把位置设为0。
        buffer.rewind();
        System.out.println("调用rewind方法后:");
        printBufferInfo(buffer);
    }
登入後複製

執行結果如下:
Java網路程式設計由淺入深三 一文了解非阻塞通訊的圖文程式碼範例詳解

測試compact()方法

    @Test
    public void testCompact(){
        CharBuffer buffer = CharBuffer.allocate(10);
        System.out.println("创建默认情况");
        printBufferInfo(buffer);        //put的方法会修改position的值
        buffer.put(&#39;H&#39;);
        buffer.put(&#39;E&#39;);
        buffer.put(&#39;L&#39;);
        buffer.put(&#39;L&#39;);
        buffer.put(&#39;O&#39;);
        buffer.limit(8);//修改limit的值
        System.out.println("调用put和limit方法后:");
        printBufferInfo(buffer);
        System.out.println("调用compact方法后:");        //将缓冲区的当前位置和界限之间的字节(如果有)复制到缓冲区的开始处。
        buffer.compact();
        printBufferInfo(buffer);
    }
登入後複製

Java網路程式設計由淺入深三 一文了解非阻塞通訊的圖文程式碼範例詳解

##這是JDK中介紹此方法的作用:

將緩衝區的目前位置和界限之間的位元組(如果有)複製到緩衝區的開始處。即將索引p = position() 處的位元組複製到索引0 處,將索引p + 1 處的位元組複製到索引1 處,依此類推,直到將索引limit() - 1 處的位元組複製到索引n = limit() - 1 - p 處。然後將緩衝區的位置設為 n+1,並將其界限設定為其容量。如果已定義了標記,則丟棄它。

官方表示的太難理解了:#

将缓冲区的当前位置和界限之间的字节(如果有)复制到缓冲区的开始处。并将limit(实际容量)设置为 capacity(最大容量)。执行compact()方法前,limit的值是:8,position的值是:5。按照上面描述的执行完compact()后,position的值计算方式是:n+1;n=limit-1-p;所有n=8-1-5=2,最后position的值为:2+1=3。和程序运行的结果一致。
可以在这种情况:从缓冲区写入数据之后调用此方法,以防写入不完整。

buf.clear();          // Prepare buffer for use
  while (in.read(buf) >= 0 || buf.position != 0) {
     buf.flip();
     out.write(buf);
     buf.compact();    // In case of partial write
 }
登入後複製

如果out.write()方法没有将缓存中的数据读取完,这个时候的position位置指向的是剩余数据的位置。达到防止写入不完整。

通道

作用:  连接缓冲区与数据源或数据目的地。

常用类:

Channel
 接口有下面两个子接口ReadableByteChannel和WritableByteChannel和一个抽象实现类SelectableChannel。
 在ReadableByteChannel接口中申明了read(ByteBuffer
 dst)方法。在WritableByteChannel接口中申明了write(ByteBuffer[]
 srcs):方法。SelectableChannel抽象类中主要方法,configureBlocking(boolean
 block)、register();方法。 ByteChannel
 接口继承了ReadableChannel和WritableChannel。所以ByteChannel具有读和写的功能。

ServerSocketChannel继承了SelectableChannel类抽象类,所以SocketChannel具有设置是否是阻塞模式、向selector注册事件功能。

SocketChannel也继承了SelectableChannel类还实现ByteChannel接口,所以SocketChannel具有设置是否是阻塞模式、向selector注册事件、从缓冲区读写数据的功能。

通过类图展现:
Java網路程式設計由淺入深三 一文了解非阻塞通訊的圖文程式碼範例詳解

Selector类:

作用:只要ServerSocketChannel及SocketChannel向Selector注册了特定的事件,Selector就会监听这些事件的发生。

流程:
 Selector通过静态的open()方法创建一个Selector对象,SelectableChannel类向Selector注册了特定的事件。Selector就会监控这些事件发生,Selector通过select()监控已发生的SelectionKey对象的数目,通过selectKeys()方法返回对应的selectionKey对象集合。遍历该集合得到相应的selectionKey对象,通过该对象的channel()方法获取关联的SelectableChannel对象,
 通过selector()方法就可以获取关联的Selector对象。

Note:
当Selector的select()方法还有一个重载方式:select(long timeout)。并且该方法采用阻塞的工作方式,如果相关事件的selectionKey对象的数目一个也没有,就进入阻塞状态。知道出现以下情况之一,才从select()方法中返回。

  • 至少有一个SelectionKey的相关事件已经发生。

  • 其他线程调用了Selector的wakeup()方法,导致执行select()方法的线程立即返回。

  • 当前执行的select()方法的线程被中断。

  • 超出了等待时间。仅限调用select(long timeout)方法时出现。如果没有设置超时时间,则永远不会超时。

Selector类有两个非常重要的方法: 静态方法open(),这是Selector的静态工厂方法,创建一个Selector对象。
 selectedKeys()方法返回被Selector捕获的SelectionKey的集合。

SelectionKey类

作用:
 ServerSocketChannel或SocketChannel通过register()方法向Selector注册事件时,register()方法会创建一个SelectionKey对象,该对象是用来跟踪注册事件的句柄。在SelectionKey对象的有效期间,Selector会一直监控与SelectionKey对象相关的事件,如果事件发生,就会把SelectionKey对象添加到Selected-keys集合中。

SelectionKey中定义的事件: 定义了4种事件:
 1、SelectionKey.OP_ACCEPT:接收连接就绪事件,表示服务器监听到了客户连接,服务器可以接收这个连接了。常量值为16.
 2、SelectionKey.OP_CONNECT:连接就绪事件,表示客户与服务器的连接已经建立成功。常量值为8.
 3、SelectionKey.OP_READ:读就绪事件,表示通道中已经有了可读数据可以执行读操作。常量值为1.
 4、SelectionKey.OP_WRITE:写就绪事件,表示已经可以向通道写数据了。常量值为4.

常用方法:
 channel()方法:傳回與它關聯的SelectedChannel(包括ServerSocketChannel和SocketChannel)。
 selector()方法:傳回與它關聯的Selector物件。
它們之間的關係如下:
Java網路程式設計由淺入深三 一文了解非阻塞通訊的圖文程式碼範例詳解

 以上就是Java網路程式設計由淺入深三一文了解非阻塞通訊的圖文程式碼範例詳解的內容,更多相關內容請關注PHP中文網(www.php.cn)!


#
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

Video Face Swap

Video Face Swap

使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱工具

記事本++7.3.1

記事本++7.3.1

好用且免費的程式碼編輯器

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

禪工作室 13.0.1

禪工作室 13.0.1

強大的PHP整合開發環境

Dreamweaver CS6

Dreamweaver CS6

視覺化網頁開發工具

SublimeText3 Mac版

SublimeText3 Mac版

神級程式碼編輯軟體(SublimeText3)

Java Spring 面試題 Java Spring 面試題 Aug 30, 2024 pm 04:29 PM

在本文中,我們保留了最常被問到的 Java Spring 面試問題及其詳細答案。這樣你就可以順利通過面試。

突破或從Java 8流返回? 突破或從Java 8流返回? Feb 07, 2025 pm 12:09 PM

Java 8引入了Stream API,提供了一種強大且表達力豐富的處理數據集合的方式。然而,使用Stream時,一個常見問題是:如何從forEach操作中中斷或返回? 傳統循環允許提前中斷或返回,但Stream的forEach方法並不直接支持這種方式。本文將解釋原因,並探討在Stream處理系統中實現提前終止的替代方法。 延伸閱讀: Java Stream API改進 理解Stream forEach forEach方法是一個終端操作,它對Stream中的每個元素執行一個操作。它的設計意圖是處

手機上如何將XML轉換成PDF? 手機上如何將XML轉換成PDF? Apr 02, 2025 pm 10:18 PM

直接在手機上將XML轉換為PDF並不容易,但可以藉助雲端服務實現。推薦使用輕量級手機App上傳XML文件並接收生成的PDF,配合雲端API進行轉換。雲端API使用無服務器計算服務,選擇合適的平台至關重要。處理XML解析和PDF生成時需要考慮複雜性、錯誤處理、安全性和優化策略。整個過程需要前端App與後端API協同工作,需要對多種技術有所了解。

Java程序查找膠囊的體積 Java程序查找膠囊的體積 Feb 07, 2025 am 11:37 AM

膠囊是一種三維幾何圖形,由一個圓柱體和兩端各一個半球體組成。膠囊的體積可以通過將圓柱體的體積和兩端半球體的體積相加來計算。本教程將討論如何使用不同的方法在Java中計算給定膠囊的體積。 膠囊體積公式 膠囊體積的公式如下: 膠囊體積 = 圓柱體體積 兩個半球體體積 其中, r: 半球體的半徑。 h: 圓柱體的高度(不包括半球體)。 例子 1 輸入 半徑 = 5 單位 高度 = 10 單位 輸出 體積 = 1570.8 立方單位 解釋 使用公式計算體積: 體積 = π × r2 × h (4

PHP與Python:了解差異 PHP與Python:了解差異 Apr 11, 2025 am 12:15 AM

PHP和Python各有優勢,選擇應基於項目需求。 1.PHP適合web開發,語法簡單,執行效率高。 2.Python適用於數據科學和機器學習,語法簡潔,庫豐富。

PHP:網絡開發的關鍵語言 PHP:網絡開發的關鍵語言 Apr 13, 2025 am 12:08 AM

PHP是一種廣泛應用於服務器端的腳本語言,特別適合web開發。 1.PHP可以嵌入HTML,處理HTTP請求和響應,支持多種數據庫。 2.PHP用於生成動態網頁內容,處理表單數據,訪問數據庫等,具有強大的社區支持和開源資源。 3.PHP是解釋型語言,執行過程包括詞法分析、語法分析、編譯和執行。 4.PHP可以與MySQL結合用於用戶註冊系統等高級應用。 5.調試PHP時,可使用error_reporting()和var_dump()等函數。 6.優化PHP代碼可通過緩存機制、優化數據庫查詢和使用內置函數。 7

創造未來:零基礎的 Java 編程 創造未來:零基礎的 Java 編程 Oct 13, 2024 pm 01:32 PM

Java是熱門程式語言,適合初學者和經驗豐富的開發者學習。本教學從基礎概念出發,逐步深入解說進階主題。安裝Java開發工具包後,可透過建立簡單的「Hello,World!」程式來實踐程式設計。理解程式碼後,使用命令提示字元編譯並執行程序,控制台上將輸出「Hello,World!」。學習Java開啟了程式設計之旅,隨著掌握程度加深,可創建更複雜的應用程式。

如何在Spring Tool Suite中運行第一個春季啟動應用程序? 如何在Spring Tool Suite中運行第一個春季啟動應用程序? Feb 07, 2025 pm 12:11 PM

Spring Boot簡化了可靠,可擴展和生產就緒的Java應用的創建,從而徹底改變了Java開發。 它的“慣例慣例”方法(春季生態系統固有的慣例),最小化手動設置

See all articles