RESP プロトコル
RESP はクライアントとサーバー間で通信するためのプロトコルで、次の 5 つの形式があります:
通常応答: " " で始まり、最後まで文字列形式の「\ r\n」を含む
エラー応答: 「-」で始まり、文字列形式で終わる「\r\n」
整数: 「:」で始まり、「\r\n」で終わる文字列形式です。
複数行の文字列: 「$」で始まり、その後に実際に送信されたバイト数が続き、その後「\」で始まり「\」で終わります。 r\n"
$3\r\nabc\r\n
配列: 「*」で始まり、その後に数字が続きますメンバー数
SET キー値
*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\ nvalue\r\n
クライアントとサーバーによって送信されるコマンドまたはデータは、常に \r\n (CRLF) を改行文字。
*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n と入力すると$5\r\nvalue\r\nこのような一連のコマンドの場合、サーバーは次のコマンドを受け取ります:
*3\r\n
$3\r\n
SET \r\n
$3\r\n
key\r\n
$5\r\n
value\r\n
インターフェース/ resp/conn.go
type Connection interface { Write([]byte) error GetDBIndex() int SelectDB(int) } interface/resp/reply.go type Reply interface { ToBytes() []byte }
Connection インターフェイス: Redis クライアントの接続
Write: クライアントへの返信
GetDBIndex: Redis には 16 の DB があります
応答インターフェイス: 応答インターフェイス
resp/ Reply/consts.go
type PongReply struct{} var pongBytes = []byte("+PONG\r\n") func (r *PongReply) ToBytes() []byte { return pongBytes } var thePongReply = new(PongReply) func MakePongReply() *PongReply { return thePongReply } type OkReply struct{} var okBytes = []byte("+OK\r\n") func (r *OkReply) ToBytes() []byte { return okBytes } var theOkReply = new(OkReply) func MakeOkReply() *OkReply { return theOkReply } var nullBulkBytes = []byte("$-1\r\n") type NullBulkReply struct{} func (r *NullBulkReply) ToBytes() []byte { return nullBulkBytes } func MakeNullBulkReply() *NullBulkReply { return &NullBulkReply{} } var emptyMultiBulkBytes = []byte("*0\r\n") type EmptyMultiBulkReply struct{} func (r *EmptyMultiBulkReply) ToBytes() []byte { return emptyMultiBulkBytes } type NoReply struct{} var noBytes = []byte("") func (r *NoReply) ToBytes() []byte { return noBytes }
5 種類の返信を定義します: Reply pong、ok、null、空の配列、empty
resp/reply/reply.go
type ErrorReply interface { Error() string ToBytes() []byte }
ErrorReply: エラー インターフェイスの定義
resp/reply/errors.go
type UnknownErrReply struct{} var unknownErrBytes = []byte("-Err unknown\r\n") func (r *UnknownErrReply) ToBytes() []byte { return unknownErrBytes } func (r *UnknownErrReply) Error() string { return "Err unknown" } type ArgNumErrReply struct { Cmd string } func (r *ArgNumErrReply) ToBytes() []byte { return []byte("-ERR wrong number of arguments for '" + r.Cmd + "' command\r\n") } func (r *ArgNumErrReply) Error() string { return "ERR wrong number of arguments for '" + r.Cmd + "' command" } func MakeArgNumErrReply(cmd string) *ArgNumErrReply { return &ArgNumErrReply{ Cmd: cmd, } } type SyntaxErrReply struct{} var syntaxErrBytes = []byte("-Err syntax error\r\n") var theSyntaxErrReply = &SyntaxErrReply{} func MakeSyntaxErrReply() *SyntaxErrReply { return theSyntaxErrReply } func (r *SyntaxErrReply) ToBytes() []byte { return syntaxErrBytes } func (r *SyntaxErrReply) Error() string { return "Err syntax error" } type WrongTypeErrReply struct{} var wrongTypeErrBytes = []byte("-WRONGTYPE Operation against a key holding the wrong kind of value\r\n") func (r *WrongTypeErrReply) ToBytes() []byte { return wrongTypeErrBytes } func (r *WrongTypeErrReply) Error() string { return "WRONGTYPE Operation against a key holding the wrong kind of value" } type ProtocolErrReply struct { Msg string } func (r *ProtocolErrReply) ToBytes() []byte { return []byte("-ERR Protocol error: '" + r.Msg + "'\r\n") } func (r *ProtocolErrReply) Error() string { return "ERR Protocol error: '" + r.Msg }
errors は 5 種類のエラーを定義します: UnknownErrReply Unknown error 、 ArgNumErrReply パラメータ番号エラー、SyntaxErrReply 構文エラー、WrongTypeErrReply データ型エラー、ProtocolErrReply プロトコル エラー
resp/reply/reply.go
var ( nullBulkReplyBytes = []byte("$-1") // 协议的结尾 CRLF = "\r\n" ) type BulkReply struct { Arg []byte } func MakeBulkReply(arg []byte) *BulkReply { return &BulkReply{ Arg: arg, } } func (r *BulkReply) ToBytes() []byte { if len(r.Arg) == 0 { return nullBulkReplyBytes } return []byte("$" + strconv.Itoa(len(r.Arg)) + CRLF + string(r.Arg) + CRLF) } type MultiBulkReply struct { Args [][]byte } func (r *MultiBulkReply) ToBytes() []byte { argLen := len(r.Args) var buf bytes.Buffer buf.WriteString("*" + strconv.Itoa(argLen) + CRLF) for _, arg := range r.Args { if arg == nil { buf.WriteString("$-1" + CRLF) } else { buf.WriteString("$" + strconv.Itoa(len(arg)) + CRLF + string(arg) + CRLF) } } return buf.Bytes() } func MakeMultiBulkReply(args [][]byte) *MultiBulkReply { return &MultiBulkReply{ Args: args, } } type StatusReply struct { Status string } func MakeStatusReply(status string) *StatusReply { return &StatusReply{ Status: status, } } func (r *StatusReply) ToBytes() []byte { return []byte("+" + r.Status + CRLF) } type IntReply struct { Code int64 } func MakeIntReply(code int64) *IntReply { return &IntReply{ Code: code, } } func (r *IntReply) ToBytes() []byte { return []byte(":" + strconv.FormatInt(r.Code, 10) + CRLF) } type StandardErrReply struct { Status string } func (r *StandardErrReply) ToBytes() []byte { return []byte("-" + r.Status + CRLF) } func (r *StandardErrReply) Error() string { return r.Status } func MakeErrReply(status string) *StandardErrReply { return &StandardErrReply{ Status: status, } } func IsErrorReply(reply resp.Reply) bool { return reply.ToBytes()[0] == '-' }
IntReply: 数値応答
type Payload struct { Data resp.Reply Err error } type readState struct { readingMultiLine bool expectedArgsCount int msgType byte args [][]byte bulkLen int64 } func (s *readState) finished() bool { return s.expectedArgsCount > 0 && len(s.args) == s.expectedArgsCount } func ParseStream(reader io.Reader) <-chan *Payload { ch := make(chan *Payload) go parse0(reader, ch) return ch } func parse0(reader io.Reader, ch chan<- *Payload) { ...... }
ペイロード構造: カスタマー サービス側から送信されるデータReply: クライアントとサーバーが相互に送信するデータは Reply
readState 構造:
と呼ばれます。readingMultiLine: 単一行または複数行のデータを解析します
expectedArgsCount: 読み取る必要があるパラメーターの数
msgType: メッセージ タイプ
args: メッセージの内容
bulkLen: データ長
completed メソッド: 解析が完了したかどうかを判断します
func readLine(bufReader *bufio.Reader, state *readState) ([]byte, bool, error) { var msg []byte var err error if state.bulkLen == 0 { msg, err = bufReader.ReadBytes('\n') if err != nil { return nil, true, err } if len(msg) == 0 || msg[len(msg)-2] != '\r' { return nil, false, errors.New("protocol error: " + string(msg)) } } else { msg = make([]byte, state.bulkLen+2) _, err = io.ReadFull(bufReader, msg) if err != nil { return nil, true, err } if len(msg) == 0 || msg[len(msg)-2] != '\r' || msg[len(msg)-1] != '\n' { return nil, false, errors.New("protocol error: " + string(msg)) } state.bulkLen = 0 } return msg, false, nil }
readLine: 行ごとに読み取ります。 \n で区切られた通常の行を読み取ります。テキスト内に \r\n 文字を含む行を読み取る場合、state.bulkLen は改行文字 \r\n を追加します (state.bulkLen 2)
func parseMultiBulkHeader(msg []byte, state *readState) error { var err error var expectedLine uint64 expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32) if err != nil { return errors.New("protocol error: " + string(msg)) } if expectedLine == 0 { state.expectedArgsCount = 0 return nil } else if expectedLine > 0 { state.msgType = msg[0] state.readingMultiLine = true state.expectedArgsCount = int(expectedLine) state.args = make([][]byte, 0, expectedLine) return nil } else { return errors.New("protocol error: " + string(msg)) } } func parseBulkHeader(msg []byte, state *readState) error { var err error state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64) if err != nil { return errors.New("protocol error: " + string(msg)) } if state.bulkLen == -1 { // null bulk return nil } else if state.bulkLen > 0 { state.msgType = msg[0] state.readingMultiLine = true state.expectedArgsCount = 1 state.args = make([][]byte, 0, 1) return nil } else { return errors.New("protocol error: " + string(msg)) } }
parseMultiBulkHeader: 配列のヘッダーを解析し、予想される数値を設定します行と関連パラメータの数。
parseBulkHeader: 複数行の文字列のヘッダーを解析します。
func parseSingleLineReply(msg []byte) (resp.Reply, error) { str := strings.TrimSuffix(string(msg), "\r\n") var result resp.Reply switch msg[0] { case '+': // status reply result = reply.MakeStatusReply(str[1:]) case '-': // err reply result = reply.MakeErrReply(str[1:]) case ':': // int reply val, err := strconv.ParseInt(str[1:], 10, 64) if err != nil { return nil, errors.New("protocol error: " + string(msg)) } result = reply.MakeIntReply(val) } return result, nil } func readBody(msg []byte, state *readState) error { line := msg[0 : len(msg)-2] var err error if line[0] == '$' { // bulk reply state.bulkLen, err = strconv.ParseInt(string(line[1:]), 10, 64) if err != nil { return errors.New("protocol error: " + string(msg)) } if state.bulkLen <= 0 { // null bulk in multi bulks state.args = append(state.args, []byte{}) state.bulkLen = 0 } } else { state.args = append(state.args, line) } return nil }
parseSingleLineReply: 単一行のコマンドを解析します
readBody: 複数行のコマンドを読み取ります $ で始まる場合は、BulkLen を設定します 次の行を読み取るときに、これを使用します 2. If args
func parse0(reader io.Reader, ch chan<- *Payload) { defer func() { if err := recover(); err != nil { logger.Error(string(debug.Stack())) } }() bufReader := bufio.NewReader(reader) var state readState var err error var msg []byte for { var ioErr bool msg, ioErr, err = readLine(bufReader, &state) if err != nil { if ioErr { ch <- &Payload{ Err: err, } close(ch) return } ch <- &Payload{ Err: err, } state = readState{} continue } if !state.readingMultiLine { if msg[0] == '*' { // multi bulk reply err = parseMultiBulkHeader(msg, &state) if err != nil { ch <- &Payload{ Err: errors.New("protocol error: " + string(msg)), } state = readState{} continue } if state.expectedArgsCount == 0 { ch <- &Payload{ Data: &reply.EmptyMultiBulkReply{}, } state = readState{} continue } } else if msg[0] == '$' { // bulk reply err = parseBulkHeader(msg, &state) if err != nil { ch <- &Payload{ Err: errors.New("protocol error: " + string(msg)), } state = readState{} // reset state continue } if state.bulkLen == -1 { // null bulk reply ch <- &Payload{ Data: &reply.NullBulkReply{}, } state = readState{} // reset state continue } } else { // single line reply result, err := parseSingleLineReply(msg) ch <- &Payload{ Data: result, Err: err, } state = readState{} // reset state continue } } else { // read bulk reply err = readBody(msg, &state) if err != nil { ch <- &Payload{ Err: errors.New("protocol error: " + string(msg)), } state = readState{} // reset state continue } // if sending finished if state.finished() { var result resp.Reply if state.msgType == '*' { result = reply.MakeMultiBulkReply(state.args) } else if state.msgType == '$' { result = reply.MakeBulkReply(state.args[0]) } ch <- &Payload{ Data: result, Err: err, } state = readState{} } } } }
parse0 に移動: コマンドを解析し、解析が完了したらチャネル経由で送信します
resp/connection/ conn.gotype Connection struct { conn net.Conn waitingReply wait.Wait mu sync.Mutex // 避免多个协程往客户端中写 selectedDB int } func NewConn(conn net.Conn) *Connection { return &Connection{ conn: conn, } } func (c *Connection) RemoteAddr() net.Addr { return c.conn.RemoteAddr() } func (c *Connection) Close() error { c.waitingReply.WaitWithTimeout(10 * time.Second) _ = c.conn.Close() return nil } func (c *Connection) Write(b []byte) error { if len(b) == 0 { return nil } c.mu.Lock() c.waitingReply.Add(1) defer func() { c.waitingReply.Done() c.mu.Unlock() }() _, err := c.conn.Write(b) return err } func (c *Connection) GetDBIndex() int { return c.selectedDB } func (c *Connection) SelectDB(dbNum int) { c.selectedDB = dbNum }
以前に書きましたが、EchoHandler はユーザー入力を受け取り、それを変更せずに返すために使用されます。ここで、EchoHandler を置き換える RespHandler を作成し、パーサーに解析を実行させる必要があります。クライアント接続を管理する Connection 構造体が RespHandler に存在する必要があります。 Connection: プロトコル層のハンドラーで使用されるクライアント接続
resp/handler/handler.govar ( unknownErrReplyBytes = []byte("-ERR unknown\r\n") ) type RespHandler struct { activeConn sync.Map db databaseface.Database closing atomic.Boolean } func MakeHandler() *RespHandler { var db databaseface.Database db = database.NewEchoDatabase() return &RespHandler{ db: db, } } func (h *RespHandler) closeClient(client *connection.Connection) { _ = client.Close() h.db.AfterClientClose(client) h.activeConn.Delete(client) } func (h *RespHandler) Handle(ctx context.Context, conn net.Conn) { if h.closing.Get() { // closing handler refuse new connection _ = conn.Close() } client := connection.NewConn(conn) h.activeConn.Store(client, 1) ch := parser.ParseStream(conn) for payload := range ch { if payload.Err != nil { if payload.Err == io.EOF || payload.Err == io.ErrUnexpectedEOF || strings.Contains(payload.Err.Error(), "use of closed network connection") { // connection closed h.closeClient(client) logger.Info("connection closed: " + client.RemoteAddr().String()) return } // protocol err errReply := reply.MakeErrReply(payload.Err.Error()) err := client.Write(errReply.ToBytes()) if err != nil { h.closeClient(client) logger.Info("connection closed: " + client.RemoteAddr().String()) return } continue } if payload.Data == nil { logger.Error("empty payload") continue } r, ok := payload.Data.(*reply.MultiBulkReply) if !ok { logger.Error("require multi bulk reply") continue } result := h.db.Exec(client, r.Args) if result != nil { _ = client.Write(result.ToBytes()) } else { _ = client.Write(unknownErrReplyBytes) } } } func (h *RespHandler) Close() error { logger.Info("handler shutting down...") h.closing.Set(true) // TODO: concurrent wait h.activeConn.Range(func(key interface{}, val interface{}) bool { client := key.(*connection.Connection) _ = client.Close() return true }) h.db.Close() return nil }
RespHandler: および前のecho も同様ですが、コア層の db.exec 実行解析命令が追加されています
interface/database/database.gotype CmdLine = [][]byte type Database interface { Exec(client resp.Connection, args [][]byte) resp.Reply AfterClientClose(c resp.Connection) Close() } type DataEntity struct { Data interface{} }
Exec: コア層の実行AfterClientClose: 閉じた後のアフターケア メソッド
CmdLine: 2 次元バイト配列のコマンド エイリアス
DataEntity: 文字列、リスト、セットなどを含む Redis データを表します。
database/echo_database.gotype EchoDatabase struct { } func NewEchoDatabase() *EchoDatabase { return &EchoDatabase{} } func (e EchoDatabase) Exec(client resp.Connection, args [][]byte) resp.Reply { return reply.MakeMultiBulkReply(args) } func (e EchoDatabase) AfterClientClose(c resp.Connection) { logger.Info("EchoDatabase AfterClientClose") } func (e EchoDatabase) Close() { logger.Info("EchoDatabase Close") }
echo_database: テスト プロトコル層Exec: 命令が解析された後、MakeMultiBulkReply を使用してラップして返します
main.goerr := tcp.ListenAndServeWithSignal( &tcp.Config{ Address: fmt.Sprintf("%s:%d", config.Properties.Bind, config.Properties.Port), }, handler.MakeHandler()) if err != nil { logger.Error(err) }
main を今書いた内容に変更します: handler.MakeHandler()
以上がGolang に基づいて Redis プロトコル パーサーを実装する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。