Einfaches Modell des Java-NIO-Reaktormodus
Im Allgemeinen sieht der Reaktormodus in NIO so aus: Ein Akzeptor (natürlich funktionieren mehrere, aber in allgemeinen Szenarien reicht einer aus) ist für die Annahme verantwortlich Ereignis und registriert den empfangenen Socket-Kanal. Auf einem Reaktor, der gemäß einem bestimmten Algorithmus aus dem Reaktor-Pool entnommen wird, werden die registrierten Ereignisse gelesen, geschrieben usw. Danach haben alle IO-Ereignisse dieses Socket-Kanals nichts mehr mit dem Akzeptor zu tun. und werden alle vom registrierten Reactor verwaltet.
Jeder Akzeptor und jeder Reaktor enthält jeweils einen Selektor
Natürlich muss jeder Akzeptor und Reaktor ein Thread sein (zumindest logischerweise muss es ein Thread sein)
Einfache Implementierung , drei Klassen NioAcceptor, NioReactor und ReactorPool:
package cc.lixiaohui.demo.dp.reator; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Objects; import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Acceptor负责处理SelectionKey.OP_ACCEPT事件, 将接收到的SocketChannel注册到Reactor上去 */ public class NioAcceptor { private int port; private String host; private Selector selector; // Java NIO Selector private final ServerSocketChannel serverChannel; // Java NIO ServerSocketChannel private ReactorPool reactorPool; // NioReactor池 private Thread thread; // 工作线程 private volatile boolean stop = false; private static final Logger logger = LoggerFactory.getLogger(NioAcceptor.class); public NioAcceptor(int port, String host, int reactorPoolSize) throws IOException { this.port = port; this.host = Objects.requireNonNull(host); this.reactorPool = new ReactorPool(reactorPoolSize); selector = Selector.open(); // 创建selector serverChannel = ServerSocketChannel.open(); // new server socket channel serverChannel.configureBlocking(false); // in non-blocking mode serverChannel.bind(new InetSocketAddress(host, port)); // bind serverChannel.register(selector, SelectionKey.OP_ACCEPT); // } public void stop() throws InterruptedException { stop = true; thread.join(); } public void start() { thread = new Thread(new AcceptTask(this)); thread.start(); } private static class AcceptTask implements Runnable { NioAcceptor acceptor; AcceptTask(NioAcceptor acceptor) { this.acceptor = acceptor; } public void run() { final Selector selector = acceptor.selector; Set<SelectionKey> keys = null; while (!acceptor.stop) { // 运行中 try { selector.select(1000L); // select, 最多等1秒 keys = selector.selectedKeys(); try { for (SelectionKey key : keys) { if (key.isValid() && key.isAcceptable()) { // 可accept SocketChannel channel = acceptor.serverChannel.accept(); channel.configureBlocking(false); // 取下一个Reactor并把SocketChannel加入到Reactor的注册队列 acceptor.reactorPool.nextReactor().postRegistry(channel); } else { key.cancel(); } } } finally { keys.clear(); } } catch (IOException e) { logger.error("", e); } } } } }
/** * Reactor负责SelectionKey.OP_READ | SelectionKey.OP_WRITE等事件 */ public class NioReactor { /** 待注册的{@link SocketChannel} 队列 */ private Queue<SocketChannel> registerQueue = new ConcurrentLinkedQueue<SocketChannel>(); private Selector selector; private volatile boolean stop = false; private Thread thread; private static final Logger logger = LoggerFactory.getLogger(NioReactor.class); public NioReactor() throws IOException { selector = Selector.open(); } public void postRegistry(SocketChannel channel) { registerQueue.add(channel); selector.wakeup(); // 唤醒selector, 以便让其即时处理注册 } public NioReactor start() { thread = new Thread(new ReactTask(this)); thread.start(); return this; } public void stop() throws InterruptedException { stop = true; thread.join(); } /** * 处理队列里面的待注册的SocketChannel */ private void doRegister(Selector selector) { while (!registerQueue.isEmpty()) { SocketChannel channel = registerQueue.poll(); try { // 注册读事件, 写事件无需注册, 写事件是业务驱动的, 当往channel写入 数据未写完时再注册写事件 channel.register(selector, SelectionKey.OP_READ); } catch (ClosedChannelException e) { logger.error("", e); } } } private void handleWrite(SelectionKey key) { // TODO 业务写 } private void handleRead(SelectionKey key) { // TODO 业务读 } private static class ReactTask implements Runnable { NioReactor reactor; ReactTask(NioReactor reactor) { this.reactor = reactor; } public void run() { Set<SelectionKey> keys = null; while (!reactor.stop) { final Selector selector = reactor.selector; try { selector.select(500L); reactor.doRegister(selector); // 处理注册 keys = selector.selectedKeys(); for (SelectionKey key : keys) { try { if (!key.isValid()) { // not valid key.cancel(); continue; } if (key.isReadable()) { // 可读 reactor.handleRead(key); } if (key.isWritable()) { // 可写 reactor.handleWrite(key); } } catch (Throwable t) { logger.error("", t); continue; } } } catch (IOException e) { logger.error("", e); } } } } }
ReactorPool wird zur Verwaltung von Reactor verwendet:
public class ReactorPool extends LinkedList<NioReactor>{ private static final long serialVersionUID = 6525233920805533099L; private final int capacity; public ReactorPool(int size) { this.capacity = size; } // 轮询算法取下一个Reactor public NioReactor nextReactor() throws IOException { // 新建或从头部拿一个Reactor NioReactor reactor = size() < capacity ? new NioReactor().start() : poll(); add(reactor);// 加到尾部 return reactor; } }