Lab 1 - MapReduce

介绍

在本实验中,您将构建一个 MapReduce 系统。您将实现一个调用应用程序 Map 和 Reduce 函数并处理文件读写的 Worker 进程,以及一个将任务分派给 Worker 并处理失败 Worker 的主进程。您将构建与 MapReduce 论文类似的内容。

开始

我们在 src/main/mrsequential.go 中为您提供了一个简单的顺序 mapreduce 实现。它在一个进程中一次运行一个映射和还原。我们还为您提供了几个 MapReduce 应用程序:mrapps/wc.go 中的 word-countmrapps/indexer.go 中的文本索引器。您可以按如下顺序运行 word count:

1
2
3
4
5
6
7
8
9
10
$ cd ~/6.824
$ cd src/main
$ go build -buildmode=plugin ../mrapps/wc.go
$ rm mr-out*
$ go run mrsequential.go wc.so pg*.txt
$ more mr-out-0
A 509
ABOUT 2
ACT 8
...

请随意借用 mrsequential.go 中的代码。你还应该看看 mrapps/wc.go,看看 MapReduce 应用程序的代码是什么样的。

你的任务

你的任务是实现一个分布式 MapReduce,它由两个程序组成,即主进程和工作进程。其中只有一个主进程和一个或多个并行执行的工作进程。在真实系统中,工作进程会运行在多台不同的机器上,但在本实验室中,你将在一台机器上运行所有工作进程。工作进程将通过 RPC 与主进程对话。每个 Worker 进程都会向主进程请求任务,从一个或多个文件中读取任务输入,执行任务,并将任务输出写入一个或多个文件。如果某个 Worker 在合理的时间内(在本实验室中为 10 秒)没有完成任务,主进程就会注意到,并将相同的任务交给另一个 Worker。

我们已经给了你一些代码作为起步。Master和Worker的主要代码在main/mrmaster.go main/mrworker.go中,请不要改变这些文件。你应该将你的实现置于mr/master.gomr/worker.go, 和mr/rpc.go中。

下面介绍如何在 word-count MapReduce 应用程序上运行代码。首先,确保 word-count 插件是全新构建的:

1
$ go build -buildmode=plugin ../mrapps/wc.go

在main目录,运行master进程

1
2
$ rm mr-out*
$ go run mrmaster.go pg-*.txt

mrmaster.go 的 pg-*.txt 参数是输入文件;每个文件对应一个 “分割”,是一个 Map 任务的输入。

在一个或多个其他窗口中,运行一些workers进程

1
$ go run mrworker.go wc.so

当master和workers完成后,查看 mr-out-* 中的输出。完成实验后,输出文件的排序联合应与顺序输出一致,就像这样:

1
2
3
4
$ cat mr-out-* | sort | more
A 509
ABOUT 2
ACT

我们为您提供了一个测试脚本(main/test-mr.sh)。这些测试将检查 wcindexer MapReduce 应用程序在输入 pg-xxx.txt 文件时是否产生正确的输出。测试还会检查你的实现是否并行运行 Map 和 Reduce 任务,以及你的实现是否能从运行任务时崩溃的 Worker 中恢复。
如果您现在运行测试脚本,它就会挂起,因为主程序永远不会结束:

1
2
3
$ cd ~/6.824/src/main
$ sh test-mr.sh
*** Starting wc test.

您可以将 mr/master.go 中 Done 函数中的 ret := false 改为 true,这样主控程序就会立即退出。然后:

1
2
3
4
5
6
7
$ sh ./test-mr.sh
*** Starting wc test.
sort: No such file or directory
cmp: EOF on mr-wc-all
--- wc output is not the same as mr-correct-wc.txt
--- wc test: FAIL
$

测试脚本希望看到名为mr-out-X 的文件输出,每个reduce任务一个文件。mr/master.gomr/worker.go 的空实现不会生成这些文件(也不会做其他任何事情),因此测试失败。

完成后,测试脚本输出应该如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
$ sh ./test-mr.sh
*** Starting wc test.
--- wc test: PASS
*** Starting indexer test.
--- indexer test: PASS
*** Starting map parallelism test.
--- map parallelism test: PASS
*** Starting reduce parallelism test.
--- reduce parallelism test: PASS
*** Starting crash test.
--- crash test: PASS
*** PASSED ALL TESTS
$

还会看到一些来自 Go RPC 软件包的错误,这些错误看起来像:忽略这些信息即可。

1
2019/12/16 13:27:09 rpc.Register: method "Done" has 1 input parameters; needs exactly three

几条规则

  • Map阶段应将中间键分成多个桶,用于执行 nReduce 缩减任务,其中 nReducemain/mrmaster.go 传递给 MakeMaster() 的参数。
  • Worker的实现中应该将第X个Reduce任务的输出放在mr-out-X的文件中。
  • mr-out-X 文件应包含每个 Reduce 函数输出的一行。这一行应该以 Go 的”%v %v “格式生成,并以键和值调用。请查看 main/mrsequential.go 中注释为 “这是正确格式 “的一行。如果您的实现与此格式偏差过大,测试脚本就会失败。
  • 您可以修改 mr/worker.gomr/master.gomr/rpc.go。您可以临时修改其他文件进行测试,但请确保您的代码能在原始版本下运行;我们将使用原始版本进行测试。
  • Worker 应将中间的 Map 输出放到当前目录下的文件中,这样 Worker 就能将其作为 Reduce 任务的输入进行读取。
  • main/mrmaster.go 希望 mr/master.go 实现一个 Done() 方法,当 MapReduce 作业完全完成时返回 true;此时,mrmaster.go 将退出。
  • 任务完全完成后,工作进程应退出。实现这一点的简单方法是使用 call() 的返回值:如果工作者无法联系到主进程,它可以认为主进程已经退出,因为工作已经完成,因此工作者也可以终止。根据您的设计,您可能还会发现,主控程序可以给工人一个 “请退出 “的伪任务,这也是很有帮助的。

提示

  • 一种入门方法是修改 mr/worker.go 的 Worker() 向主程序发送 RPC,请求执行任务。然后修改 master,使其响应一个尚未启动的 Map 任务的文件名。然后修改 Worker,读取该文件并调用应用程序的 Map 函数,就像 mrsequential.go 中那样。
  • 应用程序的 Map 和 Reduce 功能是在运行时使用 Go 插件包从名称以 .so 结尾的文件中加载的。
  • 如果更改了 mr/目录中的任何内容,很可能需要重新构建使用的 MapReduce 插件,方法如下:go build -buildmode=plugin ../mrapps/wc.go
  • 本实验室依赖于workers共享一个文件系统。当所有workers都在同一台机器上运行时,共享文件系统很简单,但如果工作者运行在不同的机器上,就需要一个类似 GFS 的全局文件系统。
  • 中间文件的合理命名约定是 mr-X-Y,其中 X 是映射任务编号,Y 是还原任务编号。
  • Worker 的 map 任务代码需要一种方法,将中间键/值对存储在文件中,以便在 reduce 任务中正确读回。其中一种方法是使用 Go 的编码/json 包。将键/值对写入 JSON 文件:
    1
    2
    3
    enc := json.NewEncoder(file)
    for _, kv := ... {
    err := enc.Encode(&kv)
    并读回这样的文件:
    1
    2
    3
    4
    5
    6
    7
    8
    dec := json.NewDecoder(file)
    for {
    var kv KeyValue
    if err := dec.Decode(&kv); err != nil {
    break
    }
    kva = append(kva, kv)
    }
  • Worker 的 map 部分可以使用 ihash(key) 函数(worker.go 中)为给定 key 挑选还原任务。
  • 你可以从 mrsequential.go 中窃取一些代码,用于读取 Map 输入文件、在 Map 和 Reduce 之间对键值对进行排序,以及将 Reduce 输出存储到文件中。
  • Master作为 RPC 服务器,将是并发的;不要忘记锁定共享数据。
  • 使用 go 的竞赛检测器,即 go build -race 和 go run -race。test-mr.sh 的注释说明了如何为测试启用竞赛检测器。go 的竞态检测机制 (race)_go race-CSDN博客
  • worker进程有时需要等待,例如,在上一个map任务完成之前,reduce任务无法启动。一种方法是让worker进程定期向master请求任务,在每次请求之间使用 time.Sleep() 休眠。另一种方法是让主程序中的相关 RPC 处理程序循环等待,可以使用 time.Sleep() 或 sync.Cond。Go 在自己的线程中运行每个 RPC 的处理程序,因此一个处理程序正在等待不会妨碍主程序处理其他 RPC。
  • master程序无法可靠地区分崩溃的worker、存活但因故停滞的worker,以及正在执行但速度太慢而无用的worker。你能做的最好办法就是让主程序等待一段时间,然后放弃,将任务重新分配给不同的worker。在本实验中,让主程序等待十秒钟;十秒钟后,主程序应假定 worker 已死亡(当然,也可能没有死亡)。
  • 要测试崩溃恢复,可以使用 mrapps/crash.go 应用程序插件。它会在 Map 和 Reduce 功能中随机退出。
  • 为了确保没有人会在崩溃时观察到部分写入的文件,MapReduce 论文提到了使用临时文件并在完全写入后原子重命名的技巧。您可以使用 ioutil.TempFile 创建临时文件,并使用 os.Rename 原子重命名临时文件。
  • test-mr.sh 运行子目录 mr-tmp 中的所有进程,因此如果出了问题,需要查看中间文件或输出文件,请到那里查看。

思路

  • 一共M个map任务,对应M个输入文章;N个reduce任务,N是由输入程序的参数决定
  • Worker.go负责获取任务-执行map/reduce任务
  • Coordinator.go可以理解为调度中心,负责存储任务-分发任务-响应worker
  • Rpc.go存储用于PRC的参数

Worker.go

  • Map函数过程:
    • 打开给定文件
    • 调用map函数,生成如[{"word1","1"},{"word2","1"},{}]的kv列表
    • 根据key排序,计算ihash(key)%N=Y,记录到mr-X-Y中。(json形式)这边需要原子重命名
    • RPC通知完成
  • Reduce函数过程:
    • 读入所有mr-i-Y的文件,获取键值对
    • 排序
    • 遍历相同的key,生成 key到string[]的形式
    • 调用reduce函数
    • 写入mr-out-Y文件。(json形式)需要原子重命名
    • RPC通知完成

Coordinator.go

  • 如何存储map和reduce任务的信息
  • 如何分发任务:worker不断问coordinator要任务,coordinator根据当前阶段map/reduce,发放对应未被完成的任务,worker完成后,通知coordinator标记任务状态。
  • 当所有任务完成后,执行Done
  • 当worker宕机后,如何重分配任务

Coordinator结构(初步,后续根据测试结果进行修改):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
type TaskType int
type State int

const (
MAP TaskType = iota
REDUCE
)
const (
WAITING State = iota
DONE
)
//计数器-用于产生全局唯一任务编号
type Counter struct {
lock sync.Mutex
value int
}
func (c *Counter) addValue(){
c.lock.Lock()
defer c.lock.Unlock()
c.value++
}

func (c *Counter) getValue() int{
c.lock.Lock()
defer c.lock.Unlock()
return c.value
}

func (c *Counter) resetValue(){
c.lock.Lock()
defer c.lock.Unlock()
c.value=1
}

type Coordinator struct {
// Your definitions here.
mMap int //map任务数量
nReduce int //reduce任务数量
mDone int //map已完成数量
nDone int //reduce已完成数量
mapTasks []Task //map任务列表
reduceTasks []Task //reduce任务列表
lock sync.Mutex //锁
state State //当前状态 map或reduce阶段
}

type Task struct {
id int //任务编号
filename string //任务名,reduce任务没有这个
isDone bool //是否完成
taskType TaskType //任务类型 map or reduce
startTime time.Time //开始时间
}

//rpc分配任务
//AskTask()
//1.加锁
//2.判断当前State MAPPING/REDUCING
//3.选出未被分配的任务 设置任务开始时间
//3.1没有任务了,返回nil,worker接到nil后退出
//4.返回给worker
//5.解锁

//rpc设置任务状态
//TaskDone()
//1.加锁
//2.设置任务已完成 设置mDone / nDone
//3.检查一下是否全部mDone已完成,是-改变State
//4.返回给worker
//5.解锁


RPC.go

  • 定义RPC的数据结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//定义rpc的结构

//AskTask
type AskTaskArgs struct{
//似乎不用
}

type AskTaskReply struct{
task Task
mMap int
nReduce int
}

//AskTask
type TaskDoneArgs struct{
task Task
}

type TaskDoneReply struct{
//似乎不用
}

实现

worker.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
package mr

import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sort"
"time"
)
import "log"
import "net/rpc"
import "hash/fnv"

//
// Map functions return a slice of KeyValue.
//
type KeyValue struct {
Key string
Value string
}
type ByKey []KeyValue

// for sorting by key.
func (a ByKey) Len() int { return len(a) }
func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }

//
// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
//
func ihash(key string) int {
h := fnv.New32a()
h.Write([]byte(key))
return int(h.Sum32() & 0x7fffffff)
}

//
// main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {

// Your worker implementation here.
for {
//下述代码应该循环执行
//1.rpc请求任务-获取到任务Task信息
askTaskArgs := AskTaskArgs{}
askTaskReply := AskTaskReply{}
//CallAskTask()
askTaskRPC := call("Coordinator.AskTask", &askTaskArgs, &askTaskReply)
if askTaskRPC {
//fmt.Printf("call AskTask success!\n")
} else {
//fmt.Printf("call AskTask failed!\n")
//这里其实可以进行休眠一段时间后,重试
continue
}
var taskType = askTaskReply.TaskType

//2.map/reduce

taskDoneArgs := TaskDoneArgs{}
taskDoneReply := TaskDoneReply{}
taskDoneArgs.TaskID = taskId
taskDoneArgs.TaskType = taskType
if taskType == MAP {
taskId := askTaskReply.TaskID
taskFilename := askTaskReply.Filename
nReduce := askTaskReply.NReduce
//执行map任务
ok := doMapTask(taskId, taskFilename, nReduce, mapf)



var doneTaskRPC = false
if ok {
doneTaskRPC = call("Coordinator.TaskDone", &taskDoneArgs, &taskDoneReply)
}
// todo:如果服务器未收到任务完成的RPC,worker应该进行有限次重试
if doneTaskRPC {
//fmt.Println("Task Done, coordinator has received!")
}

} else if taskType == REDUCE {
taskId := askTaskReply.TaskID

var doneTaskRPC = false
ok := doReduceTask(taskId, reducef)
//ok := true
if ok {
doneTaskRPC = call("Coordinator.TaskDone", &taskDoneArgs, &taskDoneReply)
}
if doneTaskRPC {
//fmt.Println("Task Done, coordinator has received!")
}
} else if taskType == KEEPWAITING {
//Coordinator请求等待,当有map/reduce任务在执行中,但没有多余任务待分配的时候
time.Sleep(100 * time.Millisecond)
} else if taskType == NONE {
//Coordinator请求Worker退出,当所有任务都被完成
break
}

}
// uncomment to send the Example RPC to the coordinator.
// CallExample()

}

// do map task,if no err: return true else: return false
func doMapTask(taskID int, filename string, nReduce int, mapf func(string, string) []KeyValue) bool {
//1.打开文件(任务编号-X)
fmt.Println(filename)
file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v", filename)
}
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", filename)
}
file.Close()
//2.调用mapf
kva := mapf(filename, string(content))
//3.哈希 Y-Slice映射 调用ihash(k)%nReduce=Y
kvmap := make(map[int][]KeyValue)
for _, kv := range kva {
var key = kv.Key
var hashY = ihash(key) % nReduce
if _, exists := kvmap[hashY]; !exists {
kvmap[hashY] = make([]KeyValue, 0)
}
kvmap[hashY] = append(kvmap[hashY], kv)
}
//4.存储至中间文件mr-X-Y
for hashY, kvs := range kvmap {
//fmt.Println(kvs)
//临时文件命名为: mr-tmp-X-Y
newFileName := fmt.Sprintf("mr-tmp-%d-%d", taskID, hashY)
// create temp file
tempFile, err := ioutil.TempFile("./", "temp-"+newFileName)

if err != nil {
log.Fatalf("cannot create %v", newFileName)
return false
}
// 创建JSON编码器
encoder := json.NewEncoder(tempFile)
// 使用编码器将数据写入文件
err = encoder.Encode(kvs)
if err != nil {
log.Fatalf("error:%v when writing %v", err, newFileName)
tempFile.Close()
return false
}
//5.原子重命名
err = os.Rename(tempFile.Name(), newFileName)
if err != nil {
log.Fatalf("error:%v when rename tempfile: %v", err, newFileName)
tempFile.Close()
return false
}
tempFile.Close()
}
return true
}

// do reduce task,if no err: return true else: return false
func doReduceTask(taskID int, reducef func(string, []string) string) bool {
//1.读取文件(任务编号-Y)
fmt.Println(taskID)
var hashY = taskID
intermediate := []KeyValue{}
directory := "./"
// 使用 filepath.Glob 查找匹配的文件,匹配所有mr-tmp-*-Y的文件
filenameReg := fmt.Sprintf("mr-tmp-*-%d", hashY)
files, err := filepath.Glob(filepath.Join(directory, filenameReg))
if err != nil {
log.Fatalf("error:%v when find files with Y: %v", err, hashY)
return false
}
// 2.存储kv
for _, filename := range files {
file, err := os.Open(filename)
if err != nil {
file.Close()
log.Fatalf("cannot open %v", filename)
return false
}
decoder := json.NewDecoder(file)
var kvs []KeyValue
if err := decoder.Decode(&kvs); err != nil {
file.Close()
log.Fatalf("error:%v when read mr files with Y: %v", err, hashY)
return false
}
intermediate = append(intermediate, kvs...)
file.Close()
}
//3.排序kv,目的是让相同的word {"word","1"}排到一起
sort.Sort(ByKey(intermediate))

oname := fmt.Sprintf("mr-out-%d", hashY)
tempFile, err := ioutil.TempFile("./", "temp-"+oname)
//4.调用reducef
//
// call Reduce on each distinct key in intermediate[],
// and print the result to mr-out-0.
//
i := 0
for i < len(intermediate) {
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
output := reducef(intermediate[i].Key, values)

// this is the correct format for each line of Reduce output.
_, err := fmt.Fprintf(tempFile, "%v %v\n", intermediate[i].Key, output)
if err != nil {
tempFile.Close()
log.Fatalf("error :%v when writing to %v", oname)
return false
}
i = j
}
//5.写入mr-out-Y文件,原子重命名
err = os.Rename(tempFile.Name(), oname)
if err != nil {
log.Fatalf("error:%v when rename tempfile: %v", err, oname)
tempFile.Close()
return false
}
tempFile.Close()
return true

}

//
// example function to show how to make an RPC call to the coordinator.
//
// the RPC argument and reply types are defined in rpc.go.
//
func CallExample() {

// declare an argument structure.
args := ExampleArgs{}

// fill in the argument(s).
args.X = 99

// declare a reply structure.
reply := ExampleReply{}

// send the RPC request, wait for the reply.
// the "Coordinator.Example" tells the
// receiving server that we'd like to call
// the Example() method of struct Coordinator.
ok := call("Coordinator.Example", &args, &reply)
if ok {
// reply.Y should be 100.
fmt.Printf("reply.Y %v\n", reply.Y)
} else {
fmt.Printf("call failed!\n")
}
}

//
// send an RPC request to the coordinator, wait for the response.
// usually returns true.
// returns false if something goes wrong.
//
func call(rpcname string, args interface{}, reply interface{}) bool {
// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
sockname := coordinatorSock()
c, err := rpc.DialHTTP("unix", sockname)
if err != nil {
log.Fatal("dialing:", err)
}
defer c.Close()

err = c.Call(rpcname, args, reply)
if err == nil {
return true
}

fmt.Println(err)
return false
}

coordinator.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
package mr

import (
"fmt"
"log"
"sync"
"time"
)
import "net"
import "os"
import "net/rpc"
import "net/http"

type TaskType int
type State int
type TaskState int

//定义任务类型枚举
const (
MAP TaskType = iota //Map任务
REDUCE //REDUCE任务
KEEPWAITING //待命任务(虚拟的)
NONE //空任务(虚拟的)
)
//定义Coordinator当前阶段
const (
MAPPING State = iota //执行Map任务阶段
REDUCING //执行Reduce任务阶段
FINISHED //所有任务都完成阶段
)
//定义任务状态
const (
WAITING TaskState = iota //待分配
DOING //进行中
DONE //已完成
)
//定义任务Task结构体
type Task struct {
Id int //任务编号
Filename string //任务名,用于map任务
State TaskState //是否完成
TaskType TaskType //任务类型 map or reduce or keepwaiting or none
StartTime time.Time //开始时间
}
//定义Coordinator结构体
type Coordinator struct {
// Your definitions here.
mMap int //map任务数量
nReduce int //reduce任务数量
mDone int //map已完成数量
nDone int //reduce已完成数量
mapTasks []*Task //map任务列表
reduceTasks []*Task //reduce任务列表
lock sync.Mutex //锁
state State //当前状态 MAP或REDUCE或FINISHED
}

// Your code here -- RPC handlers for the worker to call.
//分发worker任务
func (c *Coordinator) AskTask(args *AskTaskArgs, reply *AskTaskReply) error {
//1.加锁
c.lock.Lock()
defer c.lock.Unlock()
//2.判断当前State MAPPING/REDUCING/FINISHED

var task *Task
if c.state == MAPPING {
//3.选出未被分配的任务 设置任务开始时间
for _, mapTask := range c.mapTasks {
if mapTask.State == WAITING {
mapTask.StartTime = time.Now()
mapTask.State = DOING
task = mapTask
//fmt.Println("coordinator has assigned a map task!")
break
}
}
//3.1有待分配map任务,返回
//3.2没有任务了,返回KEEPWAITING任务,让worker等待
if task != nil {
reply.TaskID = task.Id
reply.TaskType = task.TaskType
reply.Filename = task.Filename
} else {
reply.TaskType = KEEPWAITING
}

} else if c.state == REDUCING {
//3.选出未被分配的任务 设置任务开始时间
for _, reduceTask := range c.reduceTasks {
if reduceTask.State == WAITING {
reduceTask.StartTime = time.Now()
reduceTask.State = DOING
task = reduceTask
//fmt.Println("coordinator has assigned a reduce task!")
break
}
}
//3.1有待分配reduce任务,返回
//3.2没有任务了,返回KEEPWAITING任务,让worker等待
if task != nil {
reply.TaskID = task.Id
reply.TaskType = task.TaskType
} else {
reply.TaskType = KEEPWAITING
}
} else if c.state == FINISHED {
// 所有任务已完成,返回虚拟的空任务,用于worker退出
reply.TaskType = NONE
}
reply.MMap = c.mMap
reply.NReduce = c.nReduce
//no use
if task == nil {
//fmt.Println("There are no tasks to be assigned!")
}
//fmt.Printf("send task:%p\n", task)
//4.返回给worker
return nil
//5.解锁
}
//任务确认完成
func (c *Coordinator) TaskDone(args *TaskDoneArgs, reply *TaskDoneReply) error {
//1.加锁
c.lock.Lock()
defer c.lock.Unlock()
//2.设置任务已完成 设置mDone / nDone

taskID := args.TaskID
taskType := args.TaskType
//fmt.Printf("receive &task:%p\n", task)
//fmt.Println("receive task:", task)
//3.检查一下是否全部mDone已完成,是-改变State
//这边c.state == MAPPING条件是避免有被认为宕机的worker重启后,重复发送taskDone,导致异常
if taskType == MAP && c.state == MAPPING {
fmt.Printf("mapTask(taskid:%d) has done\n", taskID)
c.mapTasks[taskID].State = DONE
c.mDone++
if c.mDone == c.mMap {
c.state = REDUCING
fmt.Println("------------Reduce------------")
}
} else if taskType == REDUCE && c.state == REDUCING {
fmt.Printf("reduceTask(taskid:%d) has done\n", taskID)
c.reduceTasks[taskID].State = DONE
c.nDone++
if c.nDone == c.nReduce {
c.state = FINISHED
fmt.Println("------------Finished------------")
}
}
return nil
//4.返回给worker
//5.解锁
}

// 定期检查是否有任务超时未完成(10s)
func (c *Coordinator) checkTasks() {
for {
c.lock.Lock()
currentTime := time.Now()
tenSecondsAgo := currentTime.Add(-10 * time.Second)
if c.state == FINISHED {
c.lock.Unlock()
break
} else if c.state == MAPPING {
for _, mapTask := range c.mapTasks {
if mapTask.State == DOING && mapTask.StartTime.Before(tenSecondsAgo) {
fmt.Printf("mapTask(taskid:%d) has restarted\n", mapTask.Id)
mapTask.State = WAITING
}
}
} else if c.state == REDUCING {
for _, reduceTask := range c.reduceTasks {
if reduceTask.State == DOING && reduceTask.StartTime.Before(tenSecondsAgo) {
fmt.Printf("reduceTask(taskid:%d) has restarted\n", reduceTask.Id)
reduceTask.State = WAITING
}
}
}
c.lock.Unlock()
// 每2s执行一次,避免一直占用锁,导致死锁
time.Sleep(2 * time.Second)
}
}

//
// an example RPC handler.
//
// the RPC argument and reply types are defined in rpc.go.
//
func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {
reply.Y = args.X + 1
return nil
}

//
// start a thread that listens for RPCs from worker.go
//
func (c *Coordinator) server() {
rpc.Register(c)
rpc.HandleHTTP()
//l, e := net.Listen("tcp", ":1234")
sockname := coordinatorSock()
os.Remove(sockname)
l, e := net.Listen("unix", sockname)
if e != nil {
log.Fatal("listen error:", e)
}
go http.Serve(l, nil)
}

// 检查是否所有任务已完成
// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
//
func (c *Coordinator) Done() bool {
ret := false

// Your code here.
c.lock.Lock()
if c.state != FINISHED {
c.lock.Unlock()
return ret
}

fmt.Println("All tasks have done, coordinator return true!")
ret = true
c.lock.Unlock()
// 这边等待一秒,以便于让所有keepwaiting的worker收到NONE任务后可以退出
// 不然Coordinator先退出了,worker会调用rpc失败
// 不过也可以调用rpc失败时,worker自动退出
time.Sleep(time.Second)
fmt.Println("All worker has exited!")
return ret
}

//
// create a Coordinator.
// main/mrcoordinator.go calls this function.
// nReduce is the number of reduce tasks to use.
//
func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{}
c.lock.Lock()
defer c.lock.Unlock()
// Your code here.
//Init coordinator and tasks.
c.mMap = len(files)
c.nReduce = nReduce
c.state = MAPPING
c.mapTasks = make([]*Task, c.mMap)
c.reduceTasks = make([]*Task, c.nReduce)
for i := 0; i < c.mMap; i++ {
newTask := Task{Id: i, Filename: files[i], State: WAITING, TaskType: MAP}
c.mapTasks[i] = &newTask
}
for i := 0; i < c.nReduce; i++ {
newTask := Task{Id: i, State: WAITING, TaskType: REDUCE}
c.reduceTasks[i] = &newTask
}
//fmt.Printf("start .task:%p\n", c.mapTasks[0])
// 开启任务检查协程
go c.checkTasks()
c.server()
return &c
}

rpc.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package mr

//
// RPC definitions.
//
// remember to capitalize all names.
//

import "os"
import "strconv"

//
// example to show how to declare the arguments
// and reply for an RPC.
//

type ExampleArgs struct {
X int
}

type ExampleReply struct {
Y int
}

// Add your RPC definitions here.
//AskTask
type AskTaskArgs struct {
//似乎不用
}

//这边我不认为可以用*Task取代,虽然本实验是在同一台主机上进行的,共享内存地址
//但考虑实际分布式系统,如果返回任务指针的话,另一台主机应该不能通过*Task拿到Task
type AskTaskReply struct {
TaskID int //任务ID
TaskType TaskType //任务类型
Filename string //文件名(Map任务)
MMap int //map任务数量
NReduce int //reduce任务数量
}

//AskTask
type TaskDoneArgs struct {
TaskID int //任务ID
TaskType TaskType //任务类型
}

type TaskDoneReply struct {
//似乎不用
}

// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the coordinator.
// Can't use the current directory since
// Athena AFS doesn't support UNIX-domain sockets.
func coordinatorSock() string {
s := "/var/tmp/824-mr-"
s += strconv.Itoa(os.Getuid())
return s
}

技巧:可以单独写一个testx-mr.sh进行独立测试,如仅测试crash:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
#!/usr/bin/env bash

#
# map-reduce tests
#

# comment this out to run the tests without the Go race detector.
RACE=-race

if [[ "$OSTYPE" = "darwin"* ]]
then
if go version | grep 'go1.17.[012345]'
then
# -race with plug-ins on x86 MacOS 12 with
# go1.17 before 1.17.6 sometimes crash.
RACE=
echo '*** Turning off -race since it may not work on a Mac'
echo ' with ' `go version`
fi
fi

TIMEOUT=timeout
if timeout 2s sleep 1 > /dev/null 2>&1
then
:
else
if gtimeout 2s sleep 1 > /dev/null 2>&1
then
TIMEOUT=gtimeout
else
# no timeout command
TIMEOUT=
echo '*** Cannot find timeout command; proceeding without timeouts.'
fi
fi
if [ "$TIMEOUT" != "" ]
then
TIMEOUT+=" -k 2s 180s "
fi

# run the test in a fresh sub-directory.
rm -rf mr-tmp
mkdir mr-tmp || exit 1
cd mr-tmp || exit 1
rm -f mr-*

# make sure software is freshly built.
(cd ../../mrapps && go clean)
(cd .. && go clean)
(cd ../../mrapps && go build $RACE -buildmode=plugin wc.go) || exit 1
(cd ../../mrapps && go build $RACE -buildmode=plugin indexer.go) || exit 1
(cd ../../mrapps && go build $RACE -buildmode=plugin mtiming.go) || exit 1
(cd ../../mrapps && go build $RACE -buildmode=plugin rtiming.go) || exit 1
(cd ../../mrapps && go build $RACE -buildmode=plugin jobcount.go) || exit 1
(cd ../../mrapps && go build $RACE -buildmode=plugin early_exit.go) || exit 1
(cd ../../mrapps && go build $RACE -buildmode=plugin crash.go) || exit 1
(cd ../../mrapps && go build $RACE -buildmode=plugin nocrash.go) || exit 1
(cd .. && go build $RACE mrcoordinator.go) || exit 1
(cd .. && go build $RACE mrworker.go) || exit 1
(cd .. && go build $RACE mrsequential.go) || exit 1

failed_any=0



#########################################################
echo '***' Starting crash test.

# generate the correct output
../mrsequential ../../mrapps/nocrash.so ../pg*txt || exit 1
sort mr-out-0 > mr-correct-crash.txt
rm -f mr-out*

rm -f mr-done
($TIMEOUT ../mrcoordinator ../pg*txt ; touch mr-done ) &
sleep 1

# start multiple workers
$TIMEOUT ../mrworker ../../mrapps/crash.so &

# mimic rpc.go's coordinatorSock()
SOCKNAME=/var/tmp/824-mr-`id -u`

( while [ -e $SOCKNAME -a ! -f mr-done ]
do
$TIMEOUT ../mrworker ../../mrapps/crash.so
sleep 1
done ) &

( while [ -e $SOCKNAME -a ! -f mr-done ]
do
$TIMEOUT ../mrworker ../../mrapps/crash.so
sleep 1
done ) &

while [ -e $SOCKNAME -a ! -f mr-done ]
do
$TIMEOUT ../mrworker ../../mrapps/crash.so
sleep 1
done

wait

rm $SOCKNAME
sort mr-out* | grep . > mr-crash-all
if cmp mr-crash-all mr-correct-crash.txt
then
echo '---' crash test: PASS
else
echo '---' crash output is not the same as mr-correct-crash.txt
echo '---' crash test: FAIL
failed_any=1
fi


运行bash ./test-mr.sh,发现early_exit和crash测试是失败的

early_exit测试:其实是测试有没有worker/coordinator提前退出了,一般coordinator不会提前退出,我原本的实现中,并未设置KEEPWAITING任务,导致在所有任务已被分配,但还有任务在执行中这种情况下,worker没有收到任务,直接退出了,导致测试失败。实验要求当所有任务完成后,worker/coordinator才能退出。

crash测试:随机让部分worker在执行map/reduce的时候宕机(os.Exit或超时(time.Sleep)来测试你的程序是否有恢复机制。我后来是使用了checkTasks函数,定期检查任务状态,通过对比当前时间和任务开始时间,如果发现状态为DOING的任务的开始时间<当前时间-10s,则认为负责这个任务的workder超时了,于是重新将任务的状态改为WAITING,等待其他worker请求任务时,再重新分配。(这么做感觉是有点naive,如果那个宕机的worker又活过来了,则又执行了一遍map/reduce过程,这其实是没必要的)

断断续续写了一个月,加起来差不多4天理解论文和整理思路,code+debug三天,其他时间都在摸鱼☺。(GoLand一个月试用期刚好到期乐)

总结

  • 踩坑记录:Ubuntu指令$ cat mr-out-* | sort | more会忽略大小写进行排序,导致程序输出结果和正确的结果不一致,测试失败。(不过后来发现这个不会导致测试失败,因为测试程序会再次对mr-out-x文件调用sort,使得期望结果和你程序的输出结果都按同一规则排序)
  • 踩坑记录:在执行中的任务不能被再次分配给其他worker,不然会导致并发访问。后来多加了一个DOING状态表示任务正在执行中
  • 本实验并没有用天生并发安全的chan实现,而是通过锁机制控制并发访问,一定要避免死锁和数据竞争。使用go run -race检测
  • 由于初学go,部分锁使用的时机可能不正确,导致性能低。(为了省事,从函数开始锁到函数结束)
  • 本实现代码只保证能通过所有测试,但可能并不完善且由于刚学go代码写的不够简洁,部分异常没有处理(如RPC调用失败等)