최근 Redis 코드를 살펴보니 매우 간단하다는 것을 알았습니다. 하나를 다른 언어로 구현(복사)하고 싶은 충동이 들었습니다. 원래는 Python을 사용하여 구현하려고 했습니다. 이유
첫째: Java의 NIO와 Netty의 EventLoop의 조합은 Redis 네트워크 모델과 매우 유사합니다. Redis 모델도 더 간단합니다. EventLoop 스레드가 하나만 있기 때문입니다. 쓰기(복사)
둘째: Netty 아키텍처는 꽤 좋습니다.
Redis Server를 매우 추상적인(간단한) 관점에서 보면, 기본적으로 단일 라인 요청을 처리하는 해시테이블인 6379를 수신하는 프로그램입니다. Redis 프로토콜은 http 프로토콜보다 훨씬 간단합니다.
다음은 일반적인 형식입니다. 이 프로토콜은 다음과 같습니다.
*<参数数量> CR LF $<参数 1 的字节数量> CR LF<参数 1 的数据> CR LF ... $<参数 N 的字节数量> CR LF<参数 N 的数据> CR LF
이것은 기본적으로 매우 간단한 유한 상태 머신입니다.
그래서 명령 구문 분석기에 3가지 상태를 설정했습니다.
public enum State { NUMBER_OF_ARGS, NUMBER_BYTE_OF_ARGS, ARGS_DATA }
초기 상태를 설정하겠습니다. NUMBER_OF_ARGS 설정은 처음에는 녹색 상태입니다. 데이터가 도착하면 프로그램의 상태가 무엇인지, 무엇을 하는지 지속적으로 판단합니다.
while(true){ switch (state()){ case NUMBER_OF_ARGS: //从当前数据中读取参数个数 break; case NUMBER_BYTE_OF_ARGS: //从数据中读取参数长度 break; case ARGS_DATA: //按参数长度读取参数 //判断参数个数.如果到了最后一个.则跳出,否则状态转回NUMBER_BYTE_OF_ARGS break; } }
package me.yunanw.redisinjava; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.DecoderException; import io.netty.handler.codec.ReplayingDecoder;import java.util.List; /** * Created by yunanw on 2016/10/15. */ public class CommandDecoder extends ReplayingDecoder { public enum State { NUMBER_OF_ARGS, NUMBER_BYTE_OF_ARGS, ARGS_DATA } static final char CR = '\r'; static final char LF = '\n'; public CommandDecoder(){ state(State.NUMBER_OF_ARGS); } protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { RedisFrame frame = doDecode(channelHandlerContext,byteBuf,list); if (frame != null){ list.add(frame); } } private RedisFrame doDecode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { RedisFrame frame = null; int currentArgsLen = 0; int argsCount = 0; while(true){ switch (state()){ case NUMBER_OF_ARGS: if (byteBuf.readByte() != '*'){ throw new DecoderException("can not found *"); } argsCount = parseRedisNumber(byteBuf); frame = new RedisFrame(argsCount); checkpoint(State.NUMBER_BYTE_OF_ARGS); break; case NUMBER_BYTE_OF_ARGS: if (byteBuf.readByte() != '$'){ throw new DecoderException("can not found $"); } currentArgsLen = parseRedisNumber(byteBuf); checkpoint(State.ARGS_DATA);; break; case ARGS_DATA: frame.AppendArgs(byteBuf.readBytes(currentArgsLen).array()); if (byteBuf.readByte() != CR || byteBuf.readByte() != LF) throw new DecoderException("can not found CR OR LF"); if ((--argsCount) = 0 && digit < 10) { result = (result * 10) + digit; } else { throw new DecoderException("Invalid character in integer"); } } while ((readByte = byteBuf.readByte()) != CR); if ((readByte = byteBuf.readByte()) != LF) { throw new DecoderException("can not found LF"); } return (negative? -result:result); } }
</p><pre class="brush:java;toolbar:false"> try { decode(ctx, replayable, out); //省略} catch (Signal replay) { replay.expect(REPLAY); //省略 // Return to the checkpoint (or oldPosition) and retry. int checkpoint = this.checkpoint; if (checkpoint >= 0) { in.readerIndex(checkpoint); } else { // Called by cleanup() - no need to maintain the readerIndex // anymore because the buffer has been released already. } break; }
</p><pre class="brush:java;toolbar:false"> ServerBootstrap bootstrap = new ServerBootstrap(); final DefaultEventExecutorGroup group = new DefaultEventExecutorGroup(1); try { bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .localAddress(port) .childOption(ChannelOption.TCP_NODELAY, true) .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new CommandDecoder()); p.addLast(new RedisServerHandler()); } }); // Start the server. ChannelFuture f = bootstrap.bind().sync(); // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. group.shutdownGracefully(); }
</p><pre class="brush:java;toolbar:false"> package me.yunanw.redisinjava;import java.util.ArrayList;import java.util.List; /** * Created by yunanw on 2016/10/17. */ public class RedisFrame { private int argsCount = 0; List ArgsData = null; public RedisFrame(int argsCount){ this.argsCount = argsCount; this.ArgsData = new ArrayList(argsCount); } public void AppendArgs(byte[] args){ this.ArgsData.add(new String(args)); } public int getCommandCount(){ return ArgsData.size(); } public String GetFristCommand(){ if (ArgsData.size() > 0){ return ArgsData.get(0); } return null; } public String GetCommand(int index){ if (ArgsData.size() > index){ return ArgsData.get(index); } return null; } }