Ich habe mir kürzlich den Redis-Code angesehen und festgestellt, dass ich ihn in anderen Sprachen implementieren (kopieren) wollte. Dann habe ich darüber nachgedacht, ihn zu implementieren Gründe
Erstens: Die Kombination von Javas NIO und Nettys EventLoop kommt dem Redis-Netzwerkmodell sehr nahe. Sogar das Redis-Modell ist einfacher – es gibt nur einen EventLoop-Thread schreiben (kopieren)
Zweitens: Die Netty-Architektur ist ziemlich gut.
Wenn wir Redis Server aus einer sehr abstrakten (einfachen) Perspektive betrachten, ist es ein Programm, das auf 6379 lauscht, was im Wesentlichen eine Hashtabelle ist, die einzeilige Anfragen verarbeitet. Es ist auch sehr, sehr einfach. Es ist viel einfacher als das http-Protokoll dieses Protokoll:
Dies ist im Grunde eine sehr einfache Finite-State-Maschine.*<参数数量> CR LF $<参数 1 的字节数量> CR LF<参数 1 的数据> CR LF ... $<参数 N 的字节数量> CR LF<参数 N 的数据> CR LF
Also habe ich 3 Zustände für unseren Befehlsparser festgelegt.
Wir legen den Anfangszustand fest. NUMBER_OF_ARGS ist der grüne Zustand. Wenn Daten eintreffen, beurteilen wir ständig den Zustand des Programms und was wir tunpublic enum State { NUMBER_OF_ARGS, NUMBER_BYTE_OF_ARGS, ARGS_DATA }
Lassen Sie es uns gemäß unseren obigen Ideen umsetzen.
while(true){ switch (state()){ case NUMBER_OF_ARGS: //从当前数据中读取参数个数 break; case NUMBER_BYTE_OF_ARGS: //从数据中读取参数长度 break; case ARGS_DATA: //按参数长度读取参数 //判断参数个数.如果到了最后一个.则跳出,否则状态转回NUMBER_BYTE_OF_ARGS break; } }
Wenn Sie den obigen Code verstehen, werden Sie ein kleines Problem feststellen Aus Netzwerkgründen werden die Daten manchmal nicht vollständig empfangen. Und wir Der Code berücksichtigt diesen Aspekt überhaupt nicht?
package me.yunanw.redisinjava; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.DecoderException; import io.netty.handler.codec.ReplayingDecoder;import java.util.List; /** * Created by yunanw on 2016/10/15. */ public class CommandDecoder extends ReplayingDecoder { public enum State { NUMBER_OF_ARGS, NUMBER_BYTE_OF_ARGS, ARGS_DATA } static final char CR = '\r'; static final char LF = '\n'; public CommandDecoder(){ state(State.NUMBER_OF_ARGS); } protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { RedisFrame frame = doDecode(channelHandlerContext,byteBuf,list); if (frame != null){ list.add(frame); } } private RedisFrame doDecode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { RedisFrame frame = null; int currentArgsLen = 0; int argsCount = 0; while(true){ switch (state()){ case NUMBER_OF_ARGS: if (byteBuf.readByte() != '*'){ throw new DecoderException("can not found *"); } argsCount = parseRedisNumber(byteBuf); frame = new RedisFrame(argsCount); checkpoint(State.NUMBER_BYTE_OF_ARGS); break; case NUMBER_BYTE_OF_ARGS: if (byteBuf.readByte() != '$'){ throw new DecoderException("can not found $"); } currentArgsLen = parseRedisNumber(byteBuf); checkpoint(State.ARGS_DATA);; break; case ARGS_DATA: frame.AppendArgs(byteBuf.readBytes(currentArgsLen).array()); if (byteBuf.readByte() != CR || byteBuf.readByte() != LF) throw new DecoderException("can not found CR OR LF"); if ((--argsCount) = 0 && digit < 10) { result = (result * 10) + digit; } else { throw new DecoderException("Invalid character in integer"); } } while ((readByte = byteBuf.readByte()) != CR); if ((readByte = byteBuf.readByte()) != LF) { throw new DecoderException("can not found LF"); } return (negative? -result:result); } }
Die Signalwiedergabe ist eine in Netty definierte Fehlermeldung. Wenn wir einen Fehler lesen, wartet Netty, bis das nächste Mal Daten eintreffen, und versucht, die Decode-Methode erneut durchzuführen, um zu sehen, ob alles erfolgreich ist Die von uns gewünschten Daten wurden gelesen.
Aber Vorsicht: Die Dekodiermethode von replaydecoder wird wiederholt aufgerufen. Daher müssen wir in unserem Code darauf vorbereitet sein.
</p><pre class="brush:java;toolbar:false"> try { decode(ctx, replayable, out); //省略} catch (Signal replay) { replay.expect(REPLAY); //省略 // Return to the checkpoint (or oldPosition) and retry. int checkpoint = this.checkpoint; if (checkpoint >= 0) { in.readerIndex(checkpoint); } else { // Called by cleanup() - no need to maintain the readerIndex // anymore because the buffer has been released already. } break; }
Wir analysieren das Redis-Protokoll in die RedisFrame-Klasse
Okay, jetzt öffnen Sie Redis-cli und versuchen zu sehen, ob Sie eine Verbindung zu unserem herstellen können. gefälschter Redis"-Server. Das Interessante ist - --Sie öffnen Redis-cli. Es sendet automatisch einen „Befehl“-Befehl. Egal, was Sie antworten, es wird denken, dass es verbunden ist</p><pre class="brush:java;toolbar:false"> ServerBootstrap bootstrap = new ServerBootstrap(); final DefaultEventExecutorGroup group = new DefaultEventExecutorGroup(1); try { bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .localAddress(port) .childOption(ChannelOption.TCP_NODELAY, true) .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new CommandDecoder()); p.addLast(new RedisServerHandler()); } }); // Start the server. ChannelFuture f = bootstrap.bind().sync(); // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. group.shutdownGracefully(); }