首頁 类库下载 java类库 實作一個 Java 版的 Redis

實作一個 Java 版的 Redis

May 30, 2018 pm 01:56 PM

最近看了Redis 的程式碼,感覺還是挺簡單的.有衝動想用其它語言實現(抄)一個.原來想用Python 實現來著.後來想想試試Netty.原因有二

    第一:Java的NIO 和Netty 的EventLoop 配合起來和Redis 的網路模型很接近.都是Ractor 模型.甚至 Redis的模型更簡單--只有一個EventLoop 線程.寫(抄)起來更方便

   第二:Netty 架構挺不錯.藉這個機會學習一下.

如果我們從一個很抽象(簡單)的角度看Redis Server.就是一個監聽在6379的程序, 本質上是一個處理單線線請求的Hashtable. 而Redis 的協議也是非常非常的簡單.比http 協定可簡單多了.

以下是這個協定的一般形式:

*<参数数量> CR LF
$<参数 1 的字节数量> CR LF<参数 1 的数据> CR LF
...
$<参数 N 的字节数量> CR LF<参数 N 的数据> CR LF
登入後複製

這基本上就是一個很簡單的有限狀態機.

實作一個 Java 版的 Redis

所以我給我們的命令解析器設定333個狀態.

public enum State {
    NUMBER_OF_ARGS,
    NUMBER_BYTE_OF_ARGS,
    ARGS_DATA
}
登入後複製

我們將初始狀態設定NUMBER_OF_ARGS 也就是開始那個綠色的狀態.當有資料到達時.我們不停的判斷程式的狀態.是哪個狀態,我們做啥.

while(true){    switch (state()){        case NUMBER_OF_ARGS:
            //从当前数据中读取参数个数
            break;        case NUMBER_BYTE_OF_ARGS:
            //从数据中读取参数长度
            break;        case ARGS_DATA:
            //按参数长度读取参数
            //判断参数个数.如果到了最后一个.则跳出,否则状态转回NUMBER_BYTE_OF_ARGS
            break;
    }
}
登入後複製

下面我們按著我們上面思路實現一下.

package me.yunanw.redisinjava;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.ReplayingDecoder;import java.util.List;
/**
 * Created by yunanw on 2016/10/15.
 */
 public class CommandDecoder extends ReplayingDecoder {    
 public enum State {
        NUMBER_OF_ARGS,
        NUMBER_BYTE_OF_ARGS,
        ARGS_DATA
    }    
    static final char CR = &#39;\r&#39;;    
    static final char LF = &#39;\n&#39;;    
    public CommandDecoder(){

        state(State.NUMBER_OF_ARGS);
    }    
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception {
        RedisFrame frame = doDecode(channelHandlerContext,byteBuf,list);        
        if (frame != null){
            list.add(frame);
        }
    }    
    private RedisFrame doDecode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception {
        RedisFrame frame = null;        
        int currentArgsLen = 0;        
        int argsCount = 0;        
        while(true){            
        switch (state()){                
        case NUMBER_OF_ARGS:                    
        if (byteBuf.readByte() != &#39;*&#39;){                        
        throw new DecoderException("can not found *");
                    }
                    argsCount = parseRedisNumber(byteBuf);
                    frame = new RedisFrame(argsCount);
                    checkpoint(State.NUMBER_BYTE_OF_ARGS);                    
                    break;                
                    case NUMBER_BYTE_OF_ARGS:                    
                    if (byteBuf.readByte() != &#39;$&#39;){                        
                    throw new DecoderException("can not found $");
                    }
                    currentArgsLen = parseRedisNumber(byteBuf);
                    checkpoint(State.ARGS_DATA);;                    
                    break;                
                    case ARGS_DATA:
                    frame.AppendArgs(byteBuf.readBytes(currentArgsLen).array());                    
                    if (byteBuf.readByte() != CR || byteBuf.readByte() != LF)                        
                    throw new DecoderException("can not found CR OR LF");                    
                    if ((--argsCount) = 0 && digit < 10)
                     {
                result = (result * 10) + digit;
            } else {                
            throw new DecoderException("Invalid character in integer");
            }
        } while ((readByte = byteBuf.readByte()) != CR);        
        if ((readByte = byteBuf.readByte()) != LF)
        {            
        throw new DecoderException("can not found LF");
        }        
        return (negative? -result:result);
    }

}
登入後複製

寫到這裡有一個小問題,如果你上面代碼看懂了,你就會發現一個小問題.如果由於網絡原因,有時數據可以並沒有接收完全.而我們的代碼完全沒有做這方面的考慮? 而Checkpoint 這是又什麼鬼?

第一個問題:

    事實上我們有考慮這個問題.所以我們繼承了一個相對比較特別Decoder--ReplayingDecoder.我們看一下ReplayingDecoder的CallDecode 方法.(這個名字起的非常的直白.你一定明白他是乾啥的)

</p><pre class="brush:java;toolbar:false">
try {
    decode(ctx, replayable, out);
    //省略} catch (Signal replay) {
    replay.expect(REPLAY);     //省略
    // Return to the checkpoint (or oldPosition) and retry.
    int checkpoint = this.checkpoint;    
    if (checkpoint >= 0) {        
    in.readerIndex(checkpoint);
    } else {        
    // Called by cleanup() - no need to maintain the readerIndex
        // anymore because the buffer has been released already.
    }    
    break;
}
登入後複製

Signal replay 是Netty 中定義的一個錯誤.當我們讀取錯誤時,Netty 會再等到下次有資料到達時,再試一次Decode 方法.看看能再解析成功.所以我們就可以假設定我們要的資料都已經讀取了.

但是要注意: replaydecoder 的decode 方法會被反覆調用..所以我們的程式碼中要做好這樣的準備.

二: CheckPoint 就是為了防止如果每次反复調用Decode 時從頭執行,而設置的一個狀態.讓我們這個decode 方法有狀態.

好了.現在我們建立監部分的程式碼.這都是套數,直接抄下來就行了

</p><pre class="brush:java;toolbar:false">
ServerBootstrap bootstrap = new ServerBootstrap();
final DefaultEventExecutorGroup group = new DefaultEventExecutorGroup(1);
try {
    bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup())
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 100)
            .localAddress(port)
            .childOption(ChannelOption.TCP_NODELAY, true)
            .childHandler(new ChannelInitializer() {
                @Override                
                public void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline p = ch.pipeline();
                    p.addLast(new CommandDecoder());
                    p.addLast(new RedisServerHandler());
                }
            });    
            // Start the server.
    ChannelFuture f = bootstrap.bind().sync();    
    // Wait until the server socket is closed.
    f.channel().closeFuture().sync();
} finally {    
// Shut down all event loops to terminate all threads.
    group.shutdownGracefully();
}
登入後複製

我們把Redis 的協定解析為RedisFrame 類別

</p><pre class="brush:java;toolbar:false">
package me.yunanw.redisinjava;import java.util.ArrayList;import java.util.List;
/**
 * Created by yunanw on 2016/10/17.
 */
 public class RedisFrame {    
 private int argsCount = 0;
    List ArgsData = null;    
    public RedisFrame(int argsCount){        
    this.argsCount = argsCount;        
    this.ArgsData = new ArrayList(argsCount);

    }    public void AppendArgs(byte[] args){        
    this.ArgsData.add(new String(args));
    }    public int getCommandCount(){        
    return ArgsData.size();
    }    public String GetFristCommand(){        
    if (ArgsData.size() > 0){            
    return ArgsData.get(0);
        }        
        return null;
    }    
    public String GetCommand(int index){        
    if (ArgsData.size() > index){            
    return ArgsData.get(index);
        }       
        return null;
    }
}
登入後複製

好了.這時你打開Redis-cli 試試看是不是可以連上我們的"假Redis" Server.有意的是---你打開Redis-cli.他會自動發一個"Command" 命令.而你不管回復什麼,它都認為連上了

本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

Video Face Swap

Video Face Swap

使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱門文章

<🎜>:泡泡膠模擬器無窮大 - 如何獲取和使用皇家鑰匙
3 週前 By 尊渡假赌尊渡假赌尊渡假赌
北端:融合系統,解釋
3 週前 By 尊渡假赌尊渡假赌尊渡假赌
Mandragora:巫婆樹的耳語 - 如何解鎖抓鉤
3 週前 By 尊渡假赌尊渡假赌尊渡假赌

熱工具

記事本++7.3.1

記事本++7.3.1

好用且免費的程式碼編輯器

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

禪工作室 13.0.1

禪工作室 13.0.1

強大的PHP整合開發環境

Dreamweaver CS6

Dreamweaver CS6

視覺化網頁開發工具

SublimeText3 Mac版

SublimeText3 Mac版

神級程式碼編輯軟體(SublimeText3)

熱門話題

Java教學
1665
14
CakePHP 教程
1424
52
Laravel 教程
1321
25
PHP教程
1269
29
C# 教程
1249
24