I recently looked at the code of Redis and it seems quite simple. I have the urge to implement (copy) one in other languages. Originally I wanted to use Python to implement it. Then I thought about trying Netty. There are two reasons
First: Java The combination of NIO and Netty's EventLoop is very close to the Redis network model. They are both Ractor models. Even the Redis model is simpler - there is only one EventLoop thread. It is more convenient to write (copy)
Second: Netty architecture is quite good .Take this opportunity to learn.
If we look at Redis Server from a very abstract (simple) perspective, it is a program that listens on 6379. It is essentially a Hashtable that processes single-line requests. And the Redis protocol is also very, very Simple. Much simpler than the http protocol.
The following is the general form of this protocol:
*<参数数量> CR LF $<参数 1 的字节数量> CR LF<参数 1 的数据> CR LF ... $<参数 N 的字节数量> CR LF<参数 N 的数据> CR LF
This is basically a very simple finite state machine.
So I set 3 for our command parser state.
public enum State { NUMBER_OF_ARGS, NUMBER_BYTE_OF_ARGS, ARGS_DATA }
We set the initial state to NUMBER_OF_ARGS, which is the green state. When data arrives, we constantly judge the state of the program. Which state is it and what do we do.
while(true){ switch (state()){ case NUMBER_OF_ARGS: //从当前数据中读取参数个数 break; case NUMBER_BYTE_OF_ARGS: //从数据中读取参数长度 break; case ARGS_DATA: //按参数长度读取参数 //判断参数个数.如果到了最后一个.则跳出,否则状态转回NUMBER_BYTE_OF_ARGS break; } }
Next we press Let’s implement the above idea.
package me.yunanw.redisinjava; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.DecoderException; import io.netty.handler.codec.ReplayingDecoder;import java.util.List; /** * Created by yunanw on 2016/10/15. */ public class CommandDecoder extends ReplayingDecoder { public enum State { NUMBER_OF_ARGS, NUMBER_BYTE_OF_ARGS, ARGS_DATA } static final char CR = '\r'; static final char LF = '\n'; public CommandDecoder(){ state(State.NUMBER_OF_ARGS); } protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { RedisFrame frame = doDecode(channelHandlerContext,byteBuf,list); if (frame != null){ list.add(frame); } } private RedisFrame doDecode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { RedisFrame frame = null; int currentArgsLen = 0; int argsCount = 0; while(true){ switch (state()){ case NUMBER_OF_ARGS: if (byteBuf.readByte() != '*'){ throw new DecoderException("can not found *"); } argsCount = parseRedisNumber(byteBuf); frame = new RedisFrame(argsCount); checkpoint(State.NUMBER_BYTE_OF_ARGS); break; case NUMBER_BYTE_OF_ARGS: if (byteBuf.readByte() != '$'){ throw new DecoderException("can not found $"); } currentArgsLen = parseRedisNumber(byteBuf); checkpoint(State.ARGS_DATA);; break; case ARGS_DATA: frame.AppendArgs(byteBuf.readBytes(currentArgsLen).array()); if (byteBuf.readByte() != CR || byteBuf.readByte() != LF) throw new DecoderException("can not found CR OR LF"); if ((--argsCount) = 0 && digit < 10) { result = (result * 10) + digit; } else { throw new DecoderException("Invalid character in integer"); } } while ((readByte = byteBuf.readByte()) != CR); if ((readByte = byteBuf.readByte()) != LF) { throw new DecoderException("can not found LF"); } return (negative? -result:result); } }
There is a small problem when writing here. If you understand the above code, you will find a small problem. If due to network reasons, sometimes the data may not be received completely. And our code We haven’t considered this aspect at all? And what the hell is Checkpoint?
First question:
In fact, we have considered this issue. So we inherited a relatively special Decoder--ReplayingDecoder. Let’s take a look at ReplayingDecoder CallDecode method. (The name is very straightforward. You must understand what it does)
</p><pre class="brush:java;toolbar:false"> try { decode(ctx, replayable, out); //省略} catch (Signal replay) { replay.expect(REPLAY); //省略 // Return to the checkpoint (or oldPosition) and retry. int checkpoint = this.checkpoint; if (checkpoint >= 0) { in.readerIndex(checkpoint); } else { // Called by cleanup() - no need to maintain the readerIndex // anymore because the buffer has been released already. } break; }
Signal replay is an error defined in Netty. When we read the error, Netty will wait until the next time there is When the data arrives, try the Decode method again to see if it can be parsed successfully. So we can assume that all the data we want has been read.
But be careful: the decode method of replaydecoder will be called repeatedly.. So We must be prepared for this in our code.
2: CheckPoint is a state set to prevent it from being executed from the beginning every time Decode is called repeatedly. Let our decode method have a state.
Okay. Now we Create the code for the monitoring part. These are all sets, just copy them down
</p><pre class="brush:java;toolbar:false"> ServerBootstrap bootstrap = new ServerBootstrap(); final DefaultEventExecutorGroup group = new DefaultEventExecutorGroup(1); try { bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .localAddress(port) .childOption(ChannelOption.TCP_NODELAY, true) .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new CommandDecoder()); p.addLast(new RedisServerHandler()); } }); // Start the server. ChannelFuture f = bootstrap.bind().sync(); // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. group.shutdownGracefully(); }
We parse the Redis protocol into the RedisFrame class
</p><pre class="brush:java;toolbar:false"> package me.yunanw.redisinjava;import java.util.ArrayList;import java.util.List; /** * Created by yunanw on 2016/10/17. */ public class RedisFrame { private int argsCount = 0; List ArgsData = null; public RedisFrame(int argsCount){ this.argsCount = argsCount; this.ArgsData = new ArrayList(argsCount); } public void AppendArgs(byte[] args){ this.ArgsData.add(new String(args)); } public int getCommandCount(){ return ArgsData.size(); } public String GetFristCommand(){ if (ArgsData.size() > 0){ return ArgsData.get(0); } return null; } public String GetCommand(int index){ if (ArgsData.size() > index){ return ArgsData.get(index); } return null; } }
Okay. Now you open Redis-cli and try to see if you can connect to us "Fake Redis" Server. What's interesting is that - when you open Redis-cli, it will automatically send a "Command" command. No matter what you reply, it will think it is connected