基于Golang如何实现Redis协议解析器

其他教程   发布日期:2023年11月04日   浏览次数:436

这篇文章主要介绍了基于Golang如何实现Redis协议解析器的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇基于Golang如何实现Redis协议解析器文章都会有所收获,下面我们一起来看看吧。

RESP协议

RESP是客户端与服务端通信的协议,格式有五种:

正常回复:以“+”开头,以“ ”结尾的字符串形式

错误回复:以“-”开头,以“ ”结尾的字符串形式

整数:以“:”开头,以“ ”结尾的字符串形式

多行字符串:以“$”开头,后跟实际发送字节数,再以“ ”开头和结尾

$3 abc

数组:以“*”开头,后跟成员个数

SET key value
*3 $3 SET $3 key $5 value

客户端和服务器发送的命令或数据一律以 (CRLF)作为换行符。

当我们输入*3 $3 SET $3 key $5 value 这样一串命令,服务端接收到的是如下的命令:
*3
$3
SET
$3
key
$5
value

interface/resp/conn.go

  1. type Connection interface {
  2. Write([]byte) error
  3. GetDBIndex() int
  4. SelectDB(int)
  5. }
  6. interface/resp/reply.go
  7. type Reply interface {
  8. ToBytes() []byte
  9. }
  • Connection接口:Redis客户端的一个连接

  • Write:给客户端回复消息

  • GetDBIndex:Redis有16个DB

  • Reply接口:响应接口

resp/reply/consts.go

  1. type PongReply struct{}
  2. var pongBytes = []byte("+PONG
  3. ")
  4. func (r *PongReply) ToBytes() []byte {
  5. return pongBytes
  6. }
  7. var thePongReply = new(PongReply)
  8. func MakePongReply() *PongReply {
  9. return thePongReply
  10. }
  11. type OkReply struct{}
  12. var okBytes = []byte("+OK
  13. ")
  14. func (r *OkReply) ToBytes() []byte {
  15. return okBytes
  16. }
  17. var theOkReply = new(OkReply)
  18. func MakeOkReply() *OkReply {
  19. return theOkReply
  20. }
  21. var nullBulkBytes = []byte("$-1
  22. ")
  23. type NullBulkReply struct{}
  24. func (r *NullBulkReply) ToBytes() []byte {
  25. return nullBulkBytes
  26. }
  27. func MakeNullBulkReply() *NullBulkReply {
  28. return &NullBulkReply{}
  29. }
  30. var emptyMultiBulkBytes = []byte("*0
  31. ")
  32. type EmptyMultiBulkReply struct{}
  33. func (r *EmptyMultiBulkReply) ToBytes() []byte {
  34. return emptyMultiBulkBytes
  35. }
  36. type NoReply struct{}
  37. var noBytes = []byte("")
  38. func (r *NoReply) ToBytes() []byte {
  39. return noBytes
  40. }

定义五种回复:回复pong,ok,null,空数组,空

resp/reply/reply.go

  1. type ErrorReply interface {
  2. Error() string
  3. ToBytes() []byte
  4. }

ErrorReply:定义错误接口

resp/reply/errors.go

  1. type UnknownErrReply struct{}
  2. var unknownErrBytes = []byte("-Err unknown
  3. ")
  4. func (r *UnknownErrReply) ToBytes() []byte {
  5. return unknownErrBytes
  6. }
  7. func (r *UnknownErrReply) Error() string {
  8. return "Err unknown"
  9. }
  10. type ArgNumErrReply struct {
  11. Cmd string
  12. }
  13. func (r *ArgNumErrReply) ToBytes() []byte {
  14. return []byte("-ERR wrong number of arguments for '" + r.Cmd + "' command
  15. ")
  16. }
  17. func (r *ArgNumErrReply) Error() string {
  18. return "ERR wrong number of arguments for '" + r.Cmd + "' command"
  19. }
  20. func MakeArgNumErrReply(cmd string) *ArgNumErrReply {
  21. return &ArgNumErrReply{
  22. Cmd: cmd,
  23. }
  24. }
  25. type SyntaxErrReply struct{}
  26. var syntaxErrBytes = []byte("-Err syntax error
  27. ")
  28. var theSyntaxErrReply = &SyntaxErrReply{}
  29. func MakeSyntaxErrReply() *SyntaxErrReply {
  30. return theSyntaxErrReply
  31. }
  32. func (r *SyntaxErrReply) ToBytes() []byte {
  33. return syntaxErrBytes
  34. }
  35. func (r *SyntaxErrReply) Error() string {
  36. return "Err syntax error"
  37. }
  38. type WrongTypeErrReply struct{}
  39. var wrongTypeErrBytes = []byte("-WRONGTYPE Operation against a key holding the wrong kind of value
  40. ")
  41. func (r *WrongTypeErrReply) ToBytes() []byte {
  42. return wrongTypeErrBytes
  43. }
  44. func (r *WrongTypeErrReply) Error() string {
  45. return "WRONGTYPE Operation against a key holding the wrong kind of value"
  46. }
  47. type ProtocolErrReply struct {
  48. Msg string
  49. }
  50. func (r *ProtocolErrReply) ToBytes() []byte {
  51. return []byte("-ERR Protocol error: '" + r.Msg + "'
  52. ")
  53. }
  54. func (r *ProtocolErrReply) Error() string {
  55. return "ERR Protocol error: '" + r.Msg
  56. }

errors定义5种错误:UnknownErrReply 未知错误,ArgNumErrReply 参数个数错误,SyntaxErrReply 语法错误,WrongTypeErrReply 数据类型错误,ProtocolErrReply 协议错误

resp/reply/reply.go

  1. var (
  2. nullBulkReplyBytes = []byte("$-1")
  3. // 协议的结尾
  4. CRLF = "
  5. "
  6. )
  7. type BulkReply struct {
  8. Arg []byte
  9. }
  10. func MakeBulkReply(arg []byte) *BulkReply {
  11. return &BulkReply{
  12. Arg: arg,
  13. }
  14. }
  15. func (r *BulkReply) ToBytes() []byte {
  16. if len(r.Arg) == 0 {
  17. return nullBulkReplyBytes
  18. }
  19. return []byte("$" + strconv.Itoa(len(r.Arg)) + CRLF + string(r.Arg) + CRLF)
  20. }
  21. type MultiBulkReply struct {
  22. Args [][]byte
  23. }
  24. func (r *MultiBulkReply) ToBytes() []byte {
  25. argLen := len(r.Args)
  26. var buf bytes.Buffer
  27. buf.WriteString("*" + strconv.Itoa(argLen) + CRLF)
  28. for _, arg := range r.Args {
  29. if arg == nil {
  30. buf.WriteString("$-1" + CRLF)
  31. } else {
  32. buf.WriteString("$" + strconv.Itoa(len(arg)) + CRLF + string(arg) + CRLF)
  33. }
  34. }
  35. return buf.Bytes()
  36. }
  37. func MakeMultiBulkReply(args [][]byte) *MultiBulkReply {
  38. return &MultiBulkReply{
  39. Args: args,
  40. }
  41. }
  42. type StatusReply struct {
  43. Status string
  44. }
  45. func MakeStatusReply(status string) *StatusReply {
  46. return &StatusReply{
  47. Status: status,
  48. }
  49. }
  50. func (r *StatusReply) ToBytes() []byte {
  51. return []byte("+" + r.Status + CRLF)
  52. }
  53. type IntReply struct {
  54. Code int64
  55. }
  56. func MakeIntReply(code int64) *IntReply {
  57. return &IntReply{
  58. Code: code,
  59. }
  60. }
  61. func (r *IntReply) ToBytes() []byte {
  62. return []byte(":" + strconv.FormatInt(r.Code, 10) + CRLF)
  63. }
  64. type StandardErrReply struct {
  65. Status string
  66. }
  67. func (r *StandardErrReply) ToBytes() []byte {
  68. return []byte("-" + r.Status + CRLF)
  69. }
  70. func (r *StandardErrReply) Error() string {
  71. return r.Status
  72. }
  73. func MakeErrReply(status string) *StandardErrReply {
  74. return &StandardErrReply{
  75. Status: status,
  76. }
  77. }
  78. func IsErrorReply(reply resp.Reply) bool {
  79. return reply.ToBytes()[0] == '-'
  80. }
  • BulkReply:回复一个字符串

  • MultiBulkReply:回复字符串数组

  • StatusReply:状态回复

  • IntReply:数字回复

  • StandardErrReply:标准错误回复

  • IsErrorReply:判断是否为错误回复

  • ToBytes:将字符串转成RESP协议规定的格式

resp/parser/parser.go

  1. type Payload struct {
  2. Data resp.Reply
  3. Err error
  4. }
  5. type readState struct {
  6. readingMultiLine bool
  7. expectedArgsCount int
  8. msgType byte
  9. args [][]byte
  10. bulkLen int64
  11. }
  12. func (s *readState) finished() bool {
  13. return s.expectedArgsCount > 0 && len(s.args) == s.expectedArgsCount
  14. }
  15. func ParseStream(reader io.Reader) <-chan *Payload {
  16. ch := make(chan *Payload)
  17. go parse0(reader, ch)
  18. return ch
  19. }
  20. func parse0(reader io.Reader, ch chan<- *Payload) {
  21. ......
  22. }

Payload结构体:客服端给我们发的数据

Reply:客户端与服务端互相发的数据都称为Reply

readState结构体:

  • readingMultiLine:解析单行还是多行数据

  • expectedArgsCount:应该读取的参数个数

  • msgType:消息类型

  • args:消息内容

  • bulkLen:数据长度

finished方法:判断解析是否完成

ParseStream方法:异步解析数据后放入管道,返回管道数据

  1. func readLine(bufReader *bufio.Reader, state *readState) ([]byte, bool, error) {
  2. var msg []byte
  3. var err error
  4. if state.bulkLen == 0 {
  5. msg, err = bufReader.ReadBytes('
  6. ')
  7. if err != nil {
  8. return nil, true, err
  9. }
  10. if len(msg) == 0 || msg[len(msg)-2] != '
  11. ' {
  12. return nil, false, errors.New("protocol error: " + string(msg))
  13. }
  14. } else {
  15. msg = make([]byte, state.bulkLen+2)
  16. _, err = io.ReadFull(bufReader, msg)
  17. if err != nil {
  18. return nil, true, err
  19. }
  20. if len(msg) == 0 || msg[len(msg)-2] != '
  21. ' || msg[len(msg)-1] != '
  22. ' {
  23. return nil, false, errors.New("protocol error: " + string(msg))
  24. }
  25. state.bulkLen = 0
  26. }
  27. return msg, false, nil
  28. }

readLine:一行一行的读取。读正常的行,以 分隔。读正文中包含 字符的行时,state.bulkLen加上换行符 (state.bulkLen+2)

  1. func parseMultiBulkHeader(msg []byte, state *readState) error {
  2. var err error
  3. var expectedLine uint64
  4. expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)
  5. if err != nil {
  6. return errors.New("protocol error: " + string(msg))
  7. }
  8. if expectedLine == 0 {
  9. state.expectedArgsCount = 0
  10. return nil
  11. } else if expectedLine > 0 {
  12. state.msgType = msg[0]
  13. state.readingMultiLine = true
  14. state.expectedArgsCount = int(expectedLine)
  15. state.args = make([][]byte, 0, expectedLine)
  16. return nil
  17. } else {
  18. return errors.New("protocol error: " + string(msg))
  19. }
  20. }
  21. func parseBulkHeader(msg []byte, state *readState) error {
  22. var err error
  23. state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64)
  24. if err != nil {
  25. return errors.New("protocol error: " + string(msg))
  26. }
  27. if state.bulkLen == -1 { // null bulk
  28. return nil
  29. } else if state.bulkLen > 0 {
  30. state.msgType = msg[0]
  31. state.readingMultiLine = true
  32. state.expectedArgsCount = 1
  33. state.args = make([][]byte, 0, 1)
  34. return nil
  35. } else {
  36. return errors.New("protocol error: " + string(msg))
  37. }
  38. }

parseMultiBulkHeader:解析数组的头部,设置期望的行数和相关参数。

parseBulkHeader:解析多行字符串的头部。

  1. func parseSingleLineReply(msg []byte) (resp.Reply, error) {
  2. str := strings.TrimSuffix(string(msg), "
  3. ")
  4. var result resp.Reply
  5. switch msg[0] {
  6. case '+': // status reply
  7. result = reply.MakeStatusReply(str[1:])
  8. case '-': // err reply
  9. result = reply.MakeErrReply(str[1:])
  10. case ':': // int reply
  11. val, err := strconv.ParseInt(str[1:], 10, 64)
  12. if err != nil {
  13. return nil, errors.New("protocol error: " + string(msg))
  14. }
  15. result = reply.MakeIntReply(val)
  16. }
  17. return result, nil
  18. }
  19. func readBody(msg []byte, state *readState) error {
  20. line := msg[0 : len(msg)-2]
  21. var err error
  22. if line[0] == '$' {
  23. // bulk reply
  24. state.bulkLen, err = strconv.ParseInt(string(line[1:]), 10, 64)
  25. if err != nil {
  26. return errors.New("protocol error: " + string(msg))
  27. }
  28. if state.bulkLen <= 0 { // null bulk in multi bulks
  29. state.args = append(state.args, []byte{})
  30. state.bulkLen = 0
  31. }
  32. } else {
  33. state.args = append(state.args, line)
  34. }
  35. return nil
  36. }

parseSingleLineReply:解析单行命令

readBody:读取多行的命令,如果是$开头,设置bulkLen,读取下一行时根据这个+2,不是$开头则直接添加到args

  1. func parse0(reader io.Reader, ch chan<- *Payload) {
  2. defer func() {
  3. if err := recover(); err != nil {
  4. logger.Error(string(debug.Stack()))
  5. }
  6. }()
  7. bufReader := bufio.NewReader(reader)
  8. var state readState
  9. var err error
  10. var msg []byte
  11. for {
  12. var ioErr bool
  13. msg, ioErr, err = readLine(bufReader, &state)
  14. if err != nil {
  15. if ioErr {
  16. ch <- &Payload{
  17. Err: err,
  18. }
  19. close(ch)
  20. return
  21. }
  22. ch <- &Payload{
  23. Err: err,
  24. }
  25. state = readState{}
  26. continue
  27. }
  28. if !state.readingMultiLine {
  29. if msg[0] == '*' {
  30. // multi bulk reply
  31. err = parseMultiBulkHeader(msg, &state)
  32. if err != nil {
  33. ch <- &Payload{
  34. Err: errors.New("protocol error: " + string(msg)),
  35. }
  36. state = readState{}
  37. continue
  38. }
  39. if state.expectedArgsCount == 0 {
  40. ch <- &Payload{
  41. Data: &reply.EmptyMultiBulkReply{},
  42. }
  43. state = readState{}
  44. continue
  45. }
  46. } else if msg[0] == '$' { // bulk reply
  47. err = parseBulkHeader(msg, &state)
  48. if err != nil {
  49. ch <- &Payload{
  50. Err: errors.New("protocol error: " + string(msg)),
  51. }
  52. state = readState{} // reset state
  53. continue
  54. }
  55. if state.bulkLen == -1 { // null bulk reply
  56. ch <- &Payload{
  57. Data: &reply.NullBulkReply{},
  58. }
  59. state = readState{} // reset state
  60. continue
  61. }
  62. } else {
  63. // single line reply
  64. result, err := parseSingleLineReply(msg)
  65. ch <- &Payload{
  66. Data: result,
  67. Err: err,
  68. }
  69. state = readState{} // reset state
  70. continue
  71. }
  72. } else {
  73. // read bulk reply
  74. err = readBody(msg, &state)
  75. if err != nil {
  76. ch <- &Payload{
  77. Err: errors.New("protocol error: " + string(msg)),
  78. }
  79. state = readState{} // reset state
  80. continue
  81. }
  82. // if sending finished
  83. if state.finished() {
  84. var result resp.Reply
  85. if state.msgType == '*' {
  86. result = reply.MakeMultiBulkReply(state.args)
  87. } else if state.msgType == '$' {
  88. result = reply.MakeBulkReply(state.args[0])
  89. }
  90. ch <- &Payload{
  91. Data: result,
  92. Err: err,
  93. }
  94. state = readState{}
  95. }
  96. }
  97. }
  98. }

parse0:解析指令,解析完成后通过channel发出去

resp/connection/conn.go

  1. type Connection struct {
  2. conn net.Conn
  3. waitingReply wait.Wait
  4. mu sync.Mutex // 避免多个协程往客户端中写
  5. selectedDB int
  6. }
  7. func NewConn(conn net.Conn) *Connection {
  8. return &Connection{
  9. conn: conn,
  10. }
  11. }
  12. func (c *Connection) RemoteAddr() net.Addr {
  13. return c.conn.RemoteAddr()
  14. }
  15. func (c *Connection) Close() error {
  16. c.waitingReply.WaitWithTimeout(10 * time.Second)
  17. _ = c.conn.Close()
  18. return nil
  19. }
  20. func (c *Connection) Write(b []byte) error {
  21. if len(b) == 0 {
  22. return nil
  23. }
  24. c.mu.Lock()
  25. c.waitingReply.Add(1)
  26. defer func() {
  27. c.waitingReply.Done()
  28. c.mu.Unlock()
  29. }()
  30. _, err := c.conn.Write(b)
  31. return err
  32. }
  33. func (c *Connection) GetDBIndex() int {
  34. return c.selectedDB
  35. }
  36. func (c *Connection) SelectDB(dbNum int) {
  37. c.selectedDB = dbNum
  38. }

之前写的EchoHandler是用户传过来什么,我们传回去什么。现在要写一个RespHandler来代替EchoHandler,让解析器来解析。RespHandler中要有一个管理客户端连接的结构体Connection。

Connection:客户端连接,在协议层的handler中会用到

resp/handler/handler.go

  1. var (
  2. unknownErrReplyBytes = []byte("-ERR unknown
  3. ")
  4. )
  5. type RespHandler struct {
  6. activeConn sync.Map
  7. db databaseface.Database
  8. closing atomic.Boolean
  9. }
  10. func MakeHandler() *RespHandler {
  11. var db databaseface.Database
  12. db = database.NewEchoDatabase()
  13. return &RespHandler{
  14. db: db,
  15. }
  16. }
  17. func (h *RespHandler) closeClient(client *connection.Connection) {
  18. _ = client.Close()
  19. h.db.AfterClientClose(client)
  20. h.activeConn.Delete(client)
  21. }
  22. func (h *RespHandler) Handle(ctx context.Context, conn net.Conn) {
  23. if h.closing.Get() {
  24. // closing handler refuse new connection
  25. _ = conn.Close()
  26. }
  27. client := connection.NewConn(conn)
  28. h.activeConn.Store(client, 1)
  29. ch := parser.ParseStream(conn)
  30. for payload := range ch {
  31. if payload.Err != nil {
  32. if payload.Err == io.EOF ||
  33. payload.Err == io.ErrUnexpectedEOF ||
  34. strings.Contains(payload.Err.Error(), "use of closed network connection") {
  35. // connection closed
  36. h.closeClient(client)
  37. logger.Info("connection closed: " + client.RemoteAddr().String())
  38. return
  39. }
  40. // protocol err
  41. errReply := reply.MakeErrReply(payload.Err.Error())
  42. err := client.Write(errReply.ToBytes())
  43. if err != nil {
  44. h.closeClient(client)
  45. logger.Info("connection closed: " + client.RemoteAddr().String())
  46. return
  47. }
  48. continue
  49. }
  50. if payload.Data == nil {
  51. logger.Error("empty payload")
  52. continue
  53. }
  54. r, ok := payload.Data.(*reply.MultiBulkReply)
  55. if !ok {
  56. logger.Error("require multi bulk reply")
  57. continue
  58. }
  59. result := h.db.Exec(client, r.Args)
  60. if result != nil {
  61. _ = client.Write(result.ToBytes())
  62. } else {
  63. _ = client.Write(unknownErrReplyBytes)
  64. }
  65. }
  66. }
  67. func (h *RespHandler) Close() error {
  68. logger.Info("handler shutting down...")
  69. h.closing.Set(true)
  70. // TODO: concurrent wait
  71. h.activeConn.Range(func(key interface{}, val interface{}) bool {
  72. client := key.(*connection.Connection)
  73. _ = client.Close()
  74. return true
  75. })
  76. h.db.Close()
  77. return nil
  78. }

RespHandler:和之前的echo类似,加了核心层的db.exec执行解析的指令

interface/database/database.go

  1. type CmdLine = [][]byte
  2. type Database interface {
  3. Exec(client resp.Connection, args [][]byte) resp.Reply
  4. AfterClientClose(c resp.Connection)
  5. Close()
  6. }
  7. type DataEntity struct {
  8. Data interface{}
  9. }

Exec:核心层的执行

AfterClientClose:关闭之后的善后方法

CmdLine:二维字节数组的指令别名

DataEntity:表示Redis的数据,包括string, list, set等等

database/echo_database.go

  1. type EchoDatabase struct {
  2. }
  3. func NewEchoDatabase() *EchoDatabase {
  4. return &EchoDatabase{}
  5. }
  6. func (e EchoDatabase) Exec(client resp.Connection, args [][]byte) resp.Reply {
  7. return reply.MakeMultiBulkReply(args)
  8. }
  9. func (e EchoDatabase) AfterClientClose(c resp.Connection) {
  10. logger.Info("EchoDatabase AfterClientClose")
  11. }
  12. func (e EchoDatabase) Close() {
  13. logger.Info("EchoDatabase Close")
  14. }

echo_database:测试协议层

Exec:指令解析后,再使用MakeMultiBulkReply包装一下返回去

main.go

  1. err := tcp.ListenAndServeWithSignal(
  2. &tcp.Config{
  3. Address: fmt.Sprintf("%s:%d",
  4. config.Properties.Bind,
  5. config.Properties.Port),
  6. },
  7. handler.MakeHandler())
  8. if err != nil {
  9. logger.Error(err)
  10. }

main改成刚才写的:handler.MakeHandler()

以上就是基于Golang如何实现Redis协议解析器的详细内容,更多关于基于Golang如何实现Redis协议解析器的资料请关注九品源码其它相关文章!