首頁 > Java > java教程 > 主體

Getty-實作Java NIO框架設計的詳細解

黄舟
發布: 2017-03-24 10:46:01
原創
1955 人瀏覽過

前言

Getty是我為了學習 Java NIO 所寫的 NIO 框架,實作過程中參考了 Netty 的設計,同時使用 Groovy 來實作。雖然只是玩具,但是麻雀雖小,五臟俱全,在實現過程中,不僅熟悉了 NIO 的使用,還借鑒了很多 Netty 的設計思想,提升了自己的編碼和設計能力。

至於為什麼要用Groovy 來寫,因為我剛學了Groovy,剛好拿來練手,加上Groovy 是相容Java 的,所以只是語法上的差別,底層實作還是基於Java API的。

Getty 的核心程式碼行數不超過 500 行,一方面得益於 Groovy 簡潔的語法,另一方面是因為我只實現了核心的邏輯,最複雜的其實是解碼器實現。鷹架容易搭,摩天大樓哪有那麼容易蓋,但用來學習 NIO 足以。

線程模型

Getty 使用的是Reactor 多執行緒模型

Getty-實作Java NIO框架設計的詳細解

  1. 有專門一個NIO 執行緒- Acceptor 執行緒用於監聽服務端,接收客戶端的TCP 連線請求,然後將連線指派給工作執行緒,由工作執行緒來監聽寫入事件

  2. 網路 IO 操作-讀/寫等由多個工作執行緒負責,由這些工作執行緒負責訊息的讀取、解碼、編碼和發送。

  3. 1 個工作線程可以同時處理N條鏈路,但是 1 個鏈路只對應 1 個工作線程,防止並發操作問題。

事件驅動模型

整個服務端的流程處理,建立於事件機制上。在[接受連線->讀->業務處理->寫->關閉連線]這個過程中,觸發器將觸發對應事件,由事件處理器對對應事件分別回應,完成伺服器端的業務處理。

事件定義

  1. onRead:當客戶端發來數據,並已被工作執行緒正確讀取時,觸發該事件 。此事件通知各事件處理器可以對客戶端發送的資料進行實際處理了。

  2. onWrite:當客戶端可以開始接受服務端發送資料時觸發該事件,透過該事件,我們可以向客戶端發送回應資料。 (目前的實作中並未使用寫入事件)

  3. onClosed:當客戶端與伺服器斷開連線時觸發該事件。

事件回呼機制的實作

在這個模型中,事件採用廣播方式,也就是所有註冊的事件處理器都能獲得事件通知。這樣可以將不同性質的業務處理,分別用不同的處理器實現,使每個處理器的功能盡可能單一。

如下圖:整個事件模型由監聽器、事件適配器、事件觸發器(HandlerChain,PipeLine)、事件處理器組成。

Getty-實作Java NIO框架設計的詳細解

  • ServerListener:這是事件介面,定義需監聽的伺服器事件

    interface ServerListener extends Serializable{
        /**
         * 可读事件回调
         * @param request
         */
        void onRead(ctx)
        /**
         * 可写事件回调
         * @param request
         * @param response
         */
        void onWrite(ctx)
        /**
         * 连接关闭回调
         * @param request
         */
        void onClosed(ctx)
    }
    登入後複製
  • EventAdapter:對Serverlistener 介面實作一個適配器(EventAdapter),這樣的好處是最終的事件處理器可以只處理所關心的事件。

    class EventAdapter implements ServerListener {
        //下个处理器的引用
        protected next
        void onRead(Object ctx) {
        }
        void onWrite(Object ctx) {
        }
        void onClosed(Object ctx) {
        }
    }
    登入後複製
  • Not<a href="http://www.php.cn/wiki/109.html" target="_blank">if</a>ier:用於在適當的時候透過觸發伺服器事件,通知在冊的事件處理器對事件做出響應。

    interface Notifier extends Serializable{
        /**
         * 触发所有可读事件回调
         */
        void fireOnRead(ctx)
        /**
         * 触发所有可写事件回调
         */
        void fireOnWrite(ctx)
        /**
         * 触发所有连接关闭事件回调
         */
        void fireOnClosed(ctx)
    }
    登入後複製
  • HandlerChain:實作了Notifier接口,維持有序的事件處理器鏈條,每次從第一個處理器開始觸發。

    class HandlerChain implements Notifier{
        EventAdapter head
        EventAdapter tail
        /**
         * 添加处理器到执行链的最后
         * @param handler
         */
        void addLast(handler) {
            if (tail != null) {
                tail.next = handler
                tail = tail.next
            } else {
                head = handler
                tail = head
            }
        }
        void fireOnRead(ctx) {
            head.onRead(ctx)
        }
        void fireOnWrite(ctx) {
            head.onWrite(ctx)
        }
        void fireOnClosed(ctx) {
            head.onClosed(ctx)
        }
    }
    登入後複製
  • PipeLine:實作了Notifier接口,作為事件總線,維持一個事件鏈的清單。

    class PipeLine implements Notifier{
        static logger = LoggerFactory.getLogger(PipeLine.name)
        //监听器队列
        def listOfChain = []
        PipeLine(){}
        /**
         * 添加监听器到监听队列中
         * @param chain
         */
        void addChain(chain) {
            synchronized (listOfChain) {
                if (!listOfChain.contains(chain)) {
                    listOfChain.add(chain)
                }
            }
        }
        /**
         * 触发所有可读事件回调
         */
        void fireOnRead(ctx) {
            logger.debug("fireOnRead")
            listOfChain.each { chain ->
                chain.fireOnRead(ctx)
            }
        }
        /**
         * 触发所有可写事件回调
         */
        void fireOnWrite(ctx) {
            listOfChain.each { chain ->
                chain.fireOnWrite(ctx)
            }
        }
        /**
         * 触发所有连接关闭事件回调
         */
        void fireOnClosed(ctx) {
            listOfChain.each { chain ->
                chain.fireOnClosed(ctx)
            }
        }
    }
    登入後複製

事件處理流程

Getty-實作Java NIO框架設計的詳細解

#程式設計模型

#事件處理採用職責鏈模式,每個處理器處理完資料之後會決定是否繼續下一個處理器。如果處理器不將任務交給執行緒池處理,那麼整個處理流程就在同一個執行緒中處理。而且每個連線都有單獨的PipeLine,工作執行緒可以在多個連線上下文切換,但是一個連線上下文只會被一個執行緒處理。

核心类

ConnectionCtx

连接上下文ConnectionCtx

class ConnectionCtx {
    /**socket连接*/
    SocketChannel channel
    /**用于携带额外参数*/
    Object attachment
    /**处理当前连接的工作线程*/
    Worker worker
    /**连接超时时间*/
    Long timeout
    /**每个连接拥有自己的pipeline*/
    PipeLine pipeLine
}
登入後複製

NioServer

主线程负责监听端口,持有工作线程的引用(使用轮转法分配连接),每次有连接到来时,将连接放入工作线程的连接队列,并唤醒线程selector.wakeup()(线程可能阻塞在selector上)。

class NioServer extends Thread {
    /**服务端的套接字通道*/
    ServerSocketChannel ssc
    /**选择器*/
    Selector selector
    /**事件总线*/
    PipeLine pipeLine
    /**工作线程列表*/
    def workers = []
    /**当前工作线程索引*/
    int index
}
登入後複製

Worker

工作线程,负责注册server传递过来的socket连接。主要监听读事件,管理socket,处理写操作。

class Worker extends Thread {
    /**选择器*/
    Selector selector
    /**读缓冲区*/
    ByteBuffer buffer
    /**主线程分配的连接队列*/
    def queue = []
    /**存储按超时时间从小到大的连接*/
    TreeMap<Long, ConnectionCtx> ctxTreeMap

    void run() {
        while (true) {
            selector.select()
            //注册主线程发送过来的连接
            registerCtx()
            //关闭超时的连接
            closeTimeoutCtx()
            //处理事件
            dispatchEvent()
        }
    }
}
登入後複製

运行一个简单的 Web 服务器

我实现了一系列处理HTTP请求的处理器,具体实现看代码。

  • LineBasedDecoder:行解码器,按行解析数据

  • HttpRequestDecoder:HTTP请求解析,目前只支持GET请求

  • HttpRequestHandler:Http 请求处理器,目前只支持GET方法

  • HttpResponseHandler:Http响应处理器

下面是写在test中的例子

class WebServerTest {
    static void main(args) {
        def pipeLine = new PipeLine()

        def readChain = new HandlerChain()
        readChain.addLast(new LineBasedDecoder())
        readChain.addLast(new HttpRequestDecoder())
        readChain.addLast(new HttpRequestHandler())
        readChain.addLast(new HttpResponseHandler())

        def closeChain = new HandlerChain()
        closeChain.addLast(new ClosedHandler())

        pipeLine.addChain(readChain)
        pipeLine.addChain(closeChain)

        NioServer nioServer = new NioServer(pipeLine)
        nioServer.start()
    }
}
登入後複製

另外,还可以使用配置文件getty.properties设置程序的运行参数。

#用于拼接消息时使用的二进制数组的缓存区
common_buffer_size=1024
#工作线程读取tcp数据的缓存大小
worker_rcv_buffer_size=1024
#监听的端口
port=4399
#工作线程的数量
worker_num=1
#连接超时自动断开时间
timeout=900
#根目录
root=.
登入後複製

总结

Getty是我造的第二个小轮子,第一个是RedisHttpSession。都说不要重复造轮子。这话我是认同的,但是掌握一门技术最好的方法就是实践,在没有合适项目可以使用新技术的时候,造一个简单的轮子是不错的实践手段。

Getty 的缺点或者说还可以优化的点:

  1. 线程的使用直接用了Thread类,看起来有点low。等以后水平提升了再来抽象一下。

  2. 目前只有读事件是异步的,写事件是同步的。未来将写事件也改为异步的。

以上是Getty-實作Java NIO框架設計的詳細解的詳細內容。更多資訊請關注PHP中文網其他相關文章!

相關標籤:
來源:php.cn
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板