ホームページ > Java > &#&チュートリアル > Java NIO の原理分析と基本的な使用法

Java NIO の原理分析と基本的な使用法

零下一度
リリース: 2017-06-27 10:18:37
オリジナル
1733 人が閲覧しました

Java NIO 原理分析

この記事では、Java NIO の基本的な使用法から、Linux での NIO API の導入、Java Selector の基礎となる実装原理まで、主に Java NIO に焦点を当てています。 Selector其底层的实现原理。

  • Java NIO基本使用

  • Linux下的NIO系统调用介绍

  • Selector原理

  • Channel和Buffer之间的堆外内存

Java NIO基本使用

从JDK NIO文档里面可以发现,Java将其划分成了三大块:ChannelBuffer以及多路复用Selector。Channel的存在,封装了对什么实体的连接通道(如网络/文件);Buffer封装了对数据的缓冲存储,最后对于Selector则是提供了一种可以以单线程非阻塞的方式,来处理多个连接。

基本应用示例

NIO的基本步骤是,创建Selector和ServerSocketChannel,然后注册channel的ACCEPT事件,调用select方法,等待连接的到来,以及接收连接后将其注册到Selector中。下面的为Echo Server的示例:

public class SelectorDemo {

    public static void main(String[] args) throws IOException {


        Selector selector = Selector.open();
        ServerSocketChannel socketChannel = ServerSocketChannel.open();
        socketChannel.bind(new InetSocketAddress(8080));
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {
            int ready = selector.select();
            if (ready == 0) {
                continue;
            } else if (ready < 0) {
                break;
            }

            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = keys.iterator();
            while (iterator.hasNext()) {

                SelectionKey key = iterator.next();
                if (key.isAcceptable()) {

                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    SocketChannel accept = channel.accept();
                    if (accept == null) {
                        continue;
                    }
                    accept.configureBlocking(false);
                    accept.register(selector, SelectionKey.OP_READ);
                } else if (key.isReadable()) {
                    // 读事件
                    deal((SocketChannel) key.channel(), key);
                } else if (key.isWritable()) {
                    // 写事件
                    resp((SocketChannel) key.channel(), key);
                }
                // 注:处理完成后要从中移除掉
                iterator.remove();
            }
        }
        selector.close();
        socketChannel.close();
    }

    private static void deal(SocketChannel channel, SelectionKey key) throws IOException {

        ByteBuffer buffer = ByteBuffer.allocate(1024);
        ByteBuffer responseBuffer = ByteBuffer.allocate(1024);

        int read = channel.read(buffer);

        if (read > 0) {
            buffer.flip();
            responseBuffer.put(buffer);
        } else if (read == -1) {
            System.out.println("socket close");
            channel.close();
            return;
        }

        key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
        key.attach(responseBuffer);
    }

    private static void resp(SocketChannel channel, SelectionKey key) throws IOException {

        ByteBuffer buffer = (ByteBuffer) key.attachment();
        buffer.flip();

        channel.write(buffer);
        if (!buffer.hasRemaining()) {
            key.attach(null);
            key.interestOps(SelectionKey.OP_READ);
        }
    }
}
ログイン後にコピー

Linux下的NIO系统调用介绍

在Linux环境下,提供了几种方式可以实现NIO,如epoll,poll,select等。对于select/poll,每次调用,都是从外部传入FD和监听事件,这就导致每次调用的时候,都需要将这些数据从用户态复制到内核态,就导致了每次调用代价比较大,而且每次从select/poll返回回来,都是全量的数据,需要自行去遍历检查哪些是READY的。对于epoll,则为增量式的,系统内部维护了所需要的FD和监听事件,要注册的时候,调用epoll_ctl即可,而每次调用,不再需要传入了,返回的时候,只返回READY的监听事件和FD。下面作个简单的伪代码:
具体的可以看以前的文章:

// 1. 创建server socket
// 2. 绑定地址
// 3. 监听端口
// 4. 创建epoll
int epollFd = epoll_create(1024);
// 5. 注册监听事件
struct epoll_event event;
event.events = EPOLLIN | EPOLLRDHUP | EPOLLET;
event.data.fd = serverFd;
epoll_ctl(epollFd, EPOLL_CTL_ADD, serverFd, &event);

while(true) {
    readyNums = epoll_wait( epollFd, events, 1024, -1 );
    
    if ( readyNums < 0 )
     {
         printf("epoll_wait error\n");
         exit(-1);
     }

     for ( i = 0; i <  readyNums; ++i)
     {
         if ( events[i].data.fd == serverFd )
         {
             clientFd = accept( serverFd, NULL, NULL );
             // 注册监听事件
             ...
         }else if ( events[i].events & EPOLLIN )
         {
            // 处理读事件
         }else if ( events[i].events & EPOLLRDHUP )
         {
            // 关闭连接事件
            close( events[i].data.fd );
         }
}
ログイン後にコピー

Selector原理

SelectionKey

从Java顶层使用者角度来看,channel通过注册,返回SelectionKey,而Selector.select方法,也是通过返回SelectionKey来使用。那么这里为什么会需要这个类呢?这个类有什么作用?无论是任何语言,其实都脱离不了系统底层的支持,通过上述Linux下的基本应用,可以知道,通过系统调用,向其传递和返回的都是FD以及事件这些参数,那么站在设计角度来看,就需要有一个映射关系,使得可以关联起来,这里有Channel封装的是通过,如果将READY事件这些参数放在里面,不太合适,这个时候,SelectionKey出现了,在SelectionKey内部,保存Channel的引用以及一些事件信息,然后Selector通过FD找到SelectionKey来进行关联。在底层EP里面,就有一个属性:Map<Integer,SelectionKeyImpl> fdToKey。<h3>EPollSelectorImpl</h3><p>在Linux 2.6+版本,Java NIO采用的epoll(即<code>EPollSelectorImpl类),对于2.4.x的,则使用poll(即PollSelectorImpl类),这里以epoll为例。

select方法

顶层Selector,通过调用select方法,最终会调用到EPollSelectorImpl.doSelect方法,通过该方法,可以看到,其首先会处理一些不再注册的事件,调用pollWrapper.poll(timeout);

  • Java NIO の基本的な使用法

  • Linux での NIO システムコールの紹介

  • セレクターの原則


  • チャネルとバッファ間のオフヒープメモリ
Java NIO の基本的な使用方法

JDK NIO ドキュメントから、Java が 3 つの主要なブロックに分割されていることがわかります: チャネルバッファ、および多重化セレクター。 Channel の存在により、任意のエンティティ (ネットワーク/ファイルなど) への接続チャネルがカプセル化されます。最後に、Selector は、複数の接続を処理するためのシングルスレッドのノンブロッキング方法を提供します。

基本的なアプリケーション例🎜🎜 NIO の基本的な手順は、Selector と ServerSocketChannel を作成し、チャネルの ACCEPT イベントを登録し、select メソッドを呼び出し、接続が到着するのを待ち、接続を受信した後、Selector に登録します。繋がり。以下は Echo Server の例です: 🎜
protected int doSelect(long timeout)
    throws IOException
{
    if (closed)
        throw new ClosedSelectorException();
    // 处理一些不再注册的事件
    processDeregisterQueue();
    try {
        begin();
        pollWrapper.poll(timeout);
    } finally {
        end();
    }
    // 再进行一次清理
    processDeregisterQueue();
    int numKeysUpdated = updateSelectedKeys();
    if (pollWrapper.interrupted()) {
        // Clear the wakeup pipe
        pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
        synchronized (interruptLock) {
            pollWrapper.clearInterrupted();
            IOUtil.drain(fd0);
            interruptTriggered = false;
        }
    }
    return numKeysUpdated;
}


private int updateSelectedKeys() {
    int entries = pollWrapper.updated;
    int numKeysUpdated = 0;
    for (int i=0; i<entries; i++) {
        // 获取FD
        int nextFD = pollWrapper.getDescriptor(i);
        // 根据FD找到对应的SelectionKey
        SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
        // ski is null in the case of an interrupt
        if (ski != null) {
            // 找到该FD的READY事件
            int rOps = pollWrapper.getEventOps(i);
            if (selectedKeys.contains(ski)) {
                // 将底层的事件转换为Java封装的事件,SelectionKey.OP_READ等
                if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
                    numKeysUpdated++;
                }
            } else {
                // 没有在原有的SelectedKey里面,说明是在等待过程中加入的
                ski.channel.translateAndSetReadyOps(rOps, ski);
                if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
                    // 需要更新selectedKeys集合
                    selectedKeys.add(ski);
                    numKeysUpdated++;
                }
            }
        }
    }
    // 返回Ready的Channel个数
    return numKeysUpdated;
}
ログイン後にコピー
🎜Linux での NIO システム コールの紹介🎜🎜 Linux 環境では、epoll、poll、select など、NIO を実装するためのいくつかの方法が提供されています。 select/poll の場合、呼び出されるたびに FD および監視イベントが外部から渡されるため、呼び出されるたびにこれらのデータをユーザー状態からカーネル状態にコピーする必要があり、コストの比較が行われます。各呼び出しのデータは大きく、select/poll から返されるたびに全量のデータを調べて、どれが READY であるかを確認する必要があります。 epoll の場合、システムは必要な FD と監視イベントを内部で保持します。呼び出すたびに epoll_ctl を呼び出すだけで済みます。戻るときは READY のみを返します。リスニングイベントやFDなど。以下に簡単な疑似コードを示します。 🎜詳細については、前の記事を参照してください: 🎜
private native int epollCreate();
private native void epollCtl(int epfd, int opcode, int fd, int events);
private native int epollWait(long pollAddress, int numfds, long timeout,
                             int epfd) throws IOException;
ログイン後にコピー
🎜Selector の原理🎜🎜SelectionKey🎜🎜 Java トップレベル ユーザーの観点から、チャネルは登録を通じて SelectionKey を返し、Selector.select メソッドを返します。また、SelectionKey を返すことによっても使用されます。では、なぜここでこのクラスが必要なのでしょうか?このクラスは何をするのですか?言語に関係なく、Linux 上の上記の基本的なアプリケーションを通じて、システム コールを通じて FD やイベントなどのパラメーターが設計から受け渡され、返されることがわかります。ここで READY イベントのパラメータを渡すには、関連付けられるようにマッピング関係が必要です。このとき、SelectionKey の中に表示されます。 、チャネルへの参照といくつかのイベント情報を保存すると、セレクターは関連付けのために FD を介して SelectionKey を見つけます。基礎となる EP には、Map<Integer,SelectionKeyImpl> fdToKey という属性があります。 🎜🎜EPollSelectorImpl🎜🎜 Linux 2.6 以降のバージョンでは、Java NIO は epoll (つまり、EPollSelectorImpl クラス) を使用します。2.4.x の場合は、poll (つまり、PollSelectorImpl クラス) を使用します。 epollを例に挙げます。 🎜

select メソッド

🎜 最上位の Selector は、select メソッドを呼び出すことにより、最終的に EPollSelectorImpl.doSelect メソッドを呼び出します。このメソッドを通じて、最初に、すでに存在しないいくつかのイベントが処理されることがわかります。最後に、マッピング関係を処理する必要があることがわかります🎜
JNIEXPORT void JNICALL
Java_sun_nio_ch_EPollArrayWrapper_init(JNIEnv *env, jclass this)
{
    epoll_create_func = (epoll_create_t) dlsym(RTLD_DEFAULT, "epoll_create");
    epoll_ctl_func    = (epoll_ctl_t)    dlsym(RTLD_DEFAULT, "epoll_ctl");
    epoll_wait_func   = (epoll_wait_t)   dlsym(RTLD_DEFAULT, "epoll_wait");

    if ((epoll_create_func == NULL) || (epoll_ctl_func == NULL) ||
        (epoll_wait_func == NULL)) {
        JNU_ThrowInternalError(env, "unable to get address of epoll functions, pre-2.6 kernel?");
    }
}
ログイン後にコピー
🎜EPollArrayWrapper🎜🎜EpollArrayWrapper は、基になる呼び出しをカプセル化します。 🎜
static int read(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4, Object var5) throws IOException {
    if(var1.isReadOnly()) {
        throw new IllegalArgumentException("Read-only buffer");
    } else if(var1 instanceof DirectBuffer) {
        // 为堆外内存,则直接读取
        return readIntoNativeBuffer(var0, var1, var2, var4, var5);
    } else {
        // 为堆内内存,先获取临时堆外内存
        ByteBuffer var6 = Util.getTemporaryDirectBuffer(var1.remaining());

        int var8;
        try {
            // 读取到堆外内存
            int var7 = readIntoNativeBuffer(var0, var6, var2, var4, var5);
            var6.flip();
            if(var7 > 0) {
                // 复制到堆内
                var1.put(var6);
            }

            var8 = var7;
        } finally {
            // 释放临时堆外内存
            Util.offerFirstTemporaryDirectBuffer(var6);
        }

        return var8;
    }
}
ログイン後にコピー
ログイン後にコピー
🎜 対応する実装 EPollArrayWrapper.c は、openjdk のネイティブ ディレクトリ (native/sun/nio/ch) にあります。 🎜(ちなみに、ネイティブメソッドを実装するには、クラス内のメソッドにnativeキーワードを追加し、それをクラスファイルにコンパイルし、出力.hに変換します。ヘッダファイルの実装方法は、 c/c++ の一番下は so Library にコンパイルされるので、対応するディレクトリに置くだけです) 🎜初期化ファイルのメソッドでは、動的解析によってロードされ、最後に epoll_create やその他のメソッドが呼び出されていることがわかります。 🎜
// 仅仅作了对齐以及将长度放在数组前方就返回了
UNSAFE_ENTRY(jlong, Unsafe_AllocateMemory(JNIEnv *env, jobject unsafe, jlong size))
  UnsafeWrapper("Unsafe_AllocateMemory");
  size_t sz = (size_t)size;
  if (sz != (julong)size || size < 0) {
    THROW_0(vmSymbols::java_lang_IllegalArgumentException());
  }
  if (sz == 0) {
    return 0;
  }
  sz = round_to(sz, HeapWordSize);
  void* x = os::malloc(sz);
  if (x == NULL) {
    THROW_0(vmSymbols::java_lang_OutOfMemoryError());
  }
  //Copy::fill_to_words((HeapWord*)x, sz / HeapWordSize);
  return addr_to_java(x);
UNSAFE_END
ログイン後にコピー
🎜Channel と Buffer 間のオフヒープ メモリ🎜🎜 オフヒープ メモリはリークしやすいという話をよく聞きますが、Netty フレームワークはコピーを削減し、パフォーマンスを向上させるためにオフヒープ メモリを使用します。では、ここでのオフヒープ メモリとは何を指すのでしょうか? 興味を持って、読み取りメソッドを通じて、最終的に IOUtil の読み取りメソッドを呼び出す SocketChannelImpl の読み取りメソッドにたどり着きました。まず、受信バッファーが DirectBuffer であるかどうかを判断し、そうでない場合 (HeapByteBuffer である)、一時的な DirectBuffer が作成され、ヒープにコピーされます。 IOUtil.read メソッド: 🎜
static int read(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4, Object var5) throws IOException {
    if(var1.isReadOnly()) {
        throw new IllegalArgumentException("Read-only buffer");
    } else if(var1 instanceof DirectBuffer) {
        // 为堆外内存,则直接读取
        return readIntoNativeBuffer(var0, var1, var2, var4, var5);
    } else {
        // 为堆内内存,先获取临时堆外内存
        ByteBuffer var6 = Util.getTemporaryDirectBuffer(var1.remaining());

        int var8;
        try {
            // 读取到堆外内存
            int var7 = readIntoNativeBuffer(var0, var6, var2, var4, var5);
            var6.flip();
            if(var7 > 0) {
                // 复制到堆内
                var1.put(var6);
            }

            var8 = var7;
        } finally {
            // 释放临时堆外内存
            Util.offerFirstTemporaryDirectBuffer(var6);
        }

        return var8;
    }
}
ログイン後にコピー
ログイン後にコピー

这里有一个问题就是,为什么会需要DirectBuffer以及堆外内存?通过对DirectByteBuffer的创建来分析,可以知道,通过unsafe.allocateMemory(size);来分配内存的,而对于该方法来说,可以说是直接调用malloc返回,这一块内存是不受GC管理的,也就是所说的:堆外内存容易泄漏。但是对于使用DirectByteBuffer来说,会创建一个Deallocator,注册到Cleaner里面,当对象被回收的时候,则会被直接,从而释放掉内存,减少内存泄漏。要用堆外内存,从上面的创建来看,堆外内存创建后,以long型地址保存的,而堆内内存会受到GC影响,对象会被移动,如果采用堆内内存,进行系统调用的时候,那么GC就需要停止,否则就会有问题,基于这一点,采用了堆外内存(这一块参考了R大的理解:)。

注:堆外内存的创建(unsafe.cpp):

// 仅仅作了对齐以及将长度放在数组前方就返回了
UNSAFE_ENTRY(jlong, Unsafe_AllocateMemory(JNIEnv *env, jobject unsafe, jlong size))
  UnsafeWrapper("Unsafe_AllocateMemory");
  size_t sz = (size_t)size;
  if (sz != (julong)size || size < 0) {
    THROW_0(vmSymbols::java_lang_IllegalArgumentException());
  }
  if (sz == 0) {
    return 0;
  }
  sz = round_to(sz, HeapWordSize);
  void* x = os::malloc(sz);
  if (x == NULL) {
    THROW_0(vmSymbols::java_lang_OutOfMemoryError());
  }
  //Copy::fill_to_words((HeapWord*)x, sz / HeapWordSize);
  return addr_to_java(x);
UNSAFE_END
ログイン後にコピー

以上がJava NIO の原理分析と基本的な使用法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

関連ラベル:
ソース:php.cn
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート