Lab2C-Raft-持久化状态与日志同步优化-实现过程

· 343字 · 2分钟

如果你把 Lab 2B 一步步调过来了,2C 对于你几乎没有难度

实现前准备 🔗

必看 🔗

Lab 2C 目标 🔗

  1. 持久化 state,使 server 重启时能够恢复
  2. 优化 AppendEntires RPC 被拒绝的次数,之前我们使用 prevLogIndex - 1 来慢慢匹配日志的位置,很明显性能不够

LAB 2C 实现 🔗

持久化 state 🔗

需要持久化的 state 在图 2 中进行了说明,分别是:

  • currentTerm:当前任期, 这个肯定需要持久化

  • voteFor:这里持久化的目的是,避免一次任期投两次票

  • logs:日志,重启回复需要重新执行一次命令

因此代码中修改了上述三个变量的地方都要进行一次持久化。

持久化一般到磁盘中,但在这个 lab,使用了 persister 来模拟,方便测试。

  1. 读取持久化状态
//
// restore previously persisted state.
//
func (rf *Raft) readPersist(data []byte) {
    if data == nil || len(data) < 1 { // bootstrap without any state?
        return
    }
    // Your code here (2C).
    // Example:
    r := bytes.NewBuffer(data)
    d := labgob.NewDecoder(r)
    var currentTerm, voteFor int
    var logs []LogEntry
    if d.Decode(&currentTerm) != nil ||
       d.Decode(&voteFor) != nil || d.Decode(&logs) != nil {
      log.Fatalf("反序列化状态失败")
    } else {
      rf.currentTerm = currentTerm
      rf.voteFor = voteFor
      rf.logs = logs
      log.Printf("%d 从 disk 恢复数据:currentTerm=%d,voteFor=%d,logs=%+v", rf.me, rf.currentTerm,rf.voteFor, rf.logs)
    }
}
  1. 持久化数据代码
func (rf *Raft) persist() {
    // Your code here (2C).
    // Example:
    w := new(bytes.Buffer)
    e := labgob.NewEncoder(w)
    if e.Encode(rf.currentTerm) != nil ||
        e.Encode(rf.voteFor) != nil ||
        e.Encode(rf.logs) != nil{
        log.Fatalf("序列化状态失败")
    }
    data := w.Bytes()
    rf.persister.SaveRaftState(data)
}
  1. 在修改了相关 state 的地方加入持久化调用,例如:

  2. Start(command interface{}) (int, int, bool) 中给 append leader log

  3. RPC hander 收到 args 的 term 比当前大,需要更新 term 时

  4. RPC reply 的 term 比当前 term 大,需要更新 term 时

  5. 转变成 Follower,重置 voteFor 时

  6. AppendEntries handler 中 append log 时

优化 AppendEntries RPC 被拒绝的次数 🔗

当一个 Follower 远远落后 Leader 时,需要经过大量被拒绝 AppendEntries 的请求,才能找到匹配开始发送日志中的位置。这里两个可能优化的方向。

  1. 当 Follower 中 Logs 数量小于 leader 中时,直接使用 Follower 的 LastLogIndex 作为下一次匹配的位置,而不是慢慢的减一。
  2. 当 Follower 的 LastLogTerm 不等于 leader 的 LastLogTerm,直接使用 Follower 上一个 term 的位置匹配。

实现代码:

  1. 返回值中增加一个字段,代表下一次匹配的位置
type AppendEntriesReply struct {
    Term int
    Success bool
    ConflictIndex int    // 优化
}
  1. 位于 AppendEntries handler, 修改直接返回 false 部分的代码,返回下一次匹配的位置
// 
// ...
// 调用者需要减少 PrevLogIndex,重试
    if rf.getLastIndex() < args.PrevLogIndex{
        reply.ConflictIndex = rf.getLastIndex() + 1
        return
    }
    prevTerm := rf.getLog(args.PrevLogIndex).Term
    if prevTerm != args.PrevLogTerm {
        index := args.PrevLogIndex
        for rf.getLog(index).Term == prevTerm {
            index --
        }
        reply.ConflictIndex = index + 1
        return
    }
    reply.Success = true
    // ...
  1. 使用下一次匹配的位置进行匹配
// ...            
if reply.Success {
  // ...
    }else {
    // 实际上把重试的机制推到了下次心跳
        rf.nextIndex[i] = reply.ConflictIndex
}
// ....

总结 🔗

这部分实现代码比较简单:1. 持久化状态 2. 一个匹配位置的优化

但 2C 测试样例比较复杂,在这一部分发现了许多之前没测出来的 bug,也加深了我对 Raft 的理解。