Java NIO 리액터 모드 단순 모델
일반적으로 NIO의 리액터 모드는 다음과 같습니다. 하나의 Acceptor(물론 둘 이상이 작동하지만 일반적인 시나리오에서는 하나로 충분함)가 수락을 담당합니다. 특정 알고리즘에 따라 Reactor 풀에서 꺼낸 Reactor에서 등록된 이벤트를 읽고 쓰는 등의 작업을 수행합니다. 이후 이 소켓 채널의 모든 IO 이벤트는 Acceptor와 관련이 없으며, 모두 등록된 Reactor에 의해 처리됩니다.
각 Acceptor와 각 Reactor는 각각 Selector를 보유합니다
물론 각 Acceptor와 Reactor는 스레드여야 합니다(적어도 논리적으로는 스레드여야 함)
간단한 구현 , 세 가지 클래스 NioAcceptor, NioReactor 및 ReactorPool:
package cc.lixiaohui.demo.dp.reator; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Objects; import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Acceptor负责处理SelectionKey.OP_ACCEPT事件, 将接收到的SocketChannel注册到Reactor上去 */ public class NioAcceptor { private int port; private String host; private Selector selector; // Java NIO Selector private final ServerSocketChannel serverChannel; // Java NIO ServerSocketChannel private ReactorPool reactorPool; // NioReactor池 private Thread thread; // 工作线程 private volatile boolean stop = false; private static final Logger logger = LoggerFactory.getLogger(NioAcceptor.class); public NioAcceptor(int port, String host, int reactorPoolSize) throws IOException { this.port = port; this.host = Objects.requireNonNull(host); this.reactorPool = new ReactorPool(reactorPoolSize); selector = Selector.open(); // 创建selector serverChannel = ServerSocketChannel.open(); // new server socket channel serverChannel.configureBlocking(false); // in non-blocking mode serverChannel.bind(new InetSocketAddress(host, port)); // bind serverChannel.register(selector, SelectionKey.OP_ACCEPT); // } public void stop() throws InterruptedException { stop = true; thread.join(); } public void start() { thread = new Thread(new AcceptTask(this)); thread.start(); } private static class AcceptTask implements Runnable { NioAcceptor acceptor; AcceptTask(NioAcceptor acceptor) { this.acceptor = acceptor; } public void run() { final Selector selector = acceptor.selector; Set<SelectionKey> keys = null; while (!acceptor.stop) { // 运行中 try { selector.select(1000L); // select, 最多等1秒 keys = selector.selectedKeys(); try { for (SelectionKey key : keys) { if (key.isValid() && key.isAcceptable()) { // 可accept SocketChannel channel = acceptor.serverChannel.accept(); channel.configureBlocking(false); // 取下一个Reactor并把SocketChannel加入到Reactor的注册队列 acceptor.reactorPool.nextReactor().postRegistry(channel); } else { key.cancel(); } } } finally { keys.clear(); } } catch (IOException e) { logger.error("", e); } } } } }
/** * Reactor负责SelectionKey.OP_READ | SelectionKey.OP_WRITE等事件 */ public class NioReactor { /** 待注册的{@link SocketChannel} 队列 */ private Queue<SocketChannel> registerQueue = new ConcurrentLinkedQueue<SocketChannel>(); private Selector selector; private volatile boolean stop = false; private Thread thread; private static final Logger logger = LoggerFactory.getLogger(NioReactor.class); public NioReactor() throws IOException { selector = Selector.open(); } public void postRegistry(SocketChannel channel) { registerQueue.add(channel); selector.wakeup(); // 唤醒selector, 以便让其即时处理注册 } public NioReactor start() { thread = new Thread(new ReactTask(this)); thread.start(); return this; } public void stop() throws InterruptedException { stop = true; thread.join(); } /** * 处理队列里面的待注册的SocketChannel */ private void doRegister(Selector selector) { while (!registerQueue.isEmpty()) { SocketChannel channel = registerQueue.poll(); try { // 注册读事件, 写事件无需注册, 写事件是业务驱动的, 当往channel写入 数据未写完时再注册写事件 channel.register(selector, SelectionKey.OP_READ); } catch (ClosedChannelException e) { logger.error("", e); } } } private void handleWrite(SelectionKey key) { // TODO 业务写 } private void handleRead(SelectionKey key) { // TODO 业务读 } private static class ReactTask implements Runnable { NioReactor reactor; ReactTask(NioReactor reactor) { this.reactor = reactor; } public void run() { Set<SelectionKey> keys = null; while (!reactor.stop) { final Selector selector = reactor.selector; try { selector.select(500L); reactor.doRegister(selector); // 处理注册 keys = selector.selectedKeys(); for (SelectionKey key : keys) { try { if (!key.isValid()) { // not valid key.cancel(); continue; } if (key.isReadable()) { // 可读 reactor.handleRead(key); } if (key.isWritable()) { // 可写 reactor.handleWrite(key); } } catch (Throwable t) { logger.error("", t); continue; } } } catch (IOException e) { logger.error("", e); } } } } }
ReactorPool은 Reactor를 관리하는 데 사용됩니다:
public class ReactorPool extends LinkedList<NioReactor>{ private static final long serialVersionUID = 6525233920805533099L; private final int capacity; public ReactorPool(int size) { this.capacity = size; } // 轮询算法取下一个Reactor public NioReactor nextReactor() throws IOException { // 新建或从头部拿一个Reactor NioReactor reactor = size() < capacity ? new NioReactor().start() : poll(); add(reactor);// 加到尾部 return reactor; } }