So implementieren Sie den Redis-Protokollparser basierend auf Golang
RESP-Protokoll
RESP ist ein Protokoll für die Kommunikation zwischen dem Client und dem Server. Es gibt fünf Formate:
Normale Antwort : mit String-Format, das mit „+“ beginnt und mit „rn“ endet
Fehlerantwort: String-Format, das mit „-“ beginnt und mit „rn“ endet
Ganzzahl: A Zeichenfolgenformat, beginnend mit „:“ und endend mit „rn“
Mehrzeilige Zeichenfolge: beginnend mit „$“, gefolgt von der tatsächlichen Anzahl der gesendeten Bytes und dann beginnend und endend mit „rn“ # 🎜🎜#
$3rnArray: beginnt mit „*“, gefolgt von der Anzahl der Mitglieder #🎜🎜 #SET-Schlüsselwertabcrn
*3rn$3rnWenn wir *3rn$3rnSET
rn$3rnkeyrn$5rn#🎜🎜 #value# 🎜🎜#rnDie vom Client und Server gesendeten Befehle oder Daten müssen rn (CRLF) als Zeilenumbruchzeichen verwenden.
SET
rn$3rnkeyrn$5rnvalue#🎜 eingeben 🎜#rn Bei einer solchen Befehlsfolge erhält der Server den folgenden Befehl: *3rn$3rnSETrn$3rnkeyrn
$5rn
valuern
interface/resp/conn.gotype Connection interface { Write([]byte) error GetDBIndex() int SelectDB(int) } interface/resp/reply.go type Reply interface { ToBytes() []byte }Nach dem Login kopieren
#🎜🎜 #Verbindung Schnittstelle: eine Verbindung des Redis-Clients hat 16 DB
- Antwortschnittstelle: Antwortschnittstelle
- resp/reply/ consts. go
type PongReply struct{} var pongBytes = []byte("+PONG\r\n") func (r *PongReply) ToBytes() []byte { return pongBytes } var thePongReply = new(PongReply) func MakePongReply() *PongReply { return thePongReply } type OkReply struct{} var okBytes = []byte("+OK\r\n") func (r *OkReply) ToBytes() []byte { return okBytes } var theOkReply = new(OkReply) func MakeOkReply() *OkReply { return theOkReply } var nullBulkBytes = []byte("$-1\r\n") type NullBulkReply struct{} func (r *NullBulkReply) ToBytes() []byte { return nullBulkBytes } func MakeNullBulkReply() *NullBulkReply { return &NullBulkReply{} } var emptyMultiBulkBytes = []byte("*0\r\n") type EmptyMultiBulkReply struct{} func (r *EmptyMultiBulkReply) ToBytes() []byte { return emptyMultiBulkBytes } type NoReply struct{} var noBytes = []byte("") func (r *NoReply) ToBytes() []byte { return noBytes }
Nach dem Login kopieren Definieren Sie fünf Antworten: Antwort pong, ok, null, leeres Array, leer - resp/reply/reply 🎜#
- ErrorReply: Fehlerschnittstelle definieren resp/reply/errors.go
type ErrorReply interface { Error() string ToBytes() []byte }
type UnknownErrReply struct{} var unknownErrBytes = []byte("-Err unknown\r\n") func (r *UnknownErrReply) ToBytes() []byte { return unknownErrBytes } func (r *UnknownErrReply) Error() string { return "Err unknown" } type ArgNumErrReply struct { Cmd string } func (r *ArgNumErrReply) ToBytes() []byte { return []byte("-ERR wrong number of arguments for '" + r.Cmd + "' command\r\n") } func (r *ArgNumErrReply) Error() string { return "ERR wrong number of arguments for '" + r.Cmd + "' command" } func MakeArgNumErrReply(cmd string) *ArgNumErrReply { return &ArgNumErrReply{ Cmd: cmd, } } type SyntaxErrReply struct{} var syntaxErrBytes = []byte("-Err syntax error\r\n") var theSyntaxErrReply = &SyntaxErrReply{} func MakeSyntaxErrReply() *SyntaxErrReply { return theSyntaxErrReply } func (r *SyntaxErrReply) ToBytes() []byte { return syntaxErrBytes } func (r *SyntaxErrReply) Error() string { return "Err syntax error" } type WrongTypeErrReply struct{} var wrongTypeErrBytes = []byte("-WRONGTYPE Operation against a key holding the wrong kind of value\r\n") func (r *WrongTypeErrReply) ToBytes() []byte { return wrongTypeErrBytes } func (r *WrongTypeErrReply) Error() string { return "WRONGTYPE Operation against a key holding the wrong kind of value" } type ProtocolErrReply struct { Msg string } func (r *ProtocolErrReply) ToBytes() []byte { return []byte("-ERR Protocol error: '" + r.Msg + "'\r\n") } func (r *ProtocolErrReply) Error() string { return "ERR Protocol error: '" + r.Msg }
var ( nullBulkReplyBytes = []byte("$-1") // 协议的结尾 CRLF = "\r\n" ) type BulkReply struct { Arg []byte } func MakeBulkReply(arg []byte) *BulkReply { return &BulkReply{ Arg: arg, } } func (r *BulkReply) ToBytes() []byte { if len(r.Arg) == 0 { return nullBulkReplyBytes } return []byte("$" + strconv.Itoa(len(r.Arg)) + CRLF + string(r.Arg) + CRLF) } type MultiBulkReply struct { Args [][]byte } func (r *MultiBulkReply) ToBytes() []byte { argLen := len(r.Args) var buf bytes.Buffer buf.WriteString("*" + strconv.Itoa(argLen) + CRLF) for _, arg := range r.Args { if arg == nil { buf.WriteString("$-1" + CRLF) } else { buf.WriteString("$" + strconv.Itoa(len(arg)) + CRLF + string(arg) + CRLF) } } return buf.Bytes() } func MakeMultiBulkReply(args [][]byte) *MultiBulkReply { return &MultiBulkReply{ Args: args, } } type StatusReply struct { Status string } func MakeStatusReply(status string) *StatusReply { return &StatusReply{ Status: status, } } func (r *StatusReply) ToBytes() []byte { return []byte("+" + r.Status + CRLF) } type IntReply struct { Code int64 } func MakeIntReply(code int64) *IntReply { return &IntReply{ Code: code, } } func (r *IntReply) ToBytes() []byte { return []byte(":" + strconv.FormatInt(r.Code, 10) + CRLF) } type StandardErrReply struct { Status string } func (r *StandardErrReply) ToBytes() []byte { return []byte("-" + r.Status + CRLF) } func (r *StandardErrReply) Error() string { return r.Status } func MakeErrReply(status string) *StandardErrReply { return &StandardErrReply{ Status: status, } } func IsErrorReply(reply resp.Reply) bool { return reply.ToBytes()[0] == '-' }
BulkReply: Auf eine Zeichenfolge antworten
MultiBulkReply: Auf eine Zeichenfolge antworten #🎜🎜 #
StatusReply: Statusantwort
IntReply: Numerische Antwort# 🎜 🎜#StandardErrReply: Standard-Fehlerantwort
IsErrorReply: Bestimmen Sie, ob es sich um eine Fehlerantwort handelt
ToBytes: Konvertieren Sie die Zeichenfolge in das vom RESP-Protokoll angegebene Format #Payload-Struktur: Die vom Kundendienst an uns gesendeten Daten
Antwort: Die vom Client und vom Server aneinander gesendeten Daten werden als Antwort bezeichnetreadState-Struktur: # 🎜🎜## 🎜🎜#
readingMultiLine: Einzeilige oder mehrzeilige Daten analysieren- expectedArgsCount: die Anzahl der Parameter, die sein sollten read
- #🎜 🎜#msgType: Nachrichtentyp
- args: Nachrichteninhalt
- # 🎜🎜#
bulkLen: Datenlänge
fertige Methode: Bestimmen Sie, ob die Analyse abgeschlossen istParseStream-Methode: Analysieren Sie die Daten asynchron und in die Pipeline einfügen und die Pipeline-Daten zurückgeben# 🎜🎜#
type Payload struct { Data resp.Reply Err error } type readState struct { readingMultiLine bool expectedArgsCount int msgType byte args [][]byte bulkLen int64 } func (s *readState) finished() bool { return s.expectedArgsCount > 0 && len(s.args) == s.expectedArgsCount } func ParseStream(reader io.Reader) <-chan *Payload { ch := make(chan *Payload) go parse0(reader, ch) return ch } func parse0(reader io.Reader, ch chan<- *Payload) { ...... }
func readLine(bufReader *bufio.Reader, state *readState) ([]byte, bool, error) { var msg []byte var err error if state.bulkLen == 0 { msg, err = bufReader.ReadBytes('\n') if err != nil { return nil, true, err } if len(msg) == 0 || msg[len(msg)-2] != '\r' { return nil, false, errors.New("protocol error: " + string(msg)) } } else { msg = make([]byte, state.bulkLen+2) _, err = io.ReadFull(bufReader, msg) if err != nil { return nil, true, err } if len(msg) == 0 || msg[len(msg)-2] != '\r' || msg[len(msg)-1] != '\n' { return nil, false, errors.New("protocol error: " + string(msg)) } state.bulkLen = 0 } return msg, false, nil }
- parse0: Parse-Befehl, nach Abschluss der Analyse wird er über den Kanal gesendet resp/connection/conn.go#🎜🎜 #
func parseSingleLineReply(msg []byte) (resp.Reply, error) { str := strings.TrimSuffix(string(msg), "\r\n") var result resp.Reply switch msg[0] { case '+': // status reply result = reply.MakeStatusReply(str[1:]) case '-': // err reply result = reply.MakeErrReply(str[1:]) case ':': // int reply val, err := strconv.ParseInt(str[1:], 10, 64) if err != nil { return nil, errors.New("protocol error: " + string(msg)) } result = reply.MakeIntReply(val) } return result, nil } func readBody(msg []byte, state *readState) error { line := msg[0 : len(msg)-2] var err error if line[0] == '$' { // bulk reply state.bulkLen, err = strconv.ParseInt(string(line[1:]), 10, 64) if err != nil { return errors.New("protocol error: " + string(msg)) } if state.bulkLen <= 0 { // null bulk in multi bulks state.args = append(state.args, []byte{}) state.bulkLen = 0 } } else { state.args = append(state.args, line) } return nil }
Nach dem Login kopierenDer zuvor geschriebene EchoHandler wird verwendet, um die Eingaben des Benutzers zu empfangen und intakt zurückzugeben. Jetzt müssen Sie einen RespHandler schreiben, der den EchoHandler ersetzt, und den Parser das Parsen durchführen lassen. Im RespHandler muss eine Verbindungsstruktur vorhanden sein, die Clientverbindungen verwaltet.
Verbindung: Client-Verbindung, die im Handler der Protokollschicht verwendet wird - resp/handler/handler.go #🎜🎜 #
- RespHandler: Ähnlich dem vorherigen Echo, mit der Hinzufügung der Kernschicht db.exec zum Ausführen von Parsing-Anweisungen interface/database/database.go
- #🎜 🎜#
type Connection struct { conn net.Conn waitingReply wait.Wait mu sync.Mutex // 避免多个协程往客户端中写 selectedDB int } func NewConn(conn net.Conn) *Connection { return &Connection{ conn: conn, } } func (c *Connection) RemoteAddr() net.Addr { return c.conn.RemoteAddr() } func (c *Connection) Close() error { c.waitingReply.WaitWithTimeout(10 * time.Second) _ = c.conn.Close() return nil } func (c *Connection) Write(b []byte) error { if len(b) == 0 { return nil } c.mu.Lock() c.waitingReply.Add(1) defer func() { c.waitingReply.Done() c.mu.Unlock() }() _, err := c.conn.Write(b) return err } func (c *Connection) GetDBIndex() int { return c.selectedDB } func (c *Connection) SelectDB(dbNum int) { c.selectedDB = dbNum }
Nach dem Login kopierenExec: Kernschichtausführung
AfterClientClose: Aftermath-Methode nach dem Schließen
func parseMultiBulkHeader(msg []byte, state *readState) error { var err error var expectedLine uint64 expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32) if err != nil { return errors.New("protocol error: " + string(msg)) } if expectedLine == 0 { state.expectedArgsCount = 0 return nil } else if expectedLine > 0 { state.msgType = msg[0] state.readingMultiLine = true state.expectedArgsCount = int(expectedLine) state.args = make([][]byte, 0, expectedLine) return nil } else { return errors.New("protocol error: " + string(msg)) } } func parseBulkHeader(msg []byte, state *readState) error { var err error state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64) if err != nil { return errors.New("protocol error: " + string(msg)) } if state.bulkLen == -1 { // null bulk return nil } else if state.bulkLen > 0 { state.msgType = msg[0] state.readingMultiLine = true state.expectedArgsCount = 1 state.args = make([][]byte, 0, 1) return nil } else { return errors.New("protocol error: " + string(msg)) } }
func parse0(reader io.Reader, ch chan<- *Payload) { defer func() { if err := recover(); err != nil { logger.Error(string(debug.Stack())) } }() bufReader := bufio.NewReader(reader) var state readState var err error var msg []byte for { var ioErr bool msg, ioErr, err = readLine(bufReader, &state) if err != nil { if ioErr { ch <- &Payload{ Err: err, } close(ch) return } ch <- &Payload{ Err: err, } state = readState{} continue } if !state.readingMultiLine { if msg[0] == '*' { // multi bulk reply err = parseMultiBulkHeader(msg, &state) if err != nil { ch <- &Payload{ Err: errors.New("protocol error: " + string(msg)), } state = readState{} continue } if state.expectedArgsCount == 0 { ch <- &Payload{ Data: &reply.EmptyMultiBulkReply{}, } state = readState{} continue } } else if msg[0] == '$' { // bulk reply err = parseBulkHeader(msg, &state) if err != nil { ch <- &Payload{ Err: errors.New("protocol error: " + string(msg)), } state = readState{} // reset state continue } if state.bulkLen == -1 { // null bulk reply ch <- &Payload{ Data: &reply.NullBulkReply{}, } state = readState{} // reset state continue } } else { // single line reply result, err := parseSingleLineReply(msg) ch <- &Payload{ Data: result, Err: err, } state = readState{} // reset state continue } } else { // read bulk reply err = readBody(msg, &state) if err != nil { ch <- &Payload{ Err: errors.New("protocol error: " + string(msg)), } state = readState{} // reset state continue } // if sending finished if state.finished() { var result resp.Reply if state.msgType == '*' { result = reply.MakeMultiBulkReply(state.args) } else if state.msgType == '$' { result = reply.MakeBulkReply(state.args[0]) } ch <- &Payload{ Data: result, Err: err, } state = readState{} } } } }
CmdLine: Befehlsalias für zweidimensionales Byte-Array
DataEntity: Stellt Redis-Daten dar, einschließlich Zeichenfolge, Liste, Satz usw.
database/echo_database.govar ( unknownErrReplyBytes = []byte("-ERR unknown\r\n") ) type RespHandler struct { activeConn sync.Map db databaseface.Database closing atomic.Boolean } func MakeHandler() *RespHandler { var db databaseface.Database db = database.NewEchoDatabase() return &RespHandler{ db: db, } } func (h *RespHandler) closeClient(client *connection.Connection) { _ = client.Close() h.db.AfterClientClose(client) h.activeConn.Delete(client) } func (h *RespHandler) Handle(ctx context.Context, conn net.Conn) { if h.closing.Get() { // closing handler refuse new connection _ = conn.Close() } client := connection.NewConn(conn) h.activeConn.Store(client, 1) ch := parser.ParseStream(conn) for payload := range ch { if payload.Err != nil { if payload.Err == io.EOF || payload.Err == io.ErrUnexpectedEOF || strings.Contains(payload.Err.Error(), "use of closed network connection") { // connection closed h.closeClient(client) logger.Info("connection closed: " + client.RemoteAddr().String()) return } // protocol err errReply := reply.MakeErrReply(payload.Err.Error()) err := client.Write(errReply.ToBytes()) if err != nil { h.closeClient(client) logger.Info("connection closed: " + client.RemoteAddr().String()) return } continue } if payload.Data == nil { logger.Error("empty payload") continue } r, ok := payload.Data.(*reply.MultiBulkReply) if !ok { logger.Error("require multi bulk reply") continue } result := h.db.Exec(client, r.Args) if result != nil { _ = client.Write(result.ToBytes()) } else { _ = client.Write(unknownErrReplyBytes) } } } func (h *RespHandler) Close() error { logger.Info("handler shutting down...") h.closing.Set(true) // TODO: concurrent wait h.activeConn.Range(func(key interface{}, val interface{}) bool { client := key.(*connection.Connection) _ = client.Close() return true }) h.db.Close() return nil }
echo_database : Protokollschicht testen
Exec: Nachdem die Anweisung analysiert wurde, verwenden Sie MakeMultiBulkReply, um sie zu verpacken und zurückzugeben
main.go#🎜🎜 #
type CmdLine = [][]byte type Database interface { Exec(client resp.Connection, args [][]byte) resp.Reply AfterClientClose(c resp.Connection) Close() } type DataEntity struct { Data interface{} }
Das obige ist der detaillierte Inhalt vonSo implementieren Sie den Redis-Protokollparser basierend auf Golang. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Heiße KI -Werkzeuge

Undresser.AI Undress
KI-gestützte App zum Erstellen realistischer Aktfotos

AI Clothes Remover
Online-KI-Tool zum Entfernen von Kleidung aus Fotos.

Undress AI Tool
Ausziehbilder kostenlos

Clothoff.io
KI-Kleiderentferner

AI Hentai Generator
Erstellen Sie kostenlos Ai Hentai.

Heißer Artikel

Heiße Werkzeuge

Notepad++7.3.1
Einfach zu bedienender und kostenloser Code-Editor

SublimeText3 chinesische Version
Chinesische Version, sehr einfach zu bedienen

Senden Sie Studio 13.0.1
Leistungsstarke integrierte PHP-Entwicklungsumgebung

Dreamweaver CS6
Visuelle Webentwicklungstools

SublimeText3 Mac-Version
Codebearbeitungssoftware auf Gottesniveau (SublimeText3)

Heiße Themen



Der Redis -Cluster -Modus bietet Redis -Instanzen durch Sharding, die Skalierbarkeit und Verfügbarkeit verbessert. Die Bauschritte sind wie folgt: Erstellen Sie ungerade Redis -Instanzen mit verschiedenen Ports; Erstellen Sie 3 Sentinel -Instanzen, Monitor -Redis -Instanzen und Failover; Konfigurieren von Sentinel -Konfigurationsdateien, Informationen zur Überwachung von Redis -Instanzinformationen und Failover -Einstellungen hinzufügen. Konfigurieren von Redis -Instanzkonfigurationsdateien, aktivieren Sie den Cluster -Modus und geben Sie den Cluster -Informationsdateipfad an. Erstellen Sie die Datei nodes.conf, die Informationen zu jeder Redis -Instanz enthält. Starten Sie den Cluster, führen Sie den Befehl erstellen aus, um einen Cluster zu erstellen und die Anzahl der Replikate anzugeben. Melden Sie sich im Cluster an, um den Befehl cluster info auszuführen, um den Clusterstatus zu überprüfen. machen

So löschen Sie Redis -Daten: Verwenden Sie den Befehl Flushall, um alle Schlüsselwerte zu löschen. Verwenden Sie den Befehl flushdb, um den Schlüsselwert der aktuell ausgewählten Datenbank zu löschen. Verwenden Sie SELECT, um Datenbanken zu wechseln, und löschen Sie dann FlushDB, um mehrere Datenbanken zu löschen. Verwenden Sie den Befehl del, um einen bestimmten Schlüssel zu löschen. Verwenden Sie das Redis-Cli-Tool, um die Daten zu löschen.

Die Verwendung der REDIS -Anweisung erfordert die folgenden Schritte: Öffnen Sie den Redis -Client. Geben Sie den Befehl ein (Verbschlüsselwert). Bietet die erforderlichen Parameter (variiert von der Anweisung bis zur Anweisung). Drücken Sie die Eingabetaste, um den Befehl auszuführen. Redis gibt eine Antwort zurück, die das Ergebnis der Operation anzeigt (normalerweise in Ordnung oder -err).

Um eine Warteschlange aus Redis zu lesen, müssen Sie den Warteschlangenname erhalten, die Elemente mit dem Befehl LPOP lesen und die leere Warteschlange verarbeiten. Die spezifischen Schritte sind wie folgt: Holen Sie sich den Warteschlangenname: Nennen Sie ihn mit dem Präfix von "Warteschlange:" wie "Warteschlangen: My-Queue". Verwenden Sie den Befehl LPOP: Wischen Sie das Element aus dem Kopf der Warteschlange aus und geben Sie seinen Wert zurück, z. B. die LPOP-Warteschlange: my-queue. Verarbeitung leerer Warteschlangen: Wenn die Warteschlange leer ist, gibt LPOP NIL zurück, und Sie können überprüfen, ob die Warteschlange existiert, bevor Sie das Element lesen.

Um die Operationen zu sperren, muss die Sperre durch den Befehl setNX erfasst werden und dann den Befehl Ablauf verwenden, um die Ablaufzeit festzulegen. Die spezifischen Schritte sind: (1) Verwenden Sie den Befehl setNX, um zu versuchen, ein Schlüsselwertpaar festzulegen; (2) Verwenden Sie den Befehl Ablauf, um die Ablaufzeit für die Sperre festzulegen. (3) Verwenden Sie den Befehl Del, um die Sperre zu löschen, wenn die Sperre nicht mehr benötigt wird.

Verwenden Sie das Redis-Befehlszeilen-Tool (REDIS-CLI), um Redis in folgenden Schritten zu verwalten und zu betreiben: Stellen Sie die Adresse und den Port an, um die Adresse und den Port zu stellen. Senden Sie Befehle mit dem Befehlsnamen und den Parametern an den Server. Verwenden Sie den Befehl Hilfe, um Hilfeinformationen für einen bestimmten Befehl anzuzeigen. Verwenden Sie den Befehl zum Beenden, um das Befehlszeilenwerkzeug zu beenden.

Es gibt zwei Arten von RETIS-Datenverlaufstrategien: regelmäßige Löschung: periodischer Scan zum Löschen des abgelaufenen Schlüssels, der über abgelaufene Cap-Remove-Count- und Ablauf-Cap-Remove-Delay-Parameter festgelegt werden kann. LAZY LELETION: Überprüfen Sie nur, ob abgelaufene Schlüsseln gelöscht werden, wenn Tasten gelesen oder geschrieben werden. Sie können durch LazyFree-Lazy-Eviction, LazyFree-Lazy-Expire, LazyFree-Lazy-User-Del-Parameter eingestellt werden.

C eignet sich besser für Szenarien, in denen eine direkte Kontrolle der Hardware -Ressourcen und hohe Leistungsoptimierung erforderlich ist, während Golang besser für Szenarien geeignet ist, in denen eine schnelle Entwicklung und eine hohe Parallelitätsverarbeitung erforderlich sind. 1.Cs Vorteil liegt in den nahezu Hardware-Eigenschaften und hohen Optimierungsfunktionen, die für leistungsstarke Bedürfnisse wie die Spieleentwicklung geeignet sind. 2. Golangs Vorteil liegt in seiner präzisen Syntax und der natürlichen Unterstützung, die für die Entwicklung einer hohen Parallelitätsdienste geeignet ist.
