Lab2D-Raft-快照-实现过程

· 821字 · 4分钟

实现前准备 🔗

必看 🔗

Lab 2D 实验目标 🔗

问题 🔗

  1. 一个长期运行的服务,不可能永远能存下庞大的 Log

  2. 服务重启后,上层应用每次都需要从头到尾执行一次全部 Log,这个恢复速度太慢

解决 🔗

  • 服务通过不时保存一下它的快照(SnapShot),然后告诉 Raft 快照点之前的日志可以丢了
  • 使:1. Raft 不必维持所有 Log 2. crash 恢复速度也很快速

Lab 2D 实现 🔗

我的理解 🔗

下图是论文中给出的图,它很形象,让我们一下理解快照的含义。但是这个图给了我一个错觉:快照是经过 log 压缩产生的。然而这是错误的,快照是由上层服务产生,传给 raft 层,然后让 Raft 层丢弃这之前的日志。

image-20220103224115636

下面的交互图更加准确,让我们了解到数据流通的方向

交互图

  1. Snapshot(index int, snapshot []byte):这里由上层服务触发,把保存快照传过来,raft 保存快照,裁剪冗余日志
  2. InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) : 这是个 RPC handler,通常为 Follower 落后leader 太多时触发,leader 向 Follower 发起请求,让 Follower 上层服务安装 leader 的快照快速跟上节奏。
  3. CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool: 这是属于 InstallSnapshot RPC 的延续,RPC 在上层服务安装快照后,会触发这个接口更新 Raft 的状态。

State 🔗

  1. 首先我们需要在 Raft 实例中加入跟快照相关的状态
    // For 2D 快照
    lastIncludedIndex int    // 保存到快照中的最后一个 Log 的 index
    lastIncludedTerm  int    // 保存到快照中的最后一个 Log 的 term
    snapshot          []byte // 保存的最近一个快照
  1. 既然 logs 不保存所有 log 了,操作相关 logs 的方法也需要修改(把快照中 log 算上)
func (rf *Raft) getLastIndex() int {
    return len(rf.logs) + rf.lastIncludedIndex
}

func (rf *Raft) getLog(index int) LogEntry {
    if index == 0 {
        return LogEntry{Term: -1}
    }
    if index == rf.lastIncludedIndex {
        return LogEntry{Term: rf.lastIncludedTerm}
    }
    return rf.logs[index-1-rf.lastIncludedIndex]
}

// 包括: start 和 end, 深拷贝,不然极限情况下会有 race 的 bug
func (rf *Raft) subLog(start int, end int) []LogEntry {
    if start > len(rf.logs)+rf.lastIncludedIndex {
        return []LogEntry{}
    }
    if start == -1 {
        return append([]LogEntry{}, rf.logs[:end-rf.lastIncludedIndex]...)
    } else if end == -1 {
        return append([]LogEntry{}, rf.logs[start-1-rf.lastIncludedIndex:]...)
    } else {
        return append([]LogEntry{}, rf.logs[start-1-rf.lastIncludedIndex:end-rf.lastIncludedIndex]...)
    }
}
  1. 这三个 state 也需要持久化,并且快照需要被上层服务读取,persister 额外对快照进行了处理,具体看代码的修改。然后要注意在修改了这个三个状态地方要加上
func (rf *Raft) persist() {
    // Your code here (2C).
    // Example:
    w := new(bytes.Buffer)
    e := labgob.NewEncoder(w)
    if e.Encode(rf.currentTerm) != nil ||
        e.Encode(rf.voteFor) != nil ||
        e.Encode(rf.logs) != nil ||
        e.Encode(rf.lastIncludedIndex) != nil ||
        e.Encode(rf.lastIncludedTerm) != nil {
        log.Fatalf("序列化状态失败")
    }
    data := w.Bytes()
    rf.persister.SaveStateAndSnapshot(data, rf.snapshot)
}

func (rf *Raft) readPersist(data []byte) {
    if data == nil || len(data) < 1 { // bootstrap without any state?
        return
    }
    // Your code here (2C).
    // Example:
    r := bytes.NewBuffer(data)
    d := labgob.NewDecoder(r)
    var currentTerm, voteFor, lastIncludedIndex, lastIncludedTerm int
    var logs []LogEntry
    if d.Decode(&currentTerm) != nil ||
        d.Decode(&voteFor) != nil ||
        d.Decode(&logs) != nil ||
        d.Decode(&lastIncludedIndex) != nil ||
        d.Decode(&lastIncludedTerm) != nil {
        log.Fatalf("反序列化状态失败")
    } else {
        rf.currentTerm = currentTerm
        rf.voteFor = voteFor
        rf.logs = logs
        rf.lastIncludedTerm = lastIncludedTerm
        rf.lastIncludedIndex = lastIncludedIndex
        log.Printf("%d 从 disk 恢复数据:currentTerm=%d,voteFor=%d,logs=%+v", rf.me, rf.currentTerm, rf.voteFor, rf.logs)
    }
    rf.snapshot = rf.persister.ReadSnapshot()
}

Snapshot(index int, snapshot []byte) 🔗

每一个服务单独保存自己的快照

  1. 如果传过来的快照还有没当前已 applied 的日志新,拒绝
  2. 接受快照
    1. 裁剪 index 内的日志
    2. 更新与快照相关的状态:lastIncludedIndex,lastIncludedTerm,snapshot
    3. 持久化
func (rf *Raft) Snapshot(index int, snapshot []byte) {
    // Your code here (2D).
    rf.mu.Lock()
    defer rf.mu.Unlock()
    if index < rf.lastIncludedIndex {
        return
    }
    rf.acceptSnapShot(rf.getLog(index).Term, index, snapshot)
}

func (rf *Raft) acceptSnapShot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) {
    rf.logs = rf.subLog(lastIncludedIndex+1, -1)
    rf.lastIncludedIndex = lastIncludedIndex
    rf.lastIncludedTerm = lastIncludedTerm
    rf.snapshot = append([]byte{}, snapshot...)
    rf.persist()
}

InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) 🔗

  1. 首先我们实现这个 RPC handler,主要就是发消息给上层应用安装快照,具体逻辑在论文中有描述,也可以直接看代码
type InstallSnapshotArgs struct {
    Term              int
    LeaderID          int
    LastIncludedIndex int
    LastIncludedTerm  int
    Snapshot          []byte
}

type InstallSnapshotReply struct {
    Term int
}

func (rf *Raft) sendInstallSnapshot(server int, args *InstallSnapshotArgs, reply *InstallSnapshotReply) bool {
    ok := rf.peers[server].Call("Raft.InstallSnapshot", args, reply)
    return ok
}

// InstallSnapshot handler
func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {
    // rpc handler 通常是并发,加锁
    rf.mu.Lock()
    defer rf.mu.Unlock()
    log.Printf("%d 在 term %d 收到 %d 的 InstallSnapshot request %+v", rf.me, rf.currentTerm, args.LeaderID, args)
    // rpc handler 通用的规则: args 中 term 比 当前小,直接返回
    if args.Term < rf.currentTerm {
        reply.Term = rf.currentTerm
        return
    }
    // rpc handler 通用规则: args 中的 term 比当前 server 大,当前 server 更新为 term,转成 follower
    if args.Term > rf.currentTerm {
        rf.updateTerm(args.Term)
        rf.toFollower()
    }
    reply.Term = rf.currentTerm
    // 拒绝旧的快照
    if args.LastIncludedIndex <= rf.lastIncludedTerm {
        return
    }
    msg := ApplyMsg{
        SnapshotValid: true,
        Snapshot:      args.Snapshot,
        SnapshotTerm:  args.LastIncludedTerm,
        SnapshotIndex: args.LastIncludedIndex,
    }
    // 通知上层应用安装快照,然后上层应用会调用 Raft 中 CondInstallSnap 接口修改 Raft 中相关的值
    rf.applyCh <- msg
}
  1. 什么时候触发这个请求 –> Follower 中 log 落后 leader 太多 –> leader 发送AppendEntries 时,发现自己已经没有保存 args 中 prevLog 了
func (rf *Raft) sendAppendEntriesToAll() {
    //log.Printf("%d 在 term %d 发起同步", rf.me, rf.currentTerm) // 这里 CurrentTerm 没加锁,可能会有 race
    for i := range rf.peers {
        if i == rf.me {
            continue
        }
        go func(i int) {
            rf.mu.Lock()
            // For 2D, 如果需要对比的 prevLog 在 leader 上已经没有了
            // 发送 Leader 的最新快照过去
            prevLogIndex := rf.nextIndex[i] - 1
            if prevLogIndex < rf.lastIncludedIndex {
                prevLogIndex = rf.lastIncludedIndex
                rf.nextIndex[i] = prevLogIndex + 1
                go rf.sendSnapShot(i)
            }
      // ...
    }
  }
}

func (rf *Raft) sendSnapShot(i int) {
    rf.mu.Lock()
    lastIncludedIndex := rf.lastIncludedIndex
    args := &InstallSnapshotArgs{
        Term:              rf.currentTerm,
        LeaderID:          rf.me,
        LastIncludedIndex: lastIncludedIndex,
        LastIncludedTerm:  rf.lastIncludedTerm,
        Snapshot:          append([]byte{}, rf.snapshot...),
    }
    rf.mu.Unlock()
    reply := &InstallSnapshotReply{}
    if ok := rf.sendInstallSnapshot(i, args, reply); !ok {
        return
    }
    rf.mu.Lock()
    defer rf.mu.Unlock()
    if reply.Term > rf.currentTerm {
        rf.updateTerm(reply.Term)
        rf.toFollower()
    }
    if reply.Term != args.Term {
        return
    }
    rf.nextIndex[i] = lastIncludedIndex + 1
    rf.matchIndex[i] = lastIncludedIndex
}

CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool 🔗

上层服务收到 InstallSnapshot RPC handler 会调用这个接口

  1. 判断是否接受这个快照(Leader 发过来的)
  2. 更新 lastApplied, commitIndex
func (rf *Raft) CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool {
   rf.mu.Lock()
   defer rf.mu.Unlock()
   log.Printf("%d 在 term %d 收到 CondInstallSnapshot lastIncludedTerm %d LastIncludedIndex %d", rf.me, rf.currentTerm, rf.lastIncludedTerm, rf.lastIncludedIndex)
   if rf.lastApplied >= lastIncludedIndex {
      return false
   }
   rf.acceptSnapShot(lastIncludedTerm, lastIncludedIndex, snapshot)
   rf.lastApplied = lastIncludedIndex
   if lastIncludedIndex > rf.commitIndex {
      rf.commitIndex = lastIncludedIndex
   }
   return true
}

总结 🔗

lab 2D 是快照部分的实现,只要我们知道快照从上层服务而来,数据的流动方向,总体逻辑并不是很复杂。但是这部分极大增加的代码的复杂度,要达到 bug free,实现起来还是相对比较难的

至此,我们已经完成了 lab 2 的所有功能了。即使上述的代码通过了所有测试样例,它还是可能存在着 bug。让我们在 lab3 来验证它把。

后续补充 🔗

上述实现会出现低概率偶发的死锁,还在排查中…