阅读本文前请先仔细阅读Lab 1实验要求并熟悉基础代码,了解Go中的RPC通信。
Lab环境的配置过程中可能会出现若干问题,参照此处解决。
本文只提供相关实现思路,希望可以读者由此获得灵感。

实验说明

  • 完成分布式MapReduce系统设计,实现词频统计
  • Worker分别调用Map和Reduce Function来读取和处理文件
  • Master用来调度和处理失败的Worker
  • 主要修改src/mr文件夹中的master.go rpc.go worker.go
  • Lab中是所有Map任务完成后才进行Reduce任务
  • Worker和Master之间利用RPC进行通信

设计概述

大部分思路来自于https://github.com/yzongyue/6.824-golabs-2020,在此基础上进行修改,十分感谢!

结构说明

  • 调用mrmaster.go启动Master,监听是否有人向自身发送RPC请求。
  • 调用mrworker.go启动Worker,首先询问Master是否已经分配了全部任务,如果分配完成了就不新建Worker,直接退出,如果未完成,则新建Worker分担任务。
    • 此处询问是否分配完成设计是由测试脚本引发的设计,crash.go中在master分配全部任务之后会继续新建Worker向master请求,如果不进行相关设计会warning,但是不影响正确结果
  • Worker首先通过RPC通信向Master注册自身,再通过RPC通信向Master请求任务,执行任务,最后通过RPC向Master报告任务完成,此处共需要设计三种RPC请求回答格式。

架构图

master

master

worker

worker

数据流图

以nMap = n,nReduce=3为例,其中中间文件的数量为nMap*nReduce=3n个。
dataflow

具体实现

Master

Master的作用就是统筹Worker,分配任务,因此需要为任务构建一个结构体,使用RPC在Master和Worker之间传输,定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type TaskPhase int

const (
MapPhase TaskPhase = 0
ReducePhase TaskPhase = 1
WaitPhase TaskPhase = 2
ExitPhase TaskPhase = 3
)

type Task struct {
FileName string
NMap int
NReduce int
ID int
Phase TaskPhase
}

NMap变量由文件数决定,由于在Map阶段需要生成中间文件mr-MapID-BucketID,此处的NMap用作后面Reduce阶段遍历相关中间文件。
Phase变量有4个取值,分别代表当前系统是在哪个任务阶段(可能TaskPhase会造成误解),其中Wait阶段是Worker向Master发出请求,但是Master现在并没有任务发送给他,Worker需要等待,Exit阶段表示当前需要退出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// used in TaskStat
const (
TaskReady TaskStatus = 0
TaskQueue TaskStatus = 1
TaskRunning TaskStatus = 2
TaskDone TaskStatus = 3
TaskError TaskStatus = 4
)

type TaskStat struct {
WorkerID int
Status TaskStatus
StartTime time.Time
}

type Master struct {
// Your definitions here.
files []string
NReduce int

mu sync.Mutex
NowWorkers int
TotalWorkers int
ExitWorkers int

Tstate []TaskStat
Phase TaskPhase

TaskCh chan Task
}

初始化过程中可以确定好files和NReduce,files是所需要处理的文件集合,NReduce是生成最后文件的数量,可以理解为桶的数量。
NowWorker为当前分配WorkerID的指针,每次有一个Worker发来一个注册RPC,为其分配一个WorkerID,然后NowWorker自增,TotalWorker自增。
TotalWorker为当前Master中存活的Worker,因为后续test脚本中有crash测试,因此当一个worker失联超过设置的时间后判定为crash,TotalWorker自减。
ExitWorker为当前Master中正常退出的Worker,当Worker发送退出RPC时(为了简化设计,将退出RPC和通知Master任务完成的RPC进行设计合并),ExitWorker自增。
如何判断当前任务全部完成,Master可以退出了( Done )?

  • 判断TotalWorkers和ExitWorkers是否相等即可。

TState是Master中较为关键的成员,通过Init函数进行设置。

1
2
3
4
5
6
7
8
9
func (m *Master) InitMap() {
m.Phase = MapPhase
m.Tstate = make([]TaskStat, len(m.files))
}

func (m *Master) InitReduce() {
m.Phase = ReducePhase
m.Tstate = make([]TaskStat, m.NReduce)
}

TState的长度等于当前需要处理的Task的数量,TState[i]的含义就是第i个task当前的状态,包括分配给的WorkerID,状态以及开始执行的时间。TaskCh是一段有buffer的channel,用作Goroutines之间的数据传输,后续解释。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func (m *Master) InitMap() {}
func (m *Master) InitReduce() {}

//RPC通信,用于注册和退出Worker
func (m *Master) RegisterWorker(args *RegisterArgs, reply *RegisterReply) error {}
func (m *Master) ExitWorker(args *ReportArgs, reply *ReportReply) error {}

//当一个worker申请一个Task时,register该Task的TaskState
func (m *Master) RegisterTask(args *RequestArgs, task *Task) {}

//通过TaskID新建需要完成的Task并返回
func (m *Master) GetTask(NowTaskID int) Task {}

//调度
func (m *Master) Schedule() {}
func (m *Master) TickSchedule() {}

//RPC通信,用于Worker获取Task
func (m *Master) GetOneTask(args *RequestArgs, reply *RequestReply) error {}
//RPC通信,用于Master接受Worker发送过来的信息,更新对应的TState
func (m *Master) ReportTask_m(args *ReportArgs, reply *ReportReply) error {}
//RPC通信,用于Worker创建前询问Master是否完成全部的工作
func (m *Master) IsAllDone(args *ReportArgs, reply *ReportReply) error {}

func MakeMaster(files []string, nReduce int) *Master {
m := Master{}

m.mu = sync.Mutex{}
m.NReduce = nReduce
m.files = files

if nReduce > len(files) {
m.TaskCh = make(chan Task, nReduce)
} else {
m.TaskCh = make(chan Task, len(files))
}

m.InitMap()
go m.TickSchedule()

m.server()
return &m
}
  • 首先调用MakeMaster新建一个Master,并构建一个有buffer的TaskChannel,用作存放需要发送给Worker的Task。
  • 接着初始化Map,设置一下相关的参数
  • TickSchedule开始周期性调度全部任务,该函数主要实现是利用for循环,在不是ExitPhase的情况下周期性调用Schedule
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func (m *Master) Schedule() {
m.mu.Lock()
defer m.mu.Unlock()

if m.Phase == ExitPhase {
return
}

if m.Phase == WaitPhase {
if m.ExitWorkers == m.TotalWorkers {
m.Phase = ExitPhase
}
return
}

AllDone := true
for idx, task := range m.Tstate {
switch task.Status {
case TaskReady:
AllDone = false
m.TaskCh <- m.GetTask(idx)
m.Tstate[idx].Status = TaskQueue
case TaskQueue:
//..
case TaskRunning:
//..
// deal with crash
case TaskDone:
//..
case TaskError:
//..
default:
}
}

if AllDone {
if m.Phase == MapPhase {
m.InitReduce()
} else {
m.Phase = WaitPhase
//
// fake task...
//
}
}
}

Schedule的大体框架如上,首先Master在调度之前判断一下自身的状态,wait状态是Master已经调度完全部任务了,向所有还存活的Worker发送一个伪任务,告诉他们可以退出了,其余还存活的Worker一直在不停的接受任务,这时收到Master发送过来的fake task后,向Master发送已退出的消息然后退出,Master接收到全部退出消息后,将Phase改为ExitPhase然后退出。
中间的for循环遍历TaskState,将准备好的任务发送到TaskChannel,来让Worker接收。

Worker

首先定义worker结构体

1
2
3
4
5
type worker struct {
workerID int
mapf func(string, string) []KeyValue
reducef func(string, []string) string
}

worker的构建主要是通过Worker function

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {

w := worker{
mapf: mapf,
reducef: reducef,
}

args := ReportArgs{}
reply := ReportReply{}

call("Master.IsAllDone", &args, &reply)
if reply.AllFinished {
fmt.Printf("all done, no more client\n")
return
}

w.Register()
w.run()
fmt.Printf("worker %d exit normally\n", w.workerID)
}

首先判断一下Master是否分配完了全部的工作,否则就进行worker的注册和执行,Register主要通过RPC通信来和Master通信获取可用的workerID,run用来执行构建好的worker。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (w *worker) run() {
for {
task := w.RequestTask()
fmt.Printf("worker %d get task %d, TaskPhase==%d\n", w.workerID, task.ID, task.Phase)
if task.Phase == ExitPhase {
w.Exit()
break
} else if task.Phase == WaitPhase {
time.Sleep(1 * time.Second)
continue
}
w.doTask(task)

// 可以开多个窗口同时跑多个worker,下面的延时防止worker执行太快,
// 其他worker来不及执行
// time.Sleep(3 * time.Second)
}
}

run函数是worker中需要重点关注的函数,内部使用一个for循环来不断请求Master获得空闲的Task,此处的RequestTask也是通过RPC通信进行数据的获取。

  • 如果当前task处于ExitPhase,则调用Exit向Master进行RPC通信,告诉Master此Worker已退出。
  • 如果当前task处于WaitPhase,说明当前Master无任务分配,让Worker睡眠一段时间后重新请求。

如果当前任务可执行,则调用doTask进行任务的执行,doTask流程如下。

1
2
3
4
5
6
7
8
9
10
func (w *worker) doTask(task Task) {
switch task.Phase {
case MapPhase:
w.doMap(task)
case ReducePhase:
w.doReduce(task)
default:
panic("wrong phase")
}
}

RPC设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
type RegisterArgs struct {
}
type RegisterReply struct {
WorkerID int
}

type RequestArgs struct {
WorkerID int
}
type RequestReply struct {
Task_ *Task
}

type ReportArgs struct {
Done bool
TaskID int
Phase TaskPhase
WorkerID int
}
type ReportReply struct {
AllFinished bool
}

对于Register RPC,不需要worker传递参数给master,只需要master把当前可用的workerID 回复给worker即可。
对于Request RPC,worker需要传递给master自己的workerID,让master可以更新自己对应的TState中的任务,明确该分配给了了哪个worker,并更新状态,master回复给worker对应的task即可。
对于Report RPC,合并了三种操作,一种是刚调用Worker时询问Master是否分配完了全部的任务,一种是完成一个Task后向Master汇报该任务已完成,最后一种是向Master汇报该worker已退出。

doMap

Master把收到的全部文件设置为一个个的Task,状态保存在TState中,每次每个worker调用doMap就从TaskCh中取出一个Task任务执行mapf(大概就是对一个具体的file做Map操作,返回的键值对仿照mrsequential.go暂时保存到kva中),遍历kva,利用ihash函数将kv.Key对应的Value哈希到对应的桶中(可以使用intermediate[]表示桶的集合,大小为nReduce)
最后遍历桶,将各个桶中的数据写入中间文件,中间文件的命名可以使用mr-mapID-bucketID,写入出错或者写入成功都需要向Master call RPC进行通知。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (w *worker) doMap(task Task) {
// get filename and content
kva := w.mapf(filename, string(content))

intermediate := make([][]KeyValue, task.NReduce)
for _, kv := range kva {
bucketID := ihash(kv.Key) % task.NReduce
intermediate[bucketID] = append(intermediate[bucketID], kv)
}

for bucketID, kva := range intermediate {
oname := Map2ReduceFileName(task.ID, bucketID)
ofile, err := os.Create(oname)
if err != nil {
w.ReportTask_w(task, false, err)
return
}
enc := json.NewEncoder(ofile)
for _, kv := range kva {
err := enc.Encode(&kv)
if err != nil {
w.ReportTask_w(task, false, err)
}
}

if err := ofile.Close(); err != nil {
w.ReportTask_w(task, false, err)
}
}
w.ReportTask_w(task, true, nil)
}

doReduce

一个具体的Reduce Task是将上面Map生成的中间全部文件mr-i-taskID( 0 \le i <\lt NMap ),集合并做reducef操作,保存到最终文件mr-out-taskID中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func (w *worker) doReduce(task Task) {
maps := make(map[string][]string)
for idx := 0; idx < task.NMap; idx++ {
filename := Map2ReduceFileName(idx, task.ID)
file, err := os.Open(filename)
if err != nil {
w.ReportTask_w(task, false, err)
return
}

dec := json.NewDecoder(file)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
if _, ok := maps[kv.Key]; !ok {
maps[kv.Key] = make([]string, 0, 100)
}
maps[kv.Key] = append(maps[kv.Key], kv.Value)
}
}

res := make([]string, 0, 100)
for key, values := range maps {
res = append(res, fmt.Sprintf("%v %v\n", key, w.reducef(key, values)))
}

if err := ioutil.WriteFile(MergeName(task.ID), []byte(strings.Join(res, "")), 0600); err != nil {
w.ReportTask_w(task, false, err)
return
}

w.ReportTask_w(task, true, nil)
}

测试

test