后续实验对此部分代码进行了重构,因此代码不具备参考性,仅适用于本次实验
阅读本文前请先仔细阅读Lab 2 Part 2A相关实验要求并熟悉基础代码。
本文只提供相关实现思路,希望可以读者由此获得灵感。
2A实验说明
2A部分需要完成Raft系统中的leader election和heartbeats
保证系统中最多只有一个leader,如有特殊情况可以发生leader的取代
相关RPC的调用可以阅读labrpc文件夹中的代码
Test分析
代码编写完成后通过go test进行测试,可以理解为C艹中的单元测试,这里通过make_config构建起整个系统cfg。
cfg中包括有多个raft对象,每个raft对象对应有自己的ClientEnd数组peers,表示自己与其他raft的连接,可以理解为传输数据的通道(通过RPC)
网络中各个raft初始化完成后通过connect进行raft之间的连接(对应RPC通信)
每个raft有三种state对应论文中的leader,candidate,follower
以计算机网络的视角来看,cfg.net对应网络层,ClientEnd对应传输层,raft对应应用层
整体Test系统构建流程
make_config构建系统
初始化若干个raft对象
将若干raft对象进行连接
插入检查,宕机,sleep等操作
end
第一个测试TestInitialElection2A首先是初始化整个系统,设置了3个raft对象,不进行fail操作,中间sleep一段时间检查你的系统是否稳定(即没有发生宕机的情况下你的leader是否可以稳定的一直发送心跳)
第二个测试TestReElection2A会在中间加入节点的disconnect,后续重新connect,考验设计系统的鲁棒性
设计概述
实验大部分思路来自于https://sworduo.github.io/2019/06/04/MIT6-824-lab2-raft/ and https://zhuanlan.zhihu.com/p/543989771 ,在此基础上进行修改,十分感谢!
结构说明
每个raft通过Make函数来构建起单个的raft系统
在Make中对raft中的参数进行初始化
实验要求Make立即返回,因此整个系统的运行应该放在一个goroutine中,这里设计了Schedule函数进行调度
利用Schedule函数进行状态的变换( follower<->candidate<->leader )
followerTick()
follower在2a中考虑三种情况
收到来自leader的心跳,继续follower状态
自己给其他candiadte投票,继续follower状态
超时,既没有收到心跳,也没有给别人投票,自己变为candidate状态
candidateTick()
candidate需要考虑四种情况
收到来自leader的心跳,切换到follower状态
自己给其他candiadte投票,切换到follower状态
自己赢得选举,切换到leader状态
选举超时,继续下一轮选举,保持candidate状态
leaderTick()
leader需要考虑四种情况
收到来自leader的心跳,切换到follower状态
自己给其他candiadte投票,切换到follower状态
收到的心跳reply中有大于自己term(任期),切换到follower状态
心跳计时到期,保持leader状态,开始下一轮心跳发送
RPC相关(对应操作用颜色标注,红色表示心跳RPC,绿色表示投票RPC)
心跳RPC AppendEntries
根据args更新自身raft对象中的参数并返回reply
投票RPC RequestVote
根据args更新自身raft对象中的参数并返回reply
架构图
具体实现
Definition
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 const ( Follower = 0 Candidate = 1 Leader = 2 ) type AppendEntryArgs struct { Term int LeaderId int PrevLogIndex int PrevLogTerm int Entries []LogEntry LeaderCommit int } type AppendEntryReply struct { Term int Success bool } type LogEntry struct { ItemId int }
此处的LogEntry在2A中暂时不用,其余定义按照paper编写即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 type Raft struct { state int electionTimeout *time.Timer currentTerm int voteFor int logEntries []LogEntry appendCh chan *AppendEntryArgs voteCh chan *RequestVoteArgs l2fCh chan bool heartBeatTimeOut int }
appendCh表示自己成功接收到来自于leader的心跳,在心跳RPC中处理
voteCh表示自己成功给某个candidate投票,在投票RPC中处理
l2fCh表示leader收到心跳回复中有比自己大的term
通过electionTimeout来控制选举超时的时间(定义为450-550ms)
heartBeatTimeOut设置为100ms
Schedule
1 2 3 4 5 6 7 8 9 10 11 12 func (rf *Raft) Schedule() { for !rf.killed() { switch rf.state { case Leader: rf.leaderTick() case Candidate: rf.candidateTick() case Follower: rf.followerTick() } } }
AppendEntries(Heart_beat) RPC
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 func (rf *Raft) AppendEntries(args *AppendEntryArgs, reply *AppendEntryReply) { rf.mu.Lock() defer rf.mu.Unlock() if args.Term < rf.currentTerm { reply.Term = rf.currentTerm reply.Success = false return } if args.Term > rf.currentTerm { rf.currentTerm = args.Term rf.voteFor = -1 } reply.Term = rf.currentTerm reply.Success = true go func (args *AppendEntryArgs) { select { case rf.appendCh <- args: default : <-rf.appendCh rf.appendCh <- args } }(args) }
接收来自于leader的heart_beat args,如果leader的任期比自己小,则赋值reply后直接返回,不向appendCh中发送数据,表示自己没有接收到心跳,自己自动通过timeOut转变为candidate
leader任期大于等于自己,则更新自己的相关成员,向appendCh中发送数据表示接受到心跳
接收心跳后根据appendCh进行状态的转换(在follower,candidate,leader中处理)
Request_vote RPC
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) { rf.mu.Lock() defer rf.mu.Unlock() if args.Term < rf.currentTerm { } if args.Term > rf.currentTerm { } if rf.voteFor != -1 && rf.voteFor != args.CandidateId { } reply.Term = rf.currentTerm reply.VoteGranted = true rf.voteFor = args.CandidateId go func (args *RequestVoteArgs) { }(args) }
如果请求投票的candidate的任期比自己小,则不给他投票
如果任期比自己大,给他投票
如果之前投过票了就不能再投票
投票之后向voteCh发送数据表示自己已经投过票了,根据该信息转换自己当前的状态(在follower,candidate,leader中处理)
Follower
1 2 3 4 5 6 7 8 9 10 11 12 13 func (rf *Raft) followerTick() { for { rf.resetTimeout() select { case args := <-rf.appendCh: case args := <-rf.voteCh: case <-rf.electionTimeout.C: } } }
根据channel中的信息进行自身成员的更新和状态转换即可。
Candidate
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 func (rf *Raft) candidateTick() { rf.mu.Lock() rf.voteFor = rf.me rf.currentTerm++ currentTerm := rf.currentTerm rf.mu.Unlock() args := &RequestVoteArgs{ Term: currentTerm, CandidateId: rf.me, } voteCnt := 1 voteNote := false voteWin := make (chan bool , 1 ) voteLock := sync.Mutex{} rf.resetTimeout() for i := 0 ; i < len (rf.peers); i++ { if i == rf.me { continue } go func (i int ) { if rf.killed() { return } reply := &RequestVoteReply{} if ok := rf.sendRequestVote(i, args, reply); ok { if reply.VoteGranted { voteLock.Lock() voteCnt++ if !voteNote && voteCnt > len (rf.peers)/2 { voteNote = true voteLock.Unlock() voteWin <- true } else { voteLock.Unlock() } } } }(i) } select { case <-rf.appendCh: case <-rf.voteCh: case <-voteWin: case <-rf.electionTimeout.C: } }
进行candidateTick,首先根据论文所述,进入candidate状态先将自身的currentTerm+1,然后向其余的节点发送请求投票的RPC,收集到超过一半的选票后变为leader
如果在此期间收到了心跳或者自己给其他candidate投票,则关闭其给其他节点发送的请求投票的RPC并将状态变为follower
如果选举超时,那么开启新一轮选举
注意需要将rf.currentTerm提前保存下来,保证发送给其他节点的Term是开始时的Term,否则直接调用rf.currentTerm可能导致发送给其他节点的Term并不相等。
Leader
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 func (rf *Raft) leaderTick() { rf.mu.Lock() currentTerm := rf.currentTerm rf.mu.Unlock() for !rf.killed() { for i := 0 ; i < len (rf.peers); i++ { if i == rf.me { continue } args := &AppendEntryArgs{ Term: currentTerm, LeaderId: rf.me, } go func (i int , currentT int ) { if rf.killed() { return } reply := &AppendEntryReply{} if ok := rf.sendAppendVote(i, args, reply); ok { rf.mu.Lock() if rf.currentTerm != currentT { rf.mu.Unlock() return } rf.mu.Unlock() if reply.Success { } else { rf.mu.Lock() if reply.Term > currentT { rf.currentTerm = reply.Term rf.mu.Unlock() select { case rf.l2fCh <- true : default : <-rf.l2fCh rf.l2fCh <- true } } else { rf.mu.Unlock() } } } }(i, currentTerm) } select { case <-time.After(time.Duration(rf.heartBeatTimeOut) * time.Millisecond): case <-rf.appendCh: case <-rf.voteCh: case <-rf.l2fCh: } } }
注意leader在执行的过程中用一个for循环包裹,heart_beat每隔一段时间就进行发送,在本文中采用的是time.After,实验手册中建议直接使用time.Sleep
leader在每次for循环中向其余的节点发送心跳RPC,并根据reply判断是否放弃leader身份
如果计时器结束,说明一切正常,继续下一轮的心跳发送
如果收到了其他节点的心跳,说明其他节点成为了leader,自己需要放弃leader身份
如果给其他节点投票,说明其他节点的Term比自己新,自己需要放弃leader身份
如果其他节点心跳的reply中Term比自己新,自己需要放弃leader身份
测试
编写脚本测试500次,全部通过。
注意
实验过程中可以利用util中的DPrintf打印相关信息,方便debug
重点在于分析清楚各个goroutines的生命周期
不要忘记实现GetState()和Kill(),不在Kill中对节点处理的话可能会导致断连的节点依旧打印信息
使用go test -race检测代码中潜在的race问题
可能造成bug的原因
第一个test报错warning: term changed even though there were no failures原因是系统会通过checkTerms来检查当前的Term(任期号),在两次checkTerms中加入sleep,这期间你的系统是保持稳定的,即选举成功后没有发生任何crash或disconnect,leader一直持续的向其他节点发送心跳,因此两次checkTerms的Term应该不变,报错的原因就是你的两次Term发生了变化,说明系统不稳定,即使没有bug但是leader还是在更换
第二个test中断连一个leader后又把他重新加入网络,可能会导致两个leader
注意goroutines传入参数的设置,避免直接使用外部数据,通过参数传递使用你需要的数据,直接使用外部数据,数据可能会发生变化
在sendRPC返回ok的过程中可能会失败(disconnect),这类的RPC返回的非常慢(经过测试大于700ms,以秒为量级),注意这些goroutines
建议编写自己的test script进行程序的测试,因为只跑少量的testcase无法检测出概率小的bug