Raft 实验 Part A - MIT 6.824 Lab 2

MIT 6.824 分布式系统课程中,第二个实验是实现分布式共识算法 Raft。这个实验分成了 A/B/C 三个部分,这篇将主要围绕 Part 2A 部分的实验介绍一下 Raft 的基本概念和实现 Raft 中的领导选举(Leader election)。

Raft

在 Raft 之前,比较著名的共识算法是 Paxos,据说非常难懂。提出 Raft 目的之一就是使其易于理解。

共识(Concensus)是容错分布式系统中的一个基本问题。共识涉及多个服务器就操作达成一致。一旦他们对某个操作做出决定,该决定就是最终决定。当大多数服务器可用时,典型的共识算法会取得进展。例如,即使 2 台服务器出现故障,包含 5 台服务器的群集也可以继续运行。如果更多服务器发生故障,它们将停止前进(但绝不会返回错误的结果)。

在分布式系统中,复制服务(replicated service)通常将其状态(即数据)的完整副本存储在多个副本服务器(replica server)上来实现容错功能。即使服务的某些服务器出现故障(崩溃,网络故障或不稳定),剩下的副本服务器也可以使服务继续运行。挑战在于,故障可能导致副本服务器持有不同的数据副本。

Raft 将客户请求组织成一个序列,称为日志,并确保所有副本服务器看到相同的日志。每个副本均按日志顺序执行客户端请求,并将其应用于服务状态的本地副本。由于所有活动副本都具有相同的日志内容,因此它们都以相同的顺序执行相同的请求,因此继续具有相同的服务状态。如果服务器出现故障但后来又恢复了,Raft 会确保其日志为最新状态。只要至少大多数服务器处于活动状态并且可以相互通信,Raft 就会继续运行。如果没有这样的多数,Raft 将不会取得进展,但是一旦多数能够再次交流,它将从中断的地方继续前进。

更多的细节可以前往 Raft Paper

实验部分

这里将讨论 6.824 Lab 2: Raft 的 Part 2A,这一部分是实现 Raft 中的领导选举机制。因为这是第一部分,所以实现起来会有一些无从下手,而且代码本身是 concurrent 的,所以调试也会比较困难。主要的代码都在 raft/raft.go 中。

在 Raft 的论文中,Figure 2 很好的描述了整个 Raft 算法的细节。接下来先梳理一下 Leader Election 的逻辑。

4-Figure2-1

首先,一个 server 会有三种状态:Follower、Candidate 和 Leader。最开始的时候,所有的 server 的状态都为 Follower。对于每个 server,会初始化一个随机的选举超时时间(election timeout),那么当一个 server 在这段时间里面没有收到来自 Candidate 或 Leader 的消息时,便将自己转变为 Candidate 并开始新一时期(Term)选举。Candidate 通过向其它 server 索取选票(Vote),得到半数以上便转变为 Leader。之后便定时向其它服务器发送心跳包(Heartbeat)来防止新的选举。

image

代码主要思路:

  • 所有状态都通过 Raft 结构体管理
  • Make() 函数初始化所有状态变量,并通过 Goroutine 启动 LeaderElection(),监测选举超时
  • 如果在规定时间内没有没有接收到其它服务器的消息请求,尝试选举 AttemptElection()
  • AttemptElection() 中,将自己设为候选者,并向其它服务器发送 RequestVote RPC
  • 如果未得票超过半数,则当前 Term 不会有 Leader,于是选举超时被触发,重新选举
  • 如果得票超过半数,将自己设置为 Leader,并向其它服务器发送心跳包 Heartbeat
  • 发送心跳包即定期发送空的 AppendEntries PRC

首先按照 Figure 2,我们将基本的 State 填入 Raft 结构体中。在实验 Part A 中,主要用到 currentTermvotedFor。除此以外,一个 Raft server 还需要管理它当前状态 statelastReceive,表示最后一次收到其它服务器消息的时间,以便于判断超时。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
type Raft struct {
	mu        sync.Mutex          // Lock to protect shared access to this peer's state
	peers     []*labrpc.ClientEnd // RPC end points of all peers
	persister *Persister          // Object to hold this peer's persisted state
	me        int                 // this peer's index into peers[]
	dead      int32               // set by Kill()

	// Look at the paper's Figure 2 for a description of what
	// state a Raft server must maintain.

	state       RaftState // Raft server state
	lastReceive time.Time // last time receive message

	currentTerm int        // latest term server has seen
	votedFor    int        // candidateId that received vote in current term
	log         []LogEntry // log entries

	// ...other volatile state
}

Raft 状态:Follower、Candidate 和 Leader:

1
2
3
4
5
6
7
type RaftState int

const (
	Follower = iota
	Candidate
	Leader
)

在 Raft 初始化函数 Make() 中,state 先初始化为 FollowercurrentTerm0votedFor-1。之后需要用 Goroutine 启动一个选举函数 go rf.LeaderElection(),超时的话便触发选举 AttemptElection()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func (rf *Raft) LeaderElection() {
	for {
		electionTimeout := rand.Intn(200) + 200
		startTime := time.Now()
		time.Sleep(time.Duration(electionTimeout) * time.Millisecond)

		rf.mu.Lock()
		if rf.lastReceive.Before(startTime) {
			if rf.state != Leader {
				// Attempt an election
                go rf.AttemptElection()
			}
		}
		rf.mu.Unlock()
	}
}

AttemptElection() 中,主要逻辑为:

  1. 将状态 state 转换为 Candidate,将 currentTerm 增加 1votedFor 设置为 me
  2. peers 除自己以外的服务器发送 RequestVote RPC,并统计票数,这一部需要用 Goroutine 异步并行执行

AttemptElection() 伪代码(注意除 RPC Call 之外访问资源的地方都需要 Mutex)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// AttemptElection pseudocode
func (rf *Raft) AttemptElection() {
    // Convert to Candidate
    // Init args and reply
    numVote := 1
    
    for i := range(rf.peers) {
        // Send RequestVote RPC and tally votes to all peers
        go func(server int) {
            rf.sendRequestVote(server, &args, &reply)

            // If Term is newer, conver to Follower
            // If me is not Candidate, return

            if reply.VoteGranted {
                numVote++
                if numVote > len(rf.peers)/2 && rf.state == Candidate {
                    // Convert to Leader
                    // Send heartbeat to peers
                    for peer := range rf.peers {
                        go LeaderOperation(peer)
                    }
                }
            }
        }(i)
    }
}

RequestVote() 函数用来处理 RequestVote RPC 请求,照 Figure 2 实现即可。

LeaderOperation() 就是类似上面的 LeaderElection(),一个无限循环,定时发送 Heartbeat:

1
2
3
4
5
6
7
func (rf *Raft) LeaderOperation(server int) {
    for {
		// If state is not Leader, return
        go rf.SendHeartbeat(server)
        time.Sleep(HeartbeatInterval)
    }
}

发送 Heartbeat 又需要调用 sendAppendEntries()AppendEntries() 用来处理 RPC 请求,具体逻辑和 Figure 2 中一样。

调试与测试

  1. 注意访问资源的地方都需要互斥锁保护:rf.mu.Lock()rf.mu.Unlock()
  2. RPC Call 之前需要确保 rf.mu.Unlock(),以防止 RPC Handler 里出现 Deadlock
  3. 在函数中如果有无限循环 for {},慎用 defer,因为 deferfor {} 外面的函数末尾执行,而不是在 for {} 里面
  4. 多用 log.Printf()

测试 Part 2A:

1
2
3
4
5
6
7
$ go test -run 2A
Test (2A): initial election ...
  ... Passed --   4.0  3   32    9170    0
Test (2A): election after network failure ...
  ... Passed --   6.1  3   70   13895    0
PASS
ok      raft    10.187s

参考:

加载评论