Raft 实验 Part B - MIT 6.824 Lab 2

MIT 6.824 分布式系统课程中,第二个实验是实现分布式共识算法 Raft,分成了 A/B/C 三个部分。我之前完成了 Part 2A,即 Raft 中的领导选举(Leader election)。实验 Part 2B 的主要实现日志复制(Log replication)的机制。

在熟悉和完成了 Part 2A 部分之后,Part 2B 的部分只需要在 2A 的基础上完善相应的逻辑,更容易直接上手,不过调试还是一个非常头疼的事情,因此在实现每个部分的时候,写好相应的注释以及输出相关的调试信息是非常重要的。

实验部分

在 Raft 的论文中,Figure2 很好的描述了整个 Raft 算法的细节。所有的实现都需要按照这个图上的指示来,不能偷懒。

Leader Election

先来回顾一下选举机制的实现:

  • 初始化后用 Goroutine 执行 LeaderElection(),它是一个无限循环,通过 time.Sleep 实现选举超时检测,如果超时则开始新一轮选举 AttemptElection()
  • AttemptElection 中,将自己设为候选者,并向其它服务器发送 RequestVote RPC,若票数超过一半则转为 Leader,开始 LeaderOperation
  • LeaderOperation 中,定时向其它服务器发送心跳包,心跳包通过 AppendEntries RPC 处理

Log Replication

  • 当 Leader 的 Start(command) 调用之后,立即向它自己的 log 末尾添加一个新 entry,更新各种 index
  • Leader 需要不断向其它服务器发送心跳包,直接在心跳包处理逻辑 AppendEntries 里实现日志复制的逻辑
  • 当 Follower 在 AppendEntries 接受到参数之后,需要将它自己和 Leader 的 log 保持完全一致。一种情况是该 Follower 已经有了之前完整 log,只需添加新的 entry;另一种情况则是该 Follower 的 log 不完整或者有错误,则直接返回失败,Leader 检测到失败之后会将 nextIndex[index] 往前移动一位,如此反复直到 Follower 将所有不一致的 entry 删除并将缺失的 log 添加到末尾
  • 当 Leader 得知 Follower 同步成功之后,和选举一样,如果超过半数的 matchIndex 都大于当前的 commitIndex,表明有新的 entry commit 成功了,需要更新 commitIndex
  • commitIndex 更新后,还需要通知 applyCh,将 [lastApplied, commitIndex] 区间里的所有 entry 都以 ApplyMsg 的结构体传送到 applyCh 中,最后再更新 lastApplied

Pseudocode

日志结构体可以直接按照图中的描述,这里我按照文章 5.3 小节加上了 Index 项保存在日志中的序号。注意日志的序号是从 1 开始,而存储的数组的下标是从 0 开始。

1
2
3
4
5
type LogEntry struct {
	Command interface{} // command for state machine
	Term    int         // when entry is received by leader
	Index   int         // index in the LogEntry
}

首先,在 Make() 函数中,需要初始化 nextIndexmatchIndex 数组。在 Go 中,初始化一个给定长度的数组并赋值为 0 用 make([]int, len)。此外,还需要在 Raft 结构中将 applyCh 保存下来,这样每当 commit 一个新的 log,需要向 applyCh 发送一个 ApplyMsg 消息,否则是无法通过测试的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func Make(peers []*labrpc.ClientEnd, me int,
	persister *Persister, applyCh chan ApplyMsg) *Raft {
	// ...
	rf.applyCh = applyCh
	rf.nextIndex = make([]int, len(peers))
	rf.matchIndex = make([]int, len(peers))

	go rf.LeaderElection()
	//...
	return rf
}

接下来处理 Start() 函数,该函数尝试将传入 command 写入 Raft 的日志中。注释中提示了该函数应当立即返回,因此这里并不需要非常复杂的逻辑,先更新 termisLeader,如果不是 Leader 就直接返回(此时 index 为 -1),如果当前实例是 Leader,那么直接 append 一个新的 LogEntry,将其序号赋值给 index 变量,并更新 nextIndexmatchIndex,然后直接返回。

虽然这个函数添加了一个新的 LogEntry 到它的日志末尾,但其它的 Raft 实例并没有达成共识,我们之后需要在 AppendEntries RPC 中实现日志复制的完整逻辑。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func (rf *Raft) Start(command interface{}) (int, int, bool) {
	// ... Remember to Lock()
	term = rf.currentTerm
	isLeader = rf.state == Leader

	if isLeader {
		rf.log = append(rf.log, LogEntry{Command: command, Term: term, Index: len(rf.log) + 1})
		index = len(rf.log)
		rf.nextIndex[rf.me] = index + 1
		rf.matchIndex[rf.me] = index
	}
	return index, term, isLeader
}

接着来看 AppendEntries RPC,在 Part 2A 中这个函数被用来处理心跳包,我们只需要在心跳包之后加上关于日志复制的逻辑,基本和 Figure 2 的 Receiver Implementation 的 5 点是一模一样的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
	rf.mu.Lock()
	defer rf.mu.Unlock()

	rf.lastReceive = time.Now()
	reply.Term = rf.currentTerm

	// 1. Reply false if term < currentTerm (§5.1)
	// Switch to Follower if newer term found

	// 2. Reply false if log doesn't contain an entry at prevLogIndex
	// whose term matches prevLogTerm (§5.3)

	// rf.log doesn't contain entry at PrevLogIndex
	// reply false and return

	// 3. If an existing entry conflicts with a new one (same index
	// but different terms), delete the existing entry and all that
	// follow it (§5.3)
	
	// term at PrevLogIndex disagree with PrevLogTerm
	// reply false and return

	// 4. Append any new entries not already in the log

	// 5. If leaderCommit > commitIndex, set commitIndex =
	// min(leaderCommit, index of last new entry)
	
	// after updating commitIndex, send ApplyMsg if commitIndex > lastApplied
	// update lastApplied
	
	reply.Success = true
}

上面的 AppendEntries 是用来处理心跳包请求的,发送心跳包的函数需要加上处理逻辑。能够发送心跳包的只有 Leader,因此按照 Figure 2 中 Rules for Server Leader 部分实现。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func (rf *Raft) SendHeartbeat(server int) {
	rf.mu.Lock()

	var prevLog LogEntry
	if rf.nextIndex[server] > 1 {
		prevLog = rf.log[(rf.nextIndex[server]-1)-1]
	}
	entries := make([]LogEntry, len(rf.log)-prevLog.Index)
	copy(entries, rf.log[prevLog.Index:])

	args := AppendEntriesArgs{
		Term:         rf.currentTerm,
		LeaderId:     rf.me,
		PrevLogIndex: prevLog.Index,
		PrevLogTerm:  prevLog.Term,
		Entries:      entries,
		LeaderCommit: rf.commitIndex,
	}
	reply := AppendEntriesReply{}

	rf.mu.Unlock()

	ok := rf.sendAppendEntries(server, &args, &reply)

	rf.mu.Lock()
	defer rf.mu.Unlock()

	rf.lastReceive = time.Now()

	// Convert to Follower if reply.Term is newer

	// If last log index ≥ nextIndex for a follower: send
	// AppendEntries RPC with log entries starting at nextIndex
	// • If successful: update nextIndex and matchIndex for follower (§5.3)
	// • If AppendEntries fails because of log inconsistency:
	// decrement nextIndex and retry (§5.3)
	if reply.Success {
		// Update nextIndex and matchIndex
		// ...

		// If there exists an N such that N > commitIndex, a majority
		// of matchIndex[i] ≥ N, and log[N].term == currentTerm:
		// set commitIndex = N (§5.3, §5.4).
	} else {
		// decrement nextIndex
		rf.nextIndex[server] = prevLog.Index
	}
}

AttemptElection 函数中,需要修改 RequestVoteArgs,将 LastLogIndexLastLogTerm 的值放进去,在 Candidate 得到足够的选票后,重新初始化 nextIndexmatchIndex

RequestVote 处理函数中,需要加入判断 Candidate 的 log 是否比接收者的更新,确保 Leader 拥有最新的日志,否则设置 reply.voteGranted=false

每一次更新完 commitIndex 后,如果 commitIndexlastApplied 更新,说明要向 applyCh 发送最新被 commit 的 entry 信息。在 AppendEntriesSendHeartbeat 都有可能更新 commitIndex,因此都需要调用该函数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (rf *Raft) sendApplyMsg() {
	// If commitIndex > lastApplied: increment lastApplied, apply
	// log[lastApplied] to state machine (§5.3)
	if rf.commitIndex > rf.lastApplied {
		entries := append([]LogEntry{}, rf.log[rf.lastApplied:rf.commitIndex]...)

		go func(entries []LogEntry) {
			for _, entry := range entries {
				msg := ApplyMsg{
					CommandValid: true,
					Command:      entry.Command,
					CommandIndex: entry.Index,
				}
				rf.applyCh <- msg

				rf.mu.Lock()
				if rf.lastApplied < msg.CommandIndex {
					rf.lastApplied = msg.CommandIndex
				}
				rf.mu.Unlock()
			}
		}(entries)
	}
}

测试

可以通过 $ go test -run TestBasicAgree2B 来运行其中的一个测试

整个 Part 2B

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
➜  raft git:(lab2) ✗ go test -run 2B
Test (2B): basic agreement ...
  ... Passed --   1.0  3   16    4290    3
Test (2B): RPC byte count ...
  ... Passed --   2.6  3   48  113518   11
Test (2B): agreement despite follower disconnection ...
  ... Passed --   5.5  3   96   25251    7
Test (2B): no agreement if too many followers disconnect ...
  ... Passed --   3.7  5  168   38812    3
Test (2B): concurrent Start()s ...
  ... Passed --   0.8  3   14    3786    6
Test (2B): rejoin of partitioned leader ...
  ... Passed --   4.6  3  140   34273    4
Test (2B): leader backs up quickly over incorrect follower logs ...
  ... Passed --  38.5  5 2634 2335471  106
Test (2B): RPC counts aren't too high ...
  ... Passed --   2.2  3   40   11214   12
PASS

写在最后

  • 这部分实验调试占了绝大部分时间,用工程里面提供的 DPrintf 会非常方便

  • 代码需要严格按照 Figure 2 的逻辑实现,具体细节可以阅读 5.3 节帮助理解

  • Go 的 for range 要注意是值还是 index

    最好显式的写成 for _, value := range Collection ...

  • JetBrains 的 GoLand 非常好用,学生可以免费获得授权

参考:

加载评论