后续实验对此部分代码进行了重构,因此代码不具备参考性,仅适用于本次实验

阅读本文前请先仔细阅读Lab 2Part 2A相关实验要求并熟悉基础代码。
本文只提供相关实现思路,希望可以读者由此获得灵感。

2A实验说明

  • 2A部分需要完成Raft系统中的leader election和heartbeats
  • 保证系统中最多只有一个leader,如有特殊情况可以发生leader的取代
  • 相关RPC的调用可以阅读labrpc文件夹中的代码

Test分析

代码编写完成后通过go test进行测试,可以理解为C艹中的单元测试,这里通过make_config构建起整个系统cfg。

  • cfg中包括有多个raft对象,每个raft对象对应有自己的ClientEnd数组peers,表示自己与其他raft的连接,可以理解为传输数据的通道(通过RPC)
  • 网络中各个raft初始化完成后通过connect进行raft之间的连接(对应RPC通信)
  • 每个raft有三种state对应论文中的leader,candidate,follower
  • 以计算机网络的视角来看,cfg.net对应网络层,ClientEnd对应传输层,raft对应应用层
  • 整体Test系统构建流程
    • make_config构建系统
      • 初始化若干个raft对象
      • 将若干raft对象进行连接
    • 插入检查,宕机,sleep等操作
    • end
  • 第一个测试TestInitialElection2A首先是初始化整个系统,设置了3个raft对象,不进行fail操作,中间sleep一段时间检查你的系统是否稳定(即没有发生宕机的情况下你的leader是否可以稳定的一直发送心跳)
  • 第二个测试TestReElection2A会在中间加入节点的disconnect,后续重新connect,考验设计系统的鲁棒性

设计概述

实验大部分思路来自于https://sworduo.github.io/2019/06/04/MIT6-824-lab2-raft/ and https://zhuanlan.zhihu.com/p/543989771,在此基础上进行修改,十分感谢!

结构说明

  • 每个raft通过Make函数来构建起单个的raft系统
    • 在Make中对raft中的参数进行初始化
    • 实验要求Make立即返回,因此整个系统的运行应该放在一个goroutine中,这里设计了Schedule函数进行调度
  • 利用Schedule函数进行状态的变换( follower<->candidate<->leader )
    • followerTick()
      • follower在2a中考虑三种情况
        • 收到来自leader的心跳,继续follower状态
        • 自己给其他candiadte投票,继续follower状态
        • 超时,既没有收到心跳,也没有给别人投票,自己变为candidate状态
    • candidateTick()
      • candidate需要考虑四种情况
        • 收到来自leader的心跳,切换到follower状态
        • 自己给其他candiadte投票,切换到follower状态
        • 自己赢得选举,切换到leader状态
        • 选举超时,继续下一轮选举,保持candidate状态
    • leaderTick()
      • leader需要考虑四种情况
        • 收到来自leader的心跳,切换到follower状态
        • 自己给其他candiadte投票,切换到follower状态
        • 收到的心跳reply中有大于自己term(任期),切换到follower状态
        • 心跳计时到期,保持leader状态,开始下一轮心跳发送
  • RPC相关(对应操作用颜色标注,红色表示心跳RPC,绿色表示投票RPC)
    • 心跳RPC AppendEntries
      • 根据args更新自身raft对象中的参数并返回reply
    • 投票RPC RequestVote
      • 根据args更新自身raft对象中的参数并返回reply

架构图

structure

具体实现

Definition

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
const (
Follower = 0
Candidate = 1
Leader = 2
)

// heart_beat
type AppendEntryArgs struct {
Term int
LeaderId int
PrevLogIndex int
PrevLogTerm int
Entries []LogEntry
LeaderCommit int
}
type AppendEntryReply struct {
Term int
Success bool
}

type LogEntry struct {
ItemId int
}

此处的LogEntry在2A中暂时不用,其余定义按照paper编写即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type Raft struct {
//...
state int //当前的状态
electionTimeout *time.Timer //选举时间定时器

currentTerm int //当前的任期号
voteFor int //当前获得选票的候选人id
logEntries []LogEntry //日志条目集;每一个条目包含一个用户状态机执行的指令,和收到时的任期号

appendCh chan *AppendEntryArgs //leader发送的心跳
voteCh chan *RequestVoteArgs //candidate发送的投票
l2fCh chan bool //leader心跳收到比自己大的term,变为follower

heartBeatTimeOut int //1秒10次心跳
//...
}
  • appendCh表示自己成功接收到来自于leader的心跳,在心跳RPC中处理
  • voteCh表示自己成功给某个candidate投票,在投票RPC中处理
  • l2fCh表示leader收到心跳回复中有比自己大的term
  • 通过electionTimeout来控制选举超时的时间(定义为450-550ms)
  • heartBeatTimeOut设置为100ms

Schedule

1
2
3
4
5
6
7
8
9
10
11
12
func (rf *Raft) Schedule() {
for !rf.killed() {
switch rf.state {
case Leader:
rf.leaderTick()
case Candidate:
rf.candidateTick()
case Follower:
rf.followerTick()
}
}
}
  • 根据rf当前的状态执行对应的Tick即可。

AppendEntries(Heart_beat) RPC

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// heart_beat from leader
func (rf *Raft) AppendEntries(args *AppendEntryArgs, reply *AppendEntryReply) {
rf.mu.Lock()
defer rf.mu.Unlock()

if args.Term < rf.currentTerm {
reply.Term = rf.currentTerm
reply.Success = false
return
}
if args.Term > rf.currentTerm {
rf.currentTerm = args.Term
rf.voteFor = -1
}
reply.Term = rf.currentTerm
reply.Success = true
go func(args *AppendEntryArgs) {
select {
case rf.appendCh <- args:
default:
<-rf.appendCh
rf.appendCh <- args
}
}(args)
}
  • 接收来自于leader的heart_beat args,如果leader的任期比自己小,则赋值reply后直接返回,不向appendCh中发送数据,表示自己没有接收到心跳,自己自动通过timeOut转变为candidate
  • leader任期大于等于自己,则更新自己的相关成员,向appendCh中发送数据表示接受到心跳
  • 接收心跳后根据appendCh进行状态的转换(在follower,candidate,leader中处理)

Request_vote RPC

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
if args.Term < rf.currentTerm {
//...
}
if args.Term > rf.currentTerm {
//...
}
if rf.voteFor != -1 && rf.voteFor != args.CandidateId {
//...
}
//票还没投过,可以投
reply.Term = rf.currentTerm
reply.VoteGranted = true
rf.voteFor = args.CandidateId
go func(args *RequestVoteArgs) {
//same as heartbeat...
}(args)
}
  • 如果请求投票的candidate的任期比自己小,则不给他投票
  • 如果任期比自己大,给他投票
  • 如果之前投过票了就不能再投票
  • 投票之后向voteCh发送数据表示自己已经投过票了,根据该信息转换自己当前的状态(在follower,candidate,leader中处理)

Follower

1
2
3
4
5
6
7
8
9
10
11
12
13
func (rf *Raft) followerTick() {
for {
rf.resetTimeout()
select {
case args := <-rf.appendCh:
//收到了来自于leader的心跳,继续follower
case args := <-rf.voteCh:
//自己投票给了其他candidate,继续follower
case <-rf.electionTimeout.C:
//超时没收到心跳,变为candidate
}
}
}
  • 根据channel中的信息进行自身成员的更新和状态转换即可。

Candidate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
func (rf *Raft) candidateTick() {
rf.mu.Lock()
rf.voteFor = rf.me
rf.currentTerm++
currentTerm := rf.currentTerm
rf.mu.Unlock()

args := &RequestVoteArgs{
Term: currentTerm,
CandidateId: rf.me,
}

voteCnt := 1
voteNote := false
voteWin := make(chan bool, 1)
voteLock := sync.Mutex{}

//选举超时统计
rf.resetTimeout()

for i := 0; i < len(rf.peers); i++ {
if i == rf.me {
continue
}
go func(i int) {
if rf.killed() {
return
}
reply := &RequestVoteReply{}
if ok := rf.sendRequestVote(i, args, reply); ok {
if reply.VoteGranted {
voteLock.Lock()
voteCnt++
if !voteNote && voteCnt > len(rf.peers)/2 {
voteNote = true
voteLock.Unlock()
voteWin <- true
} else {
voteLock.Unlock()
}
}
}

}(i)
}

//转换状态,简单的修改state即可
select {
case <-rf.appendCh:
//...
case <-rf.voteCh:
//...
case <-voteWin:
//...
case <-rf.electionTimeout.C:
//...
}
}
  • 进行candidateTick,首先根据论文所述,进入candidate状态先将自身的currentTerm+1,然后向其余的节点发送请求投票的RPC,收集到超过一半的选票后变为leader
  • 如果在此期间收到了心跳或者自己给其他candidate投票,则关闭其给其他节点发送的请求投票的RPC并将状态变为follower
  • 如果选举超时,那么开启新一轮选举
  • 注意需要将rf.currentTerm提前保存下来,保证发送给其他节点的Term是开始时的Term,否则直接调用rf.currentTerm可能导致发送给其他节点的Term并不相等。

Leader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
func (rf *Raft) leaderTick() {
rf.mu.Lock()
currentTerm := rf.currentTerm
rf.mu.Unlock()
for !rf.killed() {
for i := 0; i < len(rf.peers); i++ {
if i == rf.me {
continue
}
args := &AppendEntryArgs{
Term: currentTerm,
LeaderId: rf.me,
}
go func(i int, currentT int) {
if rf.killed() {
return
}
reply := &AppendEntryReply{}
if ok := rf.sendAppendVote(i, args, reply); ok {
rf.mu.Lock()
if rf.currentTerm != currentT {
rf.mu.Unlock()
return
}
rf.mu.Unlock()
if reply.Success {

} else {
rf.mu.Lock()
if reply.Term > currentT {
rf.currentTerm = reply.Term
rf.mu.Unlock()
select {
case rf.l2fCh <- true:
default:
<-rf.l2fCh
rf.l2fCh <- true
}
} else {
rf.mu.Unlock()
}
}
}
}(i, currentTerm)
}
select {
case <-time.After(time.Duration(rf.heartBeatTimeOut) * time.Millisecond):
//..
case <-rf.appendCh:
//..
case <-rf.voteCh:
//..
case <-rf.l2fCh:
//..
}
}
}
  • 注意leader在执行的过程中用一个for循环包裹,heart_beat每隔一段时间就进行发送,在本文中采用的是time.After,实验手册中建议直接使用time.Sleep
  • leader在每次for循环中向其余的节点发送心跳RPC,并根据reply判断是否放弃leader身份
  • 如果计时器结束,说明一切正常,继续下一轮的心跳发送
  • 如果收到了其他节点的心跳,说明其他节点成为了leader,自己需要放弃leader身份
  • 如果给其他节点投票,说明其他节点的Term比自己新,自己需要放弃leader身份
  • 如果其他节点心跳的reply中Term比自己新,自己需要放弃leader身份

测试

编写脚本测试500次,全部通过。
test

注意

  • 实验过程中可以利用util中的DPrintf打印相关信息,方便debug
  • 重点在于分析清楚各个goroutines的生命周期
  • 不要忘记实现GetState()和Kill(),不在Kill中对节点处理的话可能会导致断连的节点依旧打印信息
  • 使用go test -race检测代码中潜在的race问题
  • 可能造成bug的原因
    • 第一个test报错warning: term changed even though there were no failures原因是系统会通过checkTerms来检查当前的Term(任期号),在两次checkTerms中加入sleep,这期间你的系统是保持稳定的,即选举成功后没有发生任何crash或disconnect,leader一直持续的向其他节点发送心跳,因此两次checkTerms的Term应该不变,报错的原因就是你的两次Term发生了变化,说明系统不稳定,即使没有bug但是leader还是在更换
    • 第二个test中断连一个leader后又把他重新加入网络,可能会导致两个leader
    • 注意goroutines传入参数的设置,避免直接使用外部数据,通过参数传递使用你需要的数据,直接使用外部数据,数据可能会发生变化
    • 在sendRPC返回ok的过程中可能会失败(disconnect),这类的RPC返回的非常慢(经过测试大于700ms,以秒为量级),注意这些goroutines
  • 建议编写自己的test script进行程序的测试,因为只跑少量的testcase无法检测出概率小的bug