实现前准备 🔗
必看 🔗
- raft论文 Log compaction 部分
- Lab2-实验文档-Raft-翻译Lab2-实验文档-Raft-翻译 2D
- 交互图
Lab 2D 实验目标 🔗
问题 🔗
-
一个长期运行的服务,不可能永远能存下庞大的 Log
-
服务重启后,上层应用每次都需要从头到尾执行一次全部 Log,这个恢复速度太慢
解决 🔗
- 服务通过不时保存一下它的快照(SnapShot),然后告诉 Raft 快照点之前的日志可以丢了
- 使:1. Raft 不必维持所有 Log 2. crash 恢复速度也很快速
Lab 2D 实现 🔗
我的理解 🔗
下图是论文中给出的图,它很形象,让我们一下理解快照的含义。但是这个图给了我一个错觉:快照是经过 log 压缩产生的。然而这是错误的,快照是由上层服务产生,传给 raft 层,然后让 Raft 层丢弃这之前的日志。
下面的交互图更加准确,让我们了解到数据流通的方向
Snapshot(index int, snapshot []byte)
:这里由上层服务触发,把保存快照传过来,raft 保存快照,裁剪冗余日志InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply)
: 这是个 RPC handler,通常为 Follower 落后leader 太多时触发,leader 向 Follower 发起请求,让 Follower 上层服务安装 leader 的快照快速跟上节奏。CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool
: 这是属于InstallSnapshot
RPC 的延续,RPC 在上层服务安装快照后,会触发这个接口更新 Raft 的状态。
State 🔗
- 首先我们需要在 Raft 实例中加入跟快照相关的状态
// For 2D 快照
lastIncludedIndex int // 保存到快照中的最后一个 Log 的 index
lastIncludedTerm int // 保存到快照中的最后一个 Log 的 term
snapshot []byte // 保存的最近一个快照
- 既然 logs 不保存所有 log 了,操作相关 logs 的方法也需要修改(把快照中 log 算上)
func (rf *Raft) getLastIndex() int {
return len(rf.logs) + rf.lastIncludedIndex
}
func (rf *Raft) getLog(index int) LogEntry {
if index == 0 {
return LogEntry{Term: -1}
}
if index == rf.lastIncludedIndex {
return LogEntry{Term: rf.lastIncludedTerm}
}
return rf.logs[index-1-rf.lastIncludedIndex]
}
// 包括: start 和 end, 深拷贝,不然极限情况下会有 race 的 bug
func (rf *Raft) subLog(start int, end int) []LogEntry {
if start > len(rf.logs)+rf.lastIncludedIndex {
return []LogEntry{}
}
if start == -1 {
return append([]LogEntry{}, rf.logs[:end-rf.lastIncludedIndex]...)
} else if end == -1 {
return append([]LogEntry{}, rf.logs[start-1-rf.lastIncludedIndex:]...)
} else {
return append([]LogEntry{}, rf.logs[start-1-rf.lastIncludedIndex:end-rf.lastIncludedIndex]...)
}
}
- 这三个 state 也需要持久化,并且快照需要被上层服务读取,persister 额外对快照进行了处理,具体看代码的修改。然后要注意在修改了这个三个状态地方要加上
func (rf *Raft) persist() {
// Your code here (2C).
// Example:
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
if e.Encode(rf.currentTerm) != nil ||
e.Encode(rf.voteFor) != nil ||
e.Encode(rf.logs) != nil ||
e.Encode(rf.lastIncludedIndex) != nil ||
e.Encode(rf.lastIncludedTerm) != nil {
log.Fatalf("序列化状态失败")
}
data := w.Bytes()
rf.persister.SaveStateAndSnapshot(data, rf.snapshot)
}
func (rf *Raft) readPersist(data []byte) {
if data == nil || len(data) < 1 { // bootstrap without any state?
return
}
// Your code here (2C).
// Example:
r := bytes.NewBuffer(data)
d := labgob.NewDecoder(r)
var currentTerm, voteFor, lastIncludedIndex, lastIncludedTerm int
var logs []LogEntry
if d.Decode(¤tTerm) != nil ||
d.Decode(&voteFor) != nil ||
d.Decode(&logs) != nil ||
d.Decode(&lastIncludedIndex) != nil ||
d.Decode(&lastIncludedTerm) != nil {
log.Fatalf("反序列化状态失败")
} else {
rf.currentTerm = currentTerm
rf.voteFor = voteFor
rf.logs = logs
rf.lastIncludedTerm = lastIncludedTerm
rf.lastIncludedIndex = lastIncludedIndex
log.Printf("%d 从 disk 恢复数据:currentTerm=%d,voteFor=%d,logs=%+v", rf.me, rf.currentTerm, rf.voteFor, rf.logs)
}
rf.snapshot = rf.persister.ReadSnapshot()
}
Snapshot(index int, snapshot []byte)
🔗
每一个服务单独保存自己的快照
- 如果传过来的快照还有没当前已 applied 的日志新,拒绝
- 接受快照
- 裁剪 index 内的日志
- 更新与快照相关的状态:
lastIncludedIndex
,lastIncludedTerm
,snapshot
- 持久化
func (rf *Raft) Snapshot(index int, snapshot []byte) {
// Your code here (2D).
rf.mu.Lock()
defer rf.mu.Unlock()
if index < rf.lastIncludedIndex {
return
}
rf.acceptSnapShot(rf.getLog(index).Term, index, snapshot)
}
func (rf *Raft) acceptSnapShot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) {
rf.logs = rf.subLog(lastIncludedIndex+1, -1)
rf.lastIncludedIndex = lastIncludedIndex
rf.lastIncludedTerm = lastIncludedTerm
rf.snapshot = append([]byte{}, snapshot...)
rf.persist()
}
InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply)
🔗
- 首先我们实现这个 RPC handler,主要就是发消息给上层应用安装快照,具体逻辑在论文中有描述,也可以直接看代码
type InstallSnapshotArgs struct {
Term int
LeaderID int
LastIncludedIndex int
LastIncludedTerm int
Snapshot []byte
}
type InstallSnapshotReply struct {
Term int
}
func (rf *Raft) sendInstallSnapshot(server int, args *InstallSnapshotArgs, reply *InstallSnapshotReply) bool {
ok := rf.peers[server].Call("Raft.InstallSnapshot", args, reply)
return ok
}
// InstallSnapshot handler
func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {
// rpc handler 通常是并发,加锁
rf.mu.Lock()
defer rf.mu.Unlock()
log.Printf("%d 在 term %d 收到 %d 的 InstallSnapshot request %+v", rf.me, rf.currentTerm, args.LeaderID, args)
// rpc handler 通用的规则: args 中 term 比 当前小,直接返回
if args.Term < rf.currentTerm {
reply.Term = rf.currentTerm
return
}
// rpc handler 通用规则: args 中的 term 比当前 server 大,当前 server 更新为 term,转成 follower
if args.Term > rf.currentTerm {
rf.updateTerm(args.Term)
rf.toFollower()
}
reply.Term = rf.currentTerm
// 拒绝旧的快照
if args.LastIncludedIndex <= rf.lastIncludedTerm {
return
}
msg := ApplyMsg{
SnapshotValid: true,
Snapshot: args.Snapshot,
SnapshotTerm: args.LastIncludedTerm,
SnapshotIndex: args.LastIncludedIndex,
}
// 通知上层应用安装快照,然后上层应用会调用 Raft 中 CondInstallSnap 接口修改 Raft 中相关的值
rf.applyCh <- msg
}
- 什么时候触发这个请求 –> Follower 中 log 落后 leader 太多 –> leader 发送
AppendEntries
时,发现自己已经没有保存 args 中 prevLog 了
func (rf *Raft) sendAppendEntriesToAll() {
//log.Printf("%d 在 term %d 发起同步", rf.me, rf.currentTerm) // 这里 CurrentTerm 没加锁,可能会有 race
for i := range rf.peers {
if i == rf.me {
continue
}
go func(i int) {
rf.mu.Lock()
// For 2D, 如果需要对比的 prevLog 在 leader 上已经没有了
// 发送 Leader 的最新快照过去
prevLogIndex := rf.nextIndex[i] - 1
if prevLogIndex < rf.lastIncludedIndex {
prevLogIndex = rf.lastIncludedIndex
rf.nextIndex[i] = prevLogIndex + 1
go rf.sendSnapShot(i)
}
// ...
}
}
}
func (rf *Raft) sendSnapShot(i int) {
rf.mu.Lock()
lastIncludedIndex := rf.lastIncludedIndex
args := &InstallSnapshotArgs{
Term: rf.currentTerm,
LeaderID: rf.me,
LastIncludedIndex: lastIncludedIndex,
LastIncludedTerm: rf.lastIncludedTerm,
Snapshot: append([]byte{}, rf.snapshot...),
}
rf.mu.Unlock()
reply := &InstallSnapshotReply{}
if ok := rf.sendInstallSnapshot(i, args, reply); !ok {
return
}
rf.mu.Lock()
defer rf.mu.Unlock()
if reply.Term > rf.currentTerm {
rf.updateTerm(reply.Term)
rf.toFollower()
}
if reply.Term != args.Term {
return
}
rf.nextIndex[i] = lastIncludedIndex + 1
rf.matchIndex[i] = lastIncludedIndex
}
CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool
🔗
上层服务收到 InstallSnapshot
RPC handler 会调用这个接口
- 判断是否接受这个快照(Leader 发过来的)
- 更新
lastApplied
,commitIndex
func (rf *Raft) CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool {
rf.mu.Lock()
defer rf.mu.Unlock()
log.Printf("%d 在 term %d 收到 CondInstallSnapshot lastIncludedTerm %d LastIncludedIndex %d", rf.me, rf.currentTerm, rf.lastIncludedTerm, rf.lastIncludedIndex)
if rf.lastApplied >= lastIncludedIndex {
return false
}
rf.acceptSnapShot(lastIncludedTerm, lastIncludedIndex, snapshot)
rf.lastApplied = lastIncludedIndex
if lastIncludedIndex > rf.commitIndex {
rf.commitIndex = lastIncludedIndex
}
return true
}
总结 🔗
lab 2D 是快照部分的实现,只要我们知道快照从上层服务而来,数据的流动方向,总体逻辑并不是很复杂。但是这部分极大增加的代码的复杂度,要达到 bug free,实现起来还是相对比较难的
至此,我们已经完成了 lab 2 的所有功能了。即使上述的代码通过了所有测试样例,它还是可能存在着 bug。让我们在 lab3 来验证它把。
后续补充 🔗
上述实现会出现低概率偶发的死锁,还在排查中…