Home > Database > Redis > How to implement Redis protocol parser based on Golang

How to implement Redis protocol parser based on Golang

Release: 2023-05-28 19:13:21
873 people have browsed it

RESP protocol

RESP is a protocol for communication between the client and the server. There are five formats:

Normal reply: starting with " " and ending with "\ r\n" in the form of a string

Error reply: starting with "-" and ending in the form of a string with "\r\n"

Integer: starting with ":", String format ending with "\r\n"

Multi-line string: starting with "$", followed by the actual number of bytes sent, and then starting and ending with "\r\n"


Array: starts with "*", followed by the number of members

SET key value
*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\ nvalue\r\n

Commands or data sent by the client and server always use \r\n (CRLF) as the newline character.

When we enter *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n $5\r\nvalue\r\nFor such a series of commands, the server receives the following commands:


type Connection interface {
   Write([]byte) error
   GetDBIndex() int

type Reply interface {
	ToBytes() []byte
Copy after login
  • Connection interface: a connection for the Redis client

  • Write: reply to the client

  • GetDBIndex: Redis has 16 DBs

  • Reply interface: response interface


type PongReply struct{}

var pongBytes = []byte("+PONG\r\n")

func (r *PongReply) ToBytes() []byte {
    return pongBytes

var thePongReply = new(PongReply)

func MakePongReply() *PongReply {
    return thePongReply

type OkReply struct{}

var okBytes = []byte("+OK\r\n")

func (r *OkReply) ToBytes() []byte {
    return okBytes

var theOkReply = new(OkReply)

func MakeOkReply() *OkReply {
    return theOkReply

var nullBulkBytes = []byte("$-1\r\n")

type NullBulkReply struct{}

func (r *NullBulkReply) ToBytes() []byte {
    return nullBulkBytes

func MakeNullBulkReply() *NullBulkReply {
    return &NullBulkReply{}

var emptyMultiBulkBytes = []byte("*0\r\n")

type EmptyMultiBulkReply struct{}

func (r *EmptyMultiBulkReply) ToBytes() []byte {
    return emptyMultiBulkBytes

type NoReply struct{}

var noBytes = []byte("")

func (r *NoReply) ToBytes() []byte {
    return noBytes
Copy after login

Define five kinds of replies: reply pong, ok, null, empty array, empty


type ErrorReply interface {
   Error() string
   ToBytes() []byte
Copy after login

ErrorReply: Define error interface


type UnknownErrReply struct{}

var unknownErrBytes = []byte("-Err unknown\r\n")

func (r *UnknownErrReply) ToBytes() []byte {
   return unknownErrBytes

func (r *UnknownErrReply) Error() string {
   return "Err unknown"

type ArgNumErrReply struct {
   Cmd string

func (r *ArgNumErrReply) ToBytes() []byte {
   return []byte("-ERR wrong number of arguments for '" + r.Cmd + "' command\r\n")

func (r *ArgNumErrReply) Error() string {
   return "ERR wrong number of arguments for '" + r.Cmd + "' command"

func MakeArgNumErrReply(cmd string) *ArgNumErrReply {
   return &ArgNumErrReply{
      Cmd: cmd,

type SyntaxErrReply struct{}

var syntaxErrBytes = []byte("-Err syntax error\r\n")
var theSyntaxErrReply = &SyntaxErrReply{}

func MakeSyntaxErrReply() *SyntaxErrReply {
   return theSyntaxErrReply

func (r *SyntaxErrReply) ToBytes() []byte {
   return syntaxErrBytes

func (r *SyntaxErrReply) Error() string {
   return "Err syntax error"

type WrongTypeErrReply struct{}

var wrongTypeErrBytes = []byte("-WRONGTYPE Operation against a key holding the wrong kind of value\r\n")

func (r *WrongTypeErrReply) ToBytes() []byte {
   return wrongTypeErrBytes

func (r *WrongTypeErrReply) Error() string {
   return "WRONGTYPE Operation against a key holding the wrong kind of value"

type ProtocolErrReply struct {
   Msg string

func (r *ProtocolErrReply) ToBytes() []byte {
   return []byte("-ERR Protocol error: '" + r.Msg + "'\r\n")

func (r *ProtocolErrReply) Error() string {
   return "ERR Protocol error: '" + r.Msg
Copy after login

errors defines 5 types of errors: UnknownErrReply Unknown error , ArgNumErrReply parameter number error, SyntaxErrReply syntax error, WrongTypeErrReply data type error, ProtocolErrReply protocol error


var (
   nullBulkReplyBytes = []byte("$-1")
   // 协议的结尾
   CRLF = "\r\n"

type BulkReply struct {
   Arg []byte

func MakeBulkReply(arg []byte) *BulkReply {
   return &BulkReply{
      Arg: arg,

func (r *BulkReply) ToBytes() []byte {
   if len(r.Arg) == 0 {
      return nullBulkReplyBytes
   return []byte("$" + strconv.Itoa(len(r.Arg)) + CRLF + string(r.Arg) + CRLF)

type MultiBulkReply struct {
   Args [][]byte

func (r *MultiBulkReply) ToBytes() []byte {
   argLen := len(r.Args)
   var buf bytes.Buffer
   buf.WriteString("*" + strconv.Itoa(argLen) + CRLF)
   for _, arg := range r.Args {
      if arg == nil {
         buf.WriteString("$-1" + CRLF)
      } else {
         buf.WriteString("$" + strconv.Itoa(len(arg)) + CRLF + string(arg) + CRLF)
   return buf.Bytes()

func MakeMultiBulkReply(args [][]byte) *MultiBulkReply {
   return &MultiBulkReply{
      Args: args,

type StatusReply struct {
   Status string

func MakeStatusReply(status string) *StatusReply {
   return &StatusReply{
      Status: status,

func (r *StatusReply) ToBytes() []byte {
   return []byte("+" + r.Status + CRLF)

type IntReply struct {
   Code int64

func MakeIntReply(code int64) *IntReply {
   return &IntReply{
      Code: code,

func (r *IntReply) ToBytes() []byte {
   return []byte(":" + strconv.FormatInt(r.Code, 10) + CRLF)

type StandardErrReply struct {
   Status string

func (r *StandardErrReply) ToBytes() []byte {
   return []byte("-" + r.Status + CRLF)

func (r *StandardErrReply) Error() string {
   return r.Status

func MakeErrReply(status string) *StandardErrReply {
   return &StandardErrReply{
      Status: status,

func IsErrorReply(reply resp.Reply) bool {
   return reply.ToBytes()[0] == '-'
Copy after login
  • BulkReply: Reply to a string

  • MultiBulkReply: Reply to an array of strings

  • StatusReply: Status reply

  • IntReply: Numeric reply

  • StandardErrReply: Standard error reply

  • IsErrorReply: Determine whether it is an error reply

  • ToBytes: Convert the string into the format specified by the RESP protocol


type Payload struct {
   Data resp.Reply
   Err  error

type readState struct {
   readingMultiLine  bool     
   expectedArgsCount int     
   msgType           byte    
   args              [][]byte 
   bulkLen           int64    

func (s *readState) finished() bool {
   return s.expectedArgsCount > 0 && len(s.args) == s.expectedArgsCount

func ParseStream(reader io.Reader) <-chan *Payload {
   ch := make(chan *Payload)
   go parse0(reader, ch)
   return ch

func parse0(reader io.Reader, ch chan<- *Payload) {
Copy after login

Payload Structure: The data sent to us by the customer service side

Reply: The data sent by the client and the server to each other is called Reply

readState structure:

  • readingMultiLine: parsing single or multiple lines of data

  • expectedArgsCount: the number of parameters that should be read

  • msgType: message type

  • args: Message content

  • bulkLen: Data length

finished method: Determine whether the parsing is completed

ParseStream method: Asynchronously parse the data and put it into the pipeline, and return the pipeline data

func readLine(bufReader *bufio.Reader, state *readState) ([]byte, bool, error) {
   var msg []byte
   var err error
   if state.bulkLen == 0 {
      msg, err = bufReader.ReadBytes(&#39;\n&#39;)
      if err != nil {
         return nil, true, err
      if len(msg) == 0 || msg[len(msg)-2] != &#39;\r&#39; {
         return nil, false, errors.New("protocol error: " + string(msg))
   } else {
      msg = make([]byte, state.bulkLen+2)
      _, err = io.ReadFull(bufReader, msg)
      if err != nil {
         return nil, true, err
      if len(msg) == 0 || msg[len(msg)-2] != &#39;\r&#39; || msg[len(msg)-1] != &#39;\n&#39; {
         return nil, false, errors.New("protocol error: " + string(msg))
      state.bulkLen = 0
   return msg, false, nil
Copy after login

readLine: Read line by line. Read normal lines, separated by \n. When reading lines containing \r\n characters in the text, state.bulkLen adds the newline character \r\n (state.bulkLen 2)

func parseMultiBulkHeader(msg []byte, state *readState) error {
   var err error
   var expectedLine uint64
   expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)
   if err != nil {
      return errors.New("protocol error: " + string(msg))
   if expectedLine == 0 {
      state.expectedArgsCount = 0
      return nil
   } else if expectedLine > 0 {
      state.msgType = msg[0]
      state.readingMultiLine = true
      state.expectedArgsCount = int(expectedLine)
      state.args = make([][]byte, 0, expectedLine)
      return nil
   } else {
      return errors.New("protocol error: " + string(msg))

func parseBulkHeader(msg []byte, state *readState) error {
   var err error
   state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64)
   if err != nil {
      return errors.New("protocol error: " + string(msg))
   if state.bulkLen == -1 { // null bulk
      return nil
   } else if state.bulkLen > 0 {
      state.msgType = msg[0]
      state.readingMultiLine = true
      state.expectedArgsCount = 1
      state.args = make([][]byte, 0, 1)
      return nil
   } else {
      return errors.New("protocol error: " + string(msg))
Copy after login

parseMultiBulkHeader: Parse the header of the array and set the expected number of lines and related parameters.

parseBulkHeader: Parse the header of a multi-line string.

func parseSingleLineReply(msg []byte) (resp.Reply, error) {
   str := strings.TrimSuffix(string(msg), "\r\n")
   var result resp.Reply
   switch msg[0] {
   case &#39;+&#39;: // status reply
      result = reply.MakeStatusReply(str[1:])
   case &#39;-&#39;: // err reply
      result = reply.MakeErrReply(str[1:])
   case &#39;:&#39;: // int reply
      val, err := strconv.ParseInt(str[1:], 10, 64)
      if err != nil {
         return nil, errors.New("protocol error: " + string(msg))
      result = reply.MakeIntReply(val)
   return result, nil

func readBody(msg []byte, state *readState) error {
   line := msg[0 : len(msg)-2]
   var err error
   if line[0] == &#39;$&#39; {
      // bulk reply
      state.bulkLen, err = strconv.ParseInt(string(line[1:]), 10, 64)
      if err != nil {
         return errors.New("protocol error: " + string(msg))
      if state.bulkLen <= 0 { // null bulk in multi bulks
         state.args = append(state.args, []byte{})
         state.bulkLen = 0
   } else {
      state.args = append(state.args, line)
   return nil
Copy after login

parseSingleLineReply: Parse a single-line command

readBody: Read a multi-line command. If it starts with $, set bulkLen. When reading the next line, use this 2. If it does not start with $, add it directly. Go to args

func parse0(reader io.Reader, ch chan<- *Payload) {
    defer func() {
       if err := recover(); err != nil {
    bufReader := bufio.NewReader(reader)
    var state readState
    var err error
    var msg []byte
    for {
       var ioErr bool
       msg, ioErr, err = readLine(bufReader, &state)
       if err != nil {
          if ioErr {
             ch <- &Payload{
                Err: err,
          ch <- &Payload{
             Err: err,
          state = readState{}

       if !state.readingMultiLine {
          if msg[0] == &#39;*&#39; {
             // multi bulk reply
             err = parseMultiBulkHeader(msg, &state)
             if err != nil {
                ch <- &Payload{
                   Err: errors.New("protocol error: " + string(msg)),
                state = readState{}
             if state.expectedArgsCount == 0 {
                ch <- &Payload{
                   Data: &reply.EmptyMultiBulkReply{},
                state = readState{}
         } else if msg[0] == &#39;$&#39; { // bulk reply
             err = parseBulkHeader(msg, &state)
             if err != nil {
                ch <- &Payload{
                   Err: errors.New("protocol error: " + string(msg)),
                state = readState{} // reset state
             if state.bulkLen == -1 { // null bulk reply
                ch <- &Payload{
                   Data: &reply.NullBulkReply{},
                state = readState{} // reset state
         } else {
             // single line reply
             result, err := parseSingleLineReply(msg)
             ch <- &Payload{
                Data: result,
                Err:  err,
             state = readState{} // reset state
      } else {
          // read bulk reply
          err = readBody(msg, &state)
          if err != nil {
             ch <- &Payload{
                Err: errors.New("protocol error: " + string(msg)),
             state = readState{} // reset state
          // if sending finished
          if state.finished() {
             var result resp.Reply
             if state.msgType == &#39;*&#39; {
                result = reply.MakeMultiBulkReply(state.args)
            } else if state.msgType == &#39;$&#39; {
                result = reply.MakeBulkReply(state.args[0])
             ch <- &Payload{
                Data: result,
                Err:  err,
             state = readState{}
Copy after login

parse0: Parse the command and send it through the channel after the parsing is completed


type Connection struct {
    conn net.Conn
    waitingReply wait.Wait
    mu sync.Mutex // 避免多个协程往客户端中写
    selectedDB int

func NewConn(conn net.Conn) *Connection {
    return &Connection{
        conn: conn,

func (c *Connection) RemoteAddr() net.Addr {
    return c.conn.RemoteAddr()

func (c *Connection) Close() error {
    c.waitingReply.WaitWithTimeout(10 * time.Second)
    _ = c.conn.Close()
    return nil

func (c *Connection) Write(b []byte) error {
    if len(b) == 0 {
        return nil
    defer func() {

    _, err := c.conn.Write(b)
    return err

func (c *Connection) GetDBIndex() int {
    return c.selectedDB

func (c *Connection) SelectDB(dbNum int) {
    c.selectedDB = dbNum
Copy after login

We wrote before The EchoHandler is used to receive user input and return it unchanged. Now you need to write a RespHandler to replace the EchoHandler and let the parser perform the parsing. A Connection structure that manages client connections needs to exist in RespHandler.

Connection: Client connection, used in the handler of the protocol layer


var (
   unknownErrReplyBytes = []byte("-ERR unknown\r\n")

type RespHandler struct {
   activeConn sync.Map
   db         databaseface.Database
   closing    atomic.Boolean

func MakeHandler() *RespHandler {
   var db databaseface.Database
   db = database.NewEchoDatabase()
   return &RespHandler{
      db: db,

func (h *RespHandler) closeClient(client *connection.Connection) {
   _ = client.Close()

func (h *RespHandler) Handle(ctx context.Context, conn net.Conn) {
   if h.closing.Get() {
      // closing handler refuse new connection
      _ = conn.Close()

   client := connection.NewConn(conn)
   h.activeConn.Store(client, 1)

   ch := parser.ParseStream(conn)
   for payload := range ch {
      if payload.Err != nil {
         if payload.Err == io.EOF ||
            payload.Err == io.ErrUnexpectedEOF ||
            strings.Contains(payload.Err.Error(), "use of closed network connection") {
            // connection closed
            logger.Info("connection closed: " + client.RemoteAddr().String())
         // protocol err
         errReply := reply.MakeErrReply(payload.Err.Error())
         err := client.Write(errReply.ToBytes())
         if err != nil {
            logger.Info("connection closed: " + client.RemoteAddr().String())
      if payload.Data == nil {
         logger.Error("empty payload")
      r, ok := payload.Data.(*reply.MultiBulkReply)
      if !ok {
         logger.Error("require multi bulk reply")
      result := h.db.Exec(client, r.Args)
      if result != nil {
         _ = client.Write(result.ToBytes())
      } else {
         _ = client.Write(unknownErrReplyBytes)

func (h *RespHandler) Close() error {
   logger.Info("handler shutting down...")
   // TODO: concurrent wait
   h.activeConn.Range(func(key interface{}, val interface{}) bool {
      client := key.(*connection.Connection)
      _ = client.Close()
      return true
   return nil
Copy after login

RespHandler: and The previous echo is similar, with the addition of the core layer's db.exec execution parsing instructions


type CmdLine = [][]byte

type Database interface {
	Exec(client resp.Connection, args [][]byte) resp.Reply
	AfterClientClose(c resp.Connection)

type DataEntity struct {
	Data interface{}
Copy after login

Exec: core layer execution

AfterClientClose: Aftercare method after closing

CmdLine: Command alias for two-dimensional byte array

DataEntity: Represents Redis data, including string, list, set, etc.


type EchoDatabase struct {

func NewEchoDatabase() *EchoDatabase {
   return &EchoDatabase{}

func (e EchoDatabase) Exec(client resp.Connection, args [][]byte) resp.Reply {
   return reply.MakeMultiBulkReply(args)

func (e EchoDatabase) AfterClientClose(c resp.Connection) {
   logger.Info("EchoDatabase AfterClientClose")

func (e EchoDatabase) Close() {
   logger.Info("EchoDatabase Close")
Copy after login

echo_database: Test protocol layer

Exec: After the instruction is parsed, use MakeMultiBulkReply to wrap it and return it


err := tcp.ListenAndServeWithSignal(
      Address: fmt.Sprintf("%s:%d",
if err != nil {
Copy after login

Change main to what you just wrote: handler.MakeHandler()

The above is the detailed content of How to implement Redis protocol parser based on Golang. For more information, please follow other related articles on the PHP Chinese website!

Related labels:
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Latest Issues
How to choose golang web mvc framework
From 1970-01-01 08:00:00
Is it necessary to use nginx when using golang?
From 1970-01-01 08:00:00
golang - vim plug-in to write go
From 1970-01-01 08:00:00
Popular Tutorials
Latest Downloads
Web Effects
Website Source Code
Website Materials
Front End Template