Java NIO 反应堆模式简单模型
一般NIO里反应堆模式都是这样:一个Acceptor(当然多个也行,不过一般场景一个够了)负责accept事件,把接收到Socket CHannel注册到按某种算法从Reactor池中取出的一个Reactor上,注册的事件为读,写等,之后这个Socket Channel的所有IO事件都和Acceptor没关系,都由被注册到的那个Reactor来负责。
每个Acceptor和每个Reactor都各自持有一个Selector
当然每个Acceptor和Reactor都得是一个线程(起码在逻辑上得是线程)
简单实现,三个类NioAcceptor、NioReactor和ReactorPool:
package cc.lixiaohui.demo.dp.reator; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Objects; import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Acceptor负责处理SelectionKey.OP_ACCEPT事件, 将接收到的SocketChannel注册到Reactor上去 */ public class NioAcceptor { private int port; private String host; private Selector selector; // Java NIO Selector private final ServerSocketChannel serverChannel; // Java NIO ServerSocketChannel private ReactorPool reactorPool; // NioReactor池 private Thread thread; // 工作线程 private volatile boolean stop = false; private static final Logger logger = LoggerFactory.getLogger(NioAcceptor.class); public NioAcceptor(int port, String host, int reactorPoolSize) throws IOException { this.port = port; this.host = Objects.requireNonNull(host); this.reactorPool = new ReactorPool(reactorPoolSize); selector = Selector.open(); // 创建selector serverChannel = ServerSocketChannel.open(); // new server socket channel serverChannel.configureBlocking(false); // in non-blocking mode serverChannel.bind(new InetSocketAddress(host, port)); // bind serverChannel.register(selector, SelectionKey.OP_ACCEPT); // } public void stop() throws InterruptedException { stop = true; thread.join(); } public void start() { thread = new Thread(new AcceptTask(this)); thread.start(); } private static class AcceptTask implements Runnable { NioAcceptor acceptor; AcceptTask(NioAcceptor acceptor) { this.acceptor = acceptor; } public void run() { final Selector selector = acceptor.selector; Set<SelectionKey> keys = null; while (!acceptor.stop) { // 运行中 try { selector.select(1000L); // select, 最多等1秒 keys = selector.selectedKeys(); try { for (SelectionKey key : keys) { if (key.isValid() && key.isAcceptable()) { // 可accept SocketChannel channel = acceptor.serverChannel.accept(); channel.configureBlocking(false); // 取下一个Reactor并把SocketChannel加入到Reactor的注册队列 acceptor.reactorPool.nextReactor().postRegistry(channel); } else { key.cancel(); } } } finally { keys.clear(); } } catch (IOException e) { logger.error("", e); } } } } }
/** * Reactor负责SelectionKey.OP_READ | SelectionKey.OP_WRITE等事件 */ public class NioReactor { /** 待注册的{@link SocketChannel} 队列 */ private Queue<SocketChannel> registerQueue = new ConcurrentLinkedQueue<SocketChannel>(); private Selector selector; private volatile boolean stop = false; private Thread thread; private static final Logger logger = LoggerFactory.getLogger(NioReactor.class); public NioReactor() throws IOException { selector = Selector.open(); } public void postRegistry(SocketChannel channel) { registerQueue.add(channel); selector.wakeup(); // 唤醒selector, 以便让其即时处理注册 } public NioReactor start() { thread = new Thread(new ReactTask(this)); thread.start(); return this; } public void stop() throws InterruptedException { stop = true; thread.join(); } /** * 处理队列里面的待注册的SocketChannel */ private void doRegister(Selector selector) { while (!registerQueue.isEmpty()) { SocketChannel channel = registerQueue.poll(); try { // 注册读事件, 写事件无需注册, 写事件是业务驱动的, 当往channel写入 数据未写完时再注册写事件 channel.register(selector, SelectionKey.OP_READ); } catch (ClosedChannelException e) { logger.error("", e); } } } private void handleWrite(SelectionKey key) { // TODO 业务写 } private void handleRead(SelectionKey key) { // TODO 业务读 } private static class ReactTask implements Runnable { NioReactor reactor; ReactTask(NioReactor reactor) { this.reactor = reactor; } public void run() { Set<SelectionKey> keys = null; while (!reactor.stop) { final Selector selector = reactor.selector; try { selector.select(500L); reactor.doRegister(selector); // 处理注册 keys = selector.selectedKeys(); for (SelectionKey key : keys) { try { if (!key.isValid()) { // not valid key.cancel(); continue; } if (key.isReadable()) { // 可读 reactor.handleRead(key); } if (key.isWritable()) { // 可写 reactor.handleWrite(key); } } catch (Throwable t) { logger.error("", t); continue; } } } catch (IOException e) { logger.error("", e); } } } } }
ReactorPool用来管理Reactor:
public class ReactorPool extends LinkedList<NioReactor>{ private static final long serialVersionUID = 6525233920805533099L; private final int capacity; public ReactorPool(int size) { this.capacity = size; } // 轮询算法取下一个Reactor public NioReactor nextReactor() throws IOException { // 新建或从头部拿一个Reactor NioReactor reactor = size() < capacity ? new NioReactor().start() : poll(); add(reactor);// 加到尾部 return reactor; } }