Comment implémenter l'analyseur de protocole Redis basé sur Golang
Protocole RESP
RESP est un protocole de communication entre le client et le serveur. Il existe cinq formats :
Réponse normale : une chaîne commençant par "+" et se terminant par "rn"
Réponse d'erreur : commençant par. " -" sous forme de chaîne et se termine par "rn"
Entier : commence par " : " et se termine par "rn" sous forme de chaîne
Chaîne multiligne : commence par "$" suivi des octets réels envoyés Numéro, commençant et se terminant par "rn"
$3rnabcrn
Array : commençant par "*", suivi du nombre de membres
SET valeur clé
*3rn$3rnSETrn$3 rn keyrn$5rnvaluern
Les commandes ou données envoyées par le client et le serveur doivent utiliser rn (CRLF) comme caractère de nouvelle ligne.
Lorsque nous saisissons une série de commandes comme *3rn$3rnSETrn$3rnkeyrn$5rnvaluern, le serveur reçoit la commande suivante :
*3rn
$3rn
SETrn
$3rn
keyrn
$5rn
valuern
interface/resp/conn.go
type Connection interface { Write([]byte) error GetDBIndex() int SelectDB(int) } interface/resp/reply.go type Reply interface { ToBytes() []byte }
Interface de connexion : une connexion pour le client Redis
Write : réponse au client
GetDBInde x : Redis a 16 bases de données
Interface de réponse : Interface de réponse
resp/reply/consts.go
type PongReply struct{} var pongBytes = []byte("+PONG\r\n") func (r *PongReply) ToBytes() []byte { return pongBytes } var thePongReply = new(PongReply) func MakePongReply() *PongReply { return thePongReply } type OkReply struct{} var okBytes = []byte("+OK\r\n") func (r *OkReply) ToBytes() []byte { return okBytes } var theOkReply = new(OkReply) func MakeOkReply() *OkReply { return theOkReply } var nullBulkBytes = []byte("$-1\r\n") type NullBulkReply struct{} func (r *NullBulkReply) ToBytes() []byte { return nullBulkBytes } func MakeNullBulkReply() *NullBulkReply { return &NullBulkReply{} } var emptyMultiBulkBytes = []byte("*0\r\n") type EmptyMultiBulkReply struct{} func (r *EmptyMultiBulkReply) ToBytes() []byte { return emptyMultiBulkBytes } type NoReply struct{} var noBytes = []byte("") func (r *NoReply) ToBytes() []byte { return noBytes }
Définissez cinq réponses : réponse pong, ok, null, tableau vide, vide
resp/reply/ réponse.go
type ErrorReply interface { Error() string ToBytes() []byte }
ErrorReply : Définir l'interface d'erreur
resp/reply/errors.go
type UnknownErrReply struct{} var unknownErrBytes = []byte("-Err unknown\r\n") func (r *UnknownErrReply) ToBytes() []byte { return unknownErrBytes } func (r *UnknownErrReply) Error() string { return "Err unknown" } type ArgNumErrReply struct { Cmd string } func (r *ArgNumErrReply) ToBytes() []byte { return []byte("-ERR wrong number of arguments for '" + r.Cmd + "' command\r\n") } func (r *ArgNumErrReply) Error() string { return "ERR wrong number of arguments for '" + r.Cmd + "' command" } func MakeArgNumErrReply(cmd string) *ArgNumErrReply { return &ArgNumErrReply{ Cmd: cmd, } } type SyntaxErrReply struct{} var syntaxErrBytes = []byte("-Err syntax error\r\n") var theSyntaxErrReply = &SyntaxErrReply{} func MakeSyntaxErrReply() *SyntaxErrReply { return theSyntaxErrReply } func (r *SyntaxErrReply) ToBytes() []byte { return syntaxErrBytes } func (r *SyntaxErrReply) Error() string { return "Err syntax error" } type WrongTypeErrReply struct{} var wrongTypeErrBytes = []byte("-WRONGTYPE Operation against a key holding the wrong kind of value\r\n") func (r *WrongTypeErrReply) ToBytes() []byte { return wrongTypeErrBytes } func (r *WrongTypeErrReply) Error() string { return "WRONGTYPE Operation against a key holding the wrong kind of value" } type ProtocolErrReply struct { Msg string } func (r *ProtocolErrReply) ToBytes() []byte { return []byte("-ERR Protocol error: '" + r.Msg + "'\r\n") } func (r *ProtocolErrReply) Error() string { return "ERR Protocol error: '" + r.Msg }
errors définit 5 types d'erreurs : erreur inconnue UnknownErrReply, erreur de numéro de paramètre ArgNumErrReply, erreur de syntaxe SyntaxErrReply, Erreur de type de données de réponse , Erreur de protocole ProtocolErrReply
resp/reply/reply.go
var ( nullBulkReplyBytes = []byte("$-1") // 协议的结尾 CRLF = "\r\n" ) type BulkReply struct { Arg []byte } func MakeBulkReply(arg []byte) *BulkReply { return &BulkReply{ Arg: arg, } } func (r *BulkReply) ToBytes() []byte { if len(r.Arg) == 0 { return nullBulkReplyBytes } return []byte("$" + strconv.Itoa(len(r.Arg)) + CRLF + string(r.Arg) + CRLF) } type MultiBulkReply struct { Args [][]byte } func (r *MultiBulkReply) ToBytes() []byte { argLen := len(r.Args) var buf bytes.Buffer buf.WriteString("*" + strconv.Itoa(argLen) + CRLF) for _, arg := range r.Args { if arg == nil { buf.WriteString("$-1" + CRLF) } else { buf.WriteString("$" + strconv.Itoa(len(arg)) + CRLF + string(arg) + CRLF) } } return buf.Bytes() } func MakeMultiBulkReply(args [][]byte) *MultiBulkReply { return &MultiBulkReply{ Args: args, } } type StatusReply struct { Status string } func MakeStatusReply(status string) *StatusReply { return &StatusReply{ Status: status, } } func (r *StatusReply) ToBytes() []byte { return []byte("+" + r.Status + CRLF) } type IntReply struct { Code int64 } func MakeIntReply(code int64) *IntReply { return &IntReply{ Code: code, } } func (r *IntReply) ToBytes() []byte { return []byte(":" + strconv.FormatInt(r.Code, 10) + CRLF) } type StandardErrReply struct { Status string } func (r *StandardErrReply) ToBytes() []byte { return []byte("-" + r.Status + CRLF) } func (r *StandardErrReply) Error() string { return r.Status } func MakeErrReply(status string) *StandardErrReply { return &StandardErrReply{ Status: status, } } func IsErrorReply(reply resp.Reply) bool { return reply.ToBytes()[0] == '-' }
BulkReply : réponse à une chaîne
MultiBulkReply : réponse à un tableau de chaînes
-
StatusReply : réponse d'état
IntRéponse : Réponse numérique
StandardErrReply : Réponse d'erreur standard
IsErrorReply : Détermine s'il s'agit d'une réponse d'erreur
ToBytes : Convertit la chaîne dans le format spécifié par le protocole RESP
resp/ analyseur /parser.go
type Payload struct { Data resp.Reply Err error } type readState struct { readingMultiLine bool expectedArgsCount int msgType byte args [][]byte bulkLen int64 } func (s *readState) finished() bool { return s.expectedArgsCount > 0 && len(s.args) == s.expectedArgsCount } func ParseStream(reader io.Reader) <-chan *Payload { ch := make(chan *Payload) go parse0(reader, ch) return ch } func parse0(reader io.Reader, ch chan<- *Payload) { ...... }
Structure de charge utile : les données qui nous sont envoyées par le service client
Reply : les données envoyées par le client et le serveur entre eux sont appelées Reply
readState structure :
readingMultiLine : analyse d'une seule ligne ou de données multilignes
Méthode terminée : analyse du jugement Est-elle terminée ? Méthode ParseStream : analyse les données de manière asynchrone et les met dans le pipeline, et renvoie les données du pipeline
func readLine(bufReader *bufio.Reader, state *readState) ([]byte, bool, error) { var msg []byte var err error if state.bulkLen == 0 { msg, err = bufReader.ReadBytes('\n') if err != nil { return nil, true, err } if len(msg) == 0 || msg[len(msg)-2] != '\r' { return nil, false, errors.New("protocol error: " + string(msg)) } } else { msg = make([]byte, state.bulkLen+2) _, err = io.ReadFull(bufReader, msg) if err != nil { return nil, true, err } if len(msg) == 0 || msg[len(msg)-2] != '\r' || msg[len(msg)-1] != '\n' { return nil, false, errors.New("protocol error: " + string(msg)) } state.bulkLen = 0 } return msg, false, nil }
Copier après la connexionreadLine : lit ligne par ligne. Lire les lignes normales, séparées par n. Lors de la lecture d'une ligne contenant le caractère rn dans le texte, state.bulkLen plus le caractère de nouvelle ligne rn (state.bulkLen+2)func parseMultiBulkHeader(msg []byte, state *readState) error { var err error var expectedLine uint64 expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32) if err != nil { return errors.New("protocol error: " + string(msg)) } if expectedLine == 0 { state.expectedArgsCount = 0 return nil } else if expectedLine > 0 { state.msgType = msg[0] state.readingMultiLine = true state.expectedArgsCount = int(expectedLine) state.args = make([][]byte, 0, expectedLine) return nil } else { return errors.New("protocol error: " + string(msg)) } } func parseBulkHeader(msg []byte, state *readState) error { var err error state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64) if err != nil { return errors.New("protocol error: " + string(msg)) } if state.bulkLen == -1 { // null bulk return nil } else if state.bulkLen > 0 { state.msgType = msg[0] state.readingMultiLine = true state.expectedArgsCount = 1 state.args = make([][]byte, 0, 1) return nil } else { return errors.New("protocol error: " + string(msg)) } }
Copier après la connexionparseMultiBulkHeader : analysez l'en-tête du tableau, définissez le nombre de lignes attendu et les paramètres associés.
parseBulkHeader : analyse l'en-tête d'une chaîne multiligne. func parseSingleLineReply(msg []byte) (resp.Reply, error) { str := strings.TrimSuffix(string(msg), "\r\n") var result resp.Reply switch msg[0] { case '+': // status reply result = reply.MakeStatusReply(str[1:]) case '-': // err reply result = reply.MakeErrReply(str[1:]) case ':': // int reply val, err := strconv.ParseInt(str[1:], 10, 64) if err != nil { return nil, errors.New("protocol error: " + string(msg)) } result = reply.MakeIntReply(val) } return result, nil } func readBody(msg []byte, state *readState) error { line := msg[0 : len(msg)-2] var err error if line[0] == '$' { // bulk reply state.bulkLen, err = strconv.ParseInt(string(line[1:]), 10, 64) if err != nil { return errors.New("protocol error: " + string(msg)) } if state.bulkLen <= 0 { // null bulk in multi bulks state.args = append(state.args, []byte{}) state.bulkLen = 0 } } else { state.args = append(state.args, line) } return nil }
Copier après la connexionparseSingleLineReply : analyser les commandes sur une seule ligne
readBody : lire les commandes multilignes, définissez BulkLen lors de la lecture de la ligne suivante, utilisez ce +2. S'il ne commence pas par $, ajoutez-le. directement à argsfunc parse0(reader io.Reader, ch chan<- *Payload) { defer func() { if err := recover(); err != nil { logger.Error(string(debug.Stack())) } }() bufReader := bufio.NewReader(reader) var state readState var err error var msg []byte for { var ioErr bool msg, ioErr, err = readLine(bufReader, &state) if err != nil { if ioErr { ch <- &Payload{ Err: err, } close(ch) return } ch <- &Payload{ Err: err, } state = readState{} continue } if !state.readingMultiLine { if msg[0] == '*' { // multi bulk reply err = parseMultiBulkHeader(msg, &state) if err != nil { ch <- &Payload{ Err: errors.New("protocol error: " + string(msg)), } state = readState{} continue } if state.expectedArgsCount == 0 { ch <- &Payload{ Data: &reply.EmptyMultiBulkReply{}, } state = readState{} continue } } else if msg[0] == '$' { // bulk reply err = parseBulkHeader(msg, &state) if err != nil { ch <- &Payload{ Err: errors.New("protocol error: " + string(msg)), } state = readState{} // reset state continue } if state.bulkLen == -1 { // null bulk reply ch <- &Payload{ Data: &reply.NullBulkReply{}, } state = readState{} // reset state continue } } else { // single line reply result, err := parseSingleLineReply(msg) ch <- &Payload{ Data: result, Err: err, } state = readState{} // reset state continue } } else { // read bulk reply err = readBody(msg, &state) if err != nil { ch <- &Payload{ Err: errors.New("protocol error: " + string(msg)), } state = readState{} // reset state continue } // if sending finished if state.finished() { var result resp.Reply if state.msgType == '*' { result = reply.MakeMultiBulkReply(state.args) } else if state.msgType == '$' { result = reply.MakeBulkReply(state.args[0]) } ch <- &Payload{ Data: result, Err: err, } state = readState{} } } } }
Copier après la connexionparse0 : analysez la commande et envoyez-la via le canal une fois l'analyse terminée
type Connection struct { conn net.Conn waitingReply wait.Wait mu sync.Mutex // 避免多个协程往客户端中写 selectedDB int } func NewConn(conn net.Conn) *Connection { return &Connection{ conn: conn, } } func (c *Connection) RemoteAddr() net.Addr { return c.conn.RemoteAddr() } func (c *Connection) Close() error { c.waitingReply.WaitWithTimeout(10 * time.Second) _ = c.conn.Close() return nil } func (c *Connection) Write(b []byte) error { if len(b) == 0 { return nil } c.mu.Lock() c.waitingReply.Add(1) defer func() { c.waitingReply.Done() c.mu.Unlock() }() _, err := c.conn.Write(b) return err } func (c *Connection) GetDBIndex() int { return c.selectedDB } func (c *Connection) SelectDB(dbNum int) { c.selectedDB = dbNum }
L'EchoHandler que nous avons écrit auparavant est utilisé pour recevoir l'entrée de l'utilisateur et remettez-le intact. Vous devez maintenant écrire un RespHandler pour remplacer le EchoHandler et laisser l'analyseur effectuer l'analyse. Une structure de connexion qui gère les connexions client doit exister dans RespHandler.
Connexion : connexion client, utilisée dans le gestionnaire de couche de protocole
resp/handler/handler.govar ( unknownErrReplyBytes = []byte("-ERR unknown\r\n") ) type RespHandler struct { activeConn sync.Map db databaseface.Database closing atomic.Boolean } func MakeHandler() *RespHandler { var db databaseface.Database db = database.NewEchoDatabase() return &RespHandler{ db: db, } } func (h *RespHandler) closeClient(client *connection.Connection) { _ = client.Close() h.db.AfterClientClose(client) h.activeConn.Delete(client) } func (h *RespHandler) Handle(ctx context.Context, conn net.Conn) { if h.closing.Get() { // closing handler refuse new connection _ = conn.Close() } client := connection.NewConn(conn) h.activeConn.Store(client, 1) ch := parser.ParseStream(conn) for payload := range ch { if payload.Err != nil { if payload.Err == io.EOF || payload.Err == io.ErrUnexpectedEOF || strings.Contains(payload.Err.Error(), "use of closed network connection") { // connection closed h.closeClient(client) logger.Info("connection closed: " + client.RemoteAddr().String()) return } // protocol err errReply := reply.MakeErrReply(payload.Err.Error()) err := client.Write(errReply.ToBytes()) if err != nil { h.closeClient(client) logger.Info("connection closed: " + client.RemoteAddr().String()) return } continue } if payload.Data == nil { logger.Error("empty payload") continue } r, ok := payload.Data.(*reply.MultiBulkReply) if !ok { logger.Error("require multi bulk reply") continue } result := h.db.Exec(client, r.Args) if result != nil { _ = client.Write(result.ToBytes()) } else { _ = client.Write(unknownErrReplyBytes) } } } func (h *RespHandler) Close() error { logger.Info("handler shutting down...") h.closing.Set(true) // TODO: concurrent wait h.activeConn.Range(func(key interface{}, val interface{}) bool { client := key.(*connection.Connection) _ = client.Close() return true }) h.db.Close() return nil }
RespHandler : similaire à l'écho précédent, avec l'ajout de la couche principale db.exec pour effectuer l'analyse de la commande
interface/database/database.go
type CmdLine = [][]byte type Database interface { Exec(client resp.Connection, args [][]byte) resp.Reply AfterClientClose(c resp.Connection) Close() } type DataEntity struct { Data interface{} }
Exec : exécution de la couche principale AfterClientClose : méthode de suivi après la fermeture
CmdLine : alias de commande pour un tableau d'octets bidimensionnel
DataEntity : représente les données Redis, y compris la chaîne, la liste, set, etc.
database/echo_database.go
type EchoDatabase struct { } func NewEchoDatabase() *EchoDatabase { return &EchoDatabase{} } func (e EchoDatabase) Exec(client resp.Connection, args [][]byte) resp.Reply { return reply.MakeMultiBulkReply(args) } func (e EchoDatabase) AfterClientClose(c resp.Connection) { logger.Info("EchoDatabase AfterClientClose") } func (e EchoDatabase) Close() { logger.Info("EchoDatabase Close") }
echo_database : tester la couche de protocole
Exec : une fois la commande analysée, utilisez MakeMultiBulkReply pour l'envelopper et la renvoyer
main.goerr := tcp.ListenAndServeWithSignal( &tcp.Config{ Address: fmt.Sprintf("%s:%d", config.Properties.Bind, config.Properties.Port), }, handler.MakeHandler()) if err != nil { logger.Error(err) }
Change principal de ce que vous venez d'écrire : handler.MakeHandler()
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Outils d'IA chauds

Undresser.AI Undress
Application basée sur l'IA pour créer des photos de nu réalistes

AI Clothes Remover
Outil d'IA en ligne pour supprimer les vêtements des photos.

Undress AI Tool
Images de déshabillage gratuites

Clothoff.io
Dissolvant de vêtements AI

AI Hentai Generator
Générez AI Hentai gratuitement.

Article chaud

Outils chauds

Bloc-notes++7.3.1
Éditeur de code facile à utiliser et gratuit

SublimeText3 version chinoise
Version chinoise, très simple à utiliser

Envoyer Studio 13.0.1
Puissant environnement de développement intégré PHP

Dreamweaver CS6
Outils de développement Web visuel

SublimeText3 version Mac
Logiciel d'édition de code au niveau de Dieu (SublimeText3)

Le mode Redis Cluster déploie les instances Redis sur plusieurs serveurs grâce à la rupture, à l'amélioration de l'évolutivité et de la disponibilité. Les étapes de construction sont les suivantes: Créez des instances de redis étranges avec différents ports; Créer 3 instances Sentinel, Moniteur Redis Instances et basculement; Configurer les fichiers de configuration Sentinel, ajouter des informations d'instance Redis de surveillance et des paramètres de basculement; Configurer les fichiers de configuration d'instance Redis, activer le mode de cluster et spécifier le chemin du fichier d'informations de cluster; Créer un fichier nœuds.conf, contenant des informations de chaque instance redis; Démarrez le cluster, exécutez la commande CREATE pour créer un cluster et spécifiez le nombre de répliques; Connectez-vous au cluster pour exécuter la commande d'informations de cluster pour vérifier l'état du cluster; faire

Comment effacer les données Redis: utilisez la commande flushall pour effacer toutes les valeurs de clé. Utilisez la commande flushdb pour effacer la valeur clé de la base de données actuellement sélectionnée. Utilisez SELECT pour commuter les bases de données, puis utilisez FlushDB pour effacer plusieurs bases de données. Utilisez la commande del pour supprimer une clé spécifique. Utilisez l'outil Redis-CLI pour effacer les données.

L'utilisation de la directive Redis nécessite les étapes suivantes: Ouvrez le client Redis. Entrez la commande (Verbe Key Value). Fournit les paramètres requis (varie de l'instruction à l'instruction). Appuyez sur Entrée pour exécuter la commande. Redis renvoie une réponse indiquant le résultat de l'opération (généralement OK ou -err).

Pour lire une file d'attente à partir de Redis, vous devez obtenir le nom de la file d'attente, lire les éléments à l'aide de la commande LPOP et traiter la file d'attente vide. Les étapes spécifiques sont les suivantes: Obtenez le nom de la file d'attente: Nommez-le avec le préfixe de "Fitre:" tel que "Fitre: My-Quyue". Utilisez la commande LPOP: éjectez l'élément de la tête de la file d'attente et renvoyez sa valeur, telle que la file d'attente LPOP: My-Queue. Traitement des files d'attente vides: si la file d'attente est vide, LPOP renvoie NIL et vous pouvez vérifier si la file d'attente existe avant de lire l'élément.

L'utilisation des opérations Redis pour verrouiller nécessite l'obtention du verrouillage via la commande setnx, puis en utilisant la commande Expire pour définir le temps d'expiration. Les étapes spécifiques sont les suivantes: (1) Utilisez la commande setnx pour essayer de définir une paire de valeurs de clé; (2) Utilisez la commande Expire pour définir le temps d'expiration du verrou; (3) Utilisez la commande del pour supprimer le verrouillage lorsque le verrouillage n'est plus nécessaire.

Utilisez l'outil de ligne de commande redis (Redis-CLI) pour gérer et utiliser Redis via les étapes suivantes: Connectez-vous au serveur, spécifiez l'adresse et le port. Envoyez des commandes au serveur à l'aide du nom et des paramètres de commande. Utilisez la commande d'aide pour afficher les informations d'aide pour une commande spécifique. Utilisez la commande QUIT pour quitter l'outil de ligne de commande.

Il existe deux types de stratégies d'expiration de données redis: la suppression périodique: analyse périodique pour supprimer la clé expirée, qui peut être définie via des paramètres d'expiration-temps-transport et des paramètres d'expiration-temps-transparence. Suppression paresseuse: vérifiez les clés expirées de suppression uniquement lorsque les clés sont lues ou écrites. Ils peuvent être définis à travers des paramètres Lazyfree-Lazy-Deviction, Lazyfree-Lazy-Expire, Lazyfree-Lazy-User-Del.

C est plus adapté aux scénarios où le contrôle direct des ressources matérielles et une optimisation élevée de performances sont nécessaires, tandis que Golang est plus adapté aux scénarios où un développement rapide et un traitement de concurrence élevé sont nécessaires. 1.C's Avantage est dans ses caractéristiques matérielles proches et à des capacités d'optimisation élevées, qui conviennent aux besoins de haute performance tels que le développement de jeux. 2. L'avantage de Golang réside dans sa syntaxe concise et son soutien à la concurrence naturelle, qui convient au développement élevé de services de concurrence.
