Lab 1.MapReduce
Lab 1 - MapReduce
介绍
在本实验中,您将构建一个 MapReduce 系统。您将实现一个调用应用程序 Map 和 Reduce 函数并处理文件读写的 Worker 进程,以及一个将任务分派给 Worker 并处理失败 Worker 的主进程。您将构建与 MapReduce 论文类似的内容。
开始
我们在 src/main/mrsequential.go
中为您提供了一个简单的顺序 mapreduce 实现。它在一个进程中一次运行一个映射和还原。我们还为您提供了几个 MapReduce 应用程序:mrapps/wc.go
中的 word-count
和mrapps/indexer.go
中的文本索引器。您可以按如下顺序运行 word count:
1 | $ cd ~/6.824 |
请随意借用 mrsequential.go 中的代码。你还应该看看 mrapps/wc.go,看看 MapReduce 应用程序的代码是什么样的。
你的任务
你的任务是实现一个分布式 MapReduce,它由两个程序组成,即主进程和工作进程。其中只有一个主进程和一个或多个并行执行的工作进程。在真实系统中,工作进程会运行在多台不同的机器上,但在本实验室中,你将在一台机器上运行所有工作进程。工作进程将通过 RPC 与主进程对话。每个 Worker 进程都会向主进程请求任务,从一个或多个文件中读取任务输入,执行任务,并将任务输出写入一个或多个文件。如果某个 Worker 在合理的时间内(在本实验室中为 10 秒)没有完成任务,主进程就会注意到,并将相同的任务交给另一个 Worker。
我们已经给了你一些代码作为起步。Master和Worker的主要代码在main/mrmaster.go
和 main/mrworker.go
中,请不要改变这些文件。你应该将你的实现置于mr/master.go
, mr/worker.go
, 和mr/rpc.go
中。
下面介绍如何在 word-count MapReduce 应用程序上运行代码。首先,确保 word-count 插件是全新构建的:
1 | $ go build -buildmode=plugin ../mrapps/wc.go |
在main目录,运行master进程
1 | $ rm mr-out* |
mrmaster.go 的 pg-*.txt
参数是输入文件;每个文件对应一个 “分割”,是一个 Map 任务的输入。
在一个或多个其他窗口中,运行一些workers进程
1 | $ go run mrworker.go wc.so |
当master和workers完成后,查看 mr-out-* 中的输出。完成实验后,输出文件的排序联合应与顺序输出一致,就像这样:
1 | $ cat mr-out-* | sort | more |
我们为您提供了一个测试脚本(main/test-mr.sh
)。这些测试将检查 wc
和indexer
MapReduce 应用程序在输入 pg-xxx.txt 文件时是否产生正确的输出。测试还会检查你的实现是否并行运行 Map 和 Reduce 任务,以及你的实现是否能从运行任务时崩溃的 Worker 中恢复。
如果您现在运行测试脚本,它就会挂起,因为主程序永远不会结束:
1 | $ cd ~/6.824/src/main |
您可以将 mr/master.go
中 Done 函数中的 ret := false 改为 true,这样主控程序就会立即退出。然后:
1 | $ sh ./test-mr.sh |
测试脚本希望看到名为mr-out-X
的文件输出,每个reduce任务一个文件。mr/master.go
和 mr/worker.go
的空实现不会生成这些文件(也不会做其他任何事情),因此测试失败。
完成后,测试脚本输出应该如下所示:
1 | $ sh ./test-mr.sh |
还会看到一些来自 Go RPC 软件包的错误,这些错误看起来像:忽略这些信息即可。
1 | 2019/12/16 13:27:09 rpc.Register: method "Done" has 1 input parameters; needs exactly three |
几条规则
- Map阶段应将中间键分成多个桶,用于执行
nReduce
缩减任务,其中nReduce
是main/mrmaster.go
传递给 MakeMaster() 的参数。 - Worker的实现中应该将第X个Reduce任务的输出放在
mr-out-X
的文件中。 - mr-out-X 文件应包含每个 Reduce 函数输出的一行。这一行应该以 Go 的”%v %v “格式生成,并以键和值调用。请查看
main/mrsequential.go
中注释为 “这是正确格式 “的一行。如果您的实现与此格式偏差过大,测试脚本就会失败。 - 您可以修改
mr/worker.go
、mr/master.go
和mr/rpc.go
。您可以临时修改其他文件进行测试,但请确保您的代码能在原始版本下运行;我们将使用原始版本进行测试。 - Worker 应将中间的 Map 输出放到当前目录下的文件中,这样 Worker 就能将其作为 Reduce 任务的输入进行读取。
main/mrmaster.go
希望mr/master.go
实现一个 Done() 方法,当 MapReduce 作业完全完成时返回 true;此时,mrmaster.go
将退出。- 任务完全完成后,工作进程应退出。实现这一点的简单方法是使用 call() 的返回值:如果工作者无法联系到主进程,它可以认为主进程已经退出,因为工作已经完成,因此工作者也可以终止。根据您的设计,您可能还会发现,主控程序可以给工人一个 “请退出 “的伪任务,这也是很有帮助的。
提示
- 一种入门方法是修改
mr/worker.go
的 Worker() 向主程序发送 RPC,请求执行任务。然后修改 master,使其响应一个尚未启动的 Map 任务的文件名。然后修改 Worker,读取该文件并调用应用程序的 Map 函数,就像mrsequential.go
中那样。 - 应用程序的 Map 和 Reduce 功能是在运行时使用 Go 插件包从名称以 .so 结尾的文件中加载的。
- 如果更改了 mr/目录中的任何内容,很可能需要重新构建使用的 MapReduce 插件,方法如下:go build -buildmode=plugin ../mrapps/wc.go
- 本实验室依赖于workers共享一个文件系统。当所有workers都在同一台机器上运行时,共享文件系统很简单,但如果工作者运行在不同的机器上,就需要一个类似 GFS 的全局文件系统。
- 中间文件的合理命名约定是 mr-X-Y,其中 X 是映射任务编号,Y 是还原任务编号。
- Worker 的 map 任务代码需要一种方法,将中间键/值对存储在文件中,以便在 reduce 任务中正确读回。其中一种方法是使用 Go 的编码/json 包。将键/值对写入 JSON 文件:并读回这样的文件:
1
2
3enc := json.NewEncoder(file)
for _, kv := ... {
err := enc.Encode(&kv)1
2
3
4
5
6
7
8dec := json.NewDecoder(file)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
kva = append(kva, kv)
} - Worker 的 map 部分可以使用 ihash(key) 函数(worker.go 中)为给定 key 挑选还原任务。
- 你可以从
mrsequential.go
中窃取一些代码,用于读取 Map 输入文件、在 Map 和 Reduce 之间对键值对进行排序,以及将 Reduce 输出存储到文件中。 - Master作为 RPC 服务器,将是并发的;不要忘记锁定共享数据。
- 使用 go 的竞赛检测器,即 go build -race 和 go run -race。test-mr.sh 的注释说明了如何为测试启用竞赛检测器。go 的竞态检测机制 (race)_go race-CSDN博客
- worker进程有时需要等待,例如,在上一个map任务完成之前,reduce任务无法启动。一种方法是让worker进程定期向master请求任务,在每次请求之间使用 time.Sleep() 休眠。另一种方法是让主程序中的相关 RPC 处理程序循环等待,可以使用 time.Sleep() 或 sync.Cond。Go 在自己的线程中运行每个 RPC 的处理程序,因此一个处理程序正在等待不会妨碍主程序处理其他 RPC。
- master程序无法可靠地区分崩溃的worker、存活但因故停滞的worker,以及正在执行但速度太慢而无用的worker。你能做的最好办法就是让主程序等待一段时间,然后放弃,将任务重新分配给不同的worker。在本实验中,让主程序等待十秒钟;十秒钟后,主程序应假定 worker 已死亡(当然,也可能没有死亡)。
- 要测试崩溃恢复,可以使用
mrapps/crash.go
应用程序插件。它会在 Map 和 Reduce 功能中随机退出。 - 为了确保没有人会在崩溃时观察到部分写入的文件,MapReduce 论文提到了使用临时文件并在完全写入后原子重命名的技巧。您可以使用 ioutil.TempFile 创建临时文件,并使用 os.Rename 原子重命名临时文件。
- test-mr.sh 运行子目录 mr-tmp 中的所有进程,因此如果出了问题,需要查看中间文件或输出文件,请到那里查看。
思路
- 一共M个map任务,对应M个输入文章;N个reduce任务,N是由输入程序的参数决定
- Worker.go负责获取任务-执行map/reduce任务
- Coordinator.go可以理解为调度中心,负责存储任务-分发任务-响应worker
- Rpc.go存储用于PRC的参数
Worker.go
- Map函数过程:
- 打开给定文件
- 调用map函数,生成如
[{"word1","1"},{"word2","1"},{}]
的kv列表 - 根据key排序,计算ihash(key)%N=Y,记录到mr-X-Y中。(json形式)这边需要原子重命名
- RPC通知完成
- Reduce函数过程:
- 读入所有mr-i-Y的文件,获取键值对
- 排序
- 遍历相同的key,生成
key到string[]
的形式 - 调用reduce函数
- 写入mr-out-Y文件。(json形式)需要原子重命名
- RPC通知完成
Coordinator.go
- 如何存储map和reduce任务的信息
- 如何分发任务:worker不断问coordinator要任务,coordinator根据当前阶段map/reduce,发放对应未被完成的任务,worker完成后,通知coordinator标记任务状态。
- 当所有任务完成后,执行Done
- 当worker宕机后,如何重分配任务
Coordinator结构(初步,后续根据测试结果进行修改):
1 | type TaskType int |
RPC.go
- 定义RPC的数据结构
1 | //定义rpc的结构 |
实现
worker.go
1 | package mr |
coordinator.go
1 | package mr |
rpc.go
1 | package mr |
技巧:可以单独写一个testx-mr.sh进行独立测试,如仅测试crash:
1 |
|
运行bash ./test-mr.sh
,发现early_exit和crash测试是失败的
early_exit测试:其实是测试有没有worker/coordinator提前退出了,一般coordinator不会提前退出,我原本的实现中,并未设置KEEPWAITING任务,导致在
所有任务已被分配,但还有任务在执行中
这种情况下,worker没有收到任务,直接退出了,导致测试失败。实验要求当所有任务完成后,worker/coordinator才能退出。
crash测试:随机让部分worker在执行
map/reduce
的时候宕机(os.Exit或超时(time.Sleep)来测试你的程序是否有恢复机制。我后来是使用了checkTasks
函数,定期检查任务状态,通过对比当前时间和任务开始时间,如果发现状态为DOING
的任务的开始时间<当前时间-10s,则认为负责这个任务的workder超时了,于是重新将任务的状态改为WAITING
,等待其他worker请求任务时,再重新分配。(这么做感觉是有点naive,如果那个宕机的worker又活过来了,则又执行了一遍map/reduce过程,这其实是没必要的)
断断续续写了一个月,加起来差不多4天理解论文和整理思路,code+debug三天,其他时间都在摸鱼☺。(GoLand一个月试用期刚好到期乐)
总结
- 踩坑记录:Ubuntu指令$ cat mr-out-* | sort | more会忽略大小写进行排序,导致程序输出结果和正确的结果不一致,测试失败。(不过后来发现这个不会导致测试失败,因为测试程序会再次对mr-out-x文件调用sort,使得期望结果和你程序的输出结果都按同一规则排序)
- 踩坑记录:在执行中的任务不能被再次分配给其他worker,不然会导致并发访问。后来多加了一个
DOING
状态表示任务正在执行中 - 本实验并没有用天生并发安全的
chan
实现,而是通过锁机制控制并发访问,一定要避免死锁和数据竞争。使用go run -race
检测 - 由于初学go,部分锁使用的时机可能不正确,导致性能低。(为了省事,从函数开始锁到函数结束)
- 本实现代码只保证能通过所有测试,但可能并不完善且由于刚学
go
代码写的不够简洁,部分异常没有处理(如RPC调用失败等)