Lab2A&2B-Raft-leader选举与日志同步-实现过程

· 2231字 · 11分钟

Raft 实现真的比较难,首先要从整体上完全理解 Raft 的运行机制就比较困难。再具体到实现上,涉及到很多个后台 goroutine(goroutine 中起 goroutine),如何加锁才能安全读取写入共享变量(一不小心还死锁),并发难以 debug,计时器的实现… 这篇文章记录下我的个人经验把,希望对其他人能有所帮助

实验前 🔗

必看 🔗

选看 🔗

建议 🔗

  • lecture 3 讲 GFS,lecture 4 讲一个复制容错的论文。与我们要实现的 raft 的关系不大,最多算一些前置知识,建议不看。

  • 建议 lab2A 与 lab2B 一起做,虽然 lab2 文档把 lab 分开为两个部分。但是我们是去对照论文实现,尤其在 RequestVoteAppendEnties 两个 RPC 中,这两者混杂在一起

  • 多读几遍论文与 lab2 文档,尤其是论文中的图 2, 再开始 lab…

  • 论文暂时可以只阅读与 2a 2b 相关的部分

实验过程 🔗

思路分析 🔗

论文中的这图太重要了,放上来再看一下吧

image-20211229170522923

在 2a,2c 实验中,我们认为:

  • 所有日志都存在于内存中,持久化在 2c 考虑
  • 与上层应用交互的逻辑应该是这个实验最后考虑的地方,我们先考虑实现 leader 选举,发送心跳的逻辑

三种角色 🔗

字段意义 🔗

按照论文描述和我的实际实现,我在 Raft 实例中增加了以下字段。

这些字段大多数都会被并发读写,请在访问前后加锁


// A Go object implementing a single Raft peer.
//
type Raft struct {
    mu        sync.Mutex          // Lock to protect shared access to this peer's state
    peers     []*labrpc.ClientEnd // RPC end points of all peers
    persister *Persister          // Object to hold this peer's persisted state
    me        int                 // this peer's index into peers[]
    dead      int32               // set by Kill()

    // Your data here (2A, 2B, 2C).
    // Look at the paper's Figure 2 for a description of what
    // state a Raft server must maintain.
    state State            // 此实例状态: Follower, Candidate, Leader
    currentTerm int        // 当前任期
    voteFor int            // 当前任期投票
    logs []LogEntry        // 所有日志条目
    commitIndex int        // for 2b,已经提交的最新日志的 index, "已经提交" == 这条日志已经被大多数 server 所接受
    lastApplied int        // for 2b, 已应用到的上层应用日志最新的 index,后台定时根据 commitIndex 的增加去提交
    applyCh chan ApplyMsg    // for 2b, 需要把已经 committed 的 log apply 到 上层应用
    // for leader
    nextIndex []int        // 记录各个 Server 写入新 Log 的位置 index(意味着这个位置以前的日志在 leader 与 follower 完全一致)
    matchIndex []int    // 各个 Server 已有 Log 的与 leader 保持一致最后的位置 = nextIndex - 1

    electionTimeout time.Time     // 应该被不断 reset,始终比 time.Now() 小。但如果到点了,说明过期了,发起选举
}

type State int

const (
    Follower State = 1
    Candidate State = 2
    Leader State = 3
)

type LogEntry struct {
    Command interface{}
    Term int
}

状态流转如下图所示(论文图 4),牢记,按照这个顺序去一一实现代码逻辑

image-20211229172959678

各个角色后台所需要一直运行的任务

Follower & Candidate & leader: 🔗
  • 检测选举超时,如果超时,需要发起选举。leader 不需要检测,但是需要保持这个后台任务,因为 leader 可以转变成其它状态
  • 应用 log 到上层应用,commitIndex > appliedIndex, 则需要把 [appliedIndex + 1, committedIndex] 的 log,应用到上层应用,for 2b
Leader: 🔗
  • 定时发送日志复制请求(心跳是其一个特例),防止其它 server 发起选举(其它 server 收到请求, 重置选举时间)
  • 更新 leader 的 commitIndex,这里需要检测新 log 是否复制到大多数机器上了,for 2b

两种 RPC 🔗

Raft 就两个 RPC,这里简单说明下它们是干什么的

RequestVote 🔗
  • 触发请求的时机:

    • 选举超时 (follower/candidate 在一段时间没收到 leader 的心跳了)
  • 接收到请求,server 端的逻辑

    1. 拒绝 term 比自己小的请求
    2. 收到 term 比自己大的请求,更新term,转成 Follower,再往下处理
    3. 判断该不该给这个请求投票
      1. 自己在这一轮,还没有投过票
      2. 只给对方日志领先(包括一致)自己日志的投票(这样才能选出包括 commited 日志的 Leader)
  • 收到回复后,client 逻辑

    1. 如果 reply 的 term 比自己 term 大,更新term,转成 Follower,直接返回

    2. 如果 reply 投票成功,numVote ++

    3. numVote 大于一半的服务数,当选成 leader

AppendEntries 🔗
  • 触发请求的时机
    • 定时发送(只有 Leader 发送)
  • 接收到请求,server 端的逻辑
    1. 拒绝 term 比自己小的请求
    2. 收到 term 比自己大的请求,更新term,转成 Follower,再往下处理
    3. 重置选举时间
    4. 对比自己的 Log 与 args 发过来 prevLog
      1. 如果自己的 log 数量少于 prevLogIndex,返回 FALSE
      2. 或者在同一位置 Index 有日志,但 term 不一致,返回 FALSE
    5. 截取掉比 prevLog 多的 Log
    6. append 新的 Log
    7. 根据 leader 发过来的 CommitIndex 与 自己的 LastIndex 更新自己的 commitIndex
  • 收到回复,client 逻辑
    1. 如果 reply 的 term 比自己 term 大,更新term,转成 Follower,直接返回
    2. 如果收到说明复制成功,更新 nextIndex 与 matchIndex
    3. 如果收到复制失败,nextIndex -= 1,在下次心跳中重试

一个具体例子 🔗

如图,一个 Raft 到了现在的状态

image-20211228140653187

放出各个 Raft 实例中字段的值,加深对各个字段理解

Leader0 Follower1 Follower2 Follower3 Follower4
currentTerm 3 3(可能掉线) 3 完全掉线 3
voteFor 0 0 0 3 0
log [1,1,1,2,3,3,3,3] [1,1,1,2,3] [1,1,1,2,3,3,3,3] [1,1] [1,1,1,2,3,3,3]
commitIndex(2B) 7 4 7 1 6
lastApplied(2B) 7 4 7 1 6
nextIndex [9,6,9,3,8] - - - -
matchIndex [8,5,8,2,7] - - - -

如果 leader 此时发起心跳, AppendEntries 请求参数:

Follower1 Follower2 Follower3 Follower4
term 3 3 3 3
leaderID 0 0 0 0
prevLogIndex 5 8 2 7
prevLogTerm 3 3 1 3
entries [3,3,3] [] [1,2,3,3,3,3] [3]
leaderCommit 7 7 7 7

如果 此时 leader 1 也完全掉线了,Follower 1, 2, 4 都有可能发起选举(2 也有可能,这里不讨论了),分别看一下可能发起的第一轮选举 RequestVote args

Follower1 Follower2 Follower4
term 4 4 4
candidateID 1 2 4
lastLogIndex 5 8 7
LastLogTerm 3 3 3

计时器如何实现 🔗

  1. 在一个协程里用死循环
  2. sleep 一个短的时间间隔(5ms)去检查超时时间与当前时间,如果 Now() 大于 超时时间,则说明超时了,需要进行逻辑处理
  3. 其它协程可以不断重置超时时间
for rf.killed() == false {
  if time.Now().After(rf.loadTimeout()) {
    // do ....
  }
  time.Sleep(5 * time.Millisecond)
}

func (rf *Raft) resetTimeout() {
    rand.Seed(time.Now().UnixNano())
    rf.electionTimeout = time.Now().Add(time.Millisecond*time.Duration(ElectionTimeoutMinMs + rand.Intn(ElectionTimeOutMaxMs-ElectionTimeoutMinMs+1)))
}

const (
    HeartbeatMs = 100    // leader 100Ms 发起一次心跳
    ElectionTimeoutMinMs = 250 // 最小的选举时间
    ElectionTimeOutMaxMs = 400    // 最大的选举时间
)

个人经验与踩坑 🔗

1. Raft 实现 Log Index 从 1 开始 🔗
  • Index = 0 即代表刚启动时的 Raft,还没有一条 Log,此时发起投票或者心跳,需要处理这个特殊的 log,mock 一个 term为 -1的 log 返回。
func (rf *Raft) getLog(index int) LogEntry {
    if index == 0 {
        return LogEntry{Term: -1}
    }else {
        return rf.logs[index-1]
    }
}
2. 如何加锁防止 race 🔗
  1. Raft 结构中的变量大多数都是有竞争的,任何对 Raft 结构变量写前请加锁
  2. 两个 RPC handler 都是并发的,请直接给方法加锁
  3. 并发开启 goroutine 读写共享变量加锁
3. 如何加锁防止死锁 🔗
  1. mutex 是不可重入锁
rf.mu.Lock()
loadState()    // 死锁
rf.mu.Unlock()

func (rf *Raft) loadState() State  {
    rf.mu.Lock()
    defer rf.mu.Unlock()
    return rf.state
}
  1. 不要把 rpc 请求锁在里面,在前后分开加锁
  2. 不要忘记释放锁(if 中 unlock)
            rf.mu.Lock()
            if rf.state != Leader {
                // rf.mu.Unlock() if 中的 unlock 
                return
            }
            rf.mu.Unlock()
4. 并发难以 debug,请在关键位置打 Log 🔗
  • Handler 收到 RPC 请求
  • 收到 RPC 回复时
  • 角色身份变化时
5. 两个 RPC 处理有一致的代码逻辑 🔗

如 handler 开始的一段处理逻辑一致

接收到请求,server 端的逻辑

  1. 拒绝 term 比自己小的请求
  2. 收到 term 比自己大的请求,更新term,转成 Follower,再往下处理
// RequestVote handler
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
    // rpc handler 通常是并发,加锁
    rf.mu.Lock()
    defer rf.mu.Unlock()
    log.Printf("%d 在 term %d 收到了 RequestVote request %+v", rf.me, rf.currentTerm, args)
    // rpc handler 通用的规则: args 中 term 比 当前小,直接返回
    if args.Term < rf.currentTerm {
        reply.Term = rf.currentTerm
        return
    }
    // rpc handler 通用规则: args 中的 term 比当前 server 大,当前 server 更新为 term,转成 follower
    if args.Term > rf.currentTerm {
        rf.currentTerm = args.Term
        rf.voteFor = -1
        rf.state = Follower
    }
  // ... 
}

如 client 端

收到回复,client 逻辑

  1. 如果 reply 的 term 比自己 term 大,更新term,转成 Follower,直接返回
if ok := rf.sendRequestVote(i,args, reply); !ok {
  return
}
// 这些变量可能并发修改了,需要注意
rf.mu.Lock()
defer rf.mu.Unlock()
    // 通用规则: reply 中 term 大于当前,转为 Follower
    if reply.Term > rf.currentTerm {
    rf.currentTerm = reply.Term
        rf.voteFor = -1
        rf.toFollower()
        return
}
// ....
6. 尽量使用大锁,不要考虑性能 🔗
7. Logs 复制请用深拷贝,不然很可能有并发 race 问题 🔗
8. RPC 请求前后请 double check: 🔗
  • 当前角色是否还需要进行这个动作,
  • 当前 term 是否还需要处理这个 回复
  • 回复的 term 是否与发起的 term 一致

具体实现 🔗

1. 填写 Raft 和两个 RPC 结构 🔗

根据论文依葫芦画瓢就行,后期根据自己需要再加,raft 结构已放过,这里放一下 RPC 的

type AppendEntriesArgs struct {
    Term int
    LeaderID int
    PrevLogIndex int
    PrevLogTerm int
    Entries []LogEntry
    LeaderCommit int
}

type AppendEntriesReply struct {
    Term int
    Success bool
}

func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool {
    ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
    return ok
}

// AppendEntries handler
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
  // ...
}

type RequestVoteArgs struct {
    Term int
    CandidateID int
    LastLogIndex int
    LastLogTerm int
}

type RequestVoteReply struct {
    Term int
    VoteGranted bool
}

func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {
    ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
    return ok
}

// RequestVote handler
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {

}

2. 启动 Raft,初始化 🔗

  1. 一些状态的初始赋值,如刚启动的 raft 角色是 follower
  2. 启动检测选举超时的协程
  3. 对于 2B 还需要在定期应用 commited 的 log 到上层应用
func Make(peers []*labrpc.ClientEnd, me int,
    persister *Persister, applyCh chan ApplyMsg) *Raft {
    rf := &Raft{}
    rf.peers = peers
    rf.persister = persister
    rf.me = me
    rf.applyCh = applyCh
    // Your initialization code here (2A, 2B, 2C).
    rf.toFollower()
    rf.voteFor = -1
    rf.resetTimeout()
    // initialize from state persisted before a crash
    rf.readPersist(persister.ReadRaftState())
    // start ticker goroutine to start elections
    go rf.ticker()
    go rf.applyMsgToUpper()    // for 2B

    return rf
}

3. leader 选举的逻辑 🔗

上面的初始化中,启动了 go rf.ticker()的协程,现在需要实现 leader 选举这一部分逻辑

  1. 一个死循环,计时器的实现

  2. 如果其角色不是 leader 并且过了选举时间,发起选举

  3. 成为 candiate, 给自己投一票

  4. 并发对其它所有 Raft 实例发起请求

  5. 由于 RPC 消息延迟比较大,因此需要 double check,当前实例的状态是否能投票

  6. 计数得到票数,票数超过一半,当选为 Leader

// The ticker go routine starts a new election if this peer hasn't received
// heartsbeats recently.
func (rf *Raft) ticker() {
    for rf.killed() == false {
        // 状态不是 Leader, 并且已经过了选举时间,需要发起选举
        if rf.loadState() != Leader && time.Now().After(rf.loadTimeout()) {
            // 1. 成为 candidate
            rf.mu.Lock()
            rf.toCandidate()
            // 2. 发起投票
            numVote := 1
            // 所有 requestVote 的请求参数一致
            args := &RequestVoteArgs{
                Term:         rf.currentTerm,
                CandidateID:  rf.me,
                LastLogIndex: rf.getLastIndex(),
                LastLogTerm:  rf.getLog(rf.getLastIndex()).Term,
            }
            rf.mu.Unlock()
            for i, _ := range rf.peers {
                if i == rf.me {
                    continue
                }
                go func(i int) {
                    reply := &RequestVoteReply{}
                    // 不能以不是 Candidate 的身份发送投票
                    if rf.loadState() != Candidate {
                        return
                    }
                    // rpc 请求
                    if ok := rf.sendRequestVote(i,args, reply); !ok {
                        return
                    }
                    // 这些变量可能并发修改了,需要注意
                    rf.mu.Lock()
                    defer rf.mu.Unlock()
                    // 通用规则: reply 中 term 大于当前,转为 Follower
                    if reply.Term > rf.currentTerm {
                        rf.currentTerm = reply.Term
                        rf.voteFor = -1
                        rf.toFollower()
                        return
                    }
                    // double check,因为有可能发起新一轮选举了,才收到这个消息
                    if rf.state != Candidate || reply.Term != rf.currentTerm || reply.Term != args.Term {
                        return
                    }
                    // 投票
                    if reply.VoteGranted {
                        numVote ++
                        log.Printf("%d 在 term %d 收到了 %d 的投票,现在票数 %d", rf.me, rf.currentTerm, i,numVote)
                    }
                    // 成为 Leader
                    if numVote > len(rf.peers)/2 {
                        rf.toLeader()
                    }
                }(i)
            }
        }
        time.Sleep(5 * time.Millisecond)
    }
}

与角色转换的方法:

func (rf *Raft) toFollower()  {
    log.Printf("%d 在term %d 成为 follower", rf.me, rf.currentTerm)
    rf.state = Follower
}

func (rf *Raft) toCandidate() {
    rf.currentTerm ++
    log.Printf("%d 在 term %d 发起了投票", rf.me,rf.currentTerm)
    rf.voteFor = rf.me
    rf.state = Candidate
    rf.resetTimeout()
}

与 log 的有关的工具方法

func (rf *Raft) getLastIndex() int {
    return len(rf.logs)
}

func (rf *Raft) getLog(index int) LogEntry {
    if index == 0 {
        return LogEntry{Term: -1}
    }else {
        return rf.logs[index-1]
    }
}
// 包括: start 和 end, 深拷贝,不然极限情况下会有 race 的 bug
func (rf *Raft) subLog(start int, end int) []LogEntry {
    if start == -1 {
        return append([]LogEntry{}, rf.logs[:end]...)
    }else if end == -1{
        return append([]LogEntry{}, rf.logs[start-1:]...)
    }else {
        return append([]LogEntry{}, rf.logs[start-1: end]...)
    }
}

4. RequestVote server 端的逻辑 🔗

在前面已经描述大致逻辑了,前面一大堆都是共有逻辑,20 行开始才是真正处理逻辑,这里直接看代码

// RequestVote handler
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
    // rpc handler 通常是并发,加锁
    rf.mu.Lock()
    defer rf.mu.Unlock()
    log.Printf("%d 在 term %d 收到了 RequestVote request %+v", rf.me, rf.currentTerm, args)
    // rpc handler 通用的规则: args 中 term 比 当前小,直接返回
    if args.Term < rf.currentTerm {
        reply.Term = rf.currentTerm
        return
    }
    // rpc handler 通用规则: args 中的 term 比当前 server 大,当前 server 更新为 term,转成 follower
    if args.Term > rf.currentTerm {
        rf.currentTerm = args.Term
        rf.voteFor = -1
        rf.state = Follower
    }
    reply.Term = rf.currentTerm
    lastIndex := rf.getLastIndex()
    // 发起投票的 candidate 要比当前有更新的 log
    isNewer := true
    if rf.getLog(lastIndex).Term > args.LastLogTerm {
        isNewer = false
    }else if rf.getLog(lastIndex).Term == args.LastLogTerm {
        if lastIndex > args.LastLogIndex {
            isNewer = false
        }
    }
    if (rf.voteFor == -1 || rf.voteFor == args.CandidateID) && isNewer {
        log.Printf("%d 在 term %d 给 %d 投票", rf.me,rf.currentTerm,args.CandidateID)
        rf.voteFor = args.CandidateID
        reply.VoteGranted = true
        return
    }
}

5. 当选为 Leader 🔗

  1. 初始化 nextIndex 与 matchIndex 数组
  2. 开启心跳后台协程
  3. 开启后台更新 commitIndex 协程
func (rf *Raft) toLeader() {
   log.Printf("%d 在term %d 成为了 leader", rf.me,rf.currentTerm)
   rf.state = Leader
   // 成为 leader 要初始化 nextIndex matchIndex 数组
   rf.nextIndex = make([]int, len(rf.peers))
   rf.matchIndex = make([]int, len(rf.peers))
   for i := 0; i < len(rf.peers); i ++ {
      rf.nextIndex[i] = rf.getLastIndex() + 1
   }
   go rf.heartbeat()
   go rf.updateCommitIndex()    // for 2b
}

6. leader 发起心跳(日志复制)RPC 🔗

定期发起心跳, 角色不是 leader,退出这个协程

func (rf *Raft) heartbeat() {
    for rf.killed() == false {
        rf.mu.Lock()
        log.Printf("%d 在 term %d 触发心跳, state: %d, nextIndex: %+v", rf.me, rf.currentTerm, rf.state, rf.nextIndex) // 共享变量访问 要加锁
        if rf.state != Leader {
            rf.mu.Unlock()
            return
        }
        rf.mu.Unlock()
        rf.sendAppendLogsToAll()
        time.Sleep(HeartbeatMs * time.Millisecond)
    }

把心跳(复制日志)的 RPC 并发发送到每个 RPC 实例,逻辑在上文中提到,这里直接看代码理解

func (rf *Raft) sendAppendLogsToAll() {
    //log.Printf("%d 在 term %d 发起同步", rf.me, rf.currentTerm) // 这里 CurrentTerm 没加锁,可能会有 race
    for i := range rf.peers {
        if i == rf.me {
            continue
        }
        go func(i int) {
            rf.mu.Lock()
            args := &AppendEntriesArgs{
                Term:         rf.currentTerm,
                LeaderID:     rf.me,
                PrevLogIndex: rf.nextIndex[i] - 1,
                PrevLogTerm:  rf.getLog(rf.nextIndex[i] - 1).Term,
                Entries:      rf.subLog(rf.nextIndex[i], -1),
                LeaderCommit: rf.commitIndex,
            }
            reply := &AppendEntriesReply{}
            // 这个检查是因为当一个协程收到比当前 term 大的 reply,会转变成 follower. 后面协程不应该再以 leader 身份发送同步信息
            if rf.state != Leader {
                rf.mu.Unlock()
                return
            }
            rf.mu.Unlock()
            if ok := rf.sendAppendEntries(i, args, reply); !ok {
                return
            }
            rf.mu.Lock()
            defer rf.mu.Unlock()
            log.Printf("%d 在 term %d 收到 %d 的AppendEntries reply %+v", rf.me, rf.currentTerm, i,reply)
            // 所有 server 的规则
            if reply.Term > rf.currentTerm {
                rf.currentTerm = reply.Term
                rf.voteFor = -1
                rf.toFollower()
                return
            }
            // double check
            if rf.state != Leader || reply.Term != rf.currentTerm || reply.Term != args.Term {
                return
            }
            if reply.Success {
                // rf.nextIndex[i] += len(args.Entries) 走过的一个 bug
                // rf.matchIndex[i] = rf.nextIndex[i] - 1
                rf.nextIndex[i] = args.PrevLogIndex + len(args.Entries) + 1
                rf.matchIndex[i] = args.PrevLogIndex + len(args.Entries)
            }else {
        // 实际上把重试的机制推到了下次心跳 
                rf.nextIndex[i] = args.PrevLogIndex
            }
        }(i)
    }
}

7. AppendEntries server 处理逻辑 🔗

这里处理的准则,各个 server 之间 prevLogIndex 之前的日志保持一致,才能增加新的 log。

  • 少 log 的补上 (重试:下一次 prevLogIndex - 1)

  • 不一致的删掉

// AppendEntries handler
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
    // rpc handler 通常是并发,加锁
    rf.mu.Lock()
    defer rf.mu.Unlock()
    log.Printf("%d 在 term %d 收到 %d 的 AppendEntries request %+v", rf.me, rf.currentTerm, args.LeaderID, args)
    // rpc handler 通用的规则: args 中 term 比 当前小,直接返回
    if args.Term < rf.currentTerm {
        reply.Term = rf.currentTerm
        return
    }
    // rpc handler 通用规则: args 中的 term 比当前 server 大,当前 server 更新为 term,转成 follower
    if args.Term > rf.currentTerm {
        rf.currentTerm = args.Term
        rf.voteFor = -1
        rf.toFollower()
    }
    rf.resetTimeout()
    reply.Term = rf.currentTerm
    // 调用者需要减少 PrevLogIndex,重试
    if rf.getLastIndex() < args.PrevLogIndex ||  rf.getLog(args.PrevLogIndex).Term != args.PrevLogTerm {
        return
    }
    reply.Success = true
    // 删除比 leader 多的 log
    rf.logs = rf.subLog(-1, args.PrevLogIndex)
    // 新增的 Entries 放入
    rf.logs = append(rf.logs, args.Entries...)
    // 更新成 Follower 的 commit
    if args.LeaderCommit > rf.commitIndex {
        rf.commitIndex = min(args.LeaderCommit, rf.getLastIndex())
    }
}

到这一步 2a 的逻辑已完成,下面是处理 2b 所需要的。

8. leader 更新 commitIndex 🔗

定期去检查 commiteIndex + 1 的 Log 是否复制到大多数机器上

func (rf *Raft) updateCommitIndex() {
    for rf.killed() == false {
        rf.mu.Lock()

        for n := rf.getLastIndex(); n >= rf.commitIndex + 1; n -- {
            // 论文中提到只能提交当前任期的日志,但我暂时感觉有点无意义,因为这样不还是顺带把任期小的提交了
            if rf.getLog(n).Term != rf.currentTerm {
                break
            }
            count := 0
            for i := range rf.matchIndex {
                if rf.matchIndex[i] >= n {
                    count ++
                }
                if count > len(rf.peers) / 2 {
                    rf.commitIndex = n
                    break
                }
            }
        }
        rf.mu.Unlock()
        time.Sleep(5*time.Microsecond)
    }
}

10. 把消息应用到上层 🔗

也是定期去检查是否存在已 commited 的 log 没有应用到上层应用

func (rf *Raft) applyMsgToUpper() {
    for rf.killed() == false {
        rf.mu.Lock()
        if rf.commitIndex > rf.lastApplied {
            rf.lastApplied ++
            msg := ApplyMsg{
                CommandValid:  true,
                Command:       rf.getLog(rf.lastApplied).Command,
                CommandIndex:  rf.lastApplied,
            }
            rf.applyCh <- msg
            log.Printf("%d 在 term %d 往上层状态机成功发送 Msg %+v", rf.me, rf.currentTerm, msg)
        }
        rf.mu.Unlock()
        time.Sleep(10*time.Millisecond)
    }
}

11. 接受上层应用消息 🔗

func (rf *Raft) Start(command interface{}) (int, int, bool) {
    rf.mu.Lock()
    defer rf.mu.Unlock()
    term, isLeader := rf.currentTerm, rf.state == Leader
    if !isLeader {
        return -1, term, isLeader
    }
    rf.logs = append(rf.logs, LogEntry{
        Command: command,
        Term: term,
    })
    rf.nextIndex[rf.me] = rf.getLastIndex() + 1
    rf.matchIndex[rf.me] = rf.getLastIndex()
    return rf.getLastIndex(), term, isLeader
}

总结 🔗

上述就是我 Raft 2a 2b 部分的实现逻辑了,上述涉及代码可能不太齐全,不过相信你能够补齐。

在测试中请多跑几次脚本(10 次?),一两次无 bug 出现可能只是运气好

个人测试上述的实现,在 2a 2b 测试样例中能完美运行,如果你发现上述实现存在 bug,请评论告知我。