Lab 2 - Raft

介绍

这是一系列实验室中的第一个,您将在其中建立一个容错键/值存储系统。在本实验中,你将实现 Raft(一种复制状态机协议)。在下一个实验室中,你将在 Raft 的基础上构建一个键/值服务。然后,您将在多个复制状态机上 “分片 “您的服务,以获得更高的性能。

复制服务通过在多个复制服务器上存储其状态(即数据)的完整副本来实现容错。即使部分服务器出现故障(崩溃或网络中断或不稳定),复制也能使服务继续运行。所面临的挑战是,故障可能会导致副本持有不同的数据副本。

Raft 将客户端请求整理成一个序列(称为日志),并确保所有副本服务器都能看到相同的日志。每个副本按日志顺序执行客户端请求,并将这些请求应用到服务状态的本地副本中。由于所有实时副本都能看到相同的日志内容,因此它们都会以相同的顺序执行相同的请求,从而继续保持相同的服务状态。如果服务器出现故障,但随后又恢复了,Raft 会负责更新其日志。只要至少有大多数服务器还活着并能相互通信,Raft 就会继续运行。如果没有过半数的服务器存活,Raft 将不会取得任何进展,但只要有过半数的服务器能再次通信,Raft 就会继续运行。

在本实验室中,您将把 Raft 作为 Go 对象类型来实现,并提供相关方法,以便在更大的服务中作为一个模块使用。一组 Raft 实例通过 RPC 相互通信,以维护复制的日志。您的 Raft 接口将支持一连串编号不确定的命令,也称为日志条目。条目用索引号编号。具有给定索引的日志条目最终将被提交。此时,您的 Raft 应将日志条目发送给更大的服务,以便其执行。

您应该遵循拓展Raft论文中的设计,尤其要注意图 2。您将实现论文中的大部分内容,包括保存持久状态,以及在节点失效后重新启动时读取持久状态。您将不会实现集群成员变更(第 6 节)。您将在稍后的实验中实现日志压缩/快照(第 7 节)。

您可能会发现本指南以及有关和并发结构的建议非常有用。如需更广阔的视角,请参阅 Paxos、Chubby、Paxos Made Live、Spanner、Zookeeper、Harp、Viewstamped Replication 和 Bolosky et al.

本实验应分三部分完成。您必须在相应的到期日提交每一部分。

开始

我们将为您提供骨架代码 src/raft/raft.go。我们还提供了一组测试,您应使用这些测试来推动您的实施工作,我们将使用这些测试对您提交的实验室进行评分。这些测试位于 src/raft/test_test.go。

要启动并运行,请执行以下命令。别忘了使用 git pull 获取最新软件。

1
2
3
4
5
6
7
8
9
10
$ cd src/raft
$ go test
Test (2A): initial election ...
--- FAIL: TestInitialElection2A (5.04s)
config.go:326: expected one leader, got none
Test (2A): election after network failure ...
--- FAIL: TestReElection2A (5.03s)
config.go:326: expected one leader, got none
...
$

任务

通过向 raft/raft.go 添加代码来实现 Raft。在该文件中,您将找到骨架代码,以及如何发送和接收 RPC 的示例。
您的实现必须支持以下接口,测试者和(最终)您的键/值服务器都将使用该接口。您可以在 raft.go的注释中找到更多细节。

1
2
3
4
5
6
7
8
9
10
11
12
// create a new Raft server instance:
rf := Make(peers, me, persister, applyCh)

// start agreement on a new log entry:
rf.Start(command interface{}) (index, term, isleader)

// ask a Raft for its current term, and whether it thinks it is leader
rf.GetState() (term, isLeader)

// each time a new entry is committed to the log, each Raft peer
// should send an ApplyMsg to the service (or tester).
type ApplyMsg

服务调用Make(peers,me,...) 创建一个 Raft 对等节点。peers 参数是一个 Raft 对等节点(包括此节点)的网络标识符数组,用于 RPC。me 参数是此对等节点在对等节点数组中的索引。Start(command)要求 Raft 开始处理,将命令附加到复制的日志中。Start() 应立即返回,无需等待日志追加完成。服务希望您的实现能为每个新提交的日志条目向 Make()applyCh 通道参数发送 ApplyMsg

raft.go 包含发送 RPC(sendRequestVote())和处理传入 RPC(RequestVote())的示例代码。你的 Raft 对等体应该使用 labrpc Go 软件包(源码在 src/labrpc)交换 RPC。测试人员可以告诉 labrpc 延迟 RPC、重新排序 RPC 和丢弃 RPC,以模拟各种网络故障。虽然你可以临时修改 labrpc,但请确保你的 Raft 能在原始 labrpc 上运行,因为我们将用它来对你的实验室进行测试和评分。您的 Raft 实例必须只能与 RPC 进行交互;例如,它们不能使用共享的 Go 变量或文件进行通信。

后续的实验将建立在本实验的基础上,因此给自己足够的时间编写可靠的代码非常重要。

Part 2A

任务

实现 Raft 领导者选举和心跳(无日志条目的 AppendEntries RPC)。第 2A 部分的目标是选出一个领导者,如果没有失败,领导者将继续担任领导者,如果旧的领导者失败,或者与旧的领导者之间的数据包丢失,新的领导者将接替领导者。运行 go test -run 2A 测试你的 2A 代码。

提示

  • 要直接运行 Raft 实现并不容易,而应通过测试器运行,即 go test -run 2A
  • 请按照本论文的图 2 进行操作。此时你关心的是发送和接收 RequestVote RPC、与选举有关的服务器规则以及与领导人选举有关的状态
  • raft.go 中的 Raft 结构中添加图 2 中的领导者选举状态。您还需要定义一个结构来保存每个日志条目的信息。
  • 填写 RequestVoteArgsRequestVoteReply 结构。修改 Make(),创建一个后台程序,该程序会在一段时间内没有收到其他同行的请求时,通过发送 RequestVote RPCs 来定期启动领导者选举。这样,如果已经有了领导者,一个同行就会知道谁是领导者,或者自己成为领导者。实现 RequestVote() RPC 处理程序,使服务器可以互相投票。
  • 要实现心跳,请定义一个 AppendEntries RPC 结构(虽然可能还不需要所有参数),并让领导者定期发送心跳。编写一个 AppendEntries RPC 处理器方法,重置选举超时,这样当一个服务器已经当选领导者时,其他服务器就不会站出来担任领导者。
  • 确保不同对等节点的选举超时不会总是在同一时间触发,否则所有对等节点都只会为自己投票,没有人会成为领导者。
  • 测试要求领导者每秒发送心跳 RPC 的次数不超过十次。
  • 测试要求您的 Raft 在旧的领导者失效后五秒内选出新的领导者(如果大多数对等节点仍能通信)。但请记住,如果出现投票结果不一致的情况(如数据包丢失或候选人不走运地选择了相同的随机后退时间),则可能需要多轮领导者选举。你必须选择足够短的选举超时(以及心跳间隔),这样即使需要多轮选举,也很有可能在五秒内完成。
  • 论文的第 5.2 节提到选举超时的范围是 150 到 300 毫秒。只有当领导者发送心跳的频率大大超过每 150 毫秒一次时,这样的范围才有意义。由于测试仪限制每秒只能发送 10 次心跳,因此你必须使用比论文中规定的 150 至 300 毫秒更大的选举超时,但也不能太大,因为这样你可能无法在五秒内选出领导者。
  • 您可能会发现Go的rand package - math/rand - Go Packages很有用。
  • 您需要编写代码,定期或延迟时间后执行操作。最简单的方法是创建一个带循环的 goroutine,调用 time.Sleep()。不要使用 Go 的 time.Timertime.Ticker,它们很难正确使用。
  • 阅读有关和并发结构的建议非常有用
  • 如果你的代码无法通过测试,请再读一遍论文中的图 2;领导者选举的全部逻辑分布在图的多个部分。
  • 不要忘记实现 GetState()
  • 测试器在永久关闭实例时会调用 Raft 的 rf.Kill()。您可以使用 rf.killed() 检查 Kill() 是否已被调用。您可能希望在所有循环中都这样做,以避免死亡的 Raft 实例打印出混乱的信息。
  • 调试代码的一个好方法是在对等程序发送或接收消息时插入打印语句,然后用 go test -run 2A > out 将输出收集到一个文件中。然后,通过研究 out 文件中的消息轨迹,你可以找出你的实现与所需协议的偏差。你可能会发现 util.go 中的 DPrintf 对调试不同问题时打开或关闭打印很有用。
  • Go RPC 只发送名称以大写字母开头的结构体字段。子结构的字段名也必须大写(例如数组中日志记录的字段)。labgob 软件包会对此发出警告,请不要忽视这些警告。
  • 使用 go test -race 检查代码,并修复它报告的任何竞争问题。

在提交第 2A 部分之前,请确保您通过了 2A 测试,这样您就会看到如下内容:

1
2
3
4
5
6
7
8
$ go test -run 2A
Test (2A): initial election ...
... Passed -- 4.0 3 32 9170 0
Test (2A): election after network failure ...
... Passed -- 6.1 3 70 13895 0
PASS
ok raft 10.187s
$

每行 “Passed”(通过)包含五个数字;它们分别是以秒为单位的测试时间、Raft 对等体的数量(通常为 3 或 5)、测试期间发送的 RPC 数量、RPC 消息的总字节数,以及 Raft 报告已提交的日志条目数量。您的数据将与此处显示的数据不同。您可以忽略这些数字,但它们可以帮助您理智地检查您的实现所发送的 RPC 数量。对于所有实验 2、3 和 4,如果所有测试花费的时间超过 600 秒(go test),或者任何单个测试花费的时间超过 120 秒,评分脚本将使您的解决方案失败。

思路

2A整体任务只有两个,领导选举和发送心跳
其中每一个都可以细分为四个步骤,主要是需要明白每个步骤、不同角色、任期关系需要做什么。

经过调试,心跳间隔和选举超时设置为如下时间,比较合理。

1
2
3
4
5
// 心跳间隔
const heartBeat = 50
// 选举超时 = 基本150ms + 随机0-200ms
const basicTimeout = 150
const randomTimeout = 200
领导选举

Raft需要启动一个后台ticker线程用来实时监测超时(Follower)或触发心跳广播(Leader)。
因此选举的流程如下:

  1. ticker线程发现当前rolefollower,并且已经发生选举超时。
  2. 发起领导选举:任期加一,给自己投票,向其他节点发送投票请求并处理(另起线程)
  3. 处理投票结果:如果收到半数以上的选票,则选举成功;
  4. 选举成功:本实验中,只需要发送一次心跳即可。
  • startElection():触发领导选举,当Follower或Candidate选举超时时由ticker()触发触发
  • SendRequestVote():发送请求选票RPC,不需要我们实现
  • RequestVote():处理请求选票RPC的handler,也就是节点收到来自其他Candidate的选票请求时,如何处理
  • handleVoteResult():Candidate收到选票请求回复时,如何处理
  • becomeLeader():选举成功,成为Leader后的操作
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42

    // start a new election
    // 开启一轮选举
    func (rf *Raft) startElection() {
    rf.mu.Lock()
    defer rf.mu.Unlock()
    DPrintf("[Election]:Peer[%d] start election for term[%d] | %s\n",rf.me,rf.currentTerm+1,time.Now().Format("15:04:05.000"))
    //Add term,增加任期号
    rf.currentTerm++
    //Vote for self,先给自己投票
    rf.votedFor = rf.me
    rf.voteCount = 1
    //Become a candidate,修改成为Candidate
    rf.role = CANDIDATE
    //Refresh election timeout,刷新选举超时时间
    rf.refreshTimeout()
    //Send RequestVote for all other peers,并行发送请求选票RPC
    // a flag for becomeLeader
    var becomeLeaderOnce sync.Once

    var args = RequestVoteArgs{
    Term: rf.currentTerm,
    CandidateId: rf.me,
    LastLogTerm: rf.log[len(rf.log)-1].Term,
    LastLogIndex: len(rf.log)-1,
    }
    for i:=0;i<len(rf.peers);i++{
    if i==rf.me{
    continue
    }
    //Open a separate goroutines to handle,开启一个单独的线程,发送RPC并等待回复回调
    go func(server int,args *RequestVoteArgs) {
    var reply = RequestVoteReply{}
    DPrintf("[Send RequestVote]:Peer[%d] send RequestVote to Peer[%d] | %s\n",rf.me,server,time.Now().Format("15:04:05.000"))
    ok:= rf.sendRequestVote(server,args,&reply)
    if ok{
    rf.handleVoteResult(&reply)
    }
    }(i,&args)
    }

    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
//RequestVote RPC hanler
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
// Your code here (2A, 2B).
rf.mu.Lock()
defer rf.mu.Unlock()
term := args.Term
lastLogIndex := args.LastLogIndex
lastLogTerm := args.LastLogTerm
candidateId := args.CandidateId

DPrintf("[Receive RequestVote]:Peer[%d] receive RequestVote RPC from Peer[%d] | %s\n",rf.me,candidateId,time.Now().Format("15:04:05.000"))
// Receive an outdated request from old candidate,ignore
// 接收到了来自往届Candidate的选票请求,忽略并返回新的任期号,以便其放弃选举
if term < rf.currentTerm{
reply.VoteGranted = false
reply.Term = rf.currentTerm
return
}
// Whether have already voted in the current term,在当前任期是否已经给其他候选人投过票
var hasVoted = (term==rf.currentTerm&& rf.votedFor!=-1)
// Whether candidate's log is updated.检查当前候选人的日志否是比自己的新
var checkCandidateLog = true

if lastLogTerm < rf.log[len(rf.log)-1].Term{
checkCandidateLog = false
}
if lastLogTerm == rf.log[len(rf.log)-1].Term && lastLogIndex < len(rf.log)-1{
checkCandidateLog = false
}

// Received a same term candidate's request,if hasn't voted, then vote for
// 收到了同任期选举人的投票请求,如果没投票则投票给Candidate(但我认为这种情况不会发生)
// 除非A在Term=2时,发送了超时,开始选举后,增加了自己的Term=3后马上就挂了,这时它还没来得及把票投给自己
// 此时另一个Term=2节点也刚好超时了, 然后发送RequestVote请求给A,A此时可能会投票给他
if term == rf.currentTerm{
if !hasVoted && checkCandidateLog{
rf.votedFor = candidateId
reply.VoteGranted = true
DPrintf("[Vote]:Peer[%d] give vote to Peer[%d] for term[%d] | %s\n",rf.me,candidateId,term,time.Now().Format("15:04:05.000"))
}else{
reply.VoteGranted = false
}
reply.Term=rf.currentTerm
return
}

// Received a updated term candidate's request,follow him and vote for him if log check is ok
// 收到了更高Term的候选人的投票请求,如果其日志检查没问题,则投票给他,并修改自己的Term,不管自己是什么角色,都变为Follower,因为此时集群中出现了更高Term的人。
if term > rf.currentTerm{
rf.currentTerm = term
rf.role = FOLLOWER
rf.votedFor = -1
if checkCandidateLog{
rf.votedFor = candidateId
}
rf.refreshTimeout()
reply.Term = rf.currentTerm
reply.VoteGranted = (rf.votedFor==candidateId)
if (reply.VoteGranted){
DPrintf("[Vote]:Peer[%d] give vote to Peer[%d] for term[%d] | %s\n",rf.me,candidateId,term,time.Now().Format("15:04:05.000"))
}
return
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// handle vote result
// Candidate处理投票回复
func (rf *Raft) handleVoteResult(reply *RequestVoteReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
term := reply.Term
voteGranted := reply.VoteGranted
// Give up candidate,and back to follower
// 如果收到了更高Term的回复,说明已经存在更高Term的Leader,则放弃选举,变为Follower
if term>rf.currentTerm{
rf.role = FOLLOWER
rf.currentTerm = term
rf.votedFor = -1
rf.voteCount = 0
rf.refreshTimeout()
return
}

// Received an old term reply,ignore
// 收到了过期Term节点的回复,忽略。这个其实也不太可能发生,因为在RequestVote中,总会将自己的Term更新到与Candidate一致,那么在回复的时候,回复者的Term>=rf.currentTerm
if term < rf.currentTerm{
return
}

// Received a same term reply
// 收到了同一Term的回复,处理选票结果
// 当第一次达到半数后,就可以宣布成为Leader,然后广播一次心跳信息。
if term == rf.currentTerm{
if voteGranted{
rf.voteCount++
if rf.voteCount>rf.n/2 && rf.role==CANDIDATE{
// only execute once
becomeLeaderOnce.Do(rf.becomeLeader)
}
}
}
return
}
1
2
3
4
5
6
7
8
9
10
11
12
// One peer electron success and become leader
func (rf *Raft) becomeLeader(){
DPrintf("[Leader][becomeLeader()]:Peer[%d] become leader for term[%d] | %s\n", rf.me, rf.currentTerm, time.Now().Format("15:04:05.000"))
rf.role = LEADER
rf.refreshTimeout()
//2B
for i := 0; i < rf.n; i++ {
rf.nextIndex[i] = rf.getLogLength()
rf.matchIndex[i] = 0
}
go rf.broadcastAppendEntries()
}
广播心跳

这里的心跳RPC复用了AppendEntries,将AppendEntriesArgs中的Entries赋nil,则代表这是一条心跳RPC,而不是附加日志的RPC(在后续实验会实现)

  • broadcastAppendEntries():向其他节点广播RPC(心跳/附加日志)
  • SendAppendEntries():发送附加日志RPC,不需要我们实现
  • AppendEntries():处理RPC的handler,也就是节点收到来自Leader的附加日志时,如何处理
  • handleAppendEntries:Leader收到回复时,如何处理
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    // Broadcast AppendEntries RPC to other peers
    // 广播心跳实验2A,后续实验会进行拓展
    func (rf *Raft) broadcastAppendEntries() {
    rf.mu.Lock()
    defer rf.mu.Unlock()
    for i:=0;i<len(rf.peers);i++{
    if i==rf.me{
    continue
    }
    var args = AppendEntriesArgs{
    Term: rf.currentTerm,
    LeaderId: rf.me,
    Entries: nil,
    }
    var server = i
    go func(server int, args *AppendEntriesArgs) {
    var reply = AppendEntriesReply{}
    ok:=rf.sendAppendEntries(server,args,&reply)
    if ok{
    rf.handleAppendEntries(&reply)
    }
    }(server,&args)
    }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// AppendEntries RPC hander
// AppendEntries RPC处理
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
term := args.Term
entries := args.Entries

// Represent heartbeat RPC
if entries==nil{
// 1.收到了来自过期Leader的心跳信息,回复它更新的Term以便更新
// 2.收到了相同任期的Leader,变为Follower(如果当前为Candidate话,因为当前任期已经有Leader了,不可能选举成功了),刷新超时时间
// 3.收到了更高任期的Leader,变为Follower,更新任期,更新超时时间
if term < rf.currentTerm{
reply.Term=rf.currentTerm
reply.Success = false
DPrintf("[HeartBeat]:Peer[%d] in term[%d] receive pre HeartBeat from Leader[%d] | %s\n",rf.me,rf.currentTerm,args.LeaderId,time.Now().Format("15:04:05.000"))
}else if term == rf.currentTerm{
rf.role=FOLLOWER
rf.refreshTimeout()
reply.Term = rf.currentTerm
reply.Success = true
DPrintf("[HeartBeat]:Peer[%d] in term[%d] receive HeartBeat from Leader[%d] | %s | nextTimeout[%s]\n",rf.me,rf.currentTerm,args.LeaderId,time.Now().Format("15:04:05.000"),rf.nextTimeout.Format("15:04:05.000"))
}else{
rf.role=FOLLOWER
rf.refreshTimeout()
rf.currentTerm=term
rf.votedFor=-1
reply.Term = rf.currentTerm
reply.Success = true
DPrintf("[HeartBeat]:Peer[%d] in term[%d] receive HeartBeat from Leader[%d] | %s | nextTimeout[%s]\n",rf.me,rf.currentTerm,args.LeaderId,time.Now().Format("15:04:05.000"),rf.nextTimeout.Format("15:04:05.000"))
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Handle other peers's AppendEntries RPC reply
// Leader收到回复后的处理
func (rf *Raft) handleAppendEntries(reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
term := reply.Term

// Received a larger term peers, means that it has expired and there is a latest leader exists in the cluster.
// 收到了更高Term的回复,说明自己的领导位置已经过期了,变成Follower,修改Term
if term > rf.currentTerm {
rf.role = FOLLOWER
rf.currentTerm = term
rf.votedFor = -1
rf.voteCount = 0
rf.refreshTimeout()
return
}

// todo
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// The ticker go routine starts a new election if this peer hasn't received
// heartsbeats recently.
func (rf *Raft) ticker() {
var count = 0
for rf.killed() == false {
// Your code here to check if a leader election should
// be started and to randomize sleeping time using
// time.Sleep().
// 模拟定时,每10ms循环一次,检查超时或是否发送心跳
time.Sleep(onceDuration*time.Millisecond)
count++
rf.mu.Lock()
// 模拟定时,当每执行5次,也就是50ms时,Leader发送一次心跳信息
if count%(heartBeat/onceDuration)==0 && rf.role == LEADER{
count=0
//Leader send heartbeat
DPrintf("[Send HeartBeat]:Peer(Leader)[%d] send HeartBeat | %s\n",rf.me,time.Now().Format("15:04:05.000"))
rf.mu.Unlock()
rf.broadcastAppendEntries()
continue

}
// 检查是否超时,由于每10ms检查一次,因此肯定会有10ms以内的误差
if rf.role != LEADER && time.Now().After(rf.nextTimeout){
DPrintf("[Timeout]:Peer[%d] timeout:%v | %s\n",rf.me,rf.nextTimeout,time.Now().Format("15:04:05.000"))
rf.mu.Unlock()
rf.startElection()
continue
}
rf.mu.Unlock()
}
}

其他部分的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
const onceDuration = 10
const heartBeat = 50
const basicTimeout = 150
const randomTimeout = 200
//
// as each Raft peer becomes aware that successive log entries are
// committed, the peer should send an ApplyMsg to the service (or
// tester) on the same server, via the applyCh passed to Make(). set
// CommandValid to true to indicate that the ApplyMsg contains a newly
// committed log entry.
//
// in part 2D you'll want to send other kinds of messages (e.g.,
// snapshots) on the applyCh, but set CommandValid to false for these
// other uses.
//
type ApplyMsg struct {
CommandValid bool
Command interface{}
CommandIndex int

// For 2D:
SnapshotValid bool
Snapshot []byte
SnapshotTerm int
SnapshotIndex int
}
type Role int
const (
FOLLOWER Role = iota
LEADER
CANDIDATE
)

//
// A Go object implementing a single Raft peer.
//
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
dead int32 // set by Kill()

// Your data here (2A, 2B, 2C).
// Look at the paper's Figure 2 for a description of what
// state a Raft server must maintain.
currentTerm int
role Role
votedFor int
log []LogEntry
nextTimeout time.Time

// vote info n is the number of peers, voteCount is the number of vote received
n int
voteCount int
}

//refresh or init the electron timeout
func (rf *Raft) refreshTimeout(){
randomMillis := basicTimeout + rand.Intn(201)
randomDuration := time.Duration(randomMillis) * time.Millisecond
rf.nextTimeout = time.Now().Add(randomDuration)
}

// return currentTerm and whether this server
// believes it is the leader.
func (rf *Raft) GetState() (int, bool) {

var term int
var isleader bool
// Your code here (2A).
rf.mu.Lock()
defer rf.mu.Unlock()
term = rf.currentTerm
isleader = rf.role==LEADER
return term, isleader
}

细节

Follower什么时候才会投票

  • term < currentTerm拒绝投票
  • votedFor不为空,说明已经投过票了,拒绝
  • 验证选举限制,简而言之就是只给日志信息更新更全的人投票
    • lastLogTerm<当前最后一条日志的term,拒绝
    • lastLogTerm=当前最后一条日志的term,且lastLogIndex<当前日志列表长度-1,拒绝
  • 同意投票

来自leader的心跳信息需不需要添加到log

不需要

结果

本实验未使用chan,time.Timer等实现,仅简单实用锁、time.Sleep进行实现
通过并行次数4,总次数20次的测试


Part 2B

任务

实现Leader和Follower的日志追加功能,通过2B测试

提示

  • 您的首要目标应该是通过 TestBasicAgree2B()。首先要实现 Start(),然后按照图 2 编写代码,通过 AppendEntries RPC 发送和接收新的日志条目。
  • 你需要实现选举限制(论文章节5.4.1)
  • 在早期 Lab 2B 测试中,一种无法达成一致的方法是反复举行选举,即使领导者还活着。查找选举计时器管理中的错误,或在赢得选举后不立即发送心跳的错误。
  • 您的代码中可能有重复检查某些事件的循环。不要让这些循环不间断地执行而不暂停,因为这样会减慢实现速度,导致测试失败。请使用 Go 的条件变量,或在每个循环迭代中插入 time.Sleep(10 * time.Millisecond)。
  • 帮自己一个忙,在今后的实验中编写(或重新编写)简洁明了的代码。您可以重新访问参考我们的structurelocking, 和 guide 页面指导。

如果您的代码运行太慢,即将进行的实验室测试可能会失败。您可以使用 time 命令检查解决方案占用了多少实时时间和 CPU 时间。以下是典型的输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
$ time go test -run 2B
Test (2B): basic agreement ...
... Passed -- 1.6 3 18 5158 3
Test (2B): RPC byte count ...
... Passed -- 3.3 3 50 115122 11
Test (2B): agreement despite follower disconnection ...
... Passed -- 6.3 3 64 17489 7
Test (2B): no agreement if too many followers disconnect ...
... Passed -- 4.9 5 116 27838 3
Test (2B): concurrent Start()s ...
... Passed -- 2.1 3 16 4648 6
Test (2B): rejoin of partitioned leader ...
... Passed -- 8.1 3 111 26996 4
Test (2B): leader backs up quickly over incorrect follower logs ...
... Passed -- 28.6 5 1342 953354 102
Test (2B): RPC counts aren't too high ...
... Passed -- 3.4 3 30 9050 12
PASS
ok raft 58.142s

real 0m58.475s
user 0m2.477s
sys 0m1.406s
$

ok raft 58.142s 表示 Go 测得的 2B 测试时间为 58.142 秒的实际(挂钟)时间。user 0m2.477s 表示代码消耗了 2.477 秒的 CPU 时间,即实际执行指令(而不是等待或休眠)的时间。如果您的解决方案在 2B 测试中使用的实际时间远超过一分钟,或 CPU 时间远超过 5 秒,那么您以后可能会遇到麻烦。请注意睡眠或等待 RPC 超时所花费的时间、在不睡眠或等待条件或通道消息的情况下运行的循环,或发送的大量 RPC。

思路

按照论文中的实现,理解nextIndexmatchIndex的意思

BUG排查

  1. Leader发送AppendEntries RPC后导致Follower超时。 - 已解决,broadcastAppendEntries死锁

    1
    2
    2023/11/07 23:18:27 [Send AppendEntries(Start)]:Peer(Leader)[0] send ppendEntries | 23:18:27.962
    Leader发送AppendEntries RPC后导致Follower超时。
  2. 没有最新日志的节点超时后,请求选举,导致选举不上又不断超时发生选举

    (2B): agreement after follower reconnects ...
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    2023/11/10 14:20:24 [Send AppendEntries(Start)]:Peer(Leader)[2] send AppendEntries | 14:20:24.097
    2023/11/10 14:20:24 [Start Return (Start)]:Peer(Leader)[2] start return | 14:20:24.098
    2023/11/10 14:20:24 [AppendEntries Send] Leader[2] send an AppendEntries to Peer[1] | 14:20:24.098
    2023/11/10 14:20:24 [AppendEntries Send] Leader[2] send an AppendEntries to Peer[0] | 14:20:24.098
    2023/11/10 14:20:24 [AppendEntries Received]:Peer[1] in term[1] receive AppendEntries from Leader[2] | 14:20:24.101
    2023/11/10 14:20:24 [Add Entries]:Peer[1] in term[1] update AppendEntries from Leader[2] | 14:20:24.101
    2023/11/10 14:20:24 [Send HeartBeat]:Peer(Leader)[2] send HeartBeat | 14:20:24.111
    2023/11/10 14:20:24 [HeartBeat]:Peer[1] in term[1] receive HeartBeat from Leader[2] | 14:20:24.113 | nextTimeout[14:20:24.485]
    2023/11/10 14:20:24 [Send HeartBeat]:Peer(Leader)[2] send HeartBeat | 14:20:24.189
    2023/11/10 14:20:24 [HeartBeat]:Peer[1] in term[1] receive HeartBeat from Leader[2] | 14:20:24.192 | nextTimeout[14:20:24.515]
    2023/11/10 14:20:24 [Send HeartBeat]:Peer(Leader)[2] send HeartBeat | 14:20:24.267
    2023/11/10 14:20:24 [HeartBeat]:Peer[1] in term[1] receive HeartBeat from Leader[2] | 14:20:24.270 | nextTimeout[14:20:24.668]
  3. TestFailNoAgree2B偶尔失败

  4. TestRejoin2B偶尔失败

测试结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Test (2B): basic agreement ...
... Passed -- 0.3 3 16 4362 3
Test (2B): RPC byte count ...
... Passed -- 0.9 3 50 114502 11
Test (2B): agreement after follower reconnects ...
... Passed -- 5.1 3 223 61055 8
Test (2B): no agreement if too many followers disconnect ...
... Passed -- 3.3 5 372 77214 4
Test (2B): concurrent Start()s ...
... Passed -- 0.6 3 34 9899 6
Test (2B): rejoin of partitioned leader ...
... Passed -- 4.0 3 286 64853 4
Test (2B): leader backs up quickly over incorrect follower logs ...
... Passed -- 9.9 5 3141 3316487 102
Test (2B): RPC counts aren't too high ...
... Passed -- 2.0 3 96 29954 12
PASS
ok 6.824/raft 26.180s

Part 2C

如果基于 Raft 的服务器重启,它应该从原来的位置恢复服务。这就要求 Raft 保持持久的状态,以便在重启后继续运行。本文的图 2 提到了哪些状态应该是持久的。
真正的实现会在 Raft 的持久化状态发生变化时将其写入磁盘,并在重启后从磁盘读取状态。您的实现不会使用磁盘;相反,它将从 Persister 对象(参见 persister.go)中保存和恢复持久化状态。无论谁调用 Raft.Make(),都要提供一个 Persister,它最初保存着 Raft 最近的持久化状态(如果有的话)。Raft 应从该 Persister 初始化其状态,并在每次状态改变时使用该 Persister 保存其持久化状态。请使用 Persister 的 ReadRaftState() 和 SaveRaftState() 方法。

任务

通过添加保存和恢复持久化状态的代码,完成 raft.go 中的函数 persist() 和 readPersist()。您需要将状态编码(或 “序列化”)为字节数组,以便将其传递给 Persister。使用 labgob 编码器;请参阅 persistence() 和 readPersist() 中的注释。labgob 类似 Go 的 gob 编码器,但如果尝试对字段名为小写的结构进行编码,则会打印错误信息。
在实现更改持久化状态的位置插入对 persist() 的调用。完成这些工作后,其余的测试就可以通过了。

提示

  • 许多 2C 测试都涉及服务器故障和网络丢失 RPC 请求或回复。
  • 你可能需要一次备份多个条目的 nextIndex 的优化。请阅读从第 7 页底部和第 8 页顶部开始的扩展 Raft 论文(用灰色线标出)。该论文对细节的描述比较含糊;您需要填补空白,或许可以借助 6.824 Raft 讲座。
  • 实验 2 全套测试(2A+2B+2C)的合理耗时为 4 分钟实时时间和 1 分钟 CPU 时间。

您的代码应通过所有 2C 测试(如下所示)以及 2A 和 2B 测试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
$ go test -run 2C
Test (2C): basic persistence ...
... Passed -- 7.2 3 206 42208 6
Test (2C): more persistence ...
... Passed -- 23.2 5 1194 198270 16
Test (2C): partitioned leader and one follower crash, leader restarts ...
... Passed -- 3.2 3 46 10638 4
Test (2C): Figure 8 ...
... Passed -- 35.1 5 9395 1939183 25
Test (2C): unreliable agreement ...
... Passed -- 4.2 5 244 85259 246
Test (2C): Figure 8 (unreliable) ...
... Passed -- 36.3 5 1948 4175577 216
Test (2C): churn ...
... Passed -- 16.6 5 4402 2220926 1766
Test (2C): unreliable churn ...
... Passed -- 16.5 5 781 539084 221
PASS
ok raft 142.357s
$

思路

这一部分实际不难,只要按照persist 和 readPersist中的注释实现即可。并在所有需要持久化的数据部分调用persist持久化到磁盘即可。readPersist不需要我们调用,会在ticker中调。但Lab2C的测试比较严格,大概率会发现前面Lab2B中的很多问题,如需要加速日志对比过程,否则Figure 8 (unreliable)大概率过不了,还有超时时间和心跳频率是否合理。具体见lab2-one

Part 2D

就目前情况而言,重启的服务器会重播完整的 Raft 日志以恢复其状态。然而,对于一个长期运行的服务来说,永远记住完整的 Raft 日志是不现实的。取而代之的是,你需要修改 Raft,以便与那些不时持续存储其状态 “快照 “的服务合作,此时 Raft 会丢弃快照之前的日志条目。这样做的结果是持久化数据量更小,重启速度更快。然而,现在追随者有可能落后太多,以至于领导者丢弃了追赶所需的日志条目;这时领导者必须发送快照和从快照时间开始的日志。Raft 扩展论文的第 7 节概述了这一方案,具体细节需要您自行设计。

请参考 Raft 交互示意图,以了解复制服务和 Raft 的通信方式。

您的 Raft 必须提供以下函数,以便服务可以调用其状态的序列化快照:

1
Snapshot(index int, snapshot []byte)

在Lab 2D中,测试工具定期调用Snapshot(),在实验3中你会实现一个key/value服务来调用Snapshot(),这个快照会包含整个key/value对的表。服务层会在每个对等体(而不仅仅是领导者)上调用Snapshot()

index参数表示快照中反映的最高日志条目。Raft 应丢弃该点之前的日志条目。您需要修改 Raft 代码,以便在只存储日志尾部的情况下运行。

您需要实现论文中讨论的InstallSnapshot RPC,它允许 Raft 领导者告诉落后的 Raft 对等节点用快照替换其状态。您可能需要考虑 InstallSnapshot 应如何与图 2 中的状态和规则交互。

当追随者的 Raft 代码接收到 InstallSnapshot RPC 时,它可以使用 applyCh 将快照发送到 ApplyMsg 服务中。ApplyMsg 结构定义已经包含了您需要的字段(也是测试人员所期望的)。请注意,这些快照只会推进服务的状态,而不会导致服务后退。

如果服务器崩溃,它必须从持久化数据重新启动。您的 Raft 应同时持久化 Raft 状态和相应的快照。请使用 persister.SaveStateAndSnapshot(),它为 Raft 状态和相应的快照分别接收不同的参数。如果没有快照,则传递 nil 作为snapshot参数。

当服务器重新启动时,应用层会读取持久化快照并恢复其保存的状态。

以前,本实验室曾建议您实现一个名为 CondInstallSnapshot 的函数,以避免快照和应用重启时发送的日志条目相互协调的要求。这个残留的 API 接口仍然存在,但我们不建议您实现它:相反,我们建议您只需让它返回 true 即可。

任务

实现 Snapshot() 和 InstallSnapshot RPC,并对 Raft 进行修改以支持这些操作(例如,使用修剪过的日志进行操作)。当您的解决方案通过 2D 测试(以及之前的所有 Lab 2 测试)时,它就完成了。

提示

  • 一个好的开始是你可以修改你的代码以便于能够只存储从某个索引X开始的日志部分。初始你可以设置X为0然后运行2B/2C测试,然后使得Snapshot(index)丢弃在index之前的日志,并设置X为index,如果这些一切顺利,那么就可以通过第一个2D测试。
  • 您不能将日志存储在 Go slice 中,也不能将 Go slice 索引与 Raft 日志索引互换使用;您需要对 slice 进行索引,以便考虑到日志中被丢弃的部分。
  • 下一步:如果Leader没有更新追随者所需的日志条目,就让领导者发送 InstallSnapshot RPC。
  • 在单个 InstallSnapshot RPC 中发送整个快照。不要执行图 13 中用于分割快照的偏移机制。
  • Raft 必须以允许 Go 垃圾回收器释放并重新使用内存的方式丢弃旧日志条目;这就要求丢弃的日志条目不存在可触及的引用(指针)。
  • 即使日志被修剪,您的实现仍需要在 AppendEntries RPC 中的新条目之前正确发送条目的Term和Index;这可能需要保存和引用最新快照的 lastIncludedTerm/lastIncludedIndex (考虑是否应持久化)。
  • Lab 2 全套测试(2A+2B+2C+2D)在不使用 -race 时的合理耗时为 6 分钟实时时间和 1 分钟 CPU 时间。使用 -race 时,实际耗时约为 10 分钟,CPU 耗时约为 2 分钟。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
$ go test -run 2D
Test (2D): snapshots basic ...
... Passed -- 11.6 3 176 61716 192
Test (2D): install snapshots (disconnect) ...
... Passed -- 64.2 3 878 320610 336
Test (2D): install snapshots (disconnect+unreliable) ...
... Passed -- 81.1 3 1059 375850 341
Test (2D): install snapshots (crash) ...
... Passed -- 53.5 3 601 256638 339
Test (2D): install snapshots (unreliable+crash) ...
... Passed -- 63.5 3 687 288294 336
Test (2D): crash and restart all servers ...
... Passed -- 19.5 3 268 81352 58
PASS
ok 6.824/raft 293.456s

思路

Follower多久进行一次快照

  • 论文中提到当日志达到固定大小时,进行快照

Snapshot由谁触发

  • 由上层应用进行触发,当上层应用认为可以将已经提交的日志进行压缩,那么就会调用节点的Snapshot()的函数。
  • 快照中包含了上层应用的最新状态,意味着raft中的日志已经成功地应用到了上层应用中,这些日志已经不再需要存储了。

需要做一个index的转换

  • storeIndex:是日志存储在log中的实际索引,storeIndex=logIndex-lastIncludedIndex
  • logIndex:是日志的逻辑索引,logIndex=storeIndex+lastIncludedIndex
    对log切片操作使用storeIndex,其他情况使用logIndex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// get log entry by logical index
func (rf *Raft) getEntry(logIndex int) LogEntry {
return rf.log[rf.getStoreIndex(logIndex)]
}
// get physical index by logical index
func (rf *Raft) getStoreIndex(logIndex int) int {
return logIndex - rf.lastIncludedIndex
}
// get logical index by physical index
func (rf *Raft) getLogIndex(storeIndex int) int {
return storeIndex + rf.lastIncludedIndex
}
// return the logical index of the last log entry
func (rf *Raft) lastEntryLogIndex() int {
return rf.getLogIndex(len(rf.log) - 1)
}

// return the term of the last log entry
func (rf *Raft) lastEntryTerm() int {
return rf.log[len(rf.log) - 1].Term
}

// return the logical length of log
func (rf *Raft) getLogLength() int {
return len(rf.log)+rf.lastIncludedIndex
}

Snapshot:接收上层应用发来的快照,index表示上层应用已经应用的条目indexsnapshot表示在应用了该index为止的条目下上层应用的最新数据。

  • 判断index是否合法。
  • 更新lastIncludedTermlastIncludedIndexsnapshot
  • 压缩日志,更新commitIndexlastApplied。因为既然上层应用已经应用了所有index之前的日志。
  • 持久化。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    // the service says it has created a snapshot that has
    // all info up to and including index. this means the
    // service no longer needs the log through (and including)
    // that index. Raft should now trim its log as much as possible.
    func (rf *Raft) Snapshot(index int, snapshot []byte) {
    // Your code here (2D).
    rf.mu.Lock()
    defer rf.mu.Unlock()
    // replace log with snapshot
    DPrintf("[Snapshot start][Snapshot()]:Peer[%d] start save Snapshot with index:[%d] and lastIndex:[%d] log:[%d]| %s\n",rf.me,index,rf.lastIncludedIndex,len(rf.log),time.Now().Format("15:04:05.000"))
    var logIndex = rf.getStoreIndex(index)
    if logIndex<=0 || logIndex >=len(rf.log){
    return
    }
    DPrintf("[Snapshot Doing][Snapshot()]:Peer[%d] is saving Snapshot with index:[%d]| %s\n",rf.me,index,time.Now().Format("15:04:05.000"))
    rf.lastIncludedTerm = rf.getEntry(index).Term
    rf.lastIncludedIndex = index
    rf.snapshot = snapshot
    rf.log = append([]LogEntry{{Term: rf.lastIncludedTerm}}, rf.log[logIndex+1:]...)
    // todo: check the following code if necessary
    if rf.commitIndex < rf.lastIncludedIndex{
    rf.commitIndex = rf.lastIncludedIndex
    }
    if rf.lastApplied < rf.lastIncludedIndex{
    rf.lastApplied = rf.lastIncludedIndex
    }
    w := new(bytes.Buffer)
    e := labgob.NewEncoder(w)
    e.Encode(rf.currentTerm)
    e.Encode(rf.votedFor)
    e.Encode(rf.log)
    e.Encode(rf.lastIncludedIndex)
    e.Encode(rf.lastIncludedTerm)
    data := w.Bytes()
    rf.persister.SaveStateAndSnapshot(data,rf.snapshot)
    DPrintf("[Snapshot success][Snapshot()]:Peer[%d] SaveStateAndSnapshot [index:%d | term:%d] log:%v| %s\n",rf.me,rf.lastIncludedIndex,rf.lastIncludedTerm,rf.log,time.Now().Format("15:04:05.000"))
    }

InstallSnapshot:安装快照,落后太多的节点需要通过安装快照来快速跟进。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
type InstallSnapshotArgs struct {
Term int
LeaderId int
LastIncludedIndex int
LastIncludedTerm int
Data []byte
}

type InstallSnapshotReply struct {
Term int
}
// InstallSnapshot RPC hander
func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
reply.Term = rf.currentTerm
// from old Term
if args.Term < rf.currentTerm{
DPrintf("[InstallSnapshot][InstallSnapshot()]:Peer[%d] in term[%d] receive pre Snapshot from Leader[%d] in term[%d]| %s\n", rf.me, rf.currentTerm, args.LeaderId, args.Term, time.Now().Format("15:04:05.000"))
return
}
DPrintf("[InstallSnapshot start][InstallSnapshot()]:Peer[%d] in term[%d] start install a Snapshot from Leader[%d] in term[%d] rf.lastIndex:[%d] args.lastIndex:[%d]| %s\n", rf.me, rf.currentTerm, args.LeaderId, args.Term,rf.lastIncludedIndex,args.LastIncludedIndex, time.Now().Format("15:04:05.000"))

rf.role = FOLLOWER
rf.refreshTimeout()

if args.LastIncludedIndex <= rf.commitIndex{
DPrintf("[InstallSnapshot]:Peer[%d] in term[%d] receive old Snapshot with lastIndex[%d] but Peer's commitIndex[%d] and | %s\n", rf.me, rf.currentTerm, args.LastIncludedIndex,rf.commitIndex, time.Now().Format("15:04:05.000"))
return
}

if args.LastIncludedIndex>rf.lastIncludedIndex{
// update snapshot
var storeIndex = rf.getStoreIndex(args.LastIncludedIndex)
var lastIndex = rf.lastEntryLogIndex()
rf.lastIncludedIndex = args.LastIncludedIndex
rf.lastIncludedTerm = args.LastIncludedTerm
if args.LastIncludedIndex >= lastIndex{
// remove all local log
rf.log=[]LogEntry{{Term: rf.lastIncludedTerm}}

}else{
rf.log=append([]LogEntry{{Term: rf.lastIncludedTerm}},rf.log[storeIndex+1:]...)
}
// update lastApplied and commitIndex
rf.snapshot = args.Data
rf.lastApplied = rf.lastIncludedIndex
rf.commitIndex = rf.lastIncludedIndex

// apply snapshot
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(rf.currentTerm)
e.Encode(rf.votedFor)
e.Encode(rf.log)
e.Encode(rf.lastIncludedIndex)
e.Encode(rf.lastIncludedTerm)
data := w.Bytes()
rf.persister.SaveStateAndSnapshot(data,rf.snapshot)

var appMsg = ApplyMsg{
CommandValid: false,
Snapshot: args.Data,
SnapshotValid: true,
SnapshotIndex: args.LastIncludedIndex,
SnapshotTerm: args.LastIncludedTerm,
}
rf.mu.Unlock()
rf.applyCh<-appMsg
rf.mu.Lock()

DPrintf("[InstallSnapshot]:Peer[%d] in term[%d] receive and install a Snapshot[%d] from Leader[%d] in term[%d]| %s\n", rf.me, rf.currentTerm, args.LastIncludedIndex,args.LeaderId, args.Term, time.Now().Format("15:04:05.000"))
}else{
DPrintf("[InstallSnapshot]:Peer[%d] in term[%d] receive old Snapshot from Leader[%d] in term[%d]| %s\n", rf.me, rf.currentTerm, args.LeaderId, args.Term, time.Now().Format("15:04:05.000"))
return
}
}

// Send a InstallSnapshot RPC to a server.
func (rf *Raft) sendInstallSnapshot(server int, args *InstallSnapshotArgs, reply *InstallSnapshotReply) bool {
ok := rf.peers[server].Call("Raft.InstallSnapshot", args, reply)
return ok
}

测试结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Test (2C): basic persistence ...
... Passed -- 2.8 3 112 29817 6
Test (2C): more persistence ...
... Passed -- 15.3 5 1682 352486 16
Test (2C): partitioned leader and one follower crash, leader restarts ...
... Passed -- 2.2 3 55 12935 4
Test (2C): Figure 8 ...
... Passed -- 31.9 5 2558 570007 70
Test (2C): unreliable agreement ...
... Passed -- 1.3 5 1327 453328 246
Test (2C): Figure 8 (unreliable) ...
... Passed -- 41.3 5 24456 102647293 326
Test (2C): churn ...
... Passed -- 16.3 5 16439 93039197 2731
Test (2C): unreliable churn ...
... Passed -- 16.1 5 12240 36402164 1943
PASS
ok 6.824/raft 127.230s

0 1 2 3 4
x x x x x len=5

0 1 2 len=3 , lastIndex = 2
x x x

bug有一个节点一直收不到心跳,死锁了?
out2D_12:为什么反复apply 10的信息
out2D_13:apply error: server 1 snapshot doesn’t match m.SnapshotIndex