func(rf*Raft)AppendEntries(args*AppendEntriesArgs,reply*AppendEntriesReply){rf.mu.Lock()deferrf.mu.Unlock()rf.lastReceive=time.Now()reply.Term=rf.currentTerm// 1. Reply false if term < currentTerm (§5.1)
// Switch to Follower if newer term found
// 2. Reply false if log doesn't contain an entry at prevLogIndex
// whose term matches prevLogTerm (§5.3)
// rf.log doesn't contain entry at PrevLogIndex
// reply false and return
// 3. If an existing entry conflicts with a new one (same index
// but different terms), delete the existing entry and all that
// follow it (§5.3)
// term at PrevLogIndex disagree with PrevLogTerm
// reply false and return
// 4. Append any new entries not already in the log
// 5. If leaderCommit > commitIndex, set commitIndex =
// min(leaderCommit, index of last new entry)
// after updating commitIndex, send ApplyMsg if commitIndex > lastApplied
// update lastApplied
reply.Success=true}
上面的 AppendEntries 是用来处理心跳包请求的,发送心跳包的函数需要加上处理逻辑。能够发送心跳包的只有 Leader,因此按照 Figure 2 中 Rules for Server Leader 部分实现。
func(rf*Raft)SendHeartbeat(serverint){rf.mu.Lock()varprevLogLogEntryifrf.nextIndex[server]>1{prevLog=rf.log[(rf.nextIndex[server]-1)-1]}entries:=make([]LogEntry,len(rf.log)-prevLog.Index)copy(entries,rf.log[prevLog.Index:])args:=AppendEntriesArgs{Term:rf.currentTerm,LeaderId:rf.me,PrevLogIndex:prevLog.Index,PrevLogTerm:prevLog.Term,Entries:entries,LeaderCommit:rf.commitIndex,}reply:=AppendEntriesReply{}rf.mu.Unlock()ok:=rf.sendAppendEntries(server,&args,&reply)rf.mu.Lock()deferrf.mu.Unlock()rf.lastReceive=time.Now()// Convert to Follower if reply.Term is newer
// If last log index ≥ nextIndex for a follower: send
// AppendEntries RPC with log entries starting at nextIndex
// • If successful: update nextIndex and matchIndex for follower (§5.3)
// • If AppendEntries fails because of log inconsistency:
// decrement nextIndex and retry (§5.3)
ifreply.Success{// Update nextIndex and matchIndex
// ...
// If there exists an N such that N > commitIndex, a majority
// of matchIndex[i] ≥ N, and log[N].term == currentTerm:
// set commitIndex = N (§5.3, §5.4).
}else{// decrement nextIndex
rf.nextIndex[server]=prevLog.Index}}
func(rf*Raft)sendApplyMsg(){// If commitIndex > lastApplied: increment lastApplied, apply
// log[lastApplied] to state machine (§5.3)
ifrf.commitIndex>rf.lastApplied{entries:=append([]LogEntry{},rf.log[rf.lastApplied:rf.commitIndex]...)gofunc(entries[]LogEntry){for_,entry:=rangeentries{msg:=ApplyMsg{CommandValid:true,Command:entry.Command,CommandIndex:entry.Index,}rf.applyCh<-msgrf.mu.Lock()ifrf.lastApplied<msg.CommandIndex{rf.lastApplied=msg.CommandIndex}rf.mu.Unlock()}}(entries)}}
测试
可以通过 $ go test -run TestBasicAgree2B 来运行其中的一个测试
整个 Part 2B
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
➜ raft git:(lab2) ✗ go test -run 2B
Test (2B): basic agreement ...
... Passed -- 1.0 31642903
Test (2B): RPC byte count ...
... Passed -- 2.6 34811351811
Test (2B): agreement despite follower disconnection ...
... Passed -- 5.5 396252517
Test (2B): no agreement if too many followers disconnect ...
... Passed -- 3.7 5168388123
Test (2B): concurrent Start()s ...
... Passed -- 0.8 31437866
Test (2B): rejoin of partitioned leader ...
... Passed -- 4.6 3140342734
Test (2B): leader backs up quickly over incorrect follower logs ...
... Passed -- 38.5 526342335471106
Test (2B): RPC counts aren't too high ...
... Passed -- 2.2 3401121412
PASS