Maison > base de données > Redis > Comment implémenter l'analyseur de protocole Redis basé sur Golang

Comment implémenter l'analyseur de protocole Redis basé sur Golang

WBOY
Libérer: 2023-05-28 19:13:21
avant
776 Les gens l'ont consulté

Protocole RESP

RESP est un protocole de communication entre le client et le serveur. Il existe cinq formats :

Réponse normale : une chaîne commençant par "+" et se terminant par "rn"

Réponse d'erreur : commençant par. " -" sous forme de chaîne et se termine par "rn"

Entier : commence par " : " et se termine par "rn" sous forme de chaîne

Chaîne multiligne : commence par "$" suivi des octets réels envoyés Numéro, commençant et se terminant par "rn"

$3rnabcrn

Array : commençant par "*", suivi du nombre de membres

SET valeur clé
*3rn$3rnSETrn$3 rn keyrn$5rnvaluern

Les commandes ou données envoyées par le client et le serveur doivent utiliser rn (CRLF) comme caractère de nouvelle ligne.

Lorsque nous saisissons une série de commandes comme *3rn$3rnSETrn$3rnkeyrn$5rnvaluern, le serveur reçoit la commande suivante :
*3rn
$3rn
SETrn
$3rn
keyrn
$5rn
valuern

interface/resp/conn.go

type Connection interface {
   Write([]byte) error
   GetDBIndex() int
   SelectDB(int)
}

interface/resp/reply.go
type Reply interface {
	ToBytes() []byte
}
Copier après la connexion
  • Interface de connexion : une connexion pour le client Redis

  • Write : réponse au client

  • GetDBInde x : Redis a 16 bases de données

  • Interface de réponse : Interface de réponse

resp/reply/consts.go

type PongReply struct{}

var pongBytes = []byte("+PONG\r\n")

func (r *PongReply) ToBytes() []byte {
    return pongBytes
}

var thePongReply = new(PongReply)

func MakePongReply() *PongReply {
    return thePongReply
}

type OkReply struct{}

var okBytes = []byte("+OK\r\n")

func (r *OkReply) ToBytes() []byte {
    return okBytes
}

var theOkReply = new(OkReply)

func MakeOkReply() *OkReply {
    return theOkReply
}

var nullBulkBytes = []byte("$-1\r\n")

type NullBulkReply struct{}

func (r *NullBulkReply) ToBytes() []byte {
    return nullBulkBytes
}

func MakeNullBulkReply() *NullBulkReply {
    return &NullBulkReply{}
}

var emptyMultiBulkBytes = []byte("*0\r\n")

type EmptyMultiBulkReply struct{}

func (r *EmptyMultiBulkReply) ToBytes() []byte {
    return emptyMultiBulkBytes
}

type NoReply struct{}

var noBytes = []byte("")

func (r *NoReply) ToBytes() []byte {
    return noBytes
}
Copier après la connexion

Définissez cinq réponses : réponse pong, ok, null, tableau vide, vide

resp/reply/ réponse.go

type ErrorReply interface {
   Error() string
   ToBytes() []byte
}
Copier après la connexion

ErrorReply : Définir l'interface d'erreur

resp/reply/errors.go

type UnknownErrReply struct{}

var unknownErrBytes = []byte("-Err unknown\r\n")

func (r *UnknownErrReply) ToBytes() []byte {
   return unknownErrBytes
}

func (r *UnknownErrReply) Error() string {
   return "Err unknown"
}

type ArgNumErrReply struct {
   Cmd string
}

func (r *ArgNumErrReply) ToBytes() []byte {
   return []byte("-ERR wrong number of arguments for '" + r.Cmd + "' command\r\n")
}

func (r *ArgNumErrReply) Error() string {
   return "ERR wrong number of arguments for '" + r.Cmd + "' command"
}

func MakeArgNumErrReply(cmd string) *ArgNumErrReply {
   return &ArgNumErrReply{
      Cmd: cmd,
   }
}

type SyntaxErrReply struct{}

var syntaxErrBytes = []byte("-Err syntax error\r\n")
var theSyntaxErrReply = &SyntaxErrReply{}

func MakeSyntaxErrReply() *SyntaxErrReply {
   return theSyntaxErrReply
}

func (r *SyntaxErrReply) ToBytes() []byte {
   return syntaxErrBytes
}

func (r *SyntaxErrReply) Error() string {
   return "Err syntax error"
}

type WrongTypeErrReply struct{}

var wrongTypeErrBytes = []byte("-WRONGTYPE Operation against a key holding the wrong kind of value\r\n")

func (r *WrongTypeErrReply) ToBytes() []byte {
   return wrongTypeErrBytes
}

func (r *WrongTypeErrReply) Error() string {
   return "WRONGTYPE Operation against a key holding the wrong kind of value"
}

type ProtocolErrReply struct {
   Msg string
}

func (r *ProtocolErrReply) ToBytes() []byte {
   return []byte("-ERR Protocol error: '" + r.Msg + "'\r\n")
}

func (r *ProtocolErrReply) Error() string {
   return "ERR Protocol error: '" + r.Msg
}
Copier après la connexion

errors définit 5 types d'erreurs : erreur inconnue UnknownErrReply, erreur de numéro de paramètre ArgNumErrReply, erreur de syntaxe SyntaxErrReply, Erreur de type de données de réponse , Erreur de protocole ProtocolErrReply

resp/reply/reply.go

var (
   nullBulkReplyBytes = []byte("$-1")
   // 协议的结尾
   CRLF = "\r\n"
)

type BulkReply struct {
   Arg []byte
}

func MakeBulkReply(arg []byte) *BulkReply {
   return &BulkReply{
      Arg: arg,
   }
}

func (r *BulkReply) ToBytes() []byte {
   if len(r.Arg) == 0 {
      return nullBulkReplyBytes
   }
   return []byte("$" + strconv.Itoa(len(r.Arg)) + CRLF + string(r.Arg) + CRLF)
}

type MultiBulkReply struct {
   Args [][]byte
}

func (r *MultiBulkReply) ToBytes() []byte {
   argLen := len(r.Args)
   var buf bytes.Buffer
   buf.WriteString("*" + strconv.Itoa(argLen) + CRLF)
   for _, arg := range r.Args {
      if arg == nil {
         buf.WriteString("$-1" + CRLF)
      } else {
         buf.WriteString("$" + strconv.Itoa(len(arg)) + CRLF + string(arg) + CRLF)
      }
   }
   return buf.Bytes()
}

func MakeMultiBulkReply(args [][]byte) *MultiBulkReply {
   return &MultiBulkReply{
      Args: args,
   }
}

type StatusReply struct {
   Status string
}

func MakeStatusReply(status string) *StatusReply {
   return &StatusReply{
      Status: status,
   }
}

func (r *StatusReply) ToBytes() []byte {
   return []byte("+" + r.Status + CRLF)
}

type IntReply struct {
   Code int64
}

func MakeIntReply(code int64) *IntReply {
   return &IntReply{
      Code: code,
   }
}

func (r *IntReply) ToBytes() []byte {
   return []byte(":" + strconv.FormatInt(r.Code, 10) + CRLF)
}

type StandardErrReply struct {
   Status string
}

func (r *StandardErrReply) ToBytes() []byte {
   return []byte("-" + r.Status + CRLF)
}

func (r *StandardErrReply) Error() string {
   return r.Status
}

func MakeErrReply(status string) *StandardErrReply {
   return &StandardErrReply{
      Status: status,
   }
}

func IsErrorReply(reply resp.Reply) bool {
   return reply.ToBytes()[0] == '-'
}
Copier après la connexion
  • BulkReply : réponse à une chaîne

  • MultiBulkReply : réponse à un tableau de chaînes

  • StatusReply : réponse d'état

  • IntRéponse : Réponse numérique

  • StandardErrReply : Réponse d'erreur standard

  • IsErrorReply : Détermine s'il s'agit d'une réponse d'erreur

  • ToBytes : Convertit la chaîne dans le format spécifié par le protocole RESP

resp/ analyseur /parser.go

type Payload struct {
   Data resp.Reply
   Err  error
}

type readState struct {
   readingMultiLine  bool     
   expectedArgsCount int     
   msgType           byte    
   args              [][]byte 
   bulkLen           int64    
}

func (s *readState) finished() bool {
   return s.expectedArgsCount > 0 && len(s.args) == s.expectedArgsCount
}

func ParseStream(reader io.Reader) <-chan *Payload {
   ch := make(chan *Payload)
   go parse0(reader, ch)
   return ch
}

func parse0(reader io.Reader, ch chan<- *Payload) {
	 ......
}
Copier après la connexion

Structure de charge utile : les données qui nous sont envoyées par le service client

Reply : les données envoyées par le client et le serveur entre eux sont appelées Reply

readState structure :

  • readingMultiLine : analyse d'une seule ligne ou de données multilignes

  • Méthode terminée : analyse du jugement Est-elle terminée ?
  • Méthode ParseStream : analyse les données de manière asynchrone et les met dans le pipeline, et renvoie les données du pipeline

    func readLine(bufReader *bufio.Reader, state *readState) ([]byte, bool, error) {
       var msg []byte
       var err error
       if state.bulkLen == 0 {
          msg, err = bufReader.ReadBytes(&#39;\n&#39;)
          if err != nil {
             return nil, true, err
          }
          if len(msg) == 0 || msg[len(msg)-2] != &#39;\r&#39; {
             return nil, false, errors.New("protocol error: " + string(msg))
          }
       } else {
          msg = make([]byte, state.bulkLen+2)
          _, err = io.ReadFull(bufReader, msg)
          if err != nil {
             return nil, true, err
          }
          if len(msg) == 0 || msg[len(msg)-2] != &#39;\r&#39; || msg[len(msg)-1] != &#39;\n&#39; {
             return nil, false, errors.New("protocol error: " + string(msg))
          }
          state.bulkLen = 0
       }
       return msg, false, nil
    }
    Copier après la connexion
  • readLine : lit ligne par ligne. Lire les lignes normales, séparées par n. Lors de la lecture d'une ligne contenant le caractère rn dans le texte, state.bulkLen plus le caractère de nouvelle ligne rn (state.bulkLen+2)
  • func parseMultiBulkHeader(msg []byte, state *readState) error {
       var err error
       var expectedLine uint64
       expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)
       if err != nil {
          return errors.New("protocol error: " + string(msg))
       }
       if expectedLine == 0 {
          state.expectedArgsCount = 0
          return nil
       } else if expectedLine > 0 {
          state.msgType = msg[0]
          state.readingMultiLine = true
          state.expectedArgsCount = int(expectedLine)
          state.args = make([][]byte, 0, expectedLine)
          return nil
       } else {
          return errors.New("protocol error: " + string(msg))
       }
    }
    
    func parseBulkHeader(msg []byte, state *readState) error {
       var err error
       state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64)
       if err != nil {
          return errors.New("protocol error: " + string(msg))
       }
       if state.bulkLen == -1 { // null bulk
          return nil
       } else if state.bulkLen > 0 {
          state.msgType = msg[0]
          state.readingMultiLine = true
          state.expectedArgsCount = 1
          state.args = make([][]byte, 0, 1)
          return nil
       } else {
          return errors.New("protocol error: " + string(msg))
       }
    }
    Copier après la connexion

    parseMultiBulkHeader : analysez l'en-tête du tableau, définissez le nombre de lignes attendu et les paramètres associés.

  • parseBulkHeader : analyse l'en-tête d'une chaîne multiligne.
  • func parseSingleLineReply(msg []byte) (resp.Reply, error) {
       str := strings.TrimSuffix(string(msg), "\r\n")
       var result resp.Reply
       switch msg[0] {
       case &#39;+&#39;: // status reply
          result = reply.MakeStatusReply(str[1:])
       case &#39;-&#39;: // err reply
          result = reply.MakeErrReply(str[1:])
       case &#39;:&#39;: // int reply
          val, err := strconv.ParseInt(str[1:], 10, 64)
          if err != nil {
             return nil, errors.New("protocol error: " + string(msg))
          }
          result = reply.MakeIntReply(val)
       }
       return result, nil
    }
    
    func readBody(msg []byte, state *readState) error {
       line := msg[0 : len(msg)-2]
       var err error
       if line[0] == &#39;$&#39; {
          // bulk reply
          state.bulkLen, err = strconv.ParseInt(string(line[1:]), 10, 64)
          if err != nil {
             return errors.New("protocol error: " + string(msg))
          }
          if state.bulkLen <= 0 { // null bulk in multi bulks
             state.args = append(state.args, []byte{})
             state.bulkLen = 0
          }
       } else {
          state.args = append(state.args, line)
       }
       return nil
    }
    Copier après la connexion

    parseSingleLineReply : analyser les commandes sur une seule ligne

  • readBody : lire les commandes multilignes, définissez BulkLen lors de la lecture de la ligne suivante, utilisez ce +2. S'il ne commence pas par $, ajoutez-le. directement à args
  • func parse0(reader io.Reader, ch chan<- *Payload) {
        defer func() {
           if err := recover(); err != nil {
              logger.Error(string(debug.Stack()))
          }
       }()
        bufReader := bufio.NewReader(reader)
        var state readState
        var err error
        var msg []byte
        for {
           var ioErr bool
           msg, ioErr, err = readLine(bufReader, &state)
           if err != nil {
              if ioErr {
                 ch <- &Payload{
                    Err: err,
                }
                 close(ch)
                 return
             }
              ch <- &Payload{
                 Err: err,
             }
              state = readState{}
              continue
          }
    
           if !state.readingMultiLine {
              if msg[0] == &#39;*&#39; {
                 // multi bulk reply
                 err = parseMultiBulkHeader(msg, &state)
                 if err != nil {
                    ch <- &Payload{
                       Err: errors.New("protocol error: " + string(msg)),
                   }
                    state = readState{}
                    continue
                }
                 if state.expectedArgsCount == 0 {
                    ch <- &Payload{
                       Data: &reply.EmptyMultiBulkReply{},
                   }
                    state = readState{}
                    continue
                }
             } else if msg[0] == &#39;$&#39; { // bulk reply
                 err = parseBulkHeader(msg, &state)
                 if err != nil {
                    ch <- &Payload{
                       Err: errors.New("protocol error: " + string(msg)),
                   }
                    state = readState{} // reset state
                    continue
                }
                 if state.bulkLen == -1 { // null bulk reply
                    ch <- &Payload{
                       Data: &reply.NullBulkReply{},
                   }
                    state = readState{} // reset state
                    continue
                }
             } else {
                 // single line reply
                 result, err := parseSingleLineReply(msg)
                 ch <- &Payload{
                    Data: result,
                    Err:  err,
                }
                 state = readState{} // reset state
                 continue
             }
          } else {
              // read bulk reply
              err = readBody(msg, &state)
              if err != nil {
                 ch <- &Payload{
                    Err: errors.New("protocol error: " + string(msg)),
                }
                 state = readState{} // reset state
                 continue
             }
              // if sending finished
              if state.finished() {
                 var result resp.Reply
                 if state.msgType == &#39;*&#39; {
                    result = reply.MakeMultiBulkReply(state.args)
                } else if state.msgType == &#39;$&#39; {
                    result = reply.MakeBulkReply(state.args[0])
                }
                 ch <- &Payload{
                    Data: result,
                    Err:  err,
                }
                 state = readState{}
             }
          }
       }
    }
    Copier après la connexion

    parse0 : analysez la commande et envoyez-la via le canal une fois l'analyse terminée

resp/connection/conn.go

type Connection struct {
    conn net.Conn
    waitingReply wait.Wait
    mu sync.Mutex // 避免多个协程往客户端中写
    selectedDB int
}

func NewConn(conn net.Conn) *Connection {
    return &Connection{
        conn: conn,
    }
}

func (c *Connection) RemoteAddr() net.Addr {
    return c.conn.RemoteAddr()
}

func (c *Connection) Close() error {
    c.waitingReply.WaitWithTimeout(10 * time.Second)
    _ = c.conn.Close()
    return nil
}

func (c *Connection) Write(b []byte) error {
    if len(b) == 0 {
        return nil
    }
    c.mu.Lock()
    c.waitingReply.Add(1)
    defer func() {
        c.waitingReply.Done()
        c.mu.Unlock()
    }()

    _, err := c.conn.Write(b)
    return err
}

func (c *Connection) GetDBIndex() int {
    return c.selectedDB
}

func (c *Connection) SelectDB(dbNum int) {
    c.selectedDB = dbNum
}
Copier après la connexion

L'EchoHandler que nous avons écrit auparavant est utilisé pour recevoir l'entrée de l'utilisateur et remettez-le intact. Vous devez maintenant écrire un RespHandler pour remplacer le EchoHandler et laisser l'analyseur effectuer l'analyse. Une structure de connexion qui gère les connexions client doit exister dans RespHandler.

Connexion : connexion client, utilisée dans le gestionnaire de couche de protocole

resp/handler/handler.go

var (
   unknownErrReplyBytes = []byte("-ERR unknown\r\n")
)

type RespHandler struct {
   activeConn sync.Map
   db         databaseface.Database
   closing    atomic.Boolean
}

func MakeHandler() *RespHandler {
   var db databaseface.Database
   db = database.NewEchoDatabase()
   return &RespHandler{
      db: db,
   }
}

func (h *RespHandler) closeClient(client *connection.Connection) {
   _ = client.Close()
   h.db.AfterClientClose(client)
   h.activeConn.Delete(client)
}

func (h *RespHandler) Handle(ctx context.Context, conn net.Conn) {
   if h.closing.Get() {
      // closing handler refuse new connection
      _ = conn.Close()
   }

   client := connection.NewConn(conn)
   h.activeConn.Store(client, 1)

   ch := parser.ParseStream(conn)
   for payload := range ch {
      if payload.Err != nil {
         if payload.Err == io.EOF ||
            payload.Err == io.ErrUnexpectedEOF ||
            strings.Contains(payload.Err.Error(), "use of closed network connection") {
            // connection closed
            h.closeClient(client)
            logger.Info("connection closed: " + client.RemoteAddr().String())
            return
         }
         // protocol err
         errReply := reply.MakeErrReply(payload.Err.Error())
         err := client.Write(errReply.ToBytes())
         if err != nil {
            h.closeClient(client)
            logger.Info("connection closed: " + client.RemoteAddr().String())
            return
         }
         continue
      }
      if payload.Data == nil {
         logger.Error("empty payload")
         continue
      }
      r, ok := payload.Data.(*reply.MultiBulkReply)
      if !ok {
         logger.Error("require multi bulk reply")
         continue
      }
      result := h.db.Exec(client, r.Args)
      if result != nil {
         _ = client.Write(result.ToBytes())
      } else {
         _ = client.Write(unknownErrReplyBytes)
      }
   }
}

func (h *RespHandler) Close() error {
   logger.Info("handler shutting down...")
   h.closing.Set(true)
   // TODO: concurrent wait
   h.activeConn.Range(func(key interface{}, val interface{}) bool {
      client := key.(*connection.Connection)
      _ = client.Close()
      return true
   })
   h.db.Close()
   return nil
}
Copier après la connexion

RespHandler : similaire à l'écho précédent, avec l'ajout de la couche principale db.exec pour effectuer l'analyse de la commande

interface/database/database.go

type CmdLine = [][]byte

type Database interface {
	Exec(client resp.Connection, args [][]byte) resp.Reply
	AfterClientClose(c resp.Connection)
	Close()
}

type DataEntity struct {
	Data interface{}
}
Copier après la connexion

Exec : exécution de la couche principale AfterClientClose : méthode de suivi après la fermeture

CmdLine : alias de commande pour un tableau d'octets bidimensionnel

DataEntity : représente les données Redis, y compris la chaîne, la liste, set, etc.

database/echo_database.go

type EchoDatabase struct {
}

func NewEchoDatabase() *EchoDatabase {
   return &EchoDatabase{}
}

func (e EchoDatabase) Exec(client resp.Connection, args [][]byte) resp.Reply {
   return reply.MakeMultiBulkReply(args)
}

func (e EchoDatabase) AfterClientClose(c resp.Connection) {
   logger.Info("EchoDatabase AfterClientClose")
}

func (e EchoDatabase) Close() {
   logger.Info("EchoDatabase Close")
}
Copier après la connexion

echo_database : tester la couche de protocole

Exec : une fois la commande analysée, utilisez MakeMultiBulkReply pour l'envelopper et la renvoyer

main.go

err := tcp.ListenAndServeWithSignal(
   &tcp.Config{
      Address: fmt.Sprintf("%s:%d",
         config.Properties.Bind,
         config.Properties.Port),
   },
   handler.MakeHandler())
if err != nil {
   logger.Error(err)
}
Copier après la connexion

Change principal de ce que vous venez d'écrire : handler.MakeHandler()

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Étiquettes associées:
source:yisu.com
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal