Golang に基づいて Redis プロトコル パーサーを実装する方法
RESP プロトコル
RESP はクライアントとサーバー間で通信するためのプロトコルで、次の 5 つの形式があります:
通常応答: " " で始まり、最後まで文字列形式の「\ r\n」を含む
エラー応答: 「-」で始まり、文字列形式で終わる「\r\n」
整数: 「:」で始まり、「\r\n」で終わる文字列形式です。
複数行の文字列: 「$」で始まり、その後に実際に送信されたバイト数が続き、その後「\」で始まり「\」で終わります。 r\n"
$3\r\nabc\r\n
配列: 「*」で始まり、その後に数字が続きますメンバー数
SET キー値
*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\ nvalue\r\n
クライアントとサーバーによって送信されるコマンドまたはデータは、常に \r\n (CRLF) を改行文字。
*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n と入力すると$5\r\nvalue\r\nこのような一連のコマンドの場合、サーバーは次のコマンドを受け取ります:
*3\r\n
$3\r\n
SET \r\n
$3\r\n
key\r\n
$5\r\n
value\r\n
インターフェース/ resp/conn.go
type Connection interface { Write([]byte) error GetDBIndex() int SelectDB(int) } interface/resp/reply.go type Reply interface { ToBytes() []byte }
Connection インターフェイス: Redis クライアントの接続
Write: クライアントへの返信
GetDBIndex: Redis には 16 の DB があります
応答インターフェイス: 応答インターフェイス
resp/ Reply/consts.go
type PongReply struct{} var pongBytes = []byte("+PONG\r\n") func (r *PongReply) ToBytes() []byte { return pongBytes } var thePongReply = new(PongReply) func MakePongReply() *PongReply { return thePongReply } type OkReply struct{} var okBytes = []byte("+OK\r\n") func (r *OkReply) ToBytes() []byte { return okBytes } var theOkReply = new(OkReply) func MakeOkReply() *OkReply { return theOkReply } var nullBulkBytes = []byte("$-1\r\n") type NullBulkReply struct{} func (r *NullBulkReply) ToBytes() []byte { return nullBulkBytes } func MakeNullBulkReply() *NullBulkReply { return &NullBulkReply{} } var emptyMultiBulkBytes = []byte("*0\r\n") type EmptyMultiBulkReply struct{} func (r *EmptyMultiBulkReply) ToBytes() []byte { return emptyMultiBulkBytes } type NoReply struct{} var noBytes = []byte("") func (r *NoReply) ToBytes() []byte { return noBytes }
5 種類の返信を定義します: Reply pong、ok、null、空の配列、empty
resp/reply/reply.go
type ErrorReply interface { Error() string ToBytes() []byte }
ErrorReply: エラー インターフェイスの定義
resp/reply/errors.go
type UnknownErrReply struct{} var unknownErrBytes = []byte("-Err unknown\r\n") func (r *UnknownErrReply) ToBytes() []byte { return unknownErrBytes } func (r *UnknownErrReply) Error() string { return "Err unknown" } type ArgNumErrReply struct { Cmd string } func (r *ArgNumErrReply) ToBytes() []byte { return []byte("-ERR wrong number of arguments for '" + r.Cmd + "' command\r\n") } func (r *ArgNumErrReply) Error() string { return "ERR wrong number of arguments for '" + r.Cmd + "' command" } func MakeArgNumErrReply(cmd string) *ArgNumErrReply { return &ArgNumErrReply{ Cmd: cmd, } } type SyntaxErrReply struct{} var syntaxErrBytes = []byte("-Err syntax error\r\n") var theSyntaxErrReply = &SyntaxErrReply{} func MakeSyntaxErrReply() *SyntaxErrReply { return theSyntaxErrReply } func (r *SyntaxErrReply) ToBytes() []byte { return syntaxErrBytes } func (r *SyntaxErrReply) Error() string { return "Err syntax error" } type WrongTypeErrReply struct{} var wrongTypeErrBytes = []byte("-WRONGTYPE Operation against a key holding the wrong kind of value\r\n") func (r *WrongTypeErrReply) ToBytes() []byte { return wrongTypeErrBytes } func (r *WrongTypeErrReply) Error() string { return "WRONGTYPE Operation against a key holding the wrong kind of value" } type ProtocolErrReply struct { Msg string } func (r *ProtocolErrReply) ToBytes() []byte { return []byte("-ERR Protocol error: '" + r.Msg + "'\r\n") } func (r *ProtocolErrReply) Error() string { return "ERR Protocol error: '" + r.Msg }
errors は 5 種類のエラーを定義します: UnknownErrReply Unknown error 、 ArgNumErrReply パラメータ番号エラー、SyntaxErrReply 構文エラー、WrongTypeErrReply データ型エラー、ProtocolErrReply プロトコル エラー
resp/reply/reply.go
var ( nullBulkReplyBytes = []byte("$-1") // 协议的结尾 CRLF = "\r\n" ) type BulkReply struct { Arg []byte } func MakeBulkReply(arg []byte) *BulkReply { return &BulkReply{ Arg: arg, } } func (r *BulkReply) ToBytes() []byte { if len(r.Arg) == 0 { return nullBulkReplyBytes } return []byte("$" + strconv.Itoa(len(r.Arg)) + CRLF + string(r.Arg) + CRLF) } type MultiBulkReply struct { Args [][]byte } func (r *MultiBulkReply) ToBytes() []byte { argLen := len(r.Args) var buf bytes.Buffer buf.WriteString("*" + strconv.Itoa(argLen) + CRLF) for _, arg := range r.Args { if arg == nil { buf.WriteString("$-1" + CRLF) } else { buf.WriteString("$" + strconv.Itoa(len(arg)) + CRLF + string(arg) + CRLF) } } return buf.Bytes() } func MakeMultiBulkReply(args [][]byte) *MultiBulkReply { return &MultiBulkReply{ Args: args, } } type StatusReply struct { Status string } func MakeStatusReply(status string) *StatusReply { return &StatusReply{ Status: status, } } func (r *StatusReply) ToBytes() []byte { return []byte("+" + r.Status + CRLF) } type IntReply struct { Code int64 } func MakeIntReply(code int64) *IntReply { return &IntReply{ Code: code, } } func (r *IntReply) ToBytes() []byte { return []byte(":" + strconv.FormatInt(r.Code, 10) + CRLF) } type StandardErrReply struct { Status string } func (r *StandardErrReply) ToBytes() []byte { return []byte("-" + r.Status + CRLF) } func (r *StandardErrReply) Error() string { return r.Status } func MakeErrReply(status string) *StandardErrReply { return &StandardErrReply{ Status: status, } } func IsErrorReply(reply resp.Reply) bool { return reply.ToBytes()[0] == '-' }
- MultiBulkReply: 文字列の配列に返信
- StatusReply: ステータスの返信
- StandardErrReply: 標準エラー応答
- IsErrorReply: エラー応答かどうかを判断します
- ToBytes: 文字列を RESP プロトコルで指定された形式に変換します
- ##resp/parser/parser.go
type Payload struct { Data resp.Reply Err error } type readState struct { readingMultiLine bool expectedArgsCount int msgType byte args [][]byte bulkLen int64 } func (s *readState) finished() bool { return s.expectedArgsCount > 0 && len(s.args) == s.expectedArgsCount } func ParseStream(reader io.Reader) <-chan *Payload { ch := make(chan *Payload) go parse0(reader, ch) return ch } func parse0(reader io.Reader, ch chan<- *Payload) { ...... }
ペイロード構造: カスタマー サービス側から送信されるデータReply: クライアントとサーバーが相互に送信するデータは Reply
readState 構造:
と呼ばれます。readingMultiLine: 単一行または複数行のデータを解析します
expectedArgsCount: 読み取る必要があるパラメーターの数
msgType: メッセージ タイプ
args: メッセージの内容
bulkLen: データ長
completed メソッド: 解析が完了したかどうかを判断します
ParseStream メソッド: データを非同期的に解析してパイプラインに入れ、パイプライン データを返します。
func readLine(bufReader *bufio.Reader, state *readState) ([]byte, bool, error) { var msg []byte var err error if state.bulkLen == 0 { msg, err = bufReader.ReadBytes('\n') if err != nil { return nil, true, err } if len(msg) == 0 || msg[len(msg)-2] != '\r' { return nil, false, errors.New("protocol error: " + string(msg)) } } else { msg = make([]byte, state.bulkLen+2) _, err = io.ReadFull(bufReader, msg) if err != nil { return nil, true, err } if len(msg) == 0 || msg[len(msg)-2] != '\r' || msg[len(msg)-1] != '\n' { return nil, false, errors.New("protocol error: " + string(msg)) } state.bulkLen = 0 } return msg, false, nil }
readLine: 行ごとに読み取ります。 \n で区切られた通常の行を読み取ります。テキスト内に \r\n 文字を含む行を読み取る場合、state.bulkLen は改行文字 \r\n を追加します (state.bulkLen 2)
func parseMultiBulkHeader(msg []byte, state *readState) error { var err error var expectedLine uint64 expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32) if err != nil { return errors.New("protocol error: " + string(msg)) } if expectedLine == 0 { state.expectedArgsCount = 0 return nil } else if expectedLine > 0 { state.msgType = msg[0] state.readingMultiLine = true state.expectedArgsCount = int(expectedLine) state.args = make([][]byte, 0, expectedLine) return nil } else { return errors.New("protocol error: " + string(msg)) } } func parseBulkHeader(msg []byte, state *readState) error { var err error state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64) if err != nil { return errors.New("protocol error: " + string(msg)) } if state.bulkLen == -1 { // null bulk return nil } else if state.bulkLen > 0 { state.msgType = msg[0] state.readingMultiLine = true state.expectedArgsCount = 1 state.args = make([][]byte, 0, 1) return nil } else { return errors.New("protocol error: " + string(msg)) } }
parseMultiBulkHeader: 配列のヘッダーを解析し、予想される数値を設定します行と関連パラメータの数。
parseBulkHeader: 複数行の文字列のヘッダーを解析します。
func parseSingleLineReply(msg []byte) (resp.Reply, error) { str := strings.TrimSuffix(string(msg), "\r\n") var result resp.Reply switch msg[0] { case '+': // status reply result = reply.MakeStatusReply(str[1:]) case '-': // err reply result = reply.MakeErrReply(str[1:]) case ':': // int reply val, err := strconv.ParseInt(str[1:], 10, 64) if err != nil { return nil, errors.New("protocol error: " + string(msg)) } result = reply.MakeIntReply(val) } return result, nil } func readBody(msg []byte, state *readState) error { line := msg[0 : len(msg)-2] var err error if line[0] == '$' { // bulk reply state.bulkLen, err = strconv.ParseInt(string(line[1:]), 10, 64) if err != nil { return errors.New("protocol error: " + string(msg)) } if state.bulkLen <= 0 { // null bulk in multi bulks state.args = append(state.args, []byte{}) state.bulkLen = 0 } } else { state.args = append(state.args, line) } return nil }
parseSingleLineReply: 単一行のコマンドを解析します
readBody: 複数行のコマンドを読み取ります $ で始まる場合は、BulkLen を設定します 次の行を読み取るときに、これを使用します 2. If args
func parse0(reader io.Reader, ch chan<- *Payload) { defer func() { if err := recover(); err != nil { logger.Error(string(debug.Stack())) } }() bufReader := bufio.NewReader(reader) var state readState var err error var msg []byte for { var ioErr bool msg, ioErr, err = readLine(bufReader, &state) if err != nil { if ioErr { ch <- &Payload{ Err: err, } close(ch) return } ch <- &Payload{ Err: err, } state = readState{} continue } if !state.readingMultiLine { if msg[0] == '*' { // multi bulk reply err = parseMultiBulkHeader(msg, &state) if err != nil { ch <- &Payload{ Err: errors.New("protocol error: " + string(msg)), } state = readState{} continue } if state.expectedArgsCount == 0 { ch <- &Payload{ Data: &reply.EmptyMultiBulkReply{}, } state = readState{} continue } } else if msg[0] == '$' { // bulk reply err = parseBulkHeader(msg, &state) if err != nil { ch <- &Payload{ Err: errors.New("protocol error: " + string(msg)), } state = readState{} // reset state continue } if state.bulkLen == -1 { // null bulk reply ch <- &Payload{ Data: &reply.NullBulkReply{}, } state = readState{} // reset state continue } } else { // single line reply result, err := parseSingleLineReply(msg) ch <- &Payload{ Data: result, Err: err, } state = readState{} // reset state continue } } else { // read bulk reply err = readBody(msg, &state) if err != nil { ch <- &Payload{ Err: errors.New("protocol error: " + string(msg)), } state = readState{} // reset state continue } // if sending finished if state.finished() { var result resp.Reply if state.msgType == '*' { result = reply.MakeMultiBulkReply(state.args) } else if state.msgType == '$' { result = reply.MakeBulkReply(state.args[0]) } ch <- &Payload{ Data: result, Err: err, } state = readState{} } } } }
parse0 に移動: コマンドを解析し、解析が完了したらチャネル経由で送信します
resp/connection/ conn.gotype Connection struct { conn net.Conn waitingReply wait.Wait mu sync.Mutex // 避免多个协程往客户端中写 selectedDB int } func NewConn(conn net.Conn) *Connection { return &Connection{ conn: conn, } } func (c *Connection) RemoteAddr() net.Addr { return c.conn.RemoteAddr() } func (c *Connection) Close() error { c.waitingReply.WaitWithTimeout(10 * time.Second) _ = c.conn.Close() return nil } func (c *Connection) Write(b []byte) error { if len(b) == 0 { return nil } c.mu.Lock() c.waitingReply.Add(1) defer func() { c.waitingReply.Done() c.mu.Unlock() }() _, err := c.conn.Write(b) return err } func (c *Connection) GetDBIndex() int { return c.selectedDB } func (c *Connection) SelectDB(dbNum int) { c.selectedDB = dbNum }
以前に書きましたが、EchoHandler はユーザー入力を受け取り、それを変更せずに返すために使用されます。ここで、EchoHandler を置き換える RespHandler を作成し、パーサーに解析を実行させる必要があります。クライアント接続を管理する Connection 構造体が RespHandler に存在する必要があります。 Connection: プロトコル層のハンドラーで使用されるクライアント接続
resp/handler/handler.govar ( unknownErrReplyBytes = []byte("-ERR unknown\r\n") ) type RespHandler struct { activeConn sync.Map db databaseface.Database closing atomic.Boolean } func MakeHandler() *RespHandler { var db databaseface.Database db = database.NewEchoDatabase() return &RespHandler{ db: db, } } func (h *RespHandler) closeClient(client *connection.Connection) { _ = client.Close() h.db.AfterClientClose(client) h.activeConn.Delete(client) } func (h *RespHandler) Handle(ctx context.Context, conn net.Conn) { if h.closing.Get() { // closing handler refuse new connection _ = conn.Close() } client := connection.NewConn(conn) h.activeConn.Store(client, 1) ch := parser.ParseStream(conn) for payload := range ch { if payload.Err != nil { if payload.Err == io.EOF || payload.Err == io.ErrUnexpectedEOF || strings.Contains(payload.Err.Error(), "use of closed network connection") { // connection closed h.closeClient(client) logger.Info("connection closed: " + client.RemoteAddr().String()) return } // protocol err errReply := reply.MakeErrReply(payload.Err.Error()) err := client.Write(errReply.ToBytes()) if err != nil { h.closeClient(client) logger.Info("connection closed: " + client.RemoteAddr().String()) return } continue } if payload.Data == nil { logger.Error("empty payload") continue } r, ok := payload.Data.(*reply.MultiBulkReply) if !ok { logger.Error("require multi bulk reply") continue } result := h.db.Exec(client, r.Args) if result != nil { _ = client.Write(result.ToBytes()) } else { _ = client.Write(unknownErrReplyBytes) } } } func (h *RespHandler) Close() error { logger.Info("handler shutting down...") h.closing.Set(true) // TODO: concurrent wait h.activeConn.Range(func(key interface{}, val interface{}) bool { client := key.(*connection.Connection) _ = client.Close() return true }) h.db.Close() return nil }
RespHandler: および前のecho も同様ですが、コア層の db.exec 実行解析命令が追加されています
interface/database/database.gotype CmdLine = [][]byte type Database interface { Exec(client resp.Connection, args [][]byte) resp.Reply AfterClientClose(c resp.Connection) Close() } type DataEntity struct { Data interface{} }
Exec: コア層の実行AfterClientClose: 閉じた後のアフターケア メソッド
CmdLine: 2 次元バイト配列のコマンド エイリアス
DataEntity: 文字列、リスト、セットなどを含む Redis データを表します。
database/echo_database.gotype EchoDatabase struct { } func NewEchoDatabase() *EchoDatabase { return &EchoDatabase{} } func (e EchoDatabase) Exec(client resp.Connection, args [][]byte) resp.Reply { return reply.MakeMultiBulkReply(args) } func (e EchoDatabase) AfterClientClose(c resp.Connection) { logger.Info("EchoDatabase AfterClientClose") } func (e EchoDatabase) Close() { logger.Info("EchoDatabase Close") }
echo_database: テスト プロトコル層Exec: 命令が解析された後、MakeMultiBulkReply を使用してラップして返します
main.goerr := tcp.ListenAndServeWithSignal( &tcp.Config{ Address: fmt.Sprintf("%s:%d", config.Properties.Bind, config.Properties.Port), }, handler.MakeHandler()) if err != nil { logger.Error(err) }
main を今書いた内容に変更します: handler.MakeHandler()
以上がGolang に基づいて Redis プロトコル パーサーを実装する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

ホットAIツール

Undresser.AI Undress
リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover
写真から衣服を削除するオンライン AI ツール。

Undress AI Tool
脱衣画像を無料で

Clothoff.io
AI衣類リムーバー

Video Face Swap
完全無料の AI 顔交換ツールを使用して、あらゆるビデオの顔を簡単に交換できます。

人気の記事

ホットツール

メモ帳++7.3.1
使いやすく無料のコードエディター

SublimeText3 中国語版
中国語版、とても使いやすい

ゼンドスタジオ 13.0.1
強力な PHP 統合開発環境

ドリームウィーバー CS6
ビジュアル Web 開発ツール

SublimeText3 Mac版
神レベルのコード編集ソフト(SublimeText3)

ホットトピック











Redisクラスターモードは、シャードを介してRedisインスタンスを複数のサーバーに展開し、スケーラビリティと可用性を向上させます。構造の手順は次のとおりです。異なるポートで奇妙なRedisインスタンスを作成します。 3つのセンチネルインスタンスを作成し、Redisインスタンスを監視し、フェールオーバーを監視します。 Sentinel構成ファイルを構成し、Redisインスタンス情報とフェールオーバー設定の監視を追加します。 Redisインスタンス構成ファイルを構成し、クラスターモードを有効にし、クラスター情報ファイルパスを指定します。各Redisインスタンスの情報を含むnodes.confファイルを作成します。クラスターを起動し、CREATEコマンドを実行してクラスターを作成し、レプリカの数を指定します。クラスターにログインしてクラスター情報コマンドを実行して、クラスターステータスを確認します。作る

Redisデータをクリアする方法:Flushallコマンドを使用して、すべての重要な値をクリアします。 FlushDBコマンドを使用して、現在選択されているデータベースのキー値をクリアします。 [選択]を使用してデータベースを切り替え、FlushDBを使用して複数のデータベースをクリアします。 DELコマンドを使用して、特定のキーを削除します。 Redis-CLIツールを使用してデータをクリアします。

Redisのキューを読むには、キュー名を取得し、LPOPコマンドを使用して要素を読み、空のキューを処理する必要があります。特定の手順は次のとおりです。キュー名を取得します:「キュー:キュー」などの「キュー:」のプレフィックスで名前を付けます。 LPOPコマンドを使用します。キューのヘッドから要素を排出し、LPOP Queue:My-Queueなどの値を返します。空のキューの処理:キューが空の場合、LPOPはnilを返し、要素を読む前にキューが存在するかどうかを確認できます。

Centosシステムでは、Redis構成ファイルを変更するか、Redisコマンドを使用して悪意のあるスクリプトがあまりにも多くのリソースを消費しないようにすることにより、LUAスクリプトの実行時間を制限できます。方法1:Redis構成ファイルを変更し、Redis構成ファイルを見つけます:Redis構成ファイルは通常/etc/redis/redis.confにあります。構成ファイルの編集:テキストエディター(VIやNANOなど)を使用して構成ファイルを開きます:sudovi/etc/redis/redis.conf luaスクリプト実行時間制限を設定します。

Redisコマンドラインツール(Redis-Cli)を使用して、次の手順を使用してRedisを管理および操作します。サーバーに接続し、アドレスとポートを指定します。コマンド名とパラメーターを使用して、コマンドをサーバーに送信します。ヘルプコマンドを使用して、特定のコマンドのヘルプ情報を表示します。 QUITコマンドを使用して、コマンドラインツールを終了します。

Debian Systemsでは、Directoryコンテンツを読み取るためにReadDirシステム呼び出しが使用されます。パフォーマンスが良くない場合は、次の最適化戦略を試してください。ディレクトリファイルの数を簡素化します。大きなディレクトリをできる限り複数の小さなディレクトリに分割し、Readdirコールごとに処理されたアイテムの数を減らします。ディレクトリコンテンツのキャッシュを有効にする:キャッシュメカニズムを構築し、定期的にキャッシュを更新するか、ディレクトリコンテンツが変更されたときに、頻繁な呼び出しをreaddirに削減します。メモリキャッシュ(memcachedやredisなど)またはローカルキャッシュ(ファイルやデータベースなど)を考慮することができます。効率的なデータ構造を採用する:ディレクトリトラバーサルを自分で実装する場合、より効率的なデータ構造(線形検索の代わりにハッシュテーブルなど)を選択してディレクトリ情報を保存およびアクセスする

Redisデータの有効期間戦略には2つのタイプがあります。周期削除:期限切れのキーを削除する定期的なスキャン。これは、期限切れの時間帯-remove-countおよび期限切れの時間帯-remove-delayパラメーターを介して設定できます。怠zyな削除:キーが読み取られたり書かれたりした場合にのみ、削除の有効期限が切れたキーを確認してください。それらは、レイジーフリーレイジーエビクション、レイジーフリーレイジーエクスピア、レイジーフリーラジーユーザーのパラメーターを介して設定できます。

Debian SystemsのPostgreSQLデータベースのパフォーマンスを改善するには、ハードウェア、構成、インデックス、クエリ、その他の側面を包括的に検討する必要があります。次の戦略は、データベースのパフォーマンスを効果的に最適化できます。1。ハードウェアリソース最適化メモリ拡張:適切なメモリは、データとインデックスをキャッシュするために重要です。高速ストレージ:SSD SSDドライブを使用すると、I/Oパフォーマンスが大幅に向上する可能性があります。マルチコアプロセッサ:マルチコアプロセッサを最大限に活用して、並列クエリ処理を実装します。 2。データベースパラメーターチューニングShared_Buffers:システムメモリサイズの設定によると、システムメモリの25%〜40%に設定することをお勧めします。 work_mem:ソートとハッシュ操作のメモリを制御します。通常は64MBから256mに設定されています
