Lab 2.Raft
Lab 2 - Raft
介绍
这是一系列实验室中的第一个,您将在其中建立一个容错键/值存储系统。在本实验中,你将实现 Raft(一种复制状态机协议)。在下一个实验室中,你将在 Raft 的基础上构建一个键/值服务。然后,您将在多个复制状态机上 “分片 “您的服务,以获得更高的性能。
复制服务通过在多个复制服务器上存储其状态(即数据)的完整副本来实现容错。即使部分服务器出现故障(崩溃或网络中断或不稳定),复制也能使服务继续运行。所面临的挑战是,故障可能会导致副本持有不同的数据副本。
Raft 将客户端请求整理成一个序列(称为日志),并确保所有副本服务器都能看到相同的日志。每个副本按日志顺序执行客户端请求,并将这些请求应用到服务状态的本地副本中。由于所有实时副本都能看到相同的日志内容,因此它们都会以相同的顺序执行相同的请求,从而继续保持相同的服务状态。如果服务器出现故障,但随后又恢复了,Raft 会负责更新其日志。只要至少有大多数服务器还活着并能相互通信,Raft 就会继续运行。如果没有过半数的服务器存活,Raft 将不会取得任何进展,但只要有过半数的服务器能再次通信,Raft 就会继续运行。
在本实验室中,您将把 Raft 作为 Go 对象类型来实现,并提供相关方法,以便在更大的服务中作为一个模块使用。一组 Raft 实例通过 RPC 相互通信,以维护复制的日志。您的 Raft 接口将支持一连串编号不确定的命令,也称为日志条目。条目用索引号编号。具有给定索引的日志条目最终将被提交。此时,您的 Raft 应将日志条目发送给更大的服务,以便其执行。
您应该遵循拓展Raft论文中的设计,尤其要注意图 2。您将实现论文中的大部分内容,包括保存持久状态,以及在节点失效后重新启动时读取持久状态。您将不会实现集群成员变更(第 6 节)。您将在稍后的实验中实现日志压缩/快照(第 7 节)。
您可能会发现本指南以及有关锁和并发结构的建议非常有用。如需更广阔的视角,请参阅 Paxos、Chubby、Paxos Made Live、Spanner、Zookeeper、Harp、Viewstamped Replication 和 Bolosky et al.
本实验应分三部分完成。您必须在相应的到期日提交每一部分。
开始
我们将为您提供骨架代码 src/raft/raft.go。我们还提供了一组测试,您应使用这些测试来推动您的实施工作,我们将使用这些测试对您提交的实验室进行评分。这些测试位于 src/raft/test_test.go。
要启动并运行,请执行以下命令。别忘了使用 git pull 获取最新软件。
1 | $ cd src/raft |
任务
通过向 raft/raft.go
添加代码来实现 Raft。在该文件中,您将找到骨架代码,以及如何发送和接收 RPC 的示例。
您的实现必须支持以下接口,测试者和(最终)您的键/值服务器都将使用该接口。您可以在 raft.go
的注释中找到更多细节。
1 | // create a new Raft server instance: |
服务调用Make(peers,me,...)
创建一个 Raft 对等节点。peers 参数是一个 Raft 对等节点(包括此节点)的网络标识符数组,用于 RPC。me
参数是此对等节点在对等节点数组中的索引。Start(command)
要求 Raft 开始处理,将命令附加到复制的日志中。Start() 应立即返回,无需等待日志追加完成。服务希望您的实现能为每个新提交的日志条目向 Make()
的 applyCh 通道
参数发送 ApplyMsg
。
raft.go
包含发送 RPC(sendRequestVote()
)和处理传入 RPC(RequestVote()
)的示例代码。你的 Raft 对等体应该使用 labrpc Go 软件包(源码在 src/labrpc
)交换 RPC。测试人员可以告诉 labrpc
延迟 RPC、重新排序 RPC 和丢弃 RPC,以模拟各种网络故障。虽然你可以临时修改 labrpc
,但请确保你的 Raft 能在原始 labrpc
上运行,因为我们将用它来对你的实验室进行测试和评分。您的 Raft 实例必须只能与 RPC 进行交互;例如,它们不能使用共享的 Go 变量或文件进行通信。
后续的实验将建立在本实验的基础上,因此给自己足够的时间编写可靠的代码非常重要。
Part 2A
任务
实现 Raft 领导者选举和心跳(无日志条目的 AppendEntries
RPC)。第 2A 部分的目标是选出一个领导者,如果没有失败,领导者将继续担任领导者,如果旧的领导者失败,或者与旧的领导者之间的数据包丢失,新的领导者将接替领导者。运行 go test -run 2A
测试你的 2A 代码。
提示
- 要直接运行 Raft 实现并不容易,而应通过测试器运行,即
go test -run 2A
- 请按照本论文的图 2 进行操作。此时你关心的是发送和接收 RequestVote RPC、与选举有关的服务器规则以及与领导人选举有关的状态
- 在
raft.go
中的Raft
结构中添加图 2 中的领导者选举状态。您还需要定义一个结构来保存每个日志条目的信息。 - 填写
RequestVoteArgs
和RequestVoteReply
结构。修改Make()
,创建一个后台程序,该程序会在一段时间内没有收到其他同行的请求时,通过发送RequestVote
RPCs 来定期启动领导者选举。这样,如果已经有了领导者,一个同行就会知道谁是领导者,或者自己成为领导者。实现RequestVote()
RPC 处理程序,使服务器可以互相投票。 - 要实现心跳,请定义一个
AppendEntries
RPC 结构(虽然可能还不需要所有参数),并让领导者定期发送心跳。编写一个AppendEntries
RPC 处理器方法,重置选举超时,这样当一个服务器已经当选领导者时,其他服务器就不会站出来担任领导者。 - 确保不同对等节点的选举超时不会总是在同一时间触发,否则所有对等节点都只会为自己投票,没有人会成为领导者。
- 测试要求领导者每秒发送心跳 RPC 的次数不超过十次。
- 测试要求您的 Raft 在旧的领导者失效后五秒内选出新的领导者(如果大多数对等节点仍能通信)。但请记住,如果出现投票结果不一致的情况(如数据包丢失或候选人不走运地选择了相同的随机后退时间),则可能需要多轮领导者选举。你必须选择足够短的选举超时(以及心跳间隔),这样即使需要多轮选举,也很有可能在五秒内完成。
- 论文的第 5.2 节提到选举超时的范围是 150 到 300 毫秒。只有当领导者发送心跳的频率大大超过每 150 毫秒一次时,这样的范围才有意义。由于测试仪限制每秒只能发送 10 次心跳,因此你必须使用比论文中规定的 150 至 300 毫秒更大的选举超时,但也不能太大,因为这样你可能无法在五秒内选出领导者。
- 您可能会发现Go的rand package - math/rand - Go Packages很有用。
- 您需要编写代码,定期或延迟时间后执行操作。最简单的方法是创建一个带循环的 goroutine,调用
time.Sleep()
。不要使用 Go 的time.Timer
或time.Ticker
,它们很难正确使用。 - 阅读有关锁和并发结构的建议非常有用
- 如果你的代码无法通过测试,请再读一遍论文中的图 2;领导者选举的全部逻辑分布在图的多个部分。
- 不要忘记实现
GetState()
- 测试器在永久关闭实例时会调用 Raft 的
rf.Kill()
。您可以使用rf.killed()
检查 Kill() 是否已被调用。您可能希望在所有循环中都这样做,以避免死亡的 Raft 实例打印出混乱的信息。 - 调试代码的一个好方法是在对等程序发送或接收消息时插入打印语句,然后用
go test -run 2A > out
将输出收集到一个文件中。然后,通过研究out
文件中的消息轨迹,你可以找出你的实现与所需协议的偏差。你可能会发现util.go
中的DPrintf
对调试不同问题时打开或关闭打印很有用。 - Go RPC 只发送名称以大写字母开头的结构体字段。子结构的字段名也必须大写(例如数组中日志记录的字段)。
labgob
软件包会对此发出警告,请不要忽视这些警告。 - 使用 go test -race 检查代码,并修复它报告的任何竞争问题。
在提交第 2A 部分之前,请确保您通过了 2A 测试,这样您就会看到如下内容:
1 | $ go test -run 2A |
每行 “Passed”(通过)包含五个数字;它们分别是以秒为单位的测试时间、Raft 对等体的数量(通常为 3 或 5)、测试期间发送的 RPC 数量、RPC 消息的总字节数,以及 Raft 报告已提交的日志条目数量。您的数据将与此处显示的数据不同。您可以忽略这些数字,但它们可以帮助您理智地检查您的实现所发送的 RPC 数量。对于所有实验 2、3 和 4,如果所有测试花费的时间超过 600 秒(go test
),或者任何单个测试花费的时间超过 120 秒,评分脚本将使您的解决方案失败。
思路
2A整体任务只有两个,领导选举和发送心跳
其中每一个都可以细分为四个步骤,主要是需要明白每个步骤、不同角色、任期关系需要做什么。
经过调试,心跳间隔和选举超时设置为如下时间,比较合理。
1 | // 心跳间隔 |
领导选举
Raft需要启动一个后台ticker
线程用来实时监测超时(Follower)或触发心跳广播(Leader)。
因此选举的流程如下:
ticker
线程发现当前role
为follower
,并且已经发生选举超时。- 发起领导选举:任期加一,给自己投票,向其他节点发送投票请求并处理(另起线程)
- 处理投票结果:如果收到半数以上的选票,则选举成功;
- 选举成功:本实验中,只需要发送一次心跳即可。
startElection()
:触发领导选举,当Follower或Candidate选举超时时由ticker()触发触发SendRequestVote()
:发送请求选票RPC,不需要我们实现RequestVote()
:处理请求选票RPC的handler,也就是节点收到来自其他Candidate的选票请求时,如何处理handleVoteResult()
:Candidate收到选票请求回复时,如何处理becomeLeader()
:选举成功,成为Leader
后的操作1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// start a new election
// 开启一轮选举
func (rf *Raft) startElection() {
rf.mu.Lock()
defer rf.mu.Unlock()
DPrintf("[Election]:Peer[%d] start election for term[%d] | %s\n",rf.me,rf.currentTerm+1,time.Now().Format("15:04:05.000"))
//Add term,增加任期号
rf.currentTerm++
//Vote for self,先给自己投票
rf.votedFor = rf.me
rf.voteCount = 1
//Become a candidate,修改成为Candidate
rf.role = CANDIDATE
//Refresh election timeout,刷新选举超时时间
rf.refreshTimeout()
//Send RequestVote for all other peers,并行发送请求选票RPC
// a flag for becomeLeader
var becomeLeaderOnce sync.Once
var args = RequestVoteArgs{
Term: rf.currentTerm,
CandidateId: rf.me,
LastLogTerm: rf.log[len(rf.log)-1].Term,
LastLogIndex: len(rf.log)-1,
}
for i:=0;i<len(rf.peers);i++{
if i==rf.me{
continue
}
//Open a separate goroutines to handle,开启一个单独的线程,发送RPC并等待回复回调
go func(server int,args *RequestVoteArgs) {
var reply = RequestVoteReply{}
DPrintf("[Send RequestVote]:Peer[%d] send RequestVote to Peer[%d] | %s\n",rf.me,server,time.Now().Format("15:04:05.000"))
ok:= rf.sendRequestVote(server,args,&reply)
if ok{
rf.handleVoteResult(&reply)
}
}(i,&args)
}
}
1 | //RequestVote RPC hanler |
1 | // handle vote result |
1 | // One peer electron success and become leader |
广播心跳
这里的心跳RPC复用了AppendEntries,将AppendEntriesArgs中的Entries赋nil,则代表这是一条心跳RPC,而不是附加日志的RPC(在后续实验会实现)
- broadcastAppendEntries():向其他节点广播RPC(心跳/附加日志)
- SendAppendEntries():发送附加日志RPC,不需要我们实现
- AppendEntries():处理RPC的handler,也就是节点收到来自Leader的附加日志时,如何处理
- handleAppendEntries:Leader收到回复时,如何处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24// Broadcast AppendEntries RPC to other peers
// 广播心跳实验2A,后续实验会进行拓展
func (rf *Raft) broadcastAppendEntries() {
rf.mu.Lock()
defer rf.mu.Unlock()
for i:=0;i<len(rf.peers);i++{
if i==rf.me{
continue
}
var args = AppendEntriesArgs{
Term: rf.currentTerm,
LeaderId: rf.me,
Entries: nil,
}
var server = i
go func(server int, args *AppendEntriesArgs) {
var reply = AppendEntriesReply{}
ok:=rf.sendAppendEntries(server,args,&reply)
if ok{
rf.handleAppendEntries(&reply)
}
}(server,&args)
}
}
1 | // AppendEntries RPC hander |
1 | // Handle other peers's AppendEntries RPC reply |
1 | // The ticker go routine starts a new election if this peer hasn't received |
其他部分的代码
1 | const onceDuration = 10 |
细节
Follower什么时候才会投票
term < currentTerm
拒绝投票votedFor
不为空,说明已经投过票了,拒绝- 验证选举限制,简而言之就是只给日志信息更新更全的人投票
- lastLogTerm<当前最后一条日志的term,拒绝
- lastLogTerm=当前最后一条日志的term,且lastLogIndex<当前日志列表长度-1,拒绝
- 同意投票
来自leader的心跳信息需不需要添加到log
不需要
结果
本实验未使用chan,time.Timer等实现,仅简单实用锁、time.Sleep进行实现
通过并行次数4,总次数20次的测试
Part 2B
任务
实现Leader和Follower的日志追加功能,通过2B测试
提示
- 您的首要目标应该是通过 TestBasicAgree2B()。首先要实现 Start(),然后按照图 2 编写代码,通过 AppendEntries RPC 发送和接收新的日志条目。
- 你需要实现选举限制(论文章节5.4.1)
- 在早期 Lab 2B 测试中,一种无法达成一致的方法是反复举行选举,即使领导者还活着。查找选举计时器管理中的错误,或在赢得选举后不立即发送心跳的错误。
- 您的代码中可能有重复检查某些事件的循环。不要让这些循环不间断地执行而不暂停,因为这样会减慢实现速度,导致测试失败。请使用 Go 的条件变量,或在每个循环迭代中插入 time.Sleep(10 * time.Millisecond)。
- 帮自己一个忙,在今后的实验中编写(或重新编写)简洁明了的代码。您可以重新访问参考我们的structure, locking, 和 guide 页面指导。
如果您的代码运行太慢,即将进行的实验室测试可能会失败。您可以使用 time 命令检查解决方案占用了多少实时时间和 CPU 时间。以下是典型的输出结果:
1 | $ time go test -run 2B |
ok raft 58.142s
表示 Go 测得的 2B 测试时间为 58.142 秒的实际(挂钟)时间。user 0m2.477s
表示代码消耗了 2.477 秒的 CPU 时间,即实际执行指令(而不是等待或休眠)的时间。如果您的解决方案在 2B 测试中使用的实际时间远超过一分钟,或 CPU 时间远超过 5 秒,那么您以后可能会遇到麻烦。请注意睡眠或等待 RPC 超时所花费的时间、在不睡眠或等待条件或通道消息的情况下运行的循环,或发送的大量 RPC。
思路
按照论文中的实现,理解nextIndex
和matchIndex
的意思
BUG排查
Leader发送AppendEntries RPC后导致Follower超时。 - 已解决,broadcastAppendEntries死锁
1
22023/11/07 23:18:27 [Send AppendEntries(Start)]:Peer(Leader)[0] send ppendEntries | 23:18:27.962
Leader发送AppendEntries RPC后导致Follower超时。没有最新日志的节点超时后,请求选举,导致选举不上又不断超时发生选举
(2B): agreement after follower reconnects ... 1
2
3
4
5
6
7
8
9
10
11
122023/11/10 14:20:24 [Send AppendEntries(Start)]:Peer(Leader)[2] send AppendEntries | 14:20:24.097
2023/11/10 14:20:24 [Start Return (Start)]:Peer(Leader)[2] start return | 14:20:24.098
2023/11/10 14:20:24 [AppendEntries Send] Leader[2] send an AppendEntries to Peer[1] | 14:20:24.098
2023/11/10 14:20:24 [AppendEntries Send] Leader[2] send an AppendEntries to Peer[0] | 14:20:24.098
2023/11/10 14:20:24 [AppendEntries Received]:Peer[1] in term[1] receive AppendEntries from Leader[2] | 14:20:24.101
2023/11/10 14:20:24 [Add Entries]:Peer[1] in term[1] update AppendEntries from Leader[2] | 14:20:24.101
2023/11/10 14:20:24 [Send HeartBeat]:Peer(Leader)[2] send HeartBeat | 14:20:24.111
2023/11/10 14:20:24 [HeartBeat]:Peer[1] in term[1] receive HeartBeat from Leader[2] | 14:20:24.113 | nextTimeout[14:20:24.485]
2023/11/10 14:20:24 [Send HeartBeat]:Peer(Leader)[2] send HeartBeat | 14:20:24.189
2023/11/10 14:20:24 [HeartBeat]:Peer[1] in term[1] receive HeartBeat from Leader[2] | 14:20:24.192 | nextTimeout[14:20:24.515]
2023/11/10 14:20:24 [Send HeartBeat]:Peer(Leader)[2] send HeartBeat | 14:20:24.267
2023/11/10 14:20:24 [HeartBeat]:Peer[1] in term[1] receive HeartBeat from Leader[2] | 14:20:24.270 | nextTimeout[14:20:24.668]TestFailNoAgree2B偶尔失败
TestRejoin2B偶尔失败
测试结果
1 | Test (2B): basic agreement ... |
Part 2C
如果基于 Raft 的服务器重启,它应该从原来的位置恢复服务。这就要求 Raft 保持持久的状态,以便在重启后继续运行。本文的图 2 提到了哪些状态应该是持久的。
真正的实现会在 Raft 的持久化状态发生变化时将其写入磁盘,并在重启后从磁盘读取状态。您的实现不会使用磁盘;相反,它将从 Persister 对象(参见 persister.go)中保存和恢复持久化状态。无论谁调用 Raft.Make(),都要提供一个 Persister,它最初保存着 Raft 最近的持久化状态(如果有的话)。Raft 应从该 Persister 初始化其状态,并在每次状态改变时使用该 Persister 保存其持久化状态。请使用 Persister 的 ReadRaftState() 和 SaveRaftState() 方法。
任务
通过添加保存和恢复持久化状态的代码,完成 raft.go 中的函数 persist() 和 readPersist()。您需要将状态编码(或 “序列化”)为字节数组,以便将其传递给 Persister。使用 labgob 编码器;请参阅 persistence() 和 readPersist() 中的注释。labgob 类似 Go 的 gob 编码器,但如果尝试对字段名为小写的结构进行编码,则会打印错误信息。
在实现更改持久化状态的位置插入对 persist() 的调用。完成这些工作后,其余的测试就可以通过了。
提示
- 许多 2C 测试都涉及服务器故障和网络丢失 RPC 请求或回复。
- 你可能需要一次备份多个条目的 nextIndex 的优化。请阅读从第 7 页底部和第 8 页顶部开始的扩展 Raft 论文(用灰色线标出)。该论文对细节的描述比较含糊;您需要填补空白,或许可以借助 6.824 Raft 讲座。
- 实验 2 全套测试(2A+2B+2C)的合理耗时为 4 分钟实时时间和 1 分钟 CPU 时间。
您的代码应通过所有 2C 测试(如下所示)以及 2A 和 2B 测试。
1 | $ go test -run 2C |
思路
这一部分实际不难,只要按照persist 和 readPersist中的注释实现即可。并在所有需要持久化的数据部分调用persist持久化到磁盘即可。readPersist不需要我们调用,会在ticker中调。但Lab2C的测试比较严格,大概率会发现前面Lab2B中的很多问题,如需要加速日志对比过程,否则Figure 8 (unreliable)大概率过不了,还有超时时间和心跳频率是否合理。具体见lab2-one
Part 2D
就目前情况而言,重启的服务器会重播完整的 Raft 日志以恢复其状态。然而,对于一个长期运行的服务来说,永远记住完整的 Raft 日志是不现实的。取而代之的是,你需要修改 Raft,以便与那些不时持续存储其状态 “快照 “的服务合作,此时 Raft 会丢弃快照之前的日志条目。这样做的结果是持久化数据量更小,重启速度更快。然而,现在追随者有可能落后太多,以至于领导者丢弃了追赶所需的日志条目;这时领导者必须发送快照和从快照时间开始的日志。Raft 扩展论文的第 7 节概述了这一方案,具体细节需要您自行设计。
请参考 Raft 交互示意图,以了解复制服务和 Raft 的通信方式。
您的 Raft 必须提供以下函数,以便服务可以调用其状态的序列化快照:
1 | Snapshot(index int, snapshot []byte) |
在Lab 2D中,测试工具定期调用Snapshot()
,在实验3中你会实现一个key/value服务来调用Snapshot()
,这个快照会包含整个key/value对的表。服务层会在每个对等体(而不仅仅是领导者)上调用Snapshot()
。
index
参数表示快照中反映的最高日志条目。Raft 应丢弃该点之前的日志条目。您需要修改 Raft 代码,以便在只存储日志尾部的情况下运行。
您需要实现论文中讨论的InstallSnapshot RPC
,它允许 Raft 领导者告诉落后的 Raft 对等节点用快照替换其状态。您可能需要考虑 InstallSnapshot 应如何与图 2 中的状态和规则交互。
当追随者的 Raft 代码接收到 InstallSnapshot RPC
时,它可以使用 applyCh
将快照发送到 ApplyMsg
服务中。ApplyMsg
结构定义已经包含了您需要的字段(也是测试人员所期望的)。请注意,这些快照只会推进服务的状态,而不会导致服务后退。
如果服务器崩溃,它必须从持久化数据重新启动。您的 Raft 应同时持久化 Raft 状态和相应的快照。请使用 persister.SaveStateAndSnapshot()
,它为 Raft 状态和相应的快照分别接收不同的参数。如果没有快照,则传递 nil
作为snapshot
参数。
当服务器重新启动时,应用层会读取持久化快照并恢复其保存的状态。
以前,本实验室曾建议您实现一个名为 CondInstallSnapshot
的函数,以避免快照和应用重启时发送的日志条目相互协调的要求。这个残留的 API 接口仍然存在,但我们不建议您实现它:相反,我们建议您只需让它返回 true 即可。
任务
实现 Snapshot() 和 InstallSnapshot RPC,并对 Raft 进行修改以支持这些操作(例如,使用修剪过的日志进行操作)。当您的解决方案通过 2D 测试(以及之前的所有 Lab 2 测试)时,它就完成了。
提示
- 一个好的开始是你可以修改你的代码以便于能够只存储从某个索引
X
开始的日志部分。初始你可以设置X
为0然后运行2B/2C测试,然后使得Snapshot(index)
丢弃在index
之前的日志,并设置X
为index,如果这些一切顺利,那么就可以通过第一个2D测试。 - 您不能将日志存储在 Go slice 中,也不能将 Go slice 索引与 Raft 日志索引互换使用;您需要对 slice 进行索引,以便考虑到日志中被丢弃的部分。
- 下一步:如果Leader没有更新追随者所需的日志条目,就让领导者发送 InstallSnapshot RPC。
- 在单个 InstallSnapshot RPC 中发送整个快照。不要执行图 13 中用于分割快照的偏移机制。
- Raft 必须以允许 Go 垃圾回收器释放并重新使用内存的方式丢弃旧日志条目;这就要求丢弃的日志条目不存在可触及的引用(指针)。
- 即使日志被修剪,您的实现仍需要在 AppendEntries RPC 中的新条目之前正确发送条目的Term和Index;这可能需要保存和引用最新快照的 lastIncludedTerm/lastIncludedIndex (考虑是否应持久化)。
- Lab 2 全套测试(2A+2B+2C+2D)在不使用 -race 时的合理耗时为 6 分钟实时时间和 1 分钟 CPU 时间。使用 -race 时,实际耗时约为 10 分钟,CPU 耗时约为 2 分钟。
1 | $ go test -run 2D |
思路
Follower多久进行一次快照
- 论文中提到当日志达到固定大小时,进行快照
Snapshot由谁触发
- 由上层应用进行触发,当上层应用认为可以将已经提交的日志进行压缩,那么就会调用节点的
Snapshot()
的函数。 - 快照中包含了上层应用的最新状态,意味着raft中的日志已经成功地应用到了上层应用中,这些日志已经不再需要存储了。
需要做一个index的转换
storeIndex
:是日志存储在log中的实际索引,storeIndex=logIndex-lastIncludedIndex
logIndex
:是日志的逻辑索引,logIndex=storeIndex+lastIncludedIndex
对log切片操作使用storeIndex
,其他情况使用logIndex
1 | // get log entry by logical index |
Snapshot:接收上层应用发来的快照,index
表示上层应用已经应用的条目index
,snapshot
表示在应用了该index
为止的条目下上层应用的最新数据。
- 判断
index
是否合法。 - 更新
lastIncludedTerm
、lastIncludedIndex
、snapshot
- 压缩日志,更新
commitIndex
和lastApplied
。因为既然上层应用已经应用了所有index
之前的日志。 - 持久化。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37// the service says it has created a snapshot that has
// all info up to and including index. this means the
// service no longer needs the log through (and including)
// that index. Raft should now trim its log as much as possible.
func (rf *Raft) Snapshot(index int, snapshot []byte) {
// Your code here (2D).
rf.mu.Lock()
defer rf.mu.Unlock()
// replace log with snapshot
DPrintf("[Snapshot start][Snapshot()]:Peer[%d] start save Snapshot with index:[%d] and lastIndex:[%d] log:[%d]| %s\n",rf.me,index,rf.lastIncludedIndex,len(rf.log),time.Now().Format("15:04:05.000"))
var logIndex = rf.getStoreIndex(index)
if logIndex<=0 || logIndex >=len(rf.log){
return
}
DPrintf("[Snapshot Doing][Snapshot()]:Peer[%d] is saving Snapshot with index:[%d]| %s\n",rf.me,index,time.Now().Format("15:04:05.000"))
rf.lastIncludedTerm = rf.getEntry(index).Term
rf.lastIncludedIndex = index
rf.snapshot = snapshot
rf.log = append([]LogEntry{{Term: rf.lastIncludedTerm}}, rf.log[logIndex+1:]...)
// todo: check the following code if necessary
if rf.commitIndex < rf.lastIncludedIndex{
rf.commitIndex = rf.lastIncludedIndex
}
if rf.lastApplied < rf.lastIncludedIndex{
rf.lastApplied = rf.lastIncludedIndex
}
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(rf.currentTerm)
e.Encode(rf.votedFor)
e.Encode(rf.log)
e.Encode(rf.lastIncludedIndex)
e.Encode(rf.lastIncludedTerm)
data := w.Bytes()
rf.persister.SaveStateAndSnapshot(data,rf.snapshot)
DPrintf("[Snapshot success][Snapshot()]:Peer[%d] SaveStateAndSnapshot [index:%d | term:%d] log:%v| %s\n",rf.me,rf.lastIncludedIndex,rf.lastIncludedTerm,rf.log,time.Now().Format("15:04:05.000"))
}
InstallSnapshot:安装快照,落后太多的节点需要通过安装快照来快速跟进。
1 | type InstallSnapshotArgs struct { |
测试结果
1 | Test (2C): basic persistence ... |
0 1 2 3 4
x x x x x len=5
0 1 2 len=3 , lastIndex = 2
x x x
bug有一个节点一直收不到心跳,死锁了?
out2D_12:为什么反复apply 10的信息
out2D_13:apply error: server 1 snapshot doesn’t match m.SnapshotIndex