如果你把 Lab 2B 一步步调过来了,2C 对于你几乎没有难度
实现前准备 🔗
必看 🔗
- raft论文 Safety、持久化、AppendEntries 优化
- Lab2-实验文档-Raft-翻译Lab2-实验文档-Raft-翻译 2C
Lab 2C 目标 🔗
- 持久化 state,使 server 重启时能够恢复
- 优化
AppendEntires
RPC 被拒绝的次数,之前我们使用 prevLogIndex - 1 来慢慢匹配日志的位置,很明显性能不够
LAB 2C 实现 🔗
持久化 state 🔗
需要持久化的 state 在图 2 中进行了说明,分别是:
-
currentTerm
:当前任期, 这个肯定需要持久化 -
voteFor
:这里持久化的目的是,避免一次任期投两次票 -
logs
:日志,重启回复需要重新执行一次命令
因此代码中修改了上述三个变量的地方都要进行一次持久化。
持久化一般到磁盘中,但在这个 lab,使用了 persister
来模拟,方便测试。
- 读取持久化状态
//
// restore previously persisted state.
//
func (rf *Raft) readPersist(data []byte) {
if data == nil || len(data) < 1 { // bootstrap without any state?
return
}
// Your code here (2C).
// Example:
r := bytes.NewBuffer(data)
d := labgob.NewDecoder(r)
var currentTerm, voteFor int
var logs []LogEntry
if d.Decode(¤tTerm) != nil ||
d.Decode(&voteFor) != nil || d.Decode(&logs) != nil {
log.Fatalf("反序列化状态失败")
} else {
rf.currentTerm = currentTerm
rf.voteFor = voteFor
rf.logs = logs
log.Printf("%d 从 disk 恢复数据:currentTerm=%d,voteFor=%d,logs=%+v", rf.me, rf.currentTerm,rf.voteFor, rf.logs)
}
}
- 持久化数据代码
func (rf *Raft) persist() {
// Your code here (2C).
// Example:
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
if e.Encode(rf.currentTerm) != nil ||
e.Encode(rf.voteFor) != nil ||
e.Encode(rf.logs) != nil{
log.Fatalf("序列化状态失败")
}
data := w.Bytes()
rf.persister.SaveRaftState(data)
}
-
在修改了相关 state 的地方加入持久化调用,例如:
-
Start(command interface{}) (int, int, bool)
中给 append leader log -
RPC hander 收到 args 的 term 比当前大,需要更新 term 时
-
RPC reply 的 term 比当前 term 大,需要更新 term 时
-
转变成 Follower,重置 voteFor 时
-
AppendEntries
handler 中 append log 时
优化 AppendEntries
RPC 被拒绝的次数 🔗
当一个 Follower 远远落后 Leader 时,需要经过大量被拒绝 AppendEntries
的请求,才能找到匹配开始发送日志中的位置。这里两个可能优化的方向。
- 当 Follower 中 Logs 数量小于 leader 中时,直接使用 Follower 的 LastLogIndex 作为下一次匹配的位置,而不是慢慢的减一。
- 当 Follower 的 LastLogTerm 不等于 leader 的 LastLogTerm,直接使用 Follower 上一个 term 的位置匹配。
实现代码:
- 返回值中增加一个字段,代表下一次匹配的位置
type AppendEntriesReply struct {
Term int
Success bool
ConflictIndex int // 优化
}
- 位于 AppendEntries handler, 修改直接返回 false 部分的代码,返回下一次匹配的位置
//
// ...
// 调用者需要减少 PrevLogIndex,重试
if rf.getLastIndex() < args.PrevLogIndex{
reply.ConflictIndex = rf.getLastIndex() + 1
return
}
prevTerm := rf.getLog(args.PrevLogIndex).Term
if prevTerm != args.PrevLogTerm {
index := args.PrevLogIndex
for rf.getLog(index).Term == prevTerm {
index --
}
reply.ConflictIndex = index + 1
return
}
reply.Success = true
// ...
- 使用下一次匹配的位置进行匹配
// ...
if reply.Success {
// ...
}else {
// 实际上把重试的机制推到了下次心跳
rf.nextIndex[i] = reply.ConflictIndex
}
// ....
总结 🔗
这部分实现代码比较简单:1. 持久化状态 2. 一个匹配位置的优化
但 2C 测试样例比较复杂,在这一部分发现了许多之前没测出来的 bug,也加深了我对 Raft 的理解。