GO怎么实现Redis的AOF持久化

其他教程   发布日期:2024年04月21日   浏览次数:338

这篇文章主要介绍“GO怎么实现Redis的AOF持久化”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“GO怎么实现Redis的AOF持久化”文章能帮助大家解决问题。

GO实现Redis的AOF持久化

将用户发来的指令以RESP协议的形式存储在本地的AOF文件,重启Redis后执行此文件恢复数据

本文涉及以下文件:redis.conf:配置文件

aof:实现aof

redis.conf

appendonly yes
appendfilename appendonly.aof

aof/aof.go

  1. type CmdLine = [][]byte
  2. const (
  3. aofQueueSize = 1 << 16
  4. )
  5. type payload struct {
  6. cmdLine CmdLine
  7. dbIndex int
  8. }
  9. type AofHandler struct {
  10. db databaseface.Database
  11. aofChan chan *payload
  12. aofFile *os.File
  13. aofFilename string
  14. currentDB int
  15. }
  16. func NewAOFHandler(db databaseface.Database) (*AofHandler, error) {
  17. handler := &AofHandler{}
  18. handler.aofFilename = config.Properties.AppendFilename
  19. handler.db = db
  20. handler.LoadAof()
  21. aofFile, err := os.OpenFile(handler.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)
  22. if err != nil {
  23. return nil, err
  24. }
  25. handler.aofFile = aofFile
  26. handler.aofChan = make(chan *payload, aofQueueSize)
  27. go func() {
  28. handler.handleAof()
  29. }()
  30. return handler, nil
  31. }
  32. func (handler *AofHandler) AddAof(dbIndex int, cmdLine CmdLine) {
  33. if config.Properties.AppendOnly && handler.aofChan != nil {
  34. handler.aofChan <- &payload{
  35. cmdLine: cmdLine,
  36. dbIndex: dbIndex,
  37. }
  38. }
  39. }
  40. func (handler *AofHandler) handleAof() {
  41. handler.currentDB = 0
  42. for p := range handler.aofChan {
  43. if p.dbIndex != handler.currentDB {
  44. // select db
  45. data := reply.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))).ToBytes()
  46. _, err := handler.aofFile.Write(data)
  47. if err != nil {
  48. logger.Warn(err)
  49. continue
  50. }
  51. handler.currentDB = p.dbIndex
  52. }
  53. data := reply.MakeMultiBulkReply(p.cmdLine).ToBytes()
  54. _, err := handler.aofFile.Write(data)
  55. if err != nil {
  56. logger.Warn(err)
  57. }
  58. }
  59. }
  60. func (handler *AofHandler) LoadAof() {
  61. file, err := os.Open(handler.aofFilename)
  62. if err != nil {
  63. logger.Warn(err)
  64. return
  65. }
  66. defer file.Close()
  67. ch := parser.ParseStream(file)
  68. fakeConn := &connection.Connection{}
  69. for p := range ch {
  70. if p.Err != nil {
  71. if p.Err == io.EOF {
  72. break
  73. }
  74. logger.Error("parse error: " + p.Err.Error())
  75. continue
  76. }
  77. if p.Data == nil {
  78. logger.Error("empty payload")
  79. continue
  80. }
  81. r, ok := p.Data.(*reply.MultiBulkReply)
  82. if !ok {
  83. logger.Error("require multi bulk reply")
  84. continue
  85. }
  86. ret := handler.db.Exec(fakeConn, r.Args)
  87. if reply.IsErrorReply(ret) {
  88. logger.Error("exec err", err)
  89. }
  90. }
  91. }
  • AofHandler:1.从管道中接收数据 2.写入AOF文件

  • AddAof:用户的指令包装成payload放入管道

  • handleAof:将管道中的payload写入磁盘

  • LoadAof:重启Redis后加载aof文件

database/database.go

  1. type Database struct {
  2. dbSet []*DB
  3. aofHandler *aof.AofHandler
  4. }
  5. func NewDatabase() *Database {
  6. mdb := &Database{}
  7. if config.Properties.Databases == 0 {
  8. config.Properties.Databases = 16
  9. }
  10. mdb.dbSet = make([]*DB, config.Properties.Databases)
  11. for i := range mdb.dbSet {
  12. singleDB := makeDB()
  13. singleDB.index = i
  14. mdb.dbSet[i] = singleDB
  15. }
  16. if config.Properties.AppendOnly {
  17. aofHandler, err := aof.NewAOFHandler(mdb)
  18. if err != nil {
  19. panic(err)
  20. }
  21. mdb.aofHandler = aofHandler
  22. for _, db := range mdb.dbSet {
  23. singleDB := db
  24. singleDB.addAof = func(line CmdLine) {
  25. mdb.aofHandler.AddAof(singleDB.index, line)
  26. }
  27. }
  28. }
  29. return mdb
  30. }

将AOF加入到database里

使用singleDB的原因:因为在循环中获取返回变量的地址都完全相同,因此当我们想要访问数组中元素所在的地址时,不应该直接获取 range 返回的变量地址 db,而应该使用 singleDB := db

database/db.go

  1. type DB struct {
  2. index int
  3. data dict.Dict
  4. addAof func(CmdLine)
  5. }
  6. func makeDB() *DB {
  7. db := &DB{
  8. data: dict.MakeSyncDict(),
  9. addAof: func(line CmdLine) {},
  10. }
  11. return db
  12. }

由于分数据库db引用不到aof,所以添加一个addAof匿名函数,在NewDatabase中用这个匿名函数调用AddAof

database/keys.go

  1. func execDel(db *DB, args [][]byte) resp.Reply {
  2. ......
  3. if deleted > 0 {
  4. db.addAof(utils.ToCmdLine2("del", args...))
  5. }
  6. return reply.MakeIntReply(int64(deleted))
  7. }
  8. func execFlushDB(db *DB, args [][]byte) resp.Reply {
  9. db.Flush()
  10. db.addAof(utils.ToCmdLine2("flushdb", args...))
  11. return &reply.OkReply{}
  12. }
  13. func execRename(db *DB, args [][]byte) resp.Reply {
  14. ......
  15. db.addAof(utils.ToCmdLine2("rename", args...))
  16. return &reply.OkReply{}
  17. }
  18. func execRenameNx(db *DB, args [][]byte) resp.Reply {
  19. ......
  20. db.addAof(utils.ToCmdLine2("renamenx", args...))
  21. return reply.MakeIntReply(1)
  22. }

database/string.go

  1. func execSet(db *DB, args [][]byte) resp.Reply {
  2. ......
  3. db.addAof(utils.ToCmdLine2("set", args...))
  4. return &reply.OkReply{}
  5. }
  6. func execSetNX(db *DB, args [][]byte) resp.Reply {
  7. ......
  8. db.addAof(utils.ToCmdLine2("setnx", args...))
  9. return reply.MakeIntReply(int64(result))
  10. }
  11. func execGetSet(db *DB, args [][]byte) resp.Reply {
  12. key := string(args[0])
  13. value := args[1]
  14. entity, exists := db.GetEntity(key)
  15. db.PutEntity(key, &database.DataEntity{Data: value})
  16. db.addAof(utils.ToCmdLine2("getset", args...))
  17. ......
  18. }

添加addAof方法

测试命令

*3 $3 SET $3 key $5 value
*2 $3 GET $3 key
*2 $6 SELECT $1 1

以上就是GO怎么实现Redis的AOF持久化的详细内容,更多关于GO怎么实现Redis的AOF持久化的资料请关注九品源码其它相关文章!